How to Run a Data Pipeline on Vultr Kubernetes Engine (VKE) Using Kafka Connect

Updated on June 28, 2023
How to Run a Data Pipeline on Vultr Kubernetes Engine (VKE) Using Kafka Connect header image

Introduction

Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate data streams with other parts of your system in a secure, reliable, and scalable manner.

To build integration solutions, you can use the Kafka Connect framework to integrate with external systems. There are two types of Kafka connectors:

  • Source connector: Used to move data from source systems to Kafka topics.
  • Sink connector: Used to send data from Kafka topics into target (sink) systems.

When integrating with relational databases, the JDBC connector (source and sink) and Debezium (source) connectors are widely used solutions. The JDBC connector works by pulling the database table to retrieve data, while Debezium relies on change data capture.

In this article, you will learn how to run a data pipeline using Kafka Connect on a Vultr Kubernetes Engine (VKE) cluster with the open-source Strimzi project. Then, you will integrate a Vultr Managed Database for PostgreSQL with Apache Kafka by creating source and sink connectors to synchronize data from one PostgreSQL table to another.

Prerequisites

Before you begin:

  1. Deploy a Vultr Kubernetes Engine (VKE) cluster with at least 3 nodes.
  2. Create a Vultr Managed Database for PostgreSQL.
  3. Install the kubectl CLI tool to access the cluster.
  4. Install the PostgreSQL psql CLI tool on your computer.
  5. Install Docker on your computer.

Set up the PostgreSQL database

  1. Connect to your Vultr Managed Database for PostgreSQL.

     $ psql "host=mydb.vultrdb.com port=5432 dbname=defaultdb user=example-user password=database-password sslmode=require"

    Replace mydb.vultrdb.com with your actual Vultr host details, example-user with your actual database user, and database-password with your actual PostgreSQL database password.

    When connected, your prompt should change to:

     defaultdb=>
  2. Create the orders table.

     CREATE TABLE orders (
         order_id serial PRIMARY KEY,
         customer_id integer,
         order_date date
     );
  3. Create the orders_sink table that will act as a target for records sent to the Kafka topic.

     CREATE TABLE orders_sink (
         order_id serial PRIMARY KEY,
         customer_id integer,
         order_date integer
     );

Install Strimzi on Vultr Kubernetes Engine

  1. Create a new Kafka namespace.

     $ kubectl create namespace kafka
  2. Apply the Strimzi installation files, including ClusterRoles, ClusterRoleBindings, and Custom Resource Definitions (CRDs).

     $ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
  3. Verify that the Strimzi cluster operator pod is available and running.

     $ kubectl get pod -n kafka -w

    Your output should look like the one below:

     NAME                                        READY   STATUS    RESTARTS   AGE
     strimzi-cluster-operator-56d64c8584-7k6sr   1/1     Running   0          43s

Set up a single-node Apache Kafka cluster

  1. Create a directory to store cluster files.

     $ mkdir vultr-vke-kafka-connect
  2. Switch to the directory.

     $ cd vultr-vke-kafka-connect
  3. Using a text editor of your choice, create a new kafka-cluster.yaml manifest.

     $ nano kafka-cluster.yml
  4. Add the following contents to the file.

     apiVersion: kafka.strimzi.io/v1beta2
     kind: Kafka
     metadata:
     name: kafka-cluster
     spec:
     kafka:
      version: 3.3.1
      replicas: 1
      listeners:
        - name: plain
          port: 9092
          type: internal
          tls: false
        - name: tls
          port: 9093
          type: internal
          tls: true
      config:
        offsets.topic.replication.factor: 1
        transaction.state.log.replication.factor: 1
        transaction.state.log.min.isr: 1
        default.replication.factor: 1
        min.insync.replicas: 1
        inter.broker.protocol.version: "3.3"
      storage:
        type: ephemeral
     zookeeper:
      replicas: 1
      storage:
        type: ephemeral
     entityOperator:
      topicOperator: {}
      userOperator: {}

    Save and close the file.

  5. Apply the cluster.

     $ kubectl apply -f kafka-cluster.yml -n kafka
  6. Verify that the Apache Kafka cluster is created.

     $ kubectl get kafka -n kafka

    Output:

     NAME            DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
     kafka-cluster   1                        1                     True    True
  7. Verify that the Kafka pod is available.

     $ kubectl get pod/kafka-cluster-kafka-0 -n kafka

    Output:

     NAME                   READY   STATUS    RESTARTS   AGE
     kafka-cluster-kafka-0   1/1     Running   0         1m23s

Create a Docker image for Kafka Connect

  1. Create a new Dockerfile.

     $ nano Dockerfile
  2. Add the following contents to the file.

     FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
     USER root:root
    
     RUN mkdir /opt/kafka/plugins
    
     RUN curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.9.0.Final/debezium-connector-postgres-1.9.0.Final-plugin.tar.gz \
      && tar -xf debezium-connector-postgres-1.9.0.Final-plugin.tar.gz -C plugins
    
     RUN mkdir /opt/kafka/plugins/kafka-connect-jdbc
    
     RUN curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.6.0/kafka-connect-jdbc-10.6.0.jar \
      && mv kafka-connect-jdbc-10.6.0.jar plugins/kafka-connect-jdbc \
      && cp plugins/debezium-connector-postgres/postgresql-42.3.3.jar plugins/kafka-connect-jdbc
    
     USER 1001

    Save and close the file.

    The above file uses quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 as the base image, creates the /opt/kafka/plugins directory, then downloads the Debezium PostgreSQL and Kafka Connect JDBC sink plugins to the directory.

  3. To publish the image, log in to Docker.

     $ sudo docker login

    Login to your Docker account as displayed in the following output:

     Login with your Docker ID to push and pull images from Docker Hub.
     Username: 
     Password:
  4. Build the image.

     $ sudo docker build -t vke-kafka-connect-demo .
  5. Tag the image.

     $ sudo docker tag vke-kafka-connect-demo example-user/vke-kafka-connect-demo

    Replace example-user with your actual Docker account username.

  6. Push the Image to Docker Hub.

     $ sudo docker push example-user/vke-kafka-connect-demo

    Output:

     The push refers to repository [docker.io/example-user/vke-kafka-connect-demo]
     5985a7b633b1: Pushed
     a1cac272dd78: Pushed
     afd5d00d6422: Pushed

Deploy a Kafka Connect cluster

  1. Create a new file kafka-connect.yaml.

     $ nano kafka-connect.yaml
  2. Add the following contents to the file.

     apiVersion: kafka.strimzi.io/v1beta2
     kind: KafkaConnect
     metadata:
      name: kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true"
     spec:
      image: example-user/vke-kafka-connect-demo
      replicas: 1
      bootstrapServers: kafka-cluster-kafka-bootstrap:9092
      config:
        group.id: my-kafka-connect-cluster
        offset.storage.topic: kafka-connect-cluster-offsets
        config.storage.topic: kafka-connect-cluster-configs
        status.storage.topic: kafka-connect-cluster-status
        key.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: true
        value.converter.schemas.enable: true
        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1
      resources:
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      jvmOptions:
        "-Xmx": "1g"
        "-Xms": "1g"

    Replace example-user/vke-kafka-connect-demo with your actual Docker image path.

    Save and close the file.

  3. Create the cluster.

     $ kubectl apply -f kafka-connect.yaml -n kafka
  4. Verify that the Kafka Connect cluster is available and running.

     $ kubectl get pod -l=strimzi.io/cluster=kafka-connect-cluster -n kafka

    Your output should look like the one below.

     NAME                                            READY   STATUS    RESTARTS   AGE
     kafka-connect-cluster-connect-f9849ccdb-5lnjc   1/1     Running   0          60s

Create the Kafka source connector

  1. Create a new file kafka-connector-source.yaml.

     $ nano kafka-connector-source.yaml
  2. Add the following contents to the file.

     apiVersion: kafka.strimzi.io/v1beta2
     kind: KafkaConnector
     metadata:
      name: debezium-postgres-source-connector
      labels:
        strimzi.io/cluster: kafka-connect-cluster
     spec:
      class: io.debezium.connector.postgresql.PostgresConnector
      tasksMax: 2
      config:
        database.hostname: mydb.vultrdb.com
        database.port: 16751
        database.user: example-admin
        database.password: "database-password"
        database.dbname: defaultdb
        database.server.name: myserver
        plugin.name: wal2json
        table.include.list: public.orders
        value.converter: org.apache.kafka.connect.json.JsonConverter
        database.sslmode: require
        publication.autocreate.mode: filtered

    Replace the connector configuration properties as follows.

    • database.hostname: - Your Vultr Managed PostgreSQL database host.
    • database.port: - The Postgres database port.
    • database.user: - Valid database user.
    • database.password: - Your Vultr Managed Database for PostgreSQL password.
    • database.dbname: - The database name to stream the Kafka changes from. As created earlier,public.orders. Public represents the database schema name.
    • database.sslmode: - Encrypted the connection to your database. The value require enforces a secure connection to your Vultr Managed Database for PostgreSQL.
    • table.include.list: - The table name to capture changes.
  3. Apply the connector to the Kafka namespace.

     $ kubectl apply -f kafka-connector-source.yaml -n kafka
  4. Verify that the connector is added.

     $ kubectl get kafkaconnector -n kafka

    Output:

     NAME                                 CLUSTER                 CONNECTOR CLASS                                      MAX TASKS
     debezium-postgres-source-connector   kafka-connect-cluster   io.debezium.connector.postgresql.PostgresConnector   2

Verify the Source Connector functionality

  1. In a new terminal session, access your PostgreSQL database, and insert data to the orders table as below.

     INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 300);
  2. In your Kubernetes session, start a Kafka consumer and confirm data in the Kafka topic as below.

     $ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning

    You should see records in the Kafka topic in the form of a JSON payload. The change event JSON is big because it contains information about the changed record, including its schema as below.

     {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"order_date"}],"optional":true,"name":"myserver.public.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"}}

    Every event generated by the connector has the following components (repeated twice) - schema and payload.

    • The first schema field corresponds to the event key and describes the structure of the table's primary key.
    • The first payload field is part of the event key and it contains the key for the row that was changed.
    • The second schema field corresponds to the event value and describes the structure of the row that was changed.
    • The second payload field is part of the event value. It has the structure described by the previous schema field and contains the actual data for the row that was changed.
  3. Insert more data and verify the capture events in the Kafka topic.

     INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);

Create the sink connector

  1. Create a new file kafka-connector-sink.yaml.

     $ nano kafka-connector-sink.yaml
  2. Add the following contents to the file.

     apiVersion: kafka.strimzi.io/v1beta2
     kind: KafkaConnector
     metadata:
      name: jdbc-sink-connector
      labels:
        strimzi.io/cluster: kafka-connect-cluster
     spec:
      class: io.confluent.connect.jdbc.JdbcSinkConnector
      tasksMax: 2
      config:
        topics: myserver.public.orders
        connection.url: "jdbc:postgresql:mydb.vultrdb.com:16751/defaultdb?user=example-user&password=database-password"
        dialect.name: PostgreSqlDatabaseDialect
        transforms: unwrap
        transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
        transforms.unwrap.drop.tombstones: false
        auto.create: false
        insert.mode: upsert
        delete.enabled: true
        pk.fields: order_id
        pk.mode: record_key
        table.name.format: public.orders_sink

    Replace the update the connection.url to match your actual Vultr Managed Database for PostgreSQL details.

  3. Apply the connector.

     $ kubectl apply -f kafka-connector-sink.yaml -n kafka
  4. Verify that the connector is created.

     $ kubectl get kafkaconnector -n kafka

    Output:

     NAME                                 CLUSTER                 CONNECTOR CLASS                                      MAX TASKS
     debezium-postgres-source-connector   kafka-connect-cluster   io.debezium.connector.postgresql.PostgresConnector   2
     jdbc-sink-connector                  kafka-connect-cluster   io.confluent.connect.jdbc.JdbcSinkConnector          2

Verify Kafka Integration with the Database

  1. In your database session, insert more data to the orders table as below.

     INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 400);
     INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 500);
     INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);
  2. Verify that you can view all JSON event changes.

     $ kubectl exec -n kafka kafka-cluster-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic myserver.public.orders --from-beginning
  3. View the orders_sink table and verify that the new data is persisted.

     select * from public.orders_sink

    Your output should look like the one below.

      order_id | customer_id | order_date 
     ----------+-------------+------------
             1 |         300 |      19511
             2 |          42 |      19511
             3 |         400 |      19511
             4 |         500 |      19511

    To synchronize more data in the orders_sink table, continue adding records.

Troubleshooting

  1. Unable to View Kafka Connections to the PostgreSQL Database.

    • Verify that you added the correct PostgreSQL database connection details in your files such as kafka-connector-source.yaml. Check that the correct port number is used, and the host details match your Vultr Database hostname.

    • View the Kafka Connect logs to investigate more on the error.

    Fetch logs.

     $ kubectl logs -f $(kubectl get pod -n kafka -l=strimzi.io/cluster=kafka-connect-cluster -o jsonpath='{.items[0].metadata.name}') -n kafka
  2. The Source connector returns a myserver.orders=LEADER_NOT_AVAILABLE warning.

    • Verify that your database connection details are correct and match your Vultr Managed Database for PostgreSQL credentials.

    • Verify that all cluster pods are available and running.

    Fetch list of pods.

     $ kubectl get pods -n kafka

    If your output looks like the one below:

         NAME                                                READY   STATUS    RESTARTS   AGE
         kafka-cluster-entity-operator-75d9dcc9c4-4lzch      3/3     Running   0          92m
         kafka-cluster-kafka-0                               1/1     Running   0          85m
         kafka-cluster-zookeeper-0                           1/1     Running   0          93m
         kafka-connect-cluster-connect-68586f8c97-zcdwz      0/1     Pending   0          58m
         strimzi-cluster-operator-64d7d46fc-dbh26            1/1     Running   0          98m

    Describe and Check the Pending pod logs and verify that your Kubernetes Cluster has enough resources to run the cluster.

     $ kubectl describe pods kafka-connect-cluster-connect-68586f8c97-zcdwz -n kafka

    Check the operator logs.

     $ kubectl logs deployment/strimzi-cluster-operator -n kafka -f

Conclusion

You have set up a data pipeline on a Vultr Kubernetes Engine (VKE) cluster by integrating Apache Kafka with a Vultr Managed Database for PostgreSQL. To achieve this, you configured the source (Debezium), and sink connectors (JBDC Sink) to synchronize data from one PostgreSQL table to another.

Next Steps