The benefits of using Change Data Capture (CDC) to replicate data from PostgreSQL into any destination are many – mainly, it allows you to track all changes applied to your database in real-time, including delete operations. The ability to track and replicate delete operations is especially beneficial for ELT pipelines.
In this tutorial, you'll learn how to set up PostgreSQL Change Data Capture in minutes using Airbyte, leveraging a powerful tool like Debezium to build a near real-time EL(T).
But before we begin, let's clarify some key concepts so you better understand what's happening in every step.
Debezium is an open-source framework for Change Data Capture. It scans databases in real-time and streams every row-level committed operation – such as insert, update, and delete – maintaining the sequence in which the operations were carried out. Airbyte uses Debezium to implement CDC for PostgreSQL, and it encapsulates it to hide the complexity from the user.
Debezium uses PostgreSQL's logical decoding to stream changes as they occur in the database. In this context, we will use the terms logical decoding and CDC interchangeably.
Logical decoding is a streaming representation of the write-ahead log (WAL), which maintains track of database modifications that happened via SQL. The logical decoding streams observe changes at the database level and are identified by logical replication slots. Hence, a replication slot is a stream of changes in a database, and each database might have several slots.
A plugin is required to convert the write-ahead log’s internal representation into an easy-to-understand format that can be interpreted without knowing the database's internal state. The plugin's output is consumed, in this case, by the Debezium connector.
A logical decoding plugin is a program written in C and installed in the PostgreSQL server. PostgreSQL 10+ implements pgoutput by default, so no extra plugins need to be installed. Many logical decoding plugins are available and, in addition to pgoutput, Airbyte currently supports wal2json.
When using an Airbyte source with Change Data Capture, you don't need to have specific knowledge of the technologies mentioned above. Airbyte takes care of everything, and in general, you only need to make sure to have a compatible logical decoding plugin and a replication slot in your PostgreSQL instance.
⚠️ If you’re using a PostgreSQL instance in the cloud, such as Amazon RDS, refer to the Debezium connector documentation for the specific requirements your instance may have.
Now, you will learn how to configure Airbyte to replicate data from PostgreSQL to a local file using CDC and you will use Docker to start a PostgreSQL instance. Let's get started!
Use docker to kick start a PostgreSQL container. To do that, run the following command in your terminal.
In this case, we are naming the container airbyte-postgres. You can select a different password.
The debezium/postgres image is used the same manner as the Postgres official image. Additionally, it instructs Postgres to load Debezium's logical decoding plugin, enable the logical decoding feature and configure a single replication slot used by the Debezium connector.
Now, it's time to configure a PostgresSQL schema, user, and necessary privileges. You can use psql, which will allow you to execute queries from the terminal interactively.
To start psql, you need to SSH into the Docker container you just started in the previous step.
Once in the container, start psql.
Now, create a schema and set the search path to tell PostgreSQL which schema to look at.
Although the database can be accessed with the root user, it is advisable to use a less privileged read-only user to read data. Create a user called airbyte and assign the password of your choice.
Then, grant the user access to the relevant schema.
To replicate data from multiple Postgres schemas, you can re-run the command above, but you'll need to set up numerous Airbyte sources connecting to the same database on the different schemas.
Next, grant the user read-only access to the relevant tables.
The user you just created also needs to be granted REPLICATION and LOGIN permissions.
Now, it's time to create a new table and populate it with some data. Create a cars table with an id and the car's name.
Next, let's create a logical replication slot using the pgoutput plugin. In PostgreSQL, a replication slot is used to hold the state of a replication stream.
Finally, create a publication to allow subscription to the events of the cars table. We advise users to add only the tables that they want to sync in the publication and not all tables.
Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.
To set up a new PostgreSQL Airbyte source, go to Airbyte's UI, click on sources and add a new source.
As the connector type, select Postgres. As demonstrated in the subsequent illustrations, fill in the following configuration fields if you used the instructions above to configure your Postgres database.
Then click on Set up source and Airbyte will test the connection. If everything goes well, you should see a successful message.
Go to destinations and add a new one. As demonstrated in the following diagram, select Local JSON as the destination type and fill in with the following details.
Then click on Set up source and let Airbyte test the destination.
Go to connections and create a new connection. Then, select the existing Postgres source you have just created and then do the same for the Local JSON destination. Once you're done, you can set up the connection as follows.
Then, it's time to configure the streams, which in this case are the tables in our database. For now, we only have the cars table. If you expand it, you can see the columns it has.
Now, you should select a sync mode. If you want to take full advantage of using Change Data Capture, you should use Incremental | Append mode to only look at the rows that have changed in the source and sync them to the destination. Selecting a Full Refresh mode would sync the whole source table, which is most likely not what you want when using CDC. Learn more about sync modes in our documentation.
When using an Incremental sync mode, we would generally need to provide a Cursor field, but when using CDC, that's not necessary since the changes in the source are detected via the Debezium connector stream.
Once you're ready, save the changes. Then, you can run your first sync by clicking on Sync now. You can check your run logs to verify everything is going well. Just wait for the sync to be completed, and that's it! You've replicated data using Postgres Change Data Capture.
From the root directory of the Airbyte project, go to /tmp/airbyte_local/cdc_tutorial, and you will find a file named _airbyte_raw_cars.jsonl where the data from the PostgreSQL database was replicated.
You can check the file's contents in your preferred IDE or run the following command.
Now, let's test the CDC setup we have configured. To do that, run the following queries to insert and delete a row from the database.
Launch a sync and, once it finishes, check the local JSON file to verify that CDC has captured the change. The JSON file should now have two new lines, showing the addition and deletion of the row from the database.
We confirm that CDC allows you to see that a row was deleted, which would be impossible to detect when using the regular Incremental sync mode. The _ab_cdc_deleted_at meta field not being null means id=3 was deleted.
In this tutorial you have learned how logical decoding works in PostgreSQL and how to leverage it to implement an EL(T) using Airbyte. Using CDC to capture database changes is one of the best ways to replicate data, especially when you have huge amounts of it and the need to track delete operations in the source database.
If you want to easily try out Airbyte, you might want to check our fully managed solution: Airbyte Cloud. We also invite you to join the conversation on our community Slack Channel to share your ideas with thousands of data engineers and help make everyone’s project a success!
Learn how to ingest and transform Github and Slack data with SQL and Python-based transformations.
Learn how to set up a maintainable and scalable pipeline for integrating diverse data sources into large language models using Airbyte, Dagster, and LangChain.