No items found.

Build an EL(T) from PostgreSQL using Change Data Capture (CDC)

Set up PostgreSQL CDC in minutes using Airbyte, leveraging a powerful tool like Debezium to build a near real-time EL(T).

The benefits of using Change Data Capture (CDC) to replicate data from PostgreSQL into any destination are many – mainly, it allows you to track all changes applied to your database in real-time, including delete operations. The ability to track and replicate delete operations is especially beneficial for ELT pipelines.

In this tutorial, you'll learn how to set up PostgreSQL Change Data Capture in minutes using Airbyte, leveraging a powerful tool like Debezium to build a near real-time EL(T). 

But before we begin, let's clarify some key concepts so you better understand what's happening in every step.

PostgreSQL logical decoding and Debezium

Debezium is an open-source framework for Change Data Capture. It scans databases in real-time and streams every row-level committed operation – such as insert, update, and delete – maintaining the sequence in which the operations were carried out. Airbyte uses Debezium to implement CDC for PostgreSQL, and it encapsulates it to hide the complexity from the user.

Debezium uses PostgreSQL's logical decoding to stream changes as they occur in the database. In this context, we will use the terms logical decoding and CDC interchangeably. 

Logical decoding is a streaming representation of the write-ahead log (WAL), which maintains track of database modifications that happened via SQL. The logical decoding streams observe changes at the database level and are identified by logical replication slots. Hence, a replication slot is a stream of changes in a database, and each database might have several slots.

A plugin is required to convert the write-ahead log’s internal representation into an easy-to-understand format that can be interpreted without knowing the database's internal state. The plugin's output is consumed, in this case, by the Debezium connector.

A logical decoding plugin is a program written in C and installed in the PostgreSQL server. PostgreSQL 10+ implements pgoutput by default, so no extra plugins need to be installed. Many logical decoding plugins are available and, in addition to pgoutput, Airbyte currently supports wal2json.

When using an Airbyte source with Change Data Capture, you don't need to have specific knowledge of the technologies mentioned above. Airbyte takes care of everything, and in general, you only need to make sure to have a compatible logical decoding plugin and a replication slot in your PostgreSQL instance. 

⚠️ If you’re using a PostgreSQL instance in the cloud, such as Amazon RDS, refer to the Debezium connector documentation for the specific requirements your instance may have.

Now, you will learn how to configure Airbyte to replicate data from PostgreSQL to a local file using CDC and you will use Docker to start a PostgreSQL instance. Let's get started!

Prerequisites

Versions used in this tutorial

  • Docker: 4.8.2
  • Compose: 1.29.2
  • debezium/postgres Docker image tag: 13
  • Airbyte: 0.38.1-alpha

Step 1: Start a PostgreSQL Docker container

Use docker to kick start a PostgreSQL container. To do that, run the following command in your terminal.


docker run --name airbyte-postgres -e POSTGRES_PASSWORD=password -p 5432:5432 -d debezium/postgres:13

In this case, we are naming the container airbyte-postgres. You can select a different password.

The debezium/postgres image is used the same manner as the Postgres official image. Additionally, it instructs Postgres to load Debezium's logical decoding plugin, enable the logical decoding feature and configure a single replication slot used by the Debezium connector.

Step 2: Configure your PostgreSQL database

Now, it's time to configure a PostgresSQL schema, user, and necessary privileges. You can use psql, which will allow you to execute queries from the terminal interactively.

To start psql, you need to SSH into the Docker container you just started in the previous step.


docker exec -it airbyte-postgres /bin/bash

Once in the container, start psql.


psql -U postgres

Now, create a schema and set the search path to tell PostgreSQL which schema to look at.


CREATE SCHEMA postgres;
SET search_path TO postgres;

Although the database can be accessed with the root user, it is advisable to use a less privileged read-only user to read data. Create a user called airbyte and assign the password of your choice. 


CREATE USER airbyte PASSWORD 'password';

Then, grant the user access to the relevant schema.


GRANT USAGE ON SCHEMA postgres TO airbyte;

To replicate data from multiple Postgres schemas, you can re-run the command above, but you'll need to set up numerous Airbyte sources connecting to the same database on the different schemas.

Next, grant the user read-only access to the relevant tables.


GRANT SELECT ON ALL TABLES IN SCHEMA postgres TO airbyte;

# Allow airbyte user to see tables created in the future
ALTER DEFAULT PRIVILEGES IN SCHEMA postgres GRANT SELECT ON TABLES TO airbyte;

The user you just created also needs to be granted REPLICATION and LOGIN permissions. 


ALTER USER airbyte REPLICATION LOGIN;

Now, it's time to create a new table and populate it with some data. Create a cars table with an id and the car's name.


CREATE TABLE cars(id INTEGER, NAME VARCHAR(200), PRIMARY KEY (id));
INSERT INTO cars VALUES(0, 'mazda');
INSERT INTO cars VALUES(1, 'honda');

Next, let's create a logical replication slot using the pgoutput plugin. In PostgreSQL, a replication slot is used to hold the state of a replication stream.


SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');

Finally, create a publication to allow subscription to the events of the cars table. We advise users to add only the tables that they want to sync in the publication and not all tables.


CREATE PUBLICATION pub1 FOR TABLE cars;

Step 3: Configure a PostgreSQL source in Airbyte

To set up a new PostgreSQL Airbyte source, go to Airbyte's UI, click on sources and add a new source. 

As the connector type, select Postgres. As demonstrated in the subsequent illustrations, fill in the following configuration fields if you used the instructions above to configure your Postgres database.

  • Name: Postgres CDC Tutorial (or any name you'd like)
  • Host: localhost
  • Port: 5432
  • DB Name: postgres
  • Schemas: postgres
  • User: airbyte
  • Password: password (or any password you assigned to the airbyte user)
  • Connect using SSL: disabled
  • Replication method: Logical replication (CDC)
  • Plugin: pgoutput
  • Replication_slot: airbyte_slot
  • Publication: pub1
  • SSH Tunnel Method: No Tunnel

Then click on Set up source and Airbyte will test the connection. If everything goes well, you should see a successful message.

Step 4: Configure a local JSON destination in Airbyte

Go to destinations and add a new one. As demonstrated in the following diagram, select Local JSON as the destination type and fill in with the following details.

  • Name: JSON CDC Tutorial (or any name you would like)
  • Destination_path: /cdc_tutorial (or any path where you'd like to store the Postgres data)

Then click on Set up source and let Airbyte test the destination.

Step 5: Create an Airbyte connection

Go to connections and create a new connection. Then, select the existing Postgres source you have just created and then do the same for the Local JSON destination. Once you're done, you can set up the connection as follows.

  • Replication Frequency: I recommend setting it to "manual" if you're testing. You can change to any frequency that makes sense to your use case when you're ready.
  • Destination Namespace: Mirror source structure
  • Destination Stream Prefix: You can leave this option blank, as we don't want a prefix at the destination.

Then, it's time to configure the streams, which in this case are the tables in our database. For now, we only have the cars table. If you expand it, you can see the columns it has.

Now, you should select a sync mode. If you want to take full advantage of using Change Data Capture, you should use Incremental | Append mode to only look at the rows that have changed in the source and sync them to the destination. Selecting a Full Refresh mode would sync the whole source table, which is most likely not what you want when using CDC. Learn more about sync modes in our documentation.

When using an Incremental sync mode, we would generally need to provide a Cursor field, but when using CDC, that's not necessary since the changes in the source are detected via the Debezium connector stream.

Once you're ready, save the changes. Then, you can run your first sync by clicking on Sync now. You can check your run logs to verify everything is going well. Just wait for the sync to be completed, and that's it! You've replicated data using Postgres Change Data Capture.

Step 6: Verify that the sync worked

From the root directory of the Airbyte project, go to /tmp/airbyte_local/cdc_tutorial, and you will find a file named _airbyte_raw_cars.jsonl where the data from the PostgreSQL database was replicated.

You can check the file's contents in your preferred IDE or run the following command.


cat _airbyte_raw_cars.jsonl

Step 7: Test CDC in action by creating and deleting an object from the database

Now, let's test the CDC setup we have configured. To do that, run the following queries to insert and delete a row from the database.


INSERT INTO cars VALUES(3, 'tesla');
DELETE FROM cars WHERE NAME = 'tesla';

Launch a sync and, once it finishes, check the local JSON file to verify that CDC has captured the change. The JSON file should now have two new lines, showing the addition and deletion of the row from the database. 

We confirm that CDC allows you to see that a row was deleted, which would be impossible to detect when using the regular Incremental sync mode. The _ab_cdc_deleted_at meta field not being null means id=3 was deleted.

Wrapping up

In this tutorial you have learned how logical decoding works in PostgreSQL and how to leverage it to implement an EL(T) using Airbyte. Using CDC to capture database changes is one of the best ways to replicate data, especially when you have huge amounts of it and the need to track delete operations in the source database.

If you want to easily try out Airbyte, you might want to check our fully managed solution: Airbyte Cloud. We also invite you to ‍join the conversation on our community Slack Channel to share your ideas with thousands of data engineers and help make everyone’s project a success!

Similar use cases

Airflow and Airbyte OSS - Better Together

Learn how to create an Airflow DAG (directed acyclic graph) that triggers Airbyte synchronizations.

Explore Airbyte's incremental refresh data synchronization

Learn how Airbyte’s incremental synchronization replication modes work.

Version control Airbyte configurations with Octavia CLI

Use Octavia CLI to import, edit, and apply Airbyte application configurations to replicate data from Postgres to BigQuery.