Integrate PostgreSQL and Apache Kafka with Debezium

Updated on July 25, 2024
Integrate PostgreSQL and Apache Kafka with Debezium header image

Introduction

This article demonstrates how to integrate a Vultr Managed Database for PostgreSQL with Apache Kafka using the Debezium source connector for PostgreSQL. Behind the scenes, this is made possible by Change Data Capture (also referred to as CDC), which can track row-level create, update and delete operations in PostgreSQL tables. These change data capture events can be used to integrate PostgreSQL with other systems.

Change data capture is a popular solution, and it is often combined with Apache Kafka and Kafka Connect.

Apache Kafka

Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate these data streams with other parts of your system in a secure, reliable, and scalable manner.

Some key components of Kafka include:

  • Broker (Node): A Kafka broker runs the Kafka JVM process. A best practice is to run three or more brokers for scalability and high availability. These groups of Kafka brokers form a cluster.
  • Producers: These are client applications that send messages to Kafka. Each message is nothing but a key-value pair.
  • Topics: Events (messages) are stored in topics, and each topic has one or more partitions. Data in each of these partitions are distributed across the Kafka cluster for high availability and redundancy.
  • Consumers: Just like producers, consumers are also client applications. They receive and process data/events from Kafka topics.

Kafka Connect

To build integration solutions, you can use the Kafka Connect framework, which provides a suite of connectors to integrate Kafka with external systems. There are two types of Kafka connectors:

  • Source connector: Used to move data from source systems to Kafka topics.
  • Sink connector: Used to send data from Kafka topics into target (sink) systems.

Debezium

Debezium is a set of source connectors for Kafka Connect. You can use it to capture changes in your databases so that your applications can respond to them in real-time. It records all row-level changes within each database table in a change event stream, and applications read these streams to see the change events in the same order in which they occurred.

When it comes to integrating with relational databases, the JDBC connector (source and sink) and Debezium (source) connectors are widely used solutions. The JDBC connector works by polling the database table to retrieve data, while Debezium relies on change data capture.

Demonstration Scenario

The solution demonstrated in this article consists of the following components:

  • PostgreSQL - You will set up a Vultr Managed Database for PostgreSQL.
  • Apache Kafka and Zookeeper - These will run as Docker containers and are orchestrated using Docker compose.
  • Kafka Connect - A Kafka Connect instance will also run as a Docker container. The Debezium PostgreSQL connector will be installed in this instance.

You will create a sample table (called orders) in the PostgreSQL database. Changes made to data in this table will be detected by the Debezium PostgreSQL connector and sent to a Kafka topic. Underneath the hood, the connector uses a Postgres output plugin and processes the changes produced by the output plugin using a combination of the PostgreSQL streaming replication protocol and the PostgreSQL JDBC driver.

Prerequisites

Before following the steps in this guide, you need to:

  1. Deploy a new Ubuntu 22.04 LTS Vultr cloud server for Kafka.

  2. Create a non-root sudo user.

  3. Install Docker.

  4. Install Docker Compose

  5. Install psql which is an interactive terminal for PostgreSQL. Because this guide uses a Vultr Managed Database for PostgreSQL, you only need the command-line client to query the database.

     $ sudo apt install -y postgresql-client
  6. Install curl, which is a popular command-line HTTP client.

Create a PostgreSQL Database

Log into your Vultr account, navigate to Add Managed Database and follow the steps below.

  1. Choose the PostgreSQL database engine.

    Choose the PostgreSQL database engine

  2. Choose a Server Type: Cloud Compute, Cloud Compute High Performance - AMD or Intel, Optimized Cloud Compute - General Purpose, Storage or Memory Optimized. Along with that, you should also select zero or more replica nodes as well as the cluster location. A replica node is the same server type and plan as the primary node. You can opt for the Cloud Compute Server Type without a replica node.

    Choose server type and replica nodes

  3. After you add a label for the database cluster, click Deploy Now to create the cluster. It will take a few minutes for the cluster to be available, and the Status should change to Running.

    Running state

After the database is ready, you can proceed with the next steps.

Create a Table

Get the connection string details for the PostgreSQL database.

  1. In the Managed Database section of the Vultr customer portal, click the Manage icon to open the Overview tab.

  2. From Connection Details section, click Copy Connection String.

  3. SSH as the non-root user to the Kafka server you deployed in the Prerequisites section.

  4. In the SSH session, paste the psql connection string you copied from the Vultr customer portal.

    After a successful database connection, you should see this prompt as the output:

     defaultdb=>
  5. To create the orders table, enter this SQL:

     CREATE TABLE orders (
         order_id serial PRIMARY KEY,
         customer_id integer,
         order_date date
     );

    After successful table creation, you should see this output:

     CREATE TABLE
  6. To confirm, you can query the table:

     select * from orders;

    Because there are no rows in the table, you should see this output:

     order_id | customer_id | order_date 
     ----------+-------------+------------
     (0 rows)

Start the Services

  1. On the Kafka server, create a new directory and change to it.

     $ mkdir vultr-postgres-kafka
     $ cd vultr-postgres-kafka
  2. Create a new file, docker-compose.yaml:

     $ touch docker-compose.yaml
  3. Add the following to docker-compose.yaml and save it:

     version: "2"
     services:
         zookeeper:
             image: quay.io/debezium/zookeeper
             ports:
             - 2181:2181
         kafka:
             image: quay.io/debezium/kafka
             ports:
             - 9092:9092
             links:
             - zookeeper
             depends_on:
             - zookeeper
             environment:
             - ZOOKEEPER_CONNECT=zookeeper:2181
             - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
         kafka-connect:
             image: debezium/connect
             ports:
             - 8083:8083
             links:
             - kafka
             depends_on:
             - kafka
             environment:
             - BOOTSTRAP_SERVERS=kafka:9092
             - GROUP_ID=1
             - CONFIG_STORAGE_TOPIC=kafka_connect_configs
             - OFFSET_STORAGE_TOPIC=kafka_connect_offsets
             - STATUS_STORAGE_TOPIC=kafka_connect_statuses

    * The zookeeper service is created from the quay.io/debezium/zookeeper Docker image and exposes port 2181.

    • The kafka service is created from the quay.io/debezium/kafka Docker image and exposes port 9092. It is linked to the zookeeper service, and the following Kafka-related environment variables have also been configured:
      • ZOOKEEPER_CONNECT - The Zookeeper node to connect to.
      • KAFKA_ADVERTISED_LISTENERS - Advertised Kafka listener.
    • The kafka-connect service is created from debezium/connect Docker image and exposes port 8083. It is linked to the kafka service, and the following Kafka Connect-related environment variables have also been configured:
      • BOOTSTRAP_SERVERS - The Kafka broker to connect to.
      • GROUP_ID - Consumer group ID assigned to Kafka Connect consumer.
      • CONFIG_STORAGE_TOPIC - Topic to store connector configuration.
      • OFFSET_STORAGE_TOPIC - Topic to store connector offsets.
      • STATUS_STORAGE_TOPIC - Topic to store connector status.
  4. To start all the services, open a new SSH session to the Kafka server and enter the command below.

     $ docker-compose --project-name postgres-kafka-cdc up

It will take some time for all the Docker containers to be ready. Once all containers have started, proceed to the next step.

Configure the Debezium Connector

Before you continue, get the connection details for Vultr PostgreSQL Managed Database.

  1. Click the Manage icon to open the Overview tab.

  2. From Connection Details section, note down the following attributes: username, password, host, port, and database.

    Get Postgres details

  3. On the Kafka server, create a new file, source-connector.json:

     $ touch source-connector.json
  4. Add the following to source-connector.json file and save it. Make sure to substitute the PostgreSQL connection parameters for the Vultr PostgreSQL Managed Database you obtained in the previous step.

     {
         "name": "source-connector",
         "config": {
             "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
             "database.hostname": "[Vultr_hostname]",
             "database.port": "[Vultr_port]",
             "database.user": "[Vultr_username]",
             "database.password": "[Vultr_password]",
             "database.dbname": "defaultdb",
             "database.server.name": "myserver",
             "plugin.name": "wal2json",
             "table.include.list": "public.orders",
             "value.converter": "org.apache.kafka.connect.json.JsonConverter",
             "database.sslmode": "require"
         }
     }

    Here is a summary of the connector configuration properties:

    • connector.class - The name of the Java class for the PostgreSQL connector.
    • database.hostname - Hostname of the Vultr Managed PostgreSQL database.
    • database.port - Port number of the Vultr Managed PostgreSQL database.
    • database.user - Name of the database user for connecting to the Vultr Managed PostgreSQL database.
    • database.password - Password to use when connecting to the Vultr Managed PostgreSQL database.
    • database.dbname - The name of the PostgreSQL database from which to stream the changes.
    • database.sslmode - Whether to use an encrypted connection. The value require implies that a secure connection will be established with the Vultr PostgreSQL Managed database.
    • table.include.list - Name of the table whose changes you want to capture.
  5. Invoke the Kafka Connect REST endpoint to create the connector.

     $ curl -X POST -H "Content-Type: application/json" --data @source-connector.json http://localhost:8083/connectors

After you have started Kafka, Zookeeper, and Kafka Connect as Docker containers and installed the Debezium source connector, follow the steps below to check the integration and verify whether the data is flowing from the PostgreSQL table to the Kafka topic.

Test the Integration

  1. Go back to the SSH session where you connected to PostgreSQL using psql.

  2. Execute the SQL command below to insert data into the table:

     INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 300);

    If successful, you should see the following output:

     INSERT 0 1
  3. Check the inserted record:

     select * from orders;

    You should see the following output:

     order_id | customer_id | order_date 
     ----------+-------------+------------
             1 |         300 | 2022-12-17
     (1 row)
  4. Open a second SSH session as the non-root user to the Kafka server.

  5. In the new SSH session, start a Kafka consumer with the Kafka CLI and confirm data in the Kafka topic. Use docker exec to access the Kafka docker container.

     $ docker exec -it postgres-kafka-cdc_kafka_1 bash
  6. Start the Kafka consumer.

     $ cd bin && ./kafka-console-consumer.sh --topic myserver.public.orders --bootstrap-server kafka:9092 --from-beginning

The topic myserver.public.orders_info is auto-generated by the Debezium connector based on a pre-defined naming convention which is <database server name>.<schema name>.<table name>.

You should see records in the Kafka topic in the form of a JSON payload. The change event JSON is rather big since it contains all the information about the changed record, including its schema.

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"order_date"}],"optional":true,"name":"myserver.public.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"order_id"},{"type":"int32","optional":true,"field":"customer_id"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"order_date"}],"optional":true,"name":"myserver.public.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"myserver.public.orders.Envelope"},"payload":{"before":null,"after":{"order_id":1,"customer_id":300,"order_date":19343},"source":{"version":"1.9.5.Final","connector":"postgresql","name":"myserver","ts_ms":1671285419136,"snapshot":"false","db":"defaultdb","sequence":"[null,\"1946158400\"]","schema":"public","table":"orders","txId":4195,"lsn":1946158400,"xmin":null},"op":"c","ts_ms":1671285420872,"transaction":null}}

Every change event generated by the connector has the following components (repeated twice) - schema and payload.

  • The first schema field corresponds to the event key and describes the structure of the table's primary key. The first payload field is part of the event key, and it contains the key for the row that was changed.
  • The second schema field corresponds to the event value and describes the structure of the row that was changed. The second payload field is part of the event value. It has the structure described by the previous schema field and contains the actual data for the row that was changed.

Go to the first SSH session, where you have the psql terminal, and continue to insert more data in the table:

INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 400);
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 500);
INSERT INTO orders (order_date, customer_id) VALUES (current_timestamp, 42);

In the second SSH session, where you have the Kafka consumer terminal, you should see more change data capture records being sent to the Kafka topic.

Cleanup

To shut down the Docker containers, run the following command from a new terminal:

$ docker-compose --project-name postgres-kafka-cdc down -v

After you have completed the tutorial in this article, you can delete the Vultr PostgreSQL Managed Database.

Delete the Vultr PostgreSQL Managed Database

Log into your Vultr account and navigate to Managed Databases.

  1. For the database you just created, choose the delete icon.
  2. In the Destroy Managed Database? pop-up window, select the checkbox Yes, destroy this Managed Database. and click on Destroy Managed Database

Delete database

Conclusion

In this article, you used the Debezium Kafka source connector for PostgreSQL to integrate a PostgreSQL database with Kafka using change data capture. By doing this, changes made to your PostgreSQL database tables were detected in real-time and sent to a Kafka topic.

Next steps

Now that the data is in the Kafka topic, you can use the respective Kafka Connect sink connectors to integrate other services, including databases (MySQL), search indices (Elasticsearch), and more.

You can also learn more in the following documentation: