Load Data from Kafka Using Kafka Connector
SynxDB Kafka Connector is a plugin based on the Apache Kafka Connect framework. It consumes data from Kafka topics and loads it into SynxDB tables, enabling scenarios such as data loading and real-time data warehousing.
Data Formats
The Kafka message data format supports the following three formats:
Delimited Plain Text
678,mike,abc 456,jack,zzz
JSON Format
{"id":"100","name":"zhangsan","address":"beijing"} {"id":"200","name":"lisi","address":"shanghai"}
JSON Format for Change Data Capture (CDC)
Supports both DML (Data Manipulation Language) and DDL (Data Definition Language) operations, such as those in debezium format.
// insert { "payload": { "before": null, "after": { "c1": 10, "c2": "abcd" }, "source": { "version": "2.1.1.Final", "connector": "postgresql", "name": "debezium", "ts_ms": 1697280773034, "snapshot": "false", "db": "postgres", "sequence": [ "23413560", "23413848" ], "schema": "s2", "table": "t2", "txId": 771, "lsn": 23413848, "xmin": null }, "op": "c", "ts_ms": 1697280773249, "transaction": null } } // delete { "payload": { "before": { "c1": 11, "c2": null }, "after": null, "source": { "version": "2.1.1.Final", "connector": "postgresql", "name": "debezium", "ts_ms": 1697280877563, "snapshot": "false", "db": "postgres", "sequence": [ "23415392", "23415448" ], "schema": "s2", "table": "t2", "txId": 774, "lsn": 23415448, "xmin": null }, "op": "d", "ts_ms": 1697280878032, "transaction": null } }
When the Kafka message is in JSON CDC format, it supports DML (insert/update/delete) and DDL operations. For DML operations (insert/update/delete), the connector performs the following merge actions:
Converts an update operation into a delete+insert operation.
Operations on the same row (with the same primary key) will be merged. For example, if a row with the same primary key is updated 10,000 times, only the final value will be sent.
During data loading, SynxDB uses its unique
gpfdist
external table method to load the merged result data to the target with maximum throughput:For inserts, it loads the data directly into the user table via
gpfdist
external tables.For deletes, it creates a temporary table, loads the delete data into this table via
gpfdist
, and then performs a join delete operation (that is,DELETE FROM user_table WHERE user_table.pk = temp_table.pk
) to update the user table.
|product_name| Installation Directory: SynxDB Kafka Connector utilizes SynxDB’s unique gpfdist external table method to load data with maximum throughput. Therefore, the machine must have the SynxDB installation directory.
Installation
Preparation
Before using SynxDB Kafka Connector, make sure to prepare the following components:
Java JDK: SynxDB Kafka Connector is released as a Java JAR file. Because Java is a cross-platform product, you only need to have the appropriate JDK installed on your system to run the connector. [Download JDK here].
Kafka: SynxDB Kafka Connector is based on the Kafka Connector framework, so it must run within the Kafka installation directory. [Download Kafka here].
|product_name| Kafka Connector JAR File: Check the available versions to download the appropriate JAR file from SynxDB.
Configuration File:
# entry class connector.class=cn.synxdb.kafka.connector.SynxDBSinkConnector # task max number tasks.max=2 # connector name name=z1connector # kafa topic name topics=delimited1topic # kafka topic message format synxdb.topic.format=delimited_file synxdb.format.delimiter=| # target table synxdb.topic.table=public.test1table # database connection info synxdb.url.name=jdbc:postgresql://192.168.176.110:5432/db1 synxdb.user.name=gpadmin synxdb.user.password=gpadmin synxdb.database.name=db1 # directory to put gpfdist data and kafka offset synxdb.data.dir=/xxx/synxdb_kafka_connector/data # data flush condition, e.g. here flush after consume 10k records or 5M bytes or 1 second buffer.count.records=10000 buffer.size.bytes=5000000 buffer.flush.time=1 # converter key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
Installation Steps
Extract Kafka.
tar xvfz kafka_2.13-3.1.0.tgz
Edit the configuration file
kafka_2.13-3.1.0/config/connect-standalone.properties
.# Modify the following two settings # 1.Kafka server address bootstrap.servers=localhost:9092 # 2.Location of the Kafka Connector JAR file plugin.path=/xxx/synxdb_kafka_connector/synxdb-kafka-connector-0.0.12.jar
Edit
synxdb-kafka-connector-standalone.properties
. For example:# Custom connector name name=zyzxconnector # Kafka topic to consume topics=zyzx1topic # Data format in the Kafka topic synxdb.topic.format=delimited_file synxdb.format.delimiter=| # Database connection details synxdb.url.name=jdbc:postgresql://192.168.176.110:5432/db1 synxdb.user.name=gpadmin synxdb.user.password=gpadmin synxdb.database.name=db1 # Directory for intermediate data: create this directory in advance with mkdir -p /xxx/synxdb_kafka_connector/data synxdb.data.dir=/xxx/synxdb_kafka_connector/data
Tip
Explanation of synxdb.data.dir
This directory will store a metadata file that tracks the Kafka message offset that has been replicated. You can manually edit this file to skip unwanted data before restarting the connector.
Set the |product_name| environment variables.
source /<installation directory>/greenplum_path.sh
Start |product_name| Kafka Connector.
kafka_2.13-3.1.0/bin/connect-standalone.sh -daemon kafka_2.13-3.1.0/config/connect-standalone.properties synxdb-kafka-connector-standalone.properties
Parameter Descriptions
The following section explains the parameters in the synxdb-kafka-connector-standalone.properties
file.
Kafka Connect Framework Parameters
Parameter
Description
Example
name
Connector name
name=zyzxconnector
topics
Kafka topic name
topics=zyzx1topic
tasks.max
Number of tasks to run in the Kafka Connect framework, generally not exceeding the number of partitions in the topic.
tasks.max=2
connector.class
Fixed value
connector.class=cn.synxdb.kafka.connector.SynxDBSinkConnector
key.converter
Fixed value
org.apache.kafka.connect.storage.StringConverter
value.converter
Fixed value
org.apache.kafka.connect.storage.StringConverter
SynxDB Kafka Conector Parameters
Kafka Data Format Parameters
Parameter
Description
Example
synxdb.topic.format
Format of the content in Kafka topics.
synxdb.topic.format=delimited_file
synxdb.format.delimiter
Delimiter for plain text data in Kafka (non-JSON).
synxdb.format.delimiter=
synxdb.topic.table
Target table name
synxdb.topic.table=schema1.table1
Database Connection Parameters
Parameter
Description
Example
synxdb.url.name
Database JDBC connection string
synxdb.url.name=jdbc:postgresql://192.168.176.110:5432/db1
synxdb.user.name
Database username
synxdb.user.name=gpadmin
synxdb.user.password
Database user password
synxdb.user.password=gpadmin
synxdb.database.name
Database name
synxdb.database.name=db1
Data Loading Real-time Parameters
Parameter
Description
Example
buffer.count.records
Number of records to consume from Kafka before loading data into the database.
buffer.count.records=10000
buffer.size.bytes
Size of consumed records from Kafka before loading data into the database.
buffer.size.bytes=5000000
buffer.flush.time
Time to wait before loading data into the database.
buffer.flush.time=1
Working Directory Configuration
Parameter
Description
Example
synxdb.dir
- Working directory used during operation. This directory will store:
Kafka topic offsets (important, do not delete).
Other temporary files (will be deleted after operation).
synxdb.data.dir=/xxx/synxdb_kafka_connector/data
synxdb.ext.table.log.errors
Whether to include a
LOG ERRORS
clause when creating thegpfdist
external table. Useful for skipping invalid data during import into SynxDB.synxdb.ext.table.log.errors=LOG ERRORS SEGMENT REJECT LIMIT 5
synxdb.table.upsert
Specify if the
INSERT ... SELECT ... FROM <gpfdist external table>
statement should include an upsert clause. If present, it updates existing rows; otherwise, it performs a direct insert.Example without upsert:
insert into public.t1 select * from public_t1_0_ins_ext
.Example with upsert:
insert into public.t1 select * from public_t1_0_ins_ext ON CONFLICT (c1) DO UPDATE SET c2 = EXCLUDED.c2, c3 = EXCLUDED.c3
.
Parameter Descriptions for synxdb.topic.format
The synxdb.topic.format
parameter defines the format of data in the Kafka topic. The valid values and their descriptions are:
delimited_file: Data with delimiters.
Delimiter Example: When
synxdb.format.delimiter=|
, the Kafka topic messages appear as:678|mike|abc 456|jack|zzz
Alternate Delimiter: When
synxdb.format.delimiter=@
, the Kafka topic messages appear as:678@mike@abc 456@jack@zzz
debeziumjson: Data in debezium format.
// insert { "payload": { "before": null, "after": { "c1": 10, "c2": "abcd" }, "source": { "version": "2.1.1.Final", "connector": "postgresql", "name": "debezium", "ts_ms": 1697280773034, "snapshot": "false", "db": "postgres", "sequence": [ "23413560", "23413848" ], "schema": "s2", "table": "t2", "txId": 771, "lsn": 23413848, "xmin": null }, "op": "c", "ts_ms": 1697280773249, "transaction": null } } // delete { "payload": { "before": { "c1": 11, "c2": null }, "after": null, "source": { "version": "2.1.1.Final", "connector": "postgresql", "name": "debezium", "ts_ms": 1697280877563, "snapshot": "false", "db": "postgres", "sequence": [ "23415392", "23415448" ], "schema": "s2", "table": "t2", "txId": 774, "lsn": 23415448, "xmin": null }, "op": "d", "ts_ms": 1697280878032, "transaction": null } } // update { "payload": { "before": { "c1": 10, "c2": "abcd" }, "after": { "c1": 10, "c2": "upd-c2-kkk" }, "source": { "version": "2.1.1.Final", "connector": "postgresql", "name": "debezium", "ts_ms": 1697285262028, "snapshot": "false", "db": "postgres", "sequence": [ "23508840", "23508896" ], "schema": "s2", "table": "t3", "txId": 784, "lsn": 23508896, "xmin": null }, "op": "u", "ts_ms": 1697285262211, "transaction": null } }
Note: When using
debeziumjson
, the following parameters are ignored:synxdb.format.delimiter
synxdb.topic.table
zyzxjson/csgjson/ttjson: Custom JSON data formats for specific clients.
// DML { "_source_schema": "PUBLIC", "_source_table": "PERSON", "_committime": "2023-03-14 14:57:35.863", "_optype": "INSERT", "_seqno": "2261", "record": { "PKID": "825", "ID": "20211128", "NAME": "zhangsan", "LOADING_DATE": "2023-03-14 00:00:00.0", "DELETE_FLAG": "1", "MOD_USER": "annoy", "MOD_USER_ID": "75589" } } { "_source_schema": "PUBLIC", "_source_table": "PERSON", "_committime": "2023-03-14 18:13:43.622", "_optype": "UPDATE", "_seqno": "2264", "record": { "PKID": "279", "ID": "20210582", "NAME": "zhangsan", "LOADING_DATE": "2023-03-14 00:00:00.0", "DELETE_FLAG": "1", "MOD_USER": "admin", "MOD_USER_ID": "94950" }, "key": { "PKID": "279" } } { "_source_schema": "PUBLIC", "_source_table": "PERSON", "_committime": "2023-03-17 15:02:05.19", "_optype": "DELETE", "_seqno": "2267", "record": { "PKID": "5" }, "key": { "PKID": "5" } } // DDL { "_source_schema": "PUBLIC", "_committime": "2023-03-17 15:06:05.249", "_optype": "DDL", "_seqno": "2268", "record": "alter table PUBLIC.PERSON add add_column integer" }
Troubleshooting
SynxDB Kafka Connector’s log files are located in the kafka_2.13-3.1.0/logs
directory. Check these files to troubleshoot issues.
To print more detailed information, you can adjust the logging level by editing the kafka_2.13-3.1.0/config/connect-log4j.properties
file. Change the log level for the Kafka Connector from INFO
to DEBUG
by modifying the following line:
log4j.logger.cn.synxdb=DEBUG
This will enable more detailed logging for troubleshooting.