Airflow and Airbyte OSS - Better Together

Learn how to create an Airflow DAG (directed acyclic graph) that triggers Airbyte synchronizations.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Apache Airflow and Airbyte are complementary tools that can be used together to meet your data integration requirements. Airbyte can be used to extract data from hundreds of sources, and load it in any of its supported destinations. Airflow can be used for scheduling and orchestration of tasks, including triggering Airbye synchronizations. The combination of Airflow and Airbyte provides a flexible, scalable, and maintainable solution for managing your data integration and data processing requirements.

In this tutorial, you will install Airbyte OSS and Apache Airflow running in a local Docker Desktop environment. After installation, you will configure a simple Airbyte connection. Next, you will create an Airflow directed acyclic graph (DAG) which triggers a data synchronization over the newly created Airbyte connection, and then triggers (orchestrates) some additional tasks which depend on the completion of the Airbyte data synchronization.

ℹ️ If you are interested in orchestrating Airbyte Cloud you may be interested in reading Using the new Airbyte API to orchestrate Airbyte Cloud with Airflow.

What is Apache Airflow

Apache Airflow is an open-source orchestrator tool that is used for programmatically scheduling and monitoring workflows. It is often used to manage a sequence of tasks performed by a data integration pipeline.

With Airflow, users can define workflows as directed acyclic graphs (DAGs), where each task in the DAG represents an individual operation. Tasks can be executed in parallel or sequentially and can be scheduled to run at specific times or in response to certain events.

Airbyte and Apache Airflow together

Airflow can execute tasks that are required as a prerequisite to triggering an Airbyte sync run and/or can be used for scheduling tasks that depend on the completion of an Airbyte sync run. 

In this tutorial, you will create a simple Airflow DAG that executes tasks on your local environment, and that does the following:

  1. Triggers an Airbyte ELT pipeline that loads data from the faker source, and writes the data to the local filesystem.
  2. Waits for the ELT pipeline’s synchronization to complete.
  3. Verifies that expected local file exists.
  4. Renames a local file that was created by the synchronization.
ℹ️ The purpose of this tutorial is to show how easy it is to set up an Airflow DAG to interact with Airbyte, as well as to give a small taste of the power of Airflow DAGs. This is demonstrated with a simple example, which may be used as a starting point for implementing a more complex real-world use-case.  

Versions

There may be future modifications to the API and/or Airflow that could render some of the instructions given in this tutorial obsolete. The instructions presented in this tutorial were created in February 2023, and the following tools were used:

  • Airbyte OSS 0.40.32
  • Docker Desktop v4.10.1
  • macOS Monterey Version 12.5.1
  • MacBook Pro with the Apple M1 Pro Chip
  • Airflow v2.5.1 Git Version: .release:2.5.1+49867b660b6231c1319969217bc61917f7cf9829

Install Airbyte

If you already have a local copy of Airbyte running, then you may skip this section. Otherwise, follow the instructions to deploy Airbyte

[Optional] Modify BASIC_AUTH_USERNAME and BASIC_AUTH_PASSWORD in the (hidden) .env file. For this tutorial I use the following default values: 


BASIC_AUTH_USERNAME=airbyte
BASIC_AUTH_PASSWORD=password

Once Airbyte is running, in your browser type in localhost:8000, which should prompt you for a username and password as follows:

Airbyte OSS login prompt

Create a connection

Create a connection that sends data from the Sample Data (Faker) source to the Local JSON (file system) output. Click on “Create your first connection” as shown below:

Create your first connection prompt

You should then see an option to set up a source connection. Select the Faker source from the dropdown as shown below.

Select Sample Data (Faker) as a source

After selecting Sample Data as the source, you will see a screen that should look as follows. Click on Set up source as shown below. 

Configure Sample Data (Faker) as a source

You will then wait a few seconds for the Sample Data source to be verified, at which point you will be prompted to configure the destination that will be used for the connection. Select Local JSON as shown below:

Select Local JSON as a destination

After selecting Local JSON as the output, you will need to specify where the JSON files should be written. By default the path that you specify will be located inside /tmp/airbyte_local. In this tutorial I set the destination to /json_from_faker, which means that the data will be copied to /tmp/airbyte_local/json_from_faker on the localhost where Airbyte is running. After specifying the Destination Path, click on Set up destination. 

Configure the Local JSON destination

This will take you to a page to set up the connection. Set the replication frequency to Manual (since we will use Airflow to trigger Airbyte syncs rather than using Airbyte’s scheduler) and then click on Set up connection as highlighted in the image below.

Specify connection settings

Trigger a sync from the Sample Data (faker) source to the Local JSON output by clicking on Sync now as highlighted in the image below.

Manually trigger a sync from the UI

The sync should take a few seconds, at which point you should see that the sync has succeed as shown below.

After the sync has completed

You can now confirm if some sample data has been copied to the expected location. As previously mentioned, for this example the JSON data can be seen in /tmp/airbyte_local_json_from_faker. Because there were three streams generated, the following three JSON files should be available: 

 
_airbyte_raw_products.jsonl	
_airbyte_raw_users.jsonl
_airbyte_raw_purchases.jsonl

You have now created a simple example connection in Airbyte which can be manually triggered. A manually triggered connection is ideal for situations where you wish to use an external orchestrator. 

In the next section you will see how to trigger a manual sync on this connection by hitting a REST endpoint directly. After that, you will see how Airflow can be used to hit that same endpoint to trigger synchronizations. 

Test the API endpoints with cURL

Before using the REST endpoint from within Airflow, it is useful to verify that it is working as expected. Get the connectionId from the URL shown in your browser as annotated in the following image: 

Get the Airbyte connection ID

You can use cURL to verify that Airbyte’s API endpoint is working as expected. Be sure to update the connectionID  in the following command to reflect the value extracted from the URL above. Execute a call to the REST API as follows:


curl  -u 'airbyte:password' -X POST "http://localhost:8000/api/v1/connections/sync" \
 -H "Accept: application/json"\
 -H "Content-Type: application/json" \
 -d '{"connectionId":"[REPLACE WITH YOUR CONNECTION ID]"}' 

The above command should respond with the following, which indicates that a Sync has started:


{"job":{"id":303,"configType":"sync","configId":"1ab174f8-fa2c-4204-9442-2900be4fd28a","createdAt":1675690032,"updatedAt":1675690032,"status":"running"},"attempts":[{"attempt":{"id":0,"status":"running","createdAt":1675690032,"updatedAt":1675690032},"logs":{"logLines":[]}}]}%  

If you look in the UI, you will see that a sync executes each time that you run the cURL command. In my case I have executed the command twice within a minute of each other, and so my UI looks as follows:

View the Sync History

Install and Launch Airflow

Now that you have verified that the REST endpoint is functioning as expected, we’ll start working with Airflow, which will trigger that same Airbyte API endpoint to execute a sync. The instructions for this section are based on Running Airflow in Docker, with additional information about how to get the Airbyte provider installed. 

Create a new folder which will be used for your Airflow installation and cd into it:


mkdir airflow 
cd airflow


Download the docker compose file for running Airflow as follows:


curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

Then create additional required sub-directories, and assign the Airflow UID as follows:


mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env

In order to support Airbyte, the Airflow Docker image requires Airbyte provider functionality. The first step is to create a file called Dockerfile in your airflow folder with the following contents:


FROM ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
USER airflow
RUN pip install apache-airflow-providers-airbyte[http] \
&& pip install apache-airflow-providers-airbyte

In order to enable the building of a new Airflow image using the Dockerfile that you have just created, you should uncomment the following line in docker-compose.yaml:


# build: . 

Additionally, add the following line to docker-compose.yaml in order for Airflow to see and manipulate files in the local folders that Airbyte will write to: 


- /tmp/airbyte_local:/tmp/airbyte_local

The relevant changes to docker-compose.yaml are highlighted in the following image: 

The relevant portions of the docker-compose.yaml file

Build the docker image as follows:


docker compose build

This should show that the Airbyte providers specified in the Dockerfile have been installed, as shown in the following image:

Confirmation that the Airflow provider has been installed

Next you can initialize Airflow as follows:


docker compose up airflow-init

You are now ready to start Airflow! Execute the following command to launch Airflow and its associated containers:


docker compose up

Once the containers are running, you can view the list of the containers with the following command:


docker ps

In my case, there are several containers running for Airbyte, and several containers running for Airflow, as shown in the image below: 

The Docker containers which are running for Airflow and Airbyte

Confirm that the Airbyte folders are visible from Airflow by logging into the Airflow scheduler container with the following command:


docker exec -it airflow-airflow-scheduler-1 bash  

And from within that container you should be able to see an airbyte_local folder in the /tmp directory as follows:

Confirmation that the airbyte_local directory is visible

Next you will login to Airflow by setting your browser to localhost:8080, which should look as follows:

Airflow login prompt

As documented in Airflow’s REST API instructions, the default username is airflow and default password is also airflow. Once you have logged in, you will see a screen that looks as follows: 

Airflow screen after login

Create Airflow connections

Airflow has its own concept of connections, and we will make use of an Airflow connection to trigger a synchronization using an Airbyte connection

In order to demonstrate how Airflow can execute additional tasks that depend on the completion of an Airbyte synchronization, we will also define an Airflow connection that will be used for accessing and modifying files on the local filesystem. 

To define a connection Airflow will use to communicate with Airbyte, go to admin→connections as shown below:

Create an Airflow connection

Then click in the + symbol as annotated in the image below:

Click on the button to create a new Airflow connection

Complete the information about the connection which Airflow will use to connect to Airbyte as follows, and click on the Test button. This should look as follows: 

Configure an Airflow connection to Airbyte

The connection parameters are:

  • Connection Id: Define an identifier that Airflow DAGs can use to communicate with Airbyte. In this example the identifier is given the name airflow-call-to-airbyte-example, which will be used in the DAG definition (shown later). 
  • Connection Type: Specifies that this is a connection to Airbyte. Note that if you do not see Airbyte in the dropdown menu, then the Docker image has not been correctly built. Adding the Airbyte provider to the Docker image was done earlier in this tutorial. 
  • Host: The host which is running Airbyte. Note the use of host.docker.internal, which  resolves to the internal IP address used by the host, as discussed in Docker’s instructions on network interfaces.
  • Login: The default user to connect to Airbyte is airbyte. If you have changed this, then use whichever username you have defined.
  • Password: If you are using the default then the value is password. If you have changed this, then use whichever password you have defined.
  • Port: By default, Airbyte listens on port 8000.

Click on Save which should take you back to the Connections screen.

Because the DAG that we will define is also going to manipulate files, we will also create a Files connection. Again, click on the + symbol as shown below:

Create another Airflow connection

This should take you to a screen that looks like the following:

Create an Airflow connection to manipulate files on the local filesystem

The connection parameters are:

  • Connection Id: As mentioned above this will be used in the DAG to connect to the file system. In this example the value is set to airflow-file-connector
  • Connection Type: Select File (path). This connector will be used in the DAG to interact with files on the local filesystem. 

After saving the above connection, you Connections screen should look as follows:

The Airflow connections that have just been created

Now that the relevant Airflow connections are defined, they can be used in an Airflow DAG.

Create an Airflow DAG

In this section, I present Python code for a simple DAG that performs the following tasks: 

  1. trigger_airbyte: Uses AirbyteTriggerSyncOperator to asynchronously trigger Airbyte to perform a synchronization from the Sample Data (Faker) input to the Local JSON (file) output using the Airbyte connection that we defined above. Because this is executed asynchronously, it immediately returns along with a job id that is used for determining the completion of the synchronization.
  2. wait_for_sync_completion: Uses AirbyteJobSensor to wait for Airbyte to complete the synchronization.   
  3. raw_products_file_sensor: Uses FileSensor to confirm that the file created by Airbyte exists. One of the files created by the Sample Data (Faker) source is called _airbyte_raw_products.jsonl, and this task waits for that file to exist.
  4. mv_raw_products_file: Uses BashOperator  to rename the raw products file.

The code which demonstrates these steps is given below.


from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from airflow.sensors.filesystem import FileSensor
import pendulum

AIRBYTE_CONNECTION_ID = '[REPLACE THIS WITH YOUR CONNECTION ID]'
RAW_PRODUCTS_FILE = '/tmp/airbyte_local/json_from_faker/_airbyte_raw_products.jsonl'
COPY_OF_RAW_PRODUCTS = '/tmp/airbyte_local/json_from_faker/moved_raw_products.jsonl'

with DAG(dag_id='airbyte_example_airflow_dag',
        default_args={'owner': 'airflow'},
        schedule='@daily',
        start_date=pendulum.today('UTC').add(days=-1)
   ) as dag:

   trigger_airbyte_sync = AirbyteTriggerSyncOperator(
       task_id='airbyte_trigger_sync',
       airbyte_conn_id='airflow-call-to-airbyte-example',
       connection_id=AIRBYTE_CONNECTION_ID,
       asynchronous=True
   )

   wait_for_sync_completion = AirbyteJobSensor(
       task_id='airbyte_check_sync',
       airbyte_conn_id='airflow-call-to-airbyte-example',
       airbyte_job_id=trigger_airbyte_sync.output
   )

   raw_products_file_sensor = FileSensor(
       task_id='check_if_file_exists_task',
       timeout=5,
       filepath=RAW_PRODUCTS_FILE,
       fs_conn_id='airflow-file-connector'
   )

   move_raw_products_file = BashOperator(
       task_id='move_raw_products_file',
       bash_command=f'mv {RAW_PRODUCTS_FILE} {COPY_OF_RAW_PRODUCTS}'
   )

   trigger_airbyte_sync >> wait_for_sync_completion >>  raw_products_file_sensor >> move_raw_products_file

Copy the code into a file called example_dag.py in the airflow/dags directory that you created earlier in this tutorial. Then set the AIRBYTE_CONNECTION_ID to the value that you extracted from the Airbyte connection URL earlier. 

Furthermore, the file paths assume that you have specified /json_from_faker in the Airbyte connector that we defined earlier – if this is not your case, then update RAW_PRODUCTS_FILE and COPY_OF_RAW_PRODUCTS in the code to reflect the correct path. 

In order to see the new DAG, click on DAGs on the top of the screen and then click on the refresh button highlighted below:

View a list of the DAGs and click on the refresh button

After some time, the DAG which you just added to the DAGs folder will appear. The name that will appear corresponds to the dag_id you specify in the code:


dag_id='airbyte_example_airflow_dag' 

This will appear in the list of DAGs as follows:

Ensure that the DAG that you have created appears in the list

View the new DAG

The DAG that is specified by the above code can be viewed in Airflow by clicking on the Graph button that is annotated in the following illustration, and looks as follows:

View a graph of the tasks in the selected DAG

Execute the Airflow DAG

Click on the newly created DAG called airbyte_example_airflow_dag highlighted in the image above. This will take you to a screen which gives more information about the DAG. Run the DAG by clicking on the button in the top right corner as annotated in the following image: 

Trigger the Airflow DAG that executes an Airbyte synchronization


After triggering the DAG, you will see a screen similar to the following, which indicates that it is executing:

The status of the Airflow DAG

Each time the above DAG is executed, you should see an associated Sync in Airbyte’s Sync History UI as follows:

The status of the synchronization that has been executed by Airbyte

Finally, once the DAG has completed, you can look in your local file system to see the files that Airbyte created, as well as the file that Airflow renamed from _airbyte_raw_products.jsonl to moved_raw_products.jsonl. Your /tmp/airbyte_local/json_from_faker folder should look as follows:

Relevant files that exist on the local filesystem

Conclusion

This article has shown you how to set up a simple Airbyte connection for which synchronizations are triggered by an Airflow DAG. After the completion of each synchronization, Airflow checks that the expected file exists and then renames that file. While the tasks that are demonstrated in this tutorial are simple, the concepts which they have demonstrated are powerful. You can extend the functionality demonstrated here to build complex Airbyte ELT pipelines that are orchestrated by Airflow! 

If you have enjoyed this article, you may be interested in reading ETL Pipelines with Airflow: the Good, the Bad and the Ugly and Automate your Data Scraping with Apache Airflow and Beautiful Soup. Furthermore, you may be interested in other Airbyte tutorials, or in Airbyte’s blog. You may also consider joining the conversation on our community Slack Channel, participating in discussions on Airbyte’s discourse, or signing up for our newsletter

Similar use cases

How to implement AI data pipeline?: Langchain, Dagster & Airbyte

Learn how to set up a maintainable and scalable pipeline for integrating diverse data sources into large language models using Airbyte, Dagster, and LangChain.

MySQL CDC: Build an ELT pipeline from MySQL Database

Easily set up MySQL CDC using Airbyte, harnessing the power of a robust tool like Debezium to construct a near real-time ELT pipeline.

Build an EL(T) from PostgreSQL using Change Data Capture (CDC)

Set up PostgreSQL CDC in minutes using Airbyte, leveraging a powerful tool like Debezium to build a near real-time EL(T).