Load Data from Hive Data Warehouse

Hive data warehouse is built on the HDFS of the Hadoop cluster, so the data in the Hive data warehouse is also stored in HDFS. Currently, SynxDB Elastic supports writing data to and reading data from HDFS (see Load Data from Object Storage and HDFS) as well as reading data from Hive via the Hive Connector.

The Hive Connector loads tables from the Hive cluster as foreign tables in SynxDB Elastic, which store the paths to the data in HDFS. datalake_fdw reads data from these external tables, thus loading data from Hive into SynxDB Elastic.

The general steps to use the Hive Connector are as follows.

Step 1. Configure Hive and HDFS information on SynxDB Elastic

On containerized SynxDB Elastic, you need to create configuration files. The general steps are as follows:

  1. Add the Hive MetaStore Service domain name and the namenode domain name to the CoreDNS server. If the two services use IP addresses, skip this step.

  2. Update gphive.conf and gphdfs.conf in connector-config to add the Hive and HDFS information to your account namespace.

  3. If the Hive cluster uses Kerberos for authentication, configures krb5.conf and keytab to add information required for Kerberos authentication.

Example

  1. If the Hive MetaStore Service and namenode do not use IP address, add the Hive MetaStore Service domain name and the namenode domain name to the coredns server in the target namespace (for example kube-system). You might need to rebuild the coredns service to apply the changes.

    kubectl -nkube-system edit cm coredns
    
    apiVersion: v1
    kind: ConfigMap
    metadata:
    name: coredns
    data:
    Corefile: |
       .:5353 {
          hosts {
             10.13.9.156 10-13-9-156
             fallthrough
          }
       }
    
  2. Configure gphive.conf and gphdfs.conf in connector-config in your account namespace.

    kubectl edit configmap connector-config -n <your-account-namespace>
    

    The following are examples of gphive.conf and gphdfs.conf in different authentication modes.

    Note

    • You need to replace the configuration options with your own ones. For the detailed description of each option, see Configuration options.

    • In the configuration files, configuration options under cluster names must be indented with 4 spaces to align with the cluster name lines. For example, in the following example, the configuration options (such as hdfs_namenode_host and hdfs_namenode_port) under hive-cluster-1 must be indented with 4 spaces.

    • For simple authentication mode with a single cluster.

      apiVersion: v1
      kind: ConfigMap
      metadata:
      name: connector-config
      data:
      gphdfs.conf: |
         hdfs-cluster-1:
             # namenode host
             hdfs_namenode_host: 10-13-9-156
             # name port
             hdfs_namenode_port: 9000
             # authentication method
             hdfs_auth_method: simple
             # rpc protection
             hadoop_rpc_protection: authentication
      gphive.conf: |
         hive-cluster-1:
             uris: thrift://10-13-9-156:9083
             auth_method: simple
      
    • For kerberos authentication mode with 2 clusters for high availability.

      apiVersion: v1
      kind: ConfigMap
      metadata:
      name: connector-config
      data:
      gphdfs.conf: |
         hdfs-cluster-1:
             hdfs_namenode_host: mycluster
             hdfs_namenode_port: 9000
             hdfs_auth_method: kerberos
             krb_principal: hdfs/10-13-9-156@EXAMPLE.COM
             krb_principal_keytab: /etc/kerberos/keytab/hdfs.keytab
             is_ha_supported: true
             hadoop_rpc_protection: authentication
             data_transfer_protocol: true
             dfs.nameservices: mycluster
             dfs.ha.namenodes.mycluster: nn1,nn2
             dfs.namenode.rpc-address.mycluster.nn1: 10.13.9.156:9000
             dfs.namenode.rpc-address.mycluster.nn2: 10.13.9.157:9000
             dfs.client.failover.proxy.provider.mycluster: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
      gphive.conf: |
         hive-cluster-1:
             uris: thrift://10.13.9.156:9083,thrift://10.13.9.157:9083
             auth_method: kerberos
             krb_service_principal: hive/10-13-9-156@EXAMPLE.COM
             krb_client_principal: hive/10-13-9-156@EXAMPLE.COM
             krb_client_keytab: /etc/kerberos/keytab/hive.keytab
      
  3. If the target Hive cluster uses Kerberos for authentication, in addition to gphive.conf and gphdfs.conf, you also need to configure kerberos-config and keytab that exist in the proxy and all segments.

    • To configure kerberos-config, run kubectl -n<account namesapce> edit cm kerberos-config. The following is an example of kerberos-config to be configured. You can get the configuration information from the krb5.conf file of the target Hive cluster.

      apiVersion: v1
      kind: ConfigMap
      data:
         krb5.conf: |
            [logging]
             default = FILE:/var/log/krb5libs.log
             kdc = FILE:/var/log/krb5kdc.log
             admin_server = FILE:/var/log/kadmind.log
            [libdefaults]
             dns_lookup_realm = false
             ticket_lifetime = 24h
             renew_lifetime = 7d
             forwardable = true
             rdns = false
             pkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crt
             default_realm = EXAMPLE.COM
            [realms]
             EXAMPLE.COM = {
             kdc = 10.13.9.156
             admin_server = 10.13.9.156
            }
            [domain_realm]
             .example.com = EXAMPLE.COM
             example.com = EXAMPLE.COM
      
    • To configure keytab, you first need to get the hdfs.keytab and hive.keytab files, and run commands in the same directory to load the files into the cluster. For example:

      # Loads hdfs.keytab into the cluster.
      kubectl -n<account_namespace> get secret kerberos-keytab -o json | jq --arg new_value "$(base64 -i hdfs.keytab)" '.data["hdfs.keytab"] = $new_value' | kubectl -n<account_namespace> apply -f -
      
      # Loads hive.keytab into the cluster.
      kubectl -n<account_namespace> get secret kerberos-keytab -o json | jq --arg new_value "$(base64 -i hive.keytab)" '.data["hive.keytab"] = $new_value' | kubectl -n<account_namespace> apply -f -
      

      After the loading, use the following commands to check the validity of hdfs.keytab and hive.keytab. The keytab is stored in /etc/kerberos/keytab/.

      kinit -k -t hdfs.keytab hdfs/10-13-9-156@EXAMPLE.COM
      kinit -k -t hive.keytab hive/10-13-9-156@EXAMPLE.COM
      

Configuration options

This section introduces the detailed description of the configuration options of gphive.conf and gphdfs.conf.

Attention

The default port for the data_lake agent has been changed from 5888 to 3888 to avoid conflict with PXF.

gphive.conf

You can get these configuration information from the hive-site.xml file of the target Hive cluster.

Item name

Description

Default value

uris

The listening address of Hive Metastore Service (the HMS hostname).

/

auth_method

The authentication method for Hive Metastore Service: simple or kerberos.

simple

krb_service_principal

The service principal required for the Kerberos authentication of Hive Metastore Service. When using the HMS HA feature, you need to configure the instance in the principal as _HOST, for example, hive/_HOST@EXAMPLE.

krb_client_principal

The client principal required for the Kerberos authentication of Hive Metastore Service.

krb_client_keytab

The keytab file of the client principal required for the Kerberos authentication of Hive Metastore Service.

debug

The debug flag of Hive Connector: true or false.

false

gphdfs.conf

You can get these configuration information from the hive-site.xml and hdfs-site.xml files of the target Hive cluster.

Option name

Description

Default Value

hdfs_namenode_host

Configures the host information of HDFS. For example, hdfs://mycluster, where hdfs:// can be omitted.

/

hdfs_namenode_port

Configures the port information of HDFS. If not configured, the default port 9000 is used.

9000

hdfs_auth_method

Configures the HDFS authentication method. Uses simple for regular HDFS. Uses kerberos for HDFS with Kerberos.

/

krb_principal

Kerberos principal. This is set when hdfs_auth_method is set to “kerberos”.

/

krb_principal_keytab

The location where the user-generated keytab is placed.

/

hadoop_rpc_protection

Should match the configuration in hdfs-site.xml of the HDFS cluster.

/

data_transfer_protocol

When the HDFS cluster is configured with Kerberos, there are two different methods: 1. privileged resources. 2. SASL RPC data transfer protection and SSL for HTTP. If it is the second method, “SASL”, you need to set data_transfer_protocol to true here.

/

is_ha_supported

Sets whether to use hdfs-ha. The value of true means to use hdfs-ha, while false means not to use. The default value is false.

false

Step 2. Create foreign data wrapper and Hive Connector extension

Before synchronization, load the datalake_fdw extension used for reading HDFS, and create the foreign data wrapper for reading external tables.

  1. Create the necessary extensions.

    CREATE EXTENSION synxdb;
    CREATE EXTENSION dfs_tablespace;
    CREATE EXTENSION gp_toolkit;
    CREATE EXTENSION datalake_fdw;
    
  2. Create the foreign data wrapper.

    CREATE FOREIGN DATA WRAPPER datalake_fdw
    HANDLER datalake_fdw_handler
    VALIDATOR datalake_fdw_validator
    OPTIONS (mpp_execute 'all segments');
    
  3. Before calling the function, you need to load the Hive Connector extension.

    CREATE EXTENSION hive_connector;
    

Step 3. Create server and user mapping

After creating the foreign data wrapper and Hive Connector, you need to create the server and user mapping, as shown in the following example:

SELECT public.create_foreign_server('sync_server', 'gpadmin', 'datalake_fdw', 'hdfs-cluster-1');

In the above example, the create_foreign_server function takes the form as follows:

create_foreign_server(serverName,
                     userMapName,
                     dataWrapName,
                     hdfsClusterName);

This function creates a server and user mapping pointing to an HDFS cluster, which can be used by the Hive Connector to create foreign tables. The datalake_fdw uses the server configuration to read data from the corresponding HDFS cluster when accessing external tables.

The parameters in the function are explained as follows:

  • serverName: The name of the server to be created.

  • userMapName: The name of the user to be created on the server.

  • dataWrapName: The name of the data wrapper used for reading HDFS data.

  • hdfsClusterName: The name of the HDFS cluster where the Hive cluster is located, as specified in the configuration file.

Tip

By default, the datalake_fdw accesses HDFS using the system role gpadmin. You can use the user option in CREATE USER MAPPING to control which HDFS user will be used when accessing the file system. This allows finer access control to HDFS resources.

Example:

CREATE SERVER foreign_server
FOREIGN DATA WRAPPER datalake_fdw
OPTIONS (
   protocol 'hdfs',
   hdfs_namenodes 'hadoop-nn',
   hdfs_port '9000',
   hdfs_auth_method 'simple',
   hadoop_rpc_protection 'authentication');

CREATE USER MAPPING FOR current_user SERVER foreign_server
OPTIONS (user 'hdfs_reader');

In this example, the HDFS storage will be accessed with the hdfs_reader user rather than the default gpadmin. This method is recommended for managing access permissions in multi-tenant or multi-user environments.

Step 4. Sync Hive objects to SynxDB Elastic

Syncing a Hive table

To sync a table from Hive to SynxDB Elastic, see the following example:

-- Syncs Hive tables in psql.

gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'weblogs', 'hdfs-cluster-1', 'myschema.weblogs', 'sync_server');
 sync_hive_table
-----------------
 t
(1 row)

The above example uses the sync_hive_table function to perform the synchronization. The general form of the function is as follows:

sync_hive_table(hiveClusterName,
               hiveDatabaseName,
               hiveTableName,
               hdfsClusterName,
               destTableName,
               serverName,
               forceSync);

This function syncs a table to SynxDB Elastic, with both non-forced and forced modes available. When forceSync is set to true, the sync is forced, which means that if a table with the same name already exists in SynxDB Elastic, the existing table is dropped before syncing. If the forceSync parameter is not provided or is set to false, an error will occur if a table with the same name exists.

The parameters are explained as follows:

  • hiveClusterName: The name of the Hive cluster where the table to be synced is located, as specified in the configuration file.

  • hiveDatabaseName: The name of the database in Hive where the table to be synced belongs.

  • hiveTableName: The name of the table to be synced.

  • hdfsClusterName: The name of the HDFS cluster where the Hive cluster is located, as specified in the configuration file.

  • destTableName: The name of the table in SynxDB Elastic where the data will be synced.

  • serverName: The name of the server to be used when creating the foreign table with the datalake_fdw extension.

  • forceSync: Optional parameter. Default value is false. Indicates whether the sync should be forced.

Sync a partitioned Hive table using sync_hive_partition_table

SynxDB Elastic supports synchronizing only the latest partition of a Hive table using the sync_hive_partition_table function. This function is used to sync a single partition specified by the highest-level partition key (for example, prov if the table is partitioned by prov, month, and day). It does not support specifying lower-level partition keys directly (such as month or day), and will return an error if you attempt to do so.

Function prototype:

CREATE OR REPLACE FUNCTION sync_hive_partition_table(
   hiveClusterName text,
   hiveDatabaseName text,
   hiveTableName text,
   hivePartitionValue text,
   hdfsClusterName text,
   destTableName text
) RETURNS boolean
AS '$libdir/hive_connector', 'sync_hive_partition_table'
LANGUAGE C STRICT EXECUTE ON MASTER;

The parameter hivePartitionValue means the value for the highest-level partition key. It must be the first key in the partition column list.

Example Hive table:

CREATE TABLE hive_table (
   id int,
   name string
)
PARTITIONED BY (
   prov int,
   month int,
   day int
);

Example usage:

SELECT sync_hive_partition_table(
   'hive-cluster-1',
   'mydb',
   'hive_table',
   '06',
   'hdfs-cluster-1',
   'myschema.hive_table_06'
);

This call will sync only the partition data under prov=06. If you try to specify values like month=06 or day=15, the function will return an error.

Note: This function only supports specifying the value of the first partition key. Multi-level partition value specification is currently not supported.

Resulting external table structure:

CREATE TABLE mpp_table (
   id int,
   name string,
   prov int,
   month int,
   day int
)
LOCATION('gphdfs://example/prov=06/ hdfs_cluster_name=paa_cluster partitonkey=month,day partitionvalue=06')
FORMAT 'xxx';

More examples

Sync a Hive text table

  1. Create the following text table in Hive.

    -- Creates the Hive table in Beeline.
    
    CREATE TABLE weblogs
    (
        client_ip           STRING,
        full_request_date   STRING,
        day                 STRING,
        month               STRING,
        month_num           INT,
        year                STRING,
        referrer            STRING,
        user_agent          STRING
    ) STORED AS TEXTFILE;
    
  2. Sync the text table to SynxDB Elastic.

    -- Syncs the Hive table in psql.
    
    gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'weblogs', 'hdfs-cluster-1', 'myschema.weblogs', 'sync_server');
    sync_hive_table
    -----------------
    t
    (1 row)
    
  3. Query the external table.

    SELECT * FROM myschema.weblogs LIMIT 10;
    

Sync a Hive ORC table

  1. Create an ORC table in Hive.

    -- Creates the Hive table in Beeline.
    CREATE TABLE test_all_type
    (
        column_a tinyint,
        column_b smallint,
        column_c int,
        column_d bigint,
        column_e float,
        column_f double,
        column_g string,
        column_h timestamp,
        column_i date,
        column_j char(20),
        column_k varchar(20),
        column_l decimal(20, 10)
    ) STORED AS ORC;
    
  2. Sync the ORC table to SynxDB Elastic:

    -- Syncs the Hive table in psql.
    
    gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'test_all_type', 'hdfs-cluster-1', 'myschema.test_all_type', 'sync_server');
    sync_hive_table
    -----------------
    t
    (1 row)
    
  3. Query the external table.

    SELECT * FROM myschema.test_all_type LIMIT 10;
    

Sync a Hive ORC partitioned table

  1. Create an ORC partitioned table in Hive.

    -- Creates the Hive table in Beeline.
    
    CREATE TABLE test_partition_1_int
    (
        a tinyint,
        b smallint,
        c int,
        d bigint,
        e float,
        f double,
        g string,
        h timestamp,
        i date,
        j char(20),
        k varchar(20),
        l decimal(20, 10)
    )
    PARTITIONED BY
    (
        m int
    )
    STORED AS ORC;
    INSERT INTO test_partition_1_int VALUES (1, 1, 1, 1, 1, 1, '1', '2020-01-01 01:01:01', '2020-01-01', '1', '1', 10.01, 1);
    INSERT INTO test_partition_1_int VALUES (2, 2, 2, 2, 2, 2, '2', '2020-02-02 02:02:02', '2020-02-01', '2', '2', 11.01, 2);
    INSERT INTO test_partition_1_int VALUES (3, 3, 3, 3, 3, 3, '3', '2020-03-03 03:03:03', '2020-03-01', '3', '3', 12.01, 3);
    INSERT INTO test_partition_1_int VALUES (4, 4, 4, 4, 4, 4, '4', '2020-04-04 04:04:04', '2020-04-01', '4', '4', 13.01, 4);
    INSERT INTO test_partition_1_int VALUES (5, 5, 5, 5, 5, 5, '5', '2020-05-05 05:05:05', '2020-05-01', '5', '5', 14.01, 5);
    
  2. Sync the ORC partitioned table to SynxDB Elastic.

    -- psql syncs the Hive partitioned tables as one foreign table.
    
    gpadmin=# select public.sync_hive_table('hive-cluster-1', 'mytestdb', 'test_partition_1_int', 'hdfs-cluster-1', 'myschema.test_partition_1_int', 'sync_server');
    sync_hive_table
    -----------------
    t
    (1 row)
    
  3. Query the external table.

    SELECT * FROM myschema.test_partition_1_int LIMIT 10;
    

Supported usage and limitations

Supported Hive file formats

You can load files in TEXT, CSV, ORC, or PARQUET formats from Hive into SynxDB Elastic.

Data type mapping

The following table shows the one-to-one mapping between table data types on a Hive cluster and table data types in SynxDB Elastic.

Hive

SynxDB Elastic

binary

bytea

tinyint

smallint

smallint

smallint

int

int

bigint

bigint

float

float4

double

double precision

string

text

timestamp

timestamp

date

date

char

char

varchar

varchar

decimal

decimal

Usage limitations

  • Synchronizing Hive external tables is not supported.

  • Synchronizing Hive table statistics is not supported.

  • SynxDB Elastic can read data from HDFS and write data to HDFS, but the written data cannot be read by Hive.

  • When using sync_hive_partition_table, only the first-level partition key is supported. Specifying a value from a secondary or lower-level partition key will result in an error.

Note

Q: How is write and update on HDFS synchronized to SynxDB Elastic? Are there any limitations?

A: The data is still stored in HDFS, and the Foreign Data Wrapper only reads the data from HDFS.