Prerequisites
- PostgreSQL 17 installed and running.
• Java 11 or later installed.
• Apache Kafka and Kafka Connect.
• Debezium PostgreSQL connector JAR placed in Kafka Connect plugin path.
• A PostgreSQL user with replication privileges.
• curl and unzip utilities installed.
Install Apache Kafka
- Download Kafka:
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
- Start Zookeeper:
./bin/zookeeper-server-start.sh config/zookeeper.properties
- Start Kafka broker:
./bin/kafka-server-start.sh config/server.properties
Install and Configure Kafka Connect
- Kafka Connect is includedwithKafka distribution.
- DownloadDebeziumPostgreSQL connector and place in plugin path:
mkdir -p plugins/debezium-connector-postgres
tar -xzf debezium-connector-postgres-2.5.1.Final-plugin.tar.gz -C plugins/debezium-connector-postgres
- Start Kafka Connect:
./bin/connect-distributed.sh config/connect-distributed.properties
Configure PostgreSQL for Logical Replication
Edit postgresql.conf:
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
Edit pg_hba.conf:
host replication debezium_user 0.0.0.0/0 md5
Restart PostgreSQL.
Create Replication User
Run the following SQL in psql:
CREATE ROLE debezium_user WITH LOGIN PASSWORD ‘password’ REPLICATION;
GRANT CONNECT ON DATABASE mydb TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
Verify Logical Decoding
Run in psql:
SHOW wal_level;
Ensure it returns ‘logical’. Debezium uses the built-in ‘pgoutput’ plugin in PostgreSQL 17.
Configure Kafka Connect with Debezium
Create a JSON config file (e.g., postgres17-connector.json):
{
“name”: “postgres17-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “localhost”,
“database.port”: “5432”,
“database.user”: “debezium_user”,
“database.password”: “password”,
“database.dbname”: “mydb”,
“database.server.name”: “pgserver17”,
“plugin.name”: “pgoutput”,
“slot.name”: “debezium_slot”,
“publication.name”: “debezium_pub”,
“topic.prefix”: “postgres17”,
“snapshot.mode”: “initial”
}
}
Deploy Connector
Send the config to Kafka Connect REST API:
curl -X POST -H “Content-Type: application/json” \
–data @postgres17-connector.json \
http://localhost:8083/connectors
Validate Setup
Check connector status:
curl http://localhost:8083/connectors/postgres17-connector/status
Insert/update/delete rows in PostgreSQL and verify Kafka topic:
kafka-console-consumer –bootstrap-server localhost:9092 \
–topic postgres17.public.mytable –from-beginning