Bitcoin

Building a Real-Time Change Data Capture Pipeline with Debezium, Kafka, and PostgreSQL

Change Data Capture (CDC) is one of the critical patterns in modern data engineering that allows you to track and stream changes from a database in near real time. This is useful for syncing microservices, building data lakes, or feeding analytics systems with the latest data..

In this article, we will look at how to set up a CDC pipeline using:

  • PostgreSQL as the source database
  • Debezium as the CDC engine
  • Apache Kafka for event streaming
  • Kafka Connect to integrate Debezium and PostgreSQL
  • A Kafka consumer application in Python to read changes

Project Setup

Create a file called docker-compose.yml with the code below. This sets up a container environment for the CDC setup using postgres and kafka

version: '3.7' 

services:
   zookeeper:
     image: confluentinc/cp-zookeeper:7.3.0
     environment:
       ZOOKEEPER_CLIENT_PORT: 2181

   kafka:
     image: confluentinc/cp-kafka:7.3.0
     depends_on:
       - zookeeper
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1


   postgres:
     image: postgres:13
     environment:
       POSTGRES_USER: demo
       POSTGRES_PASSWORD: demo
       POSTGRES_DB: demo

   connect:
     image: debezium/connect:2.2
     depends_on:
       - kafka
       - postgres
     ports:
       - 8083:8083
     environment:
       BOOTSTRAP_SERVERS: kafka:9092
       GROUP_ID: 1
       CONFIG_STORAGE_TOPIC: my_connect_configs
       OFFSET_STORAGE_TOPIC: my_connect_offsets
       STATUS_STORAGE_TOPIC: my_connect_statuses
       KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
       VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
       CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
       CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"

Run this command in bash to start the container and create the environment: docker-compose up -d

Step-1 Enable Logical Replication in PostgreSQL

Debezium relies on logical decoding, so you need to create a replication slot and set up your PostgreSQL database accordingly

Enter the PostgreSQL container

docker exec -it  bash
psql -U demo -d demo
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();
CREATE TABLE customers (
   id SERIAL PRIMARY KEY,
   name VARCHAR(255),
   email VARCHAR(255)
);

Step-2 Register Debezium PostgreSQL connector

Post data to Kafka connect to register the connector

curl -X POST http://localhost:8083/connectors \
   -H "Content-Type: application/json" \
   -d '{
     "name": "postgres-connector",
     "config": {
       "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
       "database.hostname": "postgres",
       "database.port": "5432",
       "database.user": "demo",
       "database.password": "demo",
       "database.dbname": "demo",
       "database.server.name": "dbserver1",
       "table.include.list": "public.customers",
       "plugin.name": "pgoutput",
       "slot.name": "debezium",
       "publication.name": "dbz_publication"
     }
  }'

This creates a kafka topic dbserver1.public.customers

Step-3 Insert Data

Insert sample data into the table

INSERT INTO customers (name, email) VALUES ('Tom', '[email protected]');
INSERT INTO customers (name, email) VALUES ('Cruise', '[email protected]');

Step-4 Consume Data Using Python

Install the Kafka Python client

pip install kafka-python

See the script for Kafka consumer below:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
     'dbserver1.public.customers',
     bootstrap_servers='localhost:9092',
     auto_offset_reset='earliest',
     value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print("Listening for changes...\n")

for message in consumer:
     event = message.value
     payload = event.get("payload")
     if payload:
         print(f"Operation: {payload['op']}")
         print(f"Before: {payload['before']}")
         print(f"After: {payload['after']}")
         print("=" * 40)

Expected output:

Operation: c
Before: None
After: {'id': 1, 'name': 'Tom', 'email': '[email protected]'}

The op field indicates the operation:

  • c: insert (create)
  • u: update
  • d: delete

Some of the applications where these types of pipeline are used include:

  • Data lakes: Streaming DB changes into S3 or Delta Lake.
  • Microservices: Syncing state changes across distributed services.
  • Analytics: Feeding real-time dashboards with up-to-the-minute data.
  • Auditing: Tracking who changed what and when in compliance-heavy industries

Final thoughts

Change Data Capture using Debezium, Kafka, and PostgreSQL opens real-time data replication without intrusive modifications to your application code. Integrating schema registry, adding Prometheus/Grafana monitoring, securing with TLS/ACLs, and containerizing your consumer services will allow this configuration to be extended into production. CDC is among the most potent tools in your data engineering arsenal if you are creating data products or moving to event-driven systems.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button