In this blog post, we will go through how to set our data pipelines using the flexible and customizable features that PyAirbyte offers to Data Engineers looking for new open-source libraries that we can use for these purposes.
We will be learning, reviewing, and putting into action PyAirbyte features and performing useful and scalable ETL processes that are robust and efficient.
Quick overview of a common ETL process The main role of any ETL (Extract, Transform, Load) process is to gather the data from a large variety of sources (relational or not relational databases or even APIs), transform it into a format that can be used later on, and then analyze that data in your favorite BI tools to get actionable insights.
ETLs are a fundamental stage for every data management strategy, as they play an important role in data warehousing, business intelligence, and data analytics. Choosing the right tools and building robust and efficient pipelines becomes key to delivering accurate data to business stakeholders.
Some of the basic tasks that we, data producers are looking for our ETL processes to achieve are:
Extracting features from raw data ('E' part of the process): Data in its raw format is not very useful for data consumers, so we would need to find efficient ways to interpret and analyze that data into a usable format.A common example of tasks we usually perform during the Extraction phase is described in the chunk of code below where a Python function is defined, this function uses the requests Python library to connect to an API endpoint, the API response is typically a dictionary, which can be parsed using the JSON module:
url = 'https://api.github.com'
#Defines a function to extract data from API endpoint
def get_users(URL: str) -> pd.DataFrame:
"""
Extracts data from GitHub API endpoint
Args:
URL (str): URL that directs to API endpoint.
Returns:
df: The JSON data extracted from the API call.
"""
#Defines an empty DataFrame
df = pd.DataFrame()
for i in range (0,9,1):
r = requests.get(f'{URL}/users?since= + {i}')
data = r.json()
#Converts JSON response to dataframe
table = pd.DataFrame.from_dict(data)
#Appends data from newly dataframe created that contains API response
df = pd.concat([df,table])
#Waits 2 seconds to perform the next API call
time.sleep(2)
return df
Source data transformation ('T' part of the process): Here, the raw data is transformed from different data formats into a unified format that is generally suitable for data consumers (CSV, Excel, txt files, etc).Frequently some transformations are done between any ETL process, for this purpose pandas is a very suitable library to transform raw data in JSON format into a more readable for business users. In the code below the API data extracted in the previous stage is enriched by creating new columns from the existing ones, and some others are transformed.
#Function that parses extracted data
def parse_df(df):
"""
Parses the data extracted from the API call.
Args:
df (pd.DataFrame): Data extracted from the API converted into a DF
Returns:
df: pandas DataFrame with parsed data.
"""
df = df.iloc[1: , :]
df = df[['id','login','node_id','avatar_url','gravatar_id','url','html_url', 'followers_url','following_url','gists_url','starred_url',
'subscriptions_url','organizations_url','repos_url','events_url',
'received_events_url','type','site_admin','login']]
return df
Loading the data ('L' part of the process): This is the final step of every ETL process, and this is when the data is ready to be consumed in data warehouses, data silos, or repositories.Generally, the data that was extracted, manipulated, and transformed according to the end user’s needs, can be usually persisted in a data warehouse for further consumption, in the example below, the data is being loaded into a Postgres database.
#Defining method to load data to Postgres table
def load_data_to_postgres(df: pd.DataFrame, table_name: str):
"""
Loads the extracted data from API into a Postgres database
Args:
df (pd.DataFrame): name of the dataframe that holds the extracted data
table_name (str): name of the Postgres table
Returns:
None
"""
# Loads data to PostgreSQL database
try:
#loads data if the table
df.to_sql(table_name, con=engine, if_exists='append')
except:
print('Data was not loaded to the database')
There are many use cases in which an ETL solution will come in handy, from data warehousing to business intelligence, data migration, and data integration .
Nowadays, some other more modern and performant alternatives to traditional ETL have emerged, which rely on the ELT approach, where the data is transformed after it’s been loaded. This functionality makes the entire process much more efficient and reliable.
Getting to know PyAirbyte PyAirbyte is a library currently in its beta release, that provides a set of utilities to use Airbyte connectors within Python, all the power and versatility of Python together with Airbyte’s data integration power, the best of both worlds.
Its functionalities can be tested and executed in any environment that supports Python>= 3.9. It has lots of useful features and connectors to design fully automated ETL processes that simplify integration into existing workflows and also reduce the need from developers for costly and error-prone ETL scripting by providing a widespread list of pre-built connectors.
PyAirbyte architecture According to PyAirbyte's official documentation , its architecture is comprised as follows:
Source: A source object uses a Python connector and includes a configuration object. The configuration object is a dictionary that contains the configuration of the connector, like authentication or connection modalities. The source object is used to read data from the connector. Cache: Data can be read directly from the source object. However, it is recommended to use a cache object to store the data. The cache object allows to temporary storage of records from the source in an SQL database like a local DuckDB file or a Postgres or Snowflake instance. Result: An object holding the records from a read operation on a source. It allows quick access to the records of each synced stream via the used cache object. Data can be accessed as a list of records, a Pandas DataFrame, or via SQLAlchemy queries. From the above diagram, we can notice that PyAirbyte serves from a set of 3rd party APIs , which sends the requested data configured in the source connectors, then the result according to the configuration set in the Python script running Airbyte loads that data records through SQLAlchemy into multiple SQL caches like DuckDB, Postgres, among others.
Building an ETL process using PyAirbyte GitHub connector Once we have a clear landscape of what a traditional ETL process involves, and that we have a first sight of what is PyAirbyte, what can offer to us and which are its main features, we can start by putting it into action by building a sample ETL process using one of its built-in connectors source-github .
Installing PyAirbyte PyAirbyte can be installed easily in your local Python environment, even though it is strongly recommended to create a virtual environment through Python venv library , or a conda environment , and just run pip install --quiet airbyte in your command line. A great option is to use Google Colab, which is a cloud-hosted Jupyter Notebook with all PyData ecosystem libraries already installed and ready to use. PyAirbyte of course is not installed by default in Google Colab, so the first thing that we would need to do is to set our virtual environment in Google Drive:
# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv
# Install PyAirbyte
%pip install --quiet airbyte
Inside that virtual environment in Google Colab, we will run the command to install PyAirbyte. I have made available the code used in this tutorial on my GitHub page in the repository following this link .
Choosing a Data Source There are a bunch of connectors (data sources) to choose from and start pulling data to use in our data pipelines, they are available on the Getting Started page in the section Available Connectors . To choose one of the connectors to work we will first import the Airbyte library and then get all the available connectors:
# Import PyAirbyte
import airbyte as ab
# Show all available connectors
ab.get_available_connectors()
By executing the code above in your notebook, you will get a long list of sources as the output
Configuring your ETL pipeline Just as we previously did in the case of the traditional ETL process pulling data from GitHub API , there we had to create some helper functions to pull the data from the API and after that parse the data extracted and perform some custom transformations.
In the case of the ETL process built with PyAirbyte, we pulled some data from GitHub using the connector specially designed for this purpose. Once we have pulled some data from GitHub, we will be ready to take advantage of the flexible data stream management and versatile caching options (we will talk more about caching later) that PyAirbyte offers.
To continue working on this tutorial, you will need to set up your GitHub personal access token. For more details on how to get it follow this link .
In the chunk of code below, we are using the GitHub connector to pull some data, when configuring the source to pull data from GitHub, we can set any repository/repo name to the config parameter. In the code that I prepared together with this tutorial, I am using a medium-sized repository, so we don’t wait too much time pulling data, you can use your own, or any other public repo to give a try to this connector.
Once we have set our pipeline, we will perform some custom transformations, manipulations, and analysis using SQL and pandas. In the meantime, we will establish a connection to the source-github connector by setting the required parameters.
import airbyte as ab
# Create and configure the source:
source = ab.get_source(
"source-github",
install_if_missing = True,
config = {
"repositories": ["rilldata/rill"],
"credentials": {
"personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
},
},
)
# Verify the config and creds by running `check`:
source.check()
After configuring the source connector, we use the check() function to ensure that the configuration is correct and also we are correctly connected to the API.
Executing Data Transformations Once you have selected a data source and loaded it, is when the fun begins, you will have the possibility to choose a stream. The code below is self-explanatory, by using the get_available_streams()
you have a wide range of streams available to choose from and start pulling data.
Then you can choose one or more of those streams to start performing transformations, wrangling, and data manipulations. In the Jupyter Notebook, I have selected the pull_requests, issues, and commits data streams.
A very nice feature of the PyAirbyte github_source connector is that the data is by default read incrementally, and what does that mean? That means that after the first successful data sync, only new records will be synced into the cache for subsequent reads, which makes more efficient data pipelines.
So if we read data for a second time from the source, no records will be loaded into the cache.
Reading Data from Cache Now that we have some data to work and it is cached, we can read it and use it without re-configuring or re-creating the source object. Cached data can be easily transformed into Pandas DataFrames by running the below code:
# Read from the cache into a pandas dataframe:
pull_requests = cache["pull_requests"].to_pandas()
issues = cache["issues"].to_pandas()
commits = cache["commits"].to_pandas()
So once we have stored some data in pandas DataFrames, for those familiar with this versatile data analysis library, things will become easier as we can start performing transformations and data enrichment, and we can even incorporate other data manipulation and visualization libraries into our analysis, such as numpy and matplotlib .
So far we can work with 3 dataframes that contain data from pull_requests , issues, and commits from our public repository, and as we stated before we will perform some EDA with this data, transformations if needed and some visualizations to give a better understanding of from it.
As you can notice in the code above the cached data was transformed into a pandas DataFrame structure by using the to_pandas() method which passes the record iterator to a pandas dataframe constructor.
The other PyAirbyte method that can be used is to_sql_table(), which simply works as a SQL constructor that translates the cached data into a SQL schema that we can use as SQL tables to perform queries in ANSI SQL .
You can find more details on how to work with SQL-like queries inside your PyAirbyte pipelines in the Jupyter Notebook that I prepared with this tutorial in my GitHub and also on the PyAribyte GitHub quickstarts page .
Conclusion As we have shown you in this tutorial, migrating your traditional ETL processes written in pandas and other data manipulation libraries to PyAirbyte is a breeze. Incorporating PyAirbyte into your data pipelines will make data ingestion a very straightforward process, as it offers a wide variety of source connectors fully configurable and with flexible data stream management, caching is one of its coolest features, ingesting your data right into SQL and then just consuming that data will make your ETL process simple, robust and efficient.