How to Run a Data Pipeline on Vultr Kubernetes Engine (VKE) Using Kafka Connect
Introduction
Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate data streams with other parts of your system in a secure, reliable, and scalable manner.
To build integration solutions, you can use the Kafka Connect framework to integrate with external systems. There are two types of Kafka connectors:
- Source connector: Used to move data from source systems to Kafka topics.
- Sink connector: Used to send data from Kafka topics into target (sink) systems.
When integrating with relational databases, the JDBC connector (source and sink) and Debezium (source) connectors are widely used solutions. The JDBC connector works by pulling the database table to retrieve data, while Debezium relies on change data capture.
In this article, you will learn how to run a data pipeline using Kafka Connect on a Vultr Kubernetes Engine (VKE) cluster with the open-source Strimzi project. Then, you will integrate a Vultr Managed Database for PostgreSQL with Apache Kafka by creating source and sink connectors to synchronize data from one PostgreSQL table to another.
Prerequisites
Before you begin:
- Deploy a Vultr Kubernetes Engine (VKE) cluster with at least 3 nodes.
- Create a Vultr Managed Database for PostgreSQL.
- Install the kubectl CLI tool to access the cluster.
- Install the PostgreSQL
psql
CLI tool on your computer. - Install Docker on your computer.
Set up the PostgreSQL database
Connect to your Vultr Managed Database for PostgreSQL.
$ psql "host=mydb.vultrdb.com port=5432 dbname=defaultdb user=example-user password=database-password sslmode=require"
Replace
mydb.vultrdb.com
with your actual Vultr host details,example-user
with your actual database user, anddatabase-password
with your actual PostgreSQL database password.When connected, your prompt should change to:
defaultdb=>
Create the
orders
table.CREATE TABLE orders ( order_id serial PRIMARY KEY, customer_id integer, order_date date );
Create the
orders_sink
table that will act as a target for records sent to the Kafka topic.CREATE TABLE orders_sink ( order_id serial PRIMARY KEY, customer_id integer, order_date integer );
Install Strimzi on Vultr Kubernetes Engine
Create a new Kafka namespace.
$ kubectl create namespace kafka
Apply the Strimzi installation files, including
ClusterRoles
,ClusterRoleBindings
, and Custom Resource Definitions (CRDs).$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
Verify that the Strimzi cluster operator pod is available and running.
$ kubectl get pod -n kafka -w
Your output should look like the one below:
NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-56d64c8584-7k6sr 1/1 Running 0 43s
Set up a single-node Apache Kafka cluster
Create a directory to store cluster files.
$ mkdir vultr-vke-kafka-connect
Switch to the directory.
$ cd vultr-vke-kafka-connect
Using a text editor of your choice, create a new
kafka-cluster.yaml
manifest.$ nano kafka-cluster.yml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-cluster spec: kafka: version: 3.3.1 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.3" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {}
Save and close the file.
Apply the cluster.
$ kubectl apply -f kafka-cluster.yml -n kafka
Verify that the Apache Kafka cluster is created.
$ kubectl get kafka -n kafka
Output:
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS kafka-cluster 1 1 True True
Verify that the Kafka pod is available.
$ kubectl get pod/kafka-cluster-kafka-0 -n kafka
Output:
NAME READY STATUS RESTARTS AGE kafka-cluster-kafka-0 1/1 Running 0 1m23s
Create a Docker image for Kafka Connect
Create a new Dockerfile.
$ nano Dockerfile
Add the following contents to the file.
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 USER root:root RUN mkdir /opt/kafka/plugins RUN curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.9.0.Final/debezium-connector-postgres-1.9.0.Final-plugin.tar.gz \ && tar -xf debezium-connector-postgres-1.9.0.Final-plugin.tar.gz -C plugins RUN mkdir /opt/kafka/plugins/kafka-connect-jdbc RUN curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.6.0/kafka-connect-jdbc-10.6.0.jar \ && mv kafka-connect-jdbc-10.6.0.jar plugins/kafka-connect-jdbc \ && cp plugins/debezium-connector-postgres/postgresql-42.3.3.jar plugins/kafka-connect-jdbc USER 1001
Save and close the file.
The above file uses
quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
as the base image, creates the/opt/kafka/plugins
directory, then downloads the Debezium PostgreSQL and Kafka Connect JDBC sink plugins to the directory.To publish the image, log in to Docker.
$ sudo docker login
Login to your Docker account as displayed in the following output:
Login with your Docker ID to push and pull images from Docker Hub. Username: Password:
Build the image.
$ sudo docker build -t vke-kafka-connect-demo .
Tag the image.
$ sudo docker tag vke-kafka-connect-demo example-user/vke-kafka-connect-demo
Replace
example-user
with your actual Docker account username.Push the Image to Docker Hub.
$ sudo docker push example-user/vke-kafka-connect-demo
Output:
The push refers to repository [docker.io/example-user/vke-kafka-connect-demo] 5985a7b633b1: Pushed a1cac272dd78: Pushed afd5d00d6422: Pushed
Deploy a Kafka Connect cluster
Create a new file
kafka-connect.yaml
.$ nano kafka-connect.yaml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: image: example-user/vke-kafka-connect-demo replicas: 1 bootstrapServers: kafka-cluster-kafka-bootstrap:9092 config: group.id: my-kafka-connect-cluster offset.storage.topic: kafka-connect-cluster-offsets config.storage.topic: kafka-connect-cluster-configs status.storage.topic: kafka-connect-cluster-status key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 resources: requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi jvmOptions: "-Xmx": "1g" "-Xms": "1g"
Replace
example-user/vke-kafka-connect-demo
with your actual Docker image path.Save and close the file.
Create the cluster.
$ kubectl apply -f kafka-connect.yaml -n kafka
Verify that the Kafka Connect cluster is available and running.
$ kubectl get pod -l=strimzi.io/cluster=kafka-connect-cluster -n kafka
Your output should look like the one below.
NAME READY STATUS RESTARTS AGE kafka-connect-cluster-connect-f9849ccdb-5lnjc 1/1 Running 0 60s
Create the Kafka source connector
Create a new file
kafka-connector-source.yaml
.$ nano kafka-connector-source.yaml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: debezium-postgres-source-connector labels: strimzi.io/cluster: kafka-connect-cluster spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 2 config: database.hostname: mydb.vultrdb.com database.port: 16751 database.user: example-admin database.password: "database-password" database.dbname: defaultdb database.server.name: myserver plugin.name: wal2json table.include.list: public.orders value.converter: org.apache.kafka.connect.json.JsonConverter database.sslmode: require publication.autocreate.mode: filtered
Replace the connector configuration properties as follows.
database.hostname:
- Your Vultr Managed PostgreSQL database host.database.port:
- The Postgres database port.database.user:
- Valid database user.database.password:
- Your Vultr Managed Database for PostgreSQL password.database.dbname:
- The database name to stream the Kafka changes from. As created earlier,public.orders
. Public represents the database schema name.database.sslmode:
- Encrypted the connection to your database. The valuerequire
enforces a secure connection to your Vultr Managed Database for PostgreSQL.table.include.list:
- The table name to capture changes.
Apply the connector to the Kafka namespace.
$ kubectl apply -f kafka-connector-source.yaml -n kafka
Verify that the connector is added.
$ kubectl get kafkaconnector -n kafka
Output:
NAME CLUSTER CONNECTOR CLASS MAX TASKS debezium-postgres-source-connector kafka-connect-cluster io.debezium.connector.postgresql.PostgresConnector 2
Verify the Source Connector functionality
In a new terminal session, access your PostgreSQL database, and insert data to the orders table as below.
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 300);
In your Kubernetes session, start a Kafka consumer and confirm data in the Kafka topic as below.
$ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning
You should see records in the Kafka topic in the form of a
JSON
payload. The change eventJSON
is big because it contains information about the changed record, including its schema as below.{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"order_date"}],"optional":true,"name":"myserver.public.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"}}
Every event generated by the connector has the following components (repeated twice) -
schema
andpayload
.- The first
schema
field corresponds to the event key and describes the structure of the table's primary key. - The first
payload
field is part of the event key and it contains the key for the row that was changed. - The second
schema
field corresponds to the event value and describes the structure of the row that was changed. - The second
payload
field is part of the event value. It has the structure described by the previousschema
field and contains the actual data for the row that was changed.
- The first
Insert more data and verify the capture events in the Kafka topic.
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);
Create the sink connector
Create a new file
kafka-connector-sink.yaml
.$ nano kafka-connector-sink.yaml
Add the following contents to the file.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: jdbc-sink-connector labels: strimzi.io/cluster: kafka-connect-cluster spec: class: io.confluent.connect.jdbc.JdbcSinkConnector tasksMax: 2 config: topics: myserver.public.orders connection.url: "jdbc:postgresql:mydb.vultrdb.com:16751/defaultdb?user=example-user&password=database-password" dialect.name: PostgreSqlDatabaseDialect transforms: unwrap transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop.tombstones: false auto.create: false insert.mode: upsert delete.enabled: true pk.fields: order_id pk.mode: record_key table.name.format: public.orders_sink
Replace the update the
connection.url
to match your actual Vultr Managed Database for PostgreSQL details.Apply the connector.
$ kubectl apply -f kafka-connector-sink.yaml -n kafka
Verify that the connector is created.
$ kubectl get kafkaconnector -n kafka
Output:
NAME CLUSTER CONNECTOR CLASS MAX TASKS debezium-postgres-source-connector kafka-connect-cluster io.debezium.connector.postgresql.PostgresConnector 2 jdbc-sink-connector kafka-connect-cluster io.confluent.connect.jdbc.JdbcSinkConnector 2
Verify Kafka Integration with the Database
In your database session, insert more data to the
orders
table as below.INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 400); INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 500); INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);
Verify that you can view all JSON event changes.
$ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning
View the
orders_sink
table and verify that the new data is persisted.select * from public.orders_sink
Your output should look like the one below.
order_id | customer_id | order_date ----------+-------------+------------ 1 | 300 | 19511 2 | 42 | 19511 3 | 400 | 19511 4 | 500 | 19511
To synchronize more data in the
orders_sink
table, continue adding records.
Troubleshooting
Unable to View Kafka Connections to the PostgreSQL Database.
Verify that you added the correct PostgreSQL database connection details in your files such as
kafka-connector-source.yaml
. Check that the correct port number is used, and the host details match your Vultr Database hostname.View the Kafka Connect logs to investigate more on the error.
Fetch logs.
$ kubectl logs -f $(kubectl get pod -n kafka -l=strimzi.io/cluster=kafka-connect-cluster -o jsonpath='{.items[0].metadata.name}') -n kafka
The Source connector returns a
myserver.orders=LEADER_NOT_AVAILABLE
warning.Verify that your database connection details are correct and match your Vultr Managed Database for PostgreSQL credentials.
Verify that all cluster pods are available and running.
Fetch list of pods.
$ kubectl get pods -n kafka
If your output looks like the one below:
NAME READY STATUS RESTARTS AGE kafka-cluster-entity-operator-75d9dcc9c4-4lzch 3/3 Running 0 92m kafka-cluster-kafka-0 1/1 Running 0 85m kafka-cluster-zookeeper-0 1/1 Running 0 93m kafka-connect-cluster-connect-68586f8c97-zcdwz 0/1 Pending 0 58m strimzi-cluster-operator-64d7d46fc-dbh26 1/1 Running 0 98m
Describe and Check the
Pending
pod logs and verify that your Kubernetes Cluster has enough resources to run the cluster.$ kubectl describe pods kafka-connect-cluster-connect-68586f8c97-zcdwz -n kafka
Check the operator logs.
$ kubectl logs deployment/strimzi-cluster-operator -n kafka -f
Conclusion
You have set up a data pipeline on a Vultr Kubernetes Engine (VKE) cluster by integrating Apache Kafka with a Vultr Managed Database for PostgreSQL. To achieve this, you configured the source (Debezium), and sink connectors (JBDC Sink) to synchronize data from one PostgreSQL table to another.