Since we launched PyAirbyte in February, our goal has been to bring the power of Airbyte to every Python developer. We started by releasing our source connectors, making them accessible within the Python environment and simplifying data syncing. But we knew there was more.
Today we’re announcing a feature that many of you have been asking for and that marks the next phase of capabilities in PyAirbyte – the ability to manage and orchestrate hosted Airbyte jobs in Python .
This addition means that users of Airbyte Cloud, OSS, and Enterprise can run their hosted Airbyte jobs from the comfort of their Python scripts. Just as with the rest of PyAirbyte, you can get started with just a few lines of code . What's more, we have added the ability to read data from a hosted Airbyte sync result in Python as well!
This blog post will walk you through all the new objects and methods added to the PyAirbyte library that are relevant to hosted Airbyte connections management.
Prerequisites If you’d like to get hands-on and follow along, here’s what you’ll need:
Familiarity with Airbyte If you're already an Airbyte user, you’ll find this post particularly useful, as it builds on what you might already know about the platform. And if you’re somewhat new to Airbyte or just need a quick overview, there’s a helpful tutorial to get you started on setting up your first connection.
Getting your Airbyte Workspace ID and Connection IDs Don’t have an Airbyte workspace yet? No problem – you can start a 14-day free trial on Airbyte Cloud by following this link .
Once in your Airbyte workspace, you'll need to get your <span class="text-style-code">Workspace ID</span> and any <span class="text-style-code">Connection ID</span> you plan to use. Here’s how:
Workspace ID: Look at the URL in your workspace and you’ll find the Workspace ID right after workspace/ .Connection ID: Go to the “Connections” tab in your workspace and click on any connection. Check the URL again; you'll find the Connection ID right after connection/ '.Getting your Airbyte API Key To interact with Airbyte programmatically, you’ll need an API key for authentication. Here's how to get it:
Navigate to the Airbyte Developer Portal and enter your credentials to login. Once logged in, look for the option to create a new API key. The portal will guide you through the steps to generate this key. After obtaining your API key, make sure to store it safely. We recommend using your favorite secrets manager for this purpose. You'll need this key later. Installing PyAirbyte If you haven’t done so, here's how you can install PyAirbyte :
Virtual environment (recommended): It’s a good practice to use a virtual environment. Create one with <span class="text-style-code">python -m venv myenv</span> and activate it (<span class="text-style-code">source myenv/bin/activate</span> on Unix/macOS, <span class="text-style-code">myenv\Scripts\activate</span> on Windows). Install PyAirbyte: In your activated environment, run <span class="text-style-code">pip install airbyte</span>. That's it! You're ready to start using PyAirbyte.
Getting started with PyAirbyte and hosted Airbyte To get started with PyAirbyte Cloud Interop, you’ll create a <span class="text-style-code">CloudWorkspace</span> object in PyAirbyte. This corresponds to your workspace in Airbyte Cloud.
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
api_key="...",
)
If you instead want to connect to an OSS or Enterprise workspace, simply override the <span class="text-style-code">api_root</span> to match your hosted environment:
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
api_key="...",
api_root="https://airbyte.internal.adventureworks.com",
)
Running a Sync Job Remotely Using PyAirbyte Once you’ve created a workspace object, you can trigger a sync remotely by passing the connection ID of the connection you want to sync:
sync_result = workspace.run_sync(connection_id="...")
print(f"Sync was completed. Status is {sync_result.get_status()}.")
That’s it! You’ve successfully executed the connection that is hosted on Airbyte Cloud.PyAirbyte takes care of executing the job and waiting for completion, and it makes it easy for you to check the latest job status.
Working with SyncResult Objects Once you have a <span class="text-style-code">SyncResult</span> object, you can check status directly as shown above.
Other helpful things you can do with a SyncResult object include the following:
<span class="text-style-code">get_job_status()</span> - Returns the latest status. (This is cached after the job is complete, for better performance.) <span class="text-style-code">is_job_complete()</span> - Check if the job is in a completed status. (Also cached after completion, for performance.) <span class="text-style-code">wait_for_completion()</span> - Wait for a completed status to be reached. You can optionally include a specific maximum time to wait (default timeout is 30 minutes). <span class="text-style-code">raise_failure_status()</span> - Raises an exception if the job status is failed; Otherwise, does nothing. Getting Sync Results from Prior Executions You can use <span class="text-style-code">get_sync_history()</span> to get a SyncResult object from a previously run job, or <span class="text-style-code">get_sync_job()</span> if you already know the job ID.
# Get the last n job results
last_5_sync_results: list[SyncResult] = workspace.get_sync_history(
connection_id=connection_id,
limit=5,
)
# Or get a specific job result by ID:
my_job_id = "..."
sync_result_lookup: SyncResult = workspace.get_sync_job(
connection_id=connection_id,
job_id=my_job_id,
)
Reading Data From Sync Results This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery.
Querying Stream Data with SQL PyAirbyte provides two foundational methods for querying data in a sync result:
<span class="text-style-code">SyncResult.get_sql_engine()</span> - Retrieve a SQLAlchemy <span class="text-style-code">Engine</span> object to connect to the Destination backend. <span class="text-style-code">SyncResult.get_sql_table(STREAM_NAME)</span> - Retrieve a SQLAlchemy <span class="text-style-code">Table</span> object for a specific stream in the sync result. # Assuming a sync result set previously
sync_result: SyncResult = workspace.get_sync_job(...)
# Get a SQLAlchemy Engine object
sql_engine = sync_result.get_sql_engine()
# Query a specific table stream
my_stream_table = sync_result.get_sql_table("my_stream")
Working with PyAirbyte Datasets When using any of the supported SQL database backends, you can access the raw data of a stream through the <span class="text-style-code">streams</span> property.
# Assuming a sync result set previously
sync_result: SyncResult = workspace.get_sync_job(...)
my_dataset: CachedDataset = sync_result.streams["my_stream"]
# Retrieve data as a Pandas Dataframe
my_pandas_df: pandas.DataFrame = my_dataset.to_pandas()
# Retrieve data as LLM documents
my_llm_docs: Iterable[ab.Document] = my_dataset.to_documents()
# Retrieve SQL table for data stream
my_sql_table: sqlalchemy.Table = my_dataset.to_sql_table()
When accessing data through the <span class="text-style-code">streams</span> dataset, you are directly accessing data from the backend SQL data store. If you are retrieving data using a past <span class="text-style-code">SyncResult</span>, note that this method will also include the latest data and is not necessarily specific to a certain execution.
Advanced Options Submitting Jobs Asynchronously By default, PyAirbyte will wait for completion when you call <span class="text-style-code">run_sync()</span>. You can instead return immediately by setting <span class="text-style-code">wait=False</span>. Then you can check on the connection yourself by calling <span class="text-style-code">get_status()</span> or <span class="text-style-code">is_complete()</span> on the <span class="text-style-code">SyncResult</span> object:
sync_result = workspace.run_sync(wait=False)
while not sync_result.is_complete():
# Do other things while we wait...
sleep(5)
print(f"Sync was completed. Status is {sync_result.get_status()}.")
Opening up Hybrid Use Cases Now you can harness the Power of Airbyte running locally and remotely, here as some example use cases for the “hybrid” deployment options.
PyAirbyte as a Mini-Orchestrator Now with the latest version of PyAirbyte, you can use PyAirbyte as a mini-orchestrator. For instance, you can run a sequence of jobs in a specific order, and you can trigger Airbyte jobs in response to custom business logic and custom features. For instance, you can trigger Python code in response to a request. That custom code can run a sync, check status, and report results back as a Slack message.
Here’s a working example of how you can do it!
PyAirbyte as a SlackBot backend Have you ever wanted to trigger a job directly from Slack? Now you can build a simple PyAirbyte chatbot as a Slack app, responding to Slack messages (or other triggers) and running ELT jobs directly in response to your trigger.
Chaining Transform and Publish Jobs after EL You can now run a full ELTP job , where EL (extract/load), T(ransform), and P(ublish) steps are run in sequence, each running immediately after the prior step has completed.
# This is the job that pulls raw data:
extractload_job_connection_id = "..."
# This job publishes to an external service, like a vector DB:
publish_job_connection_id = "..."
def do_my_custom_transforms():
# Custom transformation code goes here...
def run_eltp():
"""Runs Extract, Load, Transform, and Publish."""
# Start by running the extract-load job to capture raw data
workspace.run_sync(extractload_job_connection_id)
# Transform data using your own custom methods:
do_my_custom_transforms()
# Publish the resulting datasets
workspace.run_sync(publish_job_connection_connection_id)
# Run it!
run_eltp()
Coming Soon We’re very excited about this release, but we’re not done yet! There is one more important release on our roadmap we’d like to share with you.
Using PyAirbyte for Automated Deployments to Airbyte Cloud We have heard from existing Airbyte users that it is helpful if we can go further to automate this process - so that a working PyAirbyte job can be promoted directly to a remote Airbyte instance, without requiring interaction with the Web UI. This automated deployment will speed up iteration and open up new use cases for “Infrastructure as Code”, rapid development, CI/CD-based deployments, and more.
FAQ Q: Does PyAirbyte have a scheduler? No. PyAirbyte does not have a native scheduler, but you can connect your PyAirbyte jobs to other scheduling and orchestration tools.
Q: How does PyAirbyte Interop work? PyAirbyte orchestrates your remote Airbyte instance by wrapping the Airbyte API in a convenient Python developer experience. We leverage the powerful Speakeasy SDK development library for code generation of Python interfaces on top of the REST API.
Wrapping Up At Airbyte, we are always looking for ways to make your data management journey smooth and efficient. We know that most data professionals build their solutions in Python, and we wanted to meet you where you are and enhance your data tooling integration experience! These updates empower you to orchestrate multiple Airbyte syncs into a single flow like never before.
Do you have any questions or feedback for us? You can keep in touch by joining our Slack channel . If you would like to keep up to date with new Airbyte and PyAirbyte features, subscribe to our newsletter.