Sync MySQL CDC to Kafka using Change Data Capture

Learn how to stream changes from a MySQL database to Kafka using Change Data Capture (CDC).

Enterprise use cases often involve integrating data from various sources and analyzing logs and activity streams. For instance, if you are building an e-commerce application backed by a relational database, such as MySQL, you might need to combine incoming order data with other downstream data for operational analytics. 

Relational databases maintain a transaction log that records every event in the database. An update, an insert, a delete — all go into the database's transaction log. Rather than moving all the order data in bulk, Change Data Capture (CDC) approaches are used, allowing you to stream every single event from the database as it occurs into a streaming platform like Apache Kafka.

By consuming the logs from MySQL CDC, data integration tools such as Airbyte can extract data changes at very low latency and deliver them seamlessly to Kafka, which serves as a central integration point for numerous downstream data feeds in and out. 

This tutorial will explain how to sync data from a MySQL database to Kafka using CDC. The process is similar for all Airbyte database sources that support CDC, like Postgres and MSSQL.

What is Change Data Capture (CDC)?

Change Data Capture (CDC) is an efficient replication technology that allows row-level data changes at the source database to be quickly identified, captured, and delivered in real-time to the destination database store. With CDC in use, only the data that has changed — categorized by insert, update, and delete operations — since the last replication is transferred.

How does CDC work in MySQL?

MySQL contains an internal feature called binary log (binlog) that records all database operations, including DDL and changes to table data.

Even if activating binary logging in a MySQL server may have a minor impact on speed, having a binlog is helpful in data recovery and replication situations. The latter is particularly important in the context of this tutorial since it permits the implementation of ELT pipelines from MySQL, the binlog.

The infrastructure that continually monitors the binlog for activities committed to the source database is referred to as MySQL CDC. When the list of operations in the binlog is copied to the destination, the ELT's load (L) component is accomplished.

Pre-requisites

Here are the tools you’ll need to start replicating data from a MySQL database to Kafka.

  1. You’ll need to get Airbyte to do the data sync for you. In this tutorial, we run Airbyte in a docker container locally using the instructions in our documentation.
  1. You will need a MySQL instance to be used as your source for CDC. MySQL instances can be set up in various ways, which can be found here. In this tutorial, we are using a hosted MySQL instance to get up and running quickly.
  2. You will need a Kafka instance that will serve as a destination. To get started quickly with Kafka, follow the instructions here. Again, for simplicity, we are using a hosted Kafka instance.

Step 1: Load data into the MySQL database

In this tutorial, we will create a MySQL database in our hosted instance and load it with some sample data. 

Use the MySQL CLI to connect to the remote MySQL instance

First, download the repo to set up the sample data. Once downloaded, run the following commands to set up the employees database and associated tables. Then, insert the sample data into these tables by running the following MySQL commands.


$ mysql --host=<hostname> --port=<port> --user=<mysql-user> -p < employees_partitioned.sql

$ mysql --host=<hostname> --port=<port> --user=<mysql-user> -p < test_employees_sha.sql

You may be prompted to enter your MySQL password. The text between the angle brackets must be replaced with the hostname, port, and username based on your MySQL instance. 

Once the two scripts are run, the employees database will be created. You can view the created tables by logging into the MySQL CLI and running the following commands.


$ mysql --host=<hostname> --port=<port> --user=<mysql-user> -p 

> USE employees;
> SHOW TABLES:

Create a dedicated user with access to the tables

Creating a dedicated user is recommended for better permission control and auditing. Alternatively, you can use Airbyte with an existing user in your database.

To create a dedicated database user, run the following commands against your database.


CREATE USER 'airbyte'@'%' IDENTIFIED BY '<password>';

The right set of permissions differs between the STANDARD and CDC replication methods. For the STANDARD replication method, only SELECT permission is required. SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT permissions are necessary for the CDC replication method.


GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'airbyte'@'%';

Your database user should now be ready for use with Airbyte.

Step 2: Set up the MySQL CDC source

It's easy to create a MySQL source through the Airbyte UI. Make sure to select CDC as the replication method. We have not used SSH in our example. We recommend using SSH tunnels if you are using a public internet network in production.

Step 3: Set up the Kafka destination

Next, we will set up a Kafka destination in Airbyte. In this tutorial, we are running Kafka in our hosted instance and will connect to it using the Kafka client setup locally. Follow the quick start steps to set up the Apache Kafka client locally.

Create a new Kafka topic to sync data

First, we will create a Kafka topic named 'departments' which will be used to write the CDC data.


$ export KAFKA_HEAP_OPTS="-Xms512m -Xmx1g"

$ ./kafka-topics.sh --topic=departments --bootstrap-server <kafka-hostname>:<kafka-port> --create --replication-factor 2 --partitions 1

Next, create a destination in Airbyte as follows. 

For all of the remaining settings, we went with the default values that were provided


Step 4: Create a MySQL CDC to Kafka connection

Once the source and destination are set up, you can create a connection from MySQL to Kafka in Airbyte. In the “select the data you want to sync” section, choose the department table and select Incremental under Sync mode.

Using the sync frequency and sync mode options of Airbyte, you can get control to replicate data incrementally and schedule Airbyte to replicate this data. 

Once configured, you can see your connection on the Connection tab.

Now that your connection is set up go back into your MySQL shell and run the following commands to add, update and then delete a row in the departments table.

Next, go back to the Airbyte UI, select the connection you just created, and trigger a manual sync.

Once the sync is complete, run the following command in a new terminal window to read the events that persisted to the Kafka topic.


$ ./kafka-console-consumer.sh --topic departments --from-beginning --bootstrap-server <your_kafka_host>:<port>

The screenshot below shows the CDC data for the row you just inserted, updated, and deleted with corresponding timestamps.

The screenshot below shows the CDC data for the row you just inserted, updated, and deleted with corresponding timestamps.


Wrapping up

Here’s what you have accomplished with this tutorial:

  • Configure a MySQL Airbyte source
  • Configure a Kafka Airbyte destination
  • Create a connection that will automatically sync CDC log data from MySQL to Kafka

With a combination of MySQL CDC logs, Airbyte and Kafka, distributed data platforms can be kept in sync and made aware of data changes.

You may be interested in other Airbyte tutorials and Airbyte’s blog. You can also join the conversation on our community Slack Channel, participate in discussions on Airbyte’s discourse, or sign up for our newsletter. Furthermore, if you are interested in Airbyte as a fully managed service, you can try Airbyte Cloud for free!

Similar use cases

Airflow and Airbyte OSS - Better Together

Learn how to create an Airflow DAG (directed acyclic graph) that triggers Airbyte synchronizations.

Explore Airbyte's incremental refresh data synchronization

Learn how Airbyte’s incremental synchronization replication modes work.

Version control Airbyte configurations with Octavia CLI

Use Octavia CLI to import, edit, and apply Airbyte application configurations to replicate data from Postgres to BigQuery.