Join our newsletter to get all the insights on the data stack
Should you build or buy your data pipelines?
Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.
Alex is a Data Engineer and Technical writer at Airbyte, and was formerly employed as a Product Manager and as a Principal Consulting Architect at Elastic. Alex has an MBA from the IESE Business School in Barcelona, and a Masters degree in Electrical Engineering from the University of Toronto. Alex has a personal blog at https://alexmarquardt.com.
Data integration can be used to create a unified view of your organization's data – but in order for your data integration strategy to provide the maximum utility, your various systems need to be in sync. This is achieved with data synchronization, which involves replicating data from a source system into a destination system in a manner that the destination always has an up-to-date copy of the source data.
This tutorial is the third in a three part series that explores Airbyte synchronization modes. The purpose of this series is to provide you with step-by-step instructions that help you understand how each mode works behind the scenes.
Previous tutorials in this series went deep into full refresh replication and incremental replication in Airbyte. This third tutorial demonstrates change data capture (CDC) replication. As this tutorial refers back to details that have been extensively discussed in the first two tutorials in this series, it is recommended that you read those tutorials before proceeding with this one.
In this tutorial you will:
Learn what CDC replication is and what benefits it provides.
Create a CDC connection between two Postgres databases.
View destination data when CDC with incremental deduped history replication is used.
See how inserts, updates, and deletes on a source database are reflected in the destination when CDC replication is used.
ℹ️ If you are not sure which replication mode is the most appropriate for your use case, you may be interested in reading An overview of Airbyte's replication modes before continuing with this tutorial.
Software versions
This tutorial was written in September 2022 and due to the fast-changing nature of Airbyte may become less relevant in the future. The following tools/versions were used for the examples in this tutorial:
macOS: 12.3.1
Docker desktop: 4.10.1 (82475)
Docker compose: 1.29.2
debezium/postgres: Docker image tag: 14 (Note: this is newer than previous tutorials)
When Airbyte replicates data from a source to a destination, it first replicates the data into a “raw” table. Then, if normalization is enabled, this data will be transformed by SQL commands that are triggered by Airbyte in the destination. Airbyte leverages dbt to create the SQL for normalizing data.
In this tutorial you will replicate data using CDC replication in conjunction with incremental deduped history replication, and will then look at both the raw and the normalized tables in the destination database.
The following diagram represents Airbyte’s approach to ELT at a high level:
And for deduped history replication, the following diagram represents Airbyte’s approach:
An overview of change data capture replication
Many popular databases may be configured to create an on-disk log known as a transaction log which is also known as a write-ahead log (WAL). This log stores a record of changes that are made to the database. Examples of events that are written into this transaction log are inserts, updates, or deletes.
When using CDC replication, the source database is configured to enable transaction logging. Then Airbyte is configured to read this log file and transmit the logged changes to a destination system. These changes are applied on the destination to keep it in sync with the source. Airbyte supports log-based CDC from Postgres, MySQL, and Microsoft SQL Server to a large number of destinations.
To support CDC, Airbyte uses Debezium internally. To keep this tutorial as simple as possible, for demonstration purposes a Postgres database is used as the data source and another Postgres database is used as the destination. Debezium/Airbyte will be configured to read from the Postgres WAL using the pgoutput plugin. A high-level overview of the CDC replication that is demonstrated in this tutorial is shown in the image below.
How is CDC different from other replication modes?
CDC replication can be used in conjunction with incremental replication. The main difference when CDC replication is enabled is in how modifications on the source database are detected and transmitted to the destination database.
Non-CDC (i.e. standard) incremental replication periodically executes queries on a source system for new or updated records, and then transmits the results of these queries to the destination. However, because a query cannot return deleted records, standard incremental replication does not correctly transmit deletions from the source to the destination. Additionally, the records that are transmitted in each sync only represent the state of the source database at the moment the sync is executed – any intermediate changes (such as multiple updates to a single record) will not be correctly captured. Furthermore, because each incremental sync executes queries against the source to detect modifications, it requires the source data to have a suitable cursor field such as updated_at, which is used to keep track of which records have already been replicated.
On the other hand, CDC incremental replication reads a log of the changes that have been made to the source database and transmits these changes to the destination. Because changes are read from a transaction log when using CDC, it is not necessary for the source data to have a suitable cursor field. Additionally, intermediate changes and deletions on the source are correctly transmitted to the destination because they are logged just like any other modification to the source data.
The differences between CDC incremental replication and non-CDC incremental replication are summarized in the table below:
CDC replication can also be used in conjunction with full refresh replication. However, as mentioned in the CDC documentation, full refresh replication mode is done with the same process both with and without CDC.
Launch a Postgres source and destination
The above discussion has given a brief overview of how CDC sync modes work. The remainder of this article is focussed on a step-by-step overview of CDC with incremental deduped history replication.
ℹ️ The examples in this tutorial are presented with new/empty Postgres containers, but it is not a requirement that your source and destination databases are empty. Furthermore, these examples use the root postgres users which should not be done for production. For instructions on how to set up a more secure configuration, see the tutorial: Build an EL(T) from PostgreSQL using Change Data Capture (CDC).
Start a source Postgres container running at port 2000 on localhost as follows:
Create a new table called cdc_tut that will be populated with the records which will be replicated to the destination.
CREATE TABLE cdc_tut(
id integer PRIMARY KEY,
name VARCHAR(200)
);
ℹ️ Notice that the cdc_tut table has defined id as a primary key. As noted in the CDC documentation, CDC incremental replication is only supported for tables with primary keys.
Add some records to the table as follows:
INSERT INTO cdc_tut(id, name) VALUES(1, 'A1 CDCSyn');
INSERT INTO cdc_tut(id, name) VALUES(2, 'A2 CDCSyn');
And view the table with the following command:
SELECT * FROM cdc_tut;
Which should respond with a table that looks like the following:
id | name
----+-----------
1 | A1 CDCSyn
2 | A2 CDCSyn
(2 rows)
How to view the SQL that is used in the normalization
The previous tutorials in this series on incremental data synchronization and full refresh data synchronization gave detailed instructions about how to view the SQL that Airbyte uses for converting raw data into normalized data, along with an analysis of the SQL. The same steps for viewing the SQL can be used for viewing the SQL used by the CDC replication modes – however an analysis of the SQL used for normalization is not a focus of this article and is left as an exercise for the curious reader.
Note that if you do look at the SQL used in conjunction with CDC replication, the main difference versus non-CDC replication modes discussed in the previous tutorials is that there is a new field called _ab_cdc_deleted_at (which indicates the time of deletion) that is used for correctly transmitting and processing DELETE operations.
ℹ️ In previous tutorials I showed you how to look at log files inside of the airbyte-server container. However, the log file format has changed in the version of Airbyte used in the creation of this tutorial and logs are now JSON-encoded. Therefore you may wish to copy the dbt log file to localhost, where you may use other tools such as jq to decode and view it.
Configure the Postgres source for CDC replication
After you have created the table that will be used as your data source, you should enable additional settings that are required for CDC replication to work correctly.
Then create a publication called cdc_tut_pub to allow subscription to the events of the cdc_tut table.
CREATE PUBLICATION cdc_tut_pub FOR TABLE cdc_tut;
ℹ️ Be careful to not create the replication slot and publication before creating and populating your source table. If you do, Airbyte may send the source records twice to the destination. This duplication happens because on the initial synchronization, Airbyte effectively executes a full refresh, which uses SELECT statements on the source to retrieve records for replication. Then, for all subsequent syncs it reads the transaction log (write-ahead log) for replication – however because records sent in the initial sync were not sent based on events stored in the transaction log, the transaction log still contains a log of the creation of all of the records on the source, and so these records will be re-transmitted on the very next sync.
Define the source and destination in Airbyte
Instantiate a Postgres source connector
Create a new data source by clicking + New source as follows.
ℹ️ If you already have some connectors defined, then + New source may appear in the top right corner of the window.
Then select Postgres as the source as follows:
Define a source connector called cdc-source as follows, and be sure to select Logical Replication (CDC) as demonstrated below”:
After selecting Logical Replication (CDC), enter the parameters that will be used for CDC replication as shown below.
Then click on the Set up source button to create the source connector,
Instantiate a Postgres destination connector
Select Postgres as the destination as follows:
Create a destination called cdc-destination as follows:
And click on the Set up destination button to create the destination connector.
Set up the CDC connection with incremental dedupe synchronization
The orchestration for CDC syncing is similar to non-CDC database sources – in other words, CDC replication works in conjunction with the various Sync modes that Airbyte supports. In this tutorial I will demonstrate CDC replication only with the incremental dedupe synchronization mode.
ℹ️ The steps presented in this section could also be used for testing other sync modes.
Define a new connection that will be used for incremental CDC replication as follows:
ℹ️ In the definition of a CDC replication connection, notice that a cursor field is not required (as opposed to “standard” incremental replication). Furthermore, the primary key is automatically determined from the source table, and is therefore not selected.
Once you click on Set up connection, Airbyte will start a sync operation from the source to the destination. Once the sync has completed, you should see a response similar to the following:
View the destination database
Open a Postgres shell to the destination as follows:
In addition to the field called _airbyte_data that contains a JSON representation of the source data, there are two additional fields in the raw table:
_airbyte_emitted_at which tells you what time airbyte sent the record to the destination.
_airbyte_ab_id is a UUID value added by the destination connector to each record before it is sent to the destination. This is a UUID (not a hash) and therefore it changes for each row after each sync, even if the data has not been modified.
CDC sync with Deduped + history was chosen as the Sync mode, and so the normalization SQL creates two tables from the raw data: a history table (cdc_tut_scd) and a deduplicated table (cdc_tut).
You can view the history (SCD) table called cdc_tut_scd by executing the following:
Notice that the history table contains several fields that are notpresent in the non-CDC replication modes that were discussed in the incremental sync tutorial. These CDC metadata columns are:
_ab_cdc_lsn: the point in the log where the record was retrieved
_ab_cdc_deleted_at: the timestamp for the database transaction that resulted in a record change which is only present for records from DELETE statements
_ab_cdc_updated_at: the timestamp for the database transaction that resulted in a record change which is present for records from DELETE/INSERT/UPDATE statements
The table also has the following fields, which were also present in the replicated data that was shown in previous incremental sync tutorial:
_airbyte_unique_key is a hash of the primary key field (the id field in this example), with some additional logic to handle records where this is undefined.
_airbyte_unique_key_scd is a hash of several fields which guarantee a unique value for every row that appears in the history (SCD) table.
_airbyte_start_at specifies when a given version of a record starts to be valid in the source table. This is determined based on a new version of that record appearing in the source table, which is detected by a change in its updated_at field (or whichever field you have specified as a cursor). Upon insertion or update, a new entry will be created in the historical table, with _airbyte_start_at set to the updated_at time.
_airbyte_end_at specifies when a given version of a historical record has stopped being valid in the source table. This is based on a change in the value of updated_at (or whichever field you have specified as the cursor) in the source table, which indicates that a record has been updated in the source system, and therefore the previous historical entry for that record therefore no longer accurately reflects its current state.
_airbyte_active_row is used for specifying which rows should be included in the deduplicated table (cdc_tut in this example).
_airbyte_normalized_at tells you when the record was normalized (i.e. when was the record created from the raw data).
_airbyte_cdc_tut_hashid is an md5 hash of the fields that appear in the _airbyte_data JSON. This is calculated and added during normalization. This article about md5 hashing on the dbt blog discusses the value of the md5 hash. Be aware that because this hash is calculated on all of the fields that are included in the JSON object stored in _airbyte_data, it will change if any values in that object (such as _ab_cdc_updated_at are modified).
And you can view the final table called cdc_tut by executing the following:
While it may not yet be obvious, this table contains a single row for each record that appears in the source data. This will become more clear as the source data is modified and replicated to the destination in the remainder of this tutorial.
Insert
In this section you will insert new records on the source database and view the replicated data on the destination.
Insert new records into the source table and then sync
Go back to the source Postgres shell and add some new records to the cdc_tut table as follows:
INSERT INTO cdc_tut(id, name) VALUES(3, 'B3 CDCSyn');
INSERT INTO cdc_tut(id, name) VALUES(4, 'B4 CDCSyn');
And view it with the following:
SELECT * FROM cdc_tut;
The source cdc_tut table should now look as follows
Execute a sync by clicking on the Sync now and wait for the sync to complete, at which point you should see a response indicating that 2 records have been emitted, such as the following:
ℹ️ Notice that because only two records were added to the source data, only two records were emitted and transmitted to the destination.
Look at the raw data
View the replicated raw data in the airbyte-destination shell by executing the following:
SELECT * FROM _airbyte_raw_cdc_tut;
Which should respond with a table similar to the following:
So far, there is a single record in the raw destination table corresponding to each record in the source. This will soon change when records on the source are modified or deleted.
View at the normalized tables
View the history table called cdc_tut_scd by executing the following:
As expected, the final deduped table contains a single record corresponding to each record in the source data.
Update
In this section you will update existing records on the source database and view the replicated data on the destination.
Modify a record in the source and then sync
Go back to the Postgres shell in the airbyte-sourcecontainer that you opened earlier in this tutorial, and make two updates to a single record in the cdc_tut table as follows:
UPDATE cdc_tut SET name='C1 CDCSyn' WHERE id=1;
UPDATE cdc_tut SET name='D1 CDCSyn' WHERE id=1;
SELECT * FROM cdc_tut;
And the source cdc_tut table should now look as follows:
Execute a new sync by clicking on Sync Now in the connections UI and wait for the sync to complete. You should see a response indicating that one record has been emitted, as follows:
View the raw table in the destination
Look at the raw table called _airbyte_raw_cdc_tut with the following command on the destination Postgres database.
SELECT * FROM _airbyte_raw_cdc_tut;
Which should respond with a table that looks as follows:
The raw table contains six records, even though the source table only contains four records. This is because there are now three entries for the record with id=1. These correspond to the original state of that record, as well as both of the updates that were done to it.
ℹ️ An important distinction versus the non-CDC incremental replication is that all of the intermediate states for each record have been captured and are stored in the destination. Contrast this with the behavior demonstrated in the previous tutorial on non-CDC incremental sync, in which only the state of the records in the source table at the moment the sync was executed was captured, and all intermediate states were lost.
View the normalized tables in the destination
View the history table called cdc_tut_scd by executing the following:
This deduplicated table contains four records, just like the source data.
Delete
In this section you will remove a record from the source database and view the replicated data on the destination.
Delete a record on the source and then sync
Remove a record from the source as follows:
DELETE FROM cdc_tut where id=4;
And view the source table as follows:
SELECT * FROM cdc_tut;
The source cdc_tut table should look as follows:
id | name
----+-----------
2 | A2 CDCSyn
3 | B3 CDCSyn
1 | D1 CDCSyn
(3 rows)
You should see that the record with the id of 4 has been removed from the source table. Execute a new sync by clicking on Sync Now in the connections UI and wait for the sync to complete.
The raw data in the above table contains a new entry which indicates that the record with id=4 has been deleted. The time of deletion is stored in _ab_cdc_deleted_at and is "2022-09-26T18:26:08.629Z" in this example.
View the normalized tables
Execute the following to view the history (SCD) table:
SELECT * FROM cdc_tut_scd;
And you should see a table similar to the following:
The raw data table also contains an entry that indicates that the record with id=4 has been deleted as can be seen in the _ab_cdc_deleted_at column, where a timestamp is present. Additionally, note that _airbyte_active_row is now 0 rather than 1 for the record with id=4.
Look at the final deduplicated table with the following command:
SELECT * FROM cdc_tut;
Which should respond with a table similar to the following:
Notice that the record with id=4 has been correctly removed from the final table, as desired. This is in contrast to the behavior that was demonstrated in the tutorial for the non-CDC (i.e. standard) incremental replication, where deletions of records on the source are not correctly transmitted or applied to the destination. This is one of the reasons why CDC replication is often preferred.
Conclusion
In this article I have given a brief overview of how CDC replication works, how it is different from the other synchronization options, and how data synchronized with CDC replication looks in the destination database. This article also demonstrated the two main benefits of CDC incremental replication versus non-CDC incremental replication, which are that intermediate states and deleted records are correctly reflected in the destination data.
Alex is a Data Engineer and Technical writer at Airbyte, and was formerly employed as a Product Manager and as a Principal Consulting Architect at Elastic. Alex has an MBA from the IESE Business School in Barcelona, and a Masters degree in Electrical Engineering from the University of Toronto. Alex has a personal blog at https://alexmarquardt.com.
About the Author
Should you build or buy your data pipelines?
Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.