Skip to content

Latest commit

 

History

History

apache-pulsar

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Streaming Postgres Database Changes to Apache Pulsar Using Debezium

This example shows how to consume change events programmatically using the Debezium's embedded mode, This approach allows to stream database changes to arbitrary destinations. The demo shows how to stream changes from a Postgres database to Apache Pulsar. However, if you want to stream change events into Apache Pulsar in a production scenario, take a look at the ready-made sink for Debezium Server.

Note: An alternative approach for ingesting change events from Debezium into Apache Pulsar is to use Pulsar IO, which comes with support for Debezium's connectors as of Pulsar 2.3.

Prerequisites

  • Java 11 development environment
  • Docker installation

Starting Apache Pulsar and Postgres

  • Start up a single Pulsar node via Docker:
docker run -it -p 6650:6650 -p 8080:8080 --rm --name pulsar apachepulsar/pulsar:2.11.0 bin/pulsar standalone
  • Start up Postgres via Docker, with the example database used in the Debezium tutorial:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres quay.io/debezium/example-postgres:2.1
  • Launch psql to run some SQL queries:
docker run -it --rm -e PGOPTIONS="--search_path=inventory" -e PGPASSWORD=postgres --link postgres:postgres quay.io/debezium/example-postgres:2.1 psql -h postgres -U postgres

Building the Source Code and Running Debezium Embedded

mvn clean package
java -jar target/apache-pulsar-1.0-SNAPSHOT-jar-with-dependencies.jar

To configure Debezium parameters refer to config.properties. All the config.properties keys can be overridden using environment variables. For example, in order to set database.password=password set this environment variable: DATABASE_PASSWORD=password.

Each table will be published to its own topic, named like so: persistent://public/default/<server>.<schema>.<table>.

Testing

Modify a record in psql, e.g. like this:

update customers set first_name = 'Sarah' where id = 1001;

Consume the corresponding change event from the Pulsar topic, e.g. using the command line client (by default it will consume one event and then return):

docker exec -i pulsar bin/pulsar-client consume -s my-subscription -n 0 persistent://public/default/test.inventory.customers