Airbyte Cloud is a data integration platform that has gained significant popularity among businesses and customers of all sizes. With Airbyte Cloud , customers can quickly and easily connect to various data sources and load their data into their preferred destinations. There are several reasons why customers are interested in Airbyte Cloud, including its ease of use, cost-effectiveness, scalability, and reliability.
Because of the benefits of Airbyte Cloud, many customers are interested in using it, and some may require a way to interact with their Cloud-based ELT pipelines from orchestration tools such as Airflow . To meet this requirement, Airbyte has released a beta version (as of March 2023) of the Airbyte API which can be used for orchestrating Airbyte Cloud deployments.
In this article you will learn about the difference between the existing (internal) Config API and the new Airbyte API. Then you will see how to use the curl to interact with Airbyte Cloud via the new Airbyte API. Finally, you will see how to use Airflow to interact with the new Airbyte API which will allow you to use powerful orchestration capabilities.
Two sets of APIs with different purposes Airbyte has two sets of APIs which are intended for different uses. The Config API is an internal API that is designed for communications between different Airbyte components. Because this is an internal API it may be changed at any time without notice, it was not explicitly designed for simplicity, and it may provide responses that are more robust than necessary for external communications. It is generally not recommended that customers interact with the Config API. On the other hand, the Airbyte API is designed for stability (i.e. fewer breaking changes in the future), and ease-of-use. If customers need to programmatically interact with Airbyte Cloud, the Airbyte API is the recommended path forward.
ℹ️ As of March 2023 the Airbyte API is only accessible in Airbyte Cloud. However, the intention is to release it for Airbyte OSS in the future. Does the new API work with orchestrators? Up until now, orchestrators including Airflow , Dagster , and Prefect have been designed and documented to work with Airbyte OSS and make use of the Config API. Furthermore, they make use of usernames and passwords for authentication. On the other hand, Airbyte Cloud uses an API token for authentication and it should preferably be orchestrated via the Airbyte API. This means that modifications to these orchestrators may be required for them to interact with Airbyte cloud.
Given that Airflow has built-in HTTP operators that can be used to send arbitrary HTTP requests, it is debatable if a custom Airbyte operator is needed for interacting with Airbyte Cloud – and in this article I will demonstrate how to make use of these built-in operators to orchestrate Airbyte Cloud via the new Airbyte API.
ℹ️If you are looking for additional information and more details and background on how to orchestrate Airbyte from Airflow, you may be interested in reading Airflow and Airbyte OSS - Better Together. That article provides a more comprehensive overview of setting up and using Airflow to orchestrate Airbyte, with a focus on using Airflow’s Airbyte provider . Get an Authentication token for connecting to Airbyte Cloud In order to interact with Airbyte Cloud, you will need an API token. Go to the Airbyte Developer Portal to generate a JSON Web Token (JWT). Keep a copy of this token, as it will need to be included in HTTP requests to Airbyte Cloud.
Get the ID of the connection that you wish to orchestrate In this tutorial you will trigger a sync on a given Airbyte connection. This will require a connection identifier which can be extracted from the URL in Airbyte Cloud as annotated below:
Get the Airbyte Cloud connection identifier
Verify that you can connect to the Airbyte API on Airbyte Cloud via curl You can use the Airbyte API reference to generate and execute a curl command to validate that the Airbyte API is responding as you expect. Below I show how to trigger a sync by specifying the connectionId and the Authentication Bearer token that was obtained in the previous two steps. Be sure to select sync as the jobType , and then click on Try it as follows:
Generate a curl command to sync an Airbyte Cloud connection Notice that the sync returns a jobId in its response. We will make use of this in our Airflow DAG to periodically poll Airbyte to determine if a sync job has successfully completed.
You can check the status of the sync job that you just launched by going to the get job page and entering in the jobId as shown below:
Generate a curl command to check Airbyte Cloud job status Define an Airflow connection to use the Airbyte API Next we will define a new Airflow connection that will be used by Airflow to connect to Airbyte. Note that this is a different “connection” than the Airbyte connection which we are going to orchestrate with Airflow. The Airflow connection will be used by the SimpleHttpOperator to send HTTP requests to the Airbyte API .
Create a new connection in the Airflow UI by selecting Admin → Connections , and then clicking on the “+ ” button to add a new connection. Then create the connection as shown below:
Define an Airflow connection to interact with Airbyte Cloud Define the variables that will be used by the DAG As an alternative to hard-coding the connectionId and Bearer token in the DAG, you can make use of Airflow variables to define required values. In Airflow, go to Admin → Variables and click on the “+ ” symbol to add a new variable.
I store my Bearer token in an Airflow variable called CLOUD_API_TOKEN as shown below:
Store the Airbyte Cloud token in an Airflow variable I store my connectionId in a variable called MY_EXAMPLE_CONNECTION_ID as shown below:
Store the Airbyte Cloud connection identifier in an Airflow variable ℹ️For production deployments, you should consider securing your variables . However, a discussion on this topic is beyond the scope of this article. Define a DAG that triggers a sync and then waits for it to complete The following Python code demonstrates a DAG that triggers a sync , retrieves the job identifier from the JSON response, and then polls the job status until it completes. You can also find a copy of this code on github .
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.models import Variable
import pendulum
import json
"""
This file demonstrates a simple Airflow DAG that triggers a sync using the Airbyte API on a connection that
is running in Airbyte Cloud, and that then waits for that sync to succeed.
The Airbyte API endpoints that are demonstrated are "sync" and "jobs" (status).
For more information, consult the Airbyte API at https://reference.airbyte.com/reference/start
"""
AIRBYTE_CONNECTION_ID = Variable.get("MY_EXAMPLE_CONNECTION_ID")
API_KEY = f'Bearer {Variable.get("CLOUD_API_TOKEN")}'
with DAG(dag_id='airbyte_api_sync_demo',
default_args={'owner': 'airflow'},
schedule='@daily',
start_date=pendulum.today('UTC').add(days=-1)
) as dag:
trigger_sync = SimpleHttpOperator(
method="POST",
task_id='start_airbyte_sync',
http_conn_id='airbyte-api-cloud-connection',
headers={
"Content-Type":"application/json",
"User-Agent": "fake-useragent", # Airbyte cloud requires that a user agent is defined
"Accept": "application/json",
"Authorization": API_KEY},
endpoint=f'/v1/jobs',
data=json.dumps({"connectionId": AIRBYTE_CONNECTION_ID, "jobType": "sync"}),
do_xcom_push=True,
response_filter=lambda response: response.json()['jobId'],
log_response=True,
)
wait_for_sync_to_complete = HttpSensor(
method='GET',
task_id='wait_for_airbyte_sync',
http_conn_id='airbyte-api-cloud-connection',
headers={
"Content-Type":"application/json",
"User-Agent": "fake-useragent", # Airbyte cloud requires that a user agent is defined
"Accept": "application/json",
"Authorization": API_KEY},
endpoint='/v1/jobs/{}'.format("{{ task_instance.xcom_pull(task_ids='start_airbyte_sync') }}"),
poke_interval=5,
response_check=lambda response: json.loads(response.text)['status'] == "succeeded"
)
trigger_sync >> wait_for_sync_to_complete
if __name__ == "__main__":
dag.test()
Test the connection from Airflow After you have stored the above DAG in a python file located in your Airflow dags folder, a DAG called airbyte_api_sync_demo should appear in your Airflow UI. To see it, select DAGs at the top of the Airflow UI. Next, click on airbyte_api_sync_demo as annotated in the following image:
Click on the Airflow DAG
You can then manually trigger the DAG to execute by clicking on Trigger DAG as follows:
Trigger the Airflow DAG After the DAG has successfully completed, you should see a screen similar to the following:
View the status of the Airflow DAG execution Conclusion This article has shown you how to create a DAG that makes use of Airflow’s built-in HTTP operators to interact with the new user-facing Airbyte API . Specifically, you learned how to trigger a synchronization of a connection running in Airbyte Cloud and to wait for the synchronization to successfully complete. The instructions presented here can be used as a starting point towards building complex data integrations pipelines that are orchestrated by Airflow!
If you have enjoyed this article, you may be interested in reading Airflow and Airbyte OSS - Better Together , ETL Pipelines with Airflow: the Good, the Bad and the Ugly and Automate your Data Scraping with Apache Airflow and Beautiful Soup . Furthermore, you may be interested in other Airbyte tutorials , or in Airbyte’s blog . You may also consider joining the conversation on our community Slack Channel , participating in discussions on Airbyte’s discourse , or signing up for our newsletter .