How to Build ETL Workflows and Processes?

October 28, 2024
20 min read

Data migration is an essential component of most organizations that deal with huge amounts of data on a daily basis. However, moving data between platforms can be error-prone and time-consuming if not paid attention. To achieve better performance and optimize costs associated with business processes, you must develop efficient data pipelines that connect different platforms.

One of the commonly used data migration processes includes ETL workflows, which enable you to extract, transform, and load data into a destination. The ETL process has been popular for a long time, with users frequently searching for the term on Google.

ETL Process Interest Over Time

This guide will take you through the concept of ETL workflows and how to use them to enhance your day-to-day data integration requirements.

What Is ETL Anyways?

ETL

Extract, Transform, and Load, or ETL, is the process of extracting raw data from a source, transforming it, and loading it into a desired location. Some of the most commonly used sources in an ETL process include marketing platforms, ERP solutions, and productivity tools. Migrating data from these sources into a data warehouse or a database allows you to create a centralized repository.

By consolidating data into a centralized repository, you can enhance data accessibility within your organization. With the help of your analytics team, you can generate impactful insights from this data, which can lead to data-driven decision-making.

How to Build ETL Workflows & How Airbyte 1.0 Saves the Day?

Building an ETL workflow depends on your specific needs and how you want your data to appear at the destination. There are two different methods to perform the ETL process, including batch and streaming ETL. In batch ETL, the data movement occurs in large data batches. Streaming ETL, on the other hand, enables you to stream data from source to destination in real-time.

Batch ETL Workflow

In this ETL workflow, data is processed in batches from numerous sources to a destination of your choice. Let’s explore the steps involved in building a batch ETL workflow.

  • Create a dataset or select existing data in any source that you want to migrate to your destination system.
  • Extract data from the chosen source. This extracted data could be in any form, including JSON, CSV, or XML.
  • Transform and validate the data by performing data cleaning, aggregating, and applying business rules, making it compatible with the destination format.
  • Move your data to the required destination.

Streaming ETL Workflow

In streaming ETL workflows, data is streamed from source to destination as soon as it is generated in the source system. This process requires you to clean and process data as it travels to the destination. To build a streaming ETL pipeline, you can use stream processing tools like Apache Kafka.

Streaming ETL Workflow

You can follow these steps to create a streaming ETL workflow:

  • Data Extraction: To extract data from different sources into a Kafka topic, you can use JDBC connectors for that source or tools like Kafka Magic. Whenever you add data rows to the source table, Kafka automatically replicates the data to the Kafka topic as messages.
  • Pulling Data from Kafka Topics: In the next step, you can pull data from Kafka topics in AVRO format, which is useful in generating KStream objects.
  • Stream Processing: The stream processor enables you to transform KStream objects by filtering and executing operations like aggregation.
  • Stream Data to a Destination: Stream the data into a destination system like a data warehouse, data lake, or a storage application like Amazon S3.

Although both the manual batch and streaming ETL processes are effective in migrating data between different applications, they have a few limitations. These processes require you to have a thorough understanding of the tools and programming languages to conduct ETL operations. When performing manual ETL processes, you can encounter errors that might be time-consuming to resolve.

To overcome these challenges, you can use ETL tools and streamline your data integration requirements. Tools like Airbyte enable you to simplify building data pipelines with their extensive library of 400+ pre-built connectors.

Airbyte

If the connector you seek is unavailable, you can use Airbyte’s Connector Builder or Connector Development Kit (CDK) to build custom connectors within minutes. To enhance your connector building experience, the no-code connector builder offers an AI-assist functionality. The AI-enabled assistant quickly reads through your preferred platform’s API docs and automatically fills most of the necessary fields.

Step-by-Step Process to Build ETL Workflows with Airbyte

Let’s explore how to develop data pipelines using Airbyte’s open-source Python library, PyAirbyte. This library enables you to leverage Airbyte connectors to extract data from various sources and load it into SQL caches. With its incremental data reading feature, PyAirbyte enables you to automate tracking source data changes. These changes are replicated in the destination, allowing you to work on the latest data automatically.

For this example, suppose you want to migrate data from a cryptocurrency market API like CoinAPI into Snowflake for analysis. To get started with this migration, you must have a CoinAPI API key and a Snowflake account with a database.

Execute the code in this section in your preferred code editor to migrate data using PyAirbyte:

  • Install PyAirbyte by executing the code below.
%pip install --quiet airbyte
  • Define a PyAirbyte cache for Snowflake by mentioning all the placeholder credentials in this code snippet.

from airbyte.caches import SnowflakeCache
from google.colab import userdata

sf_cache = SnowflakeCache(
      account=userdata.get("SNOWFLAKE_ACCOUNT"),
      username=userdata.get("SNOWFLAKE_USERNAME"),
      password=userdata.get("SNOWFLAKE_PASSWORD"),
      warehouse="AIRBYTE_DEVELOP_WAREHOUSE",
      database="AIRBYTE_DEVELOP",
      role="AIRBYTE_DEVELOPER",
      schema_name="PYAIRBYTE_DEMO"
)
  • Configure the CoinAPI source connector and extract data by replacing the necessary credentials. If you only want to use selected parts of the dataset, you can mention the streams that you want in your data.

import airbyte as ab

read_result = ab.get_source(
    "source-coin-api",
    config={
          "api_key": userdata.get("API_KEY"),
          "environment": "production",
          "symbol_id": "COINBASE_SPOT_INDEX_USD",
          "period": "1DAY",
          "start_date": "2023-01-01T00:00:00"
    },
    streams=["ohlcv_historical_data", "trades_historical_data", "quotes_historical_data"],
).read(cache=sf_cache)
  • Convert this data into Pandas DataFrame to perform transformations. By executing the code below, you can create a table with open, high, low, close, and volume data of cryptocurrencies.

ohlcv_df = read_result["ohlcv_historical_data"].to_pandas()
  • To manipulate data, we will convert the date into the ‘DateTime’ datatype and the numeric columns into the numerical datatype. Pandas’ capabilities allow you to create a daily movement column to analyze the coin’s daily price movements.

import pandas as pd

ohlcv_df['time_period_start'] = pd.to_datetime(ohlcv_df['time_period_start'])
numeric_columns = ['price_open', 'price_high', 'price_low', 'price_close', 'volume_traded', 'trades_count']
ohlcv_df[numeric_columns] = ohlcv_df[numeric_columns].apply(pd.to_numeric, errors='coerce')

ohlcv_df['daily_movement'] = ohlcv_df['price_close'] - ohlcv_df['price_open']
  • Get SQL engine from Snowflake cache to interact with Snowflake database.

engine = sf_cache.get_sql_engine()
  • After transforming the data, load it directly into Snowflake using its Python library.

pip install snowflake

import snowflake

from snowflake.connector.pandas_tools import pd_writer

ohlcv_df.to_sql('daily_movement', engine, index=False, method=pd_writer, if_exists='replace')

Executing the given process provides you with the flexibility to transform your data according to your specific requirements using Python. However, if you want to build an ELT workflow, use the simplified no-code UI that Airbyte provides to perform data integration. Airbyte’s pre-built connectors enable you to replicate data quickly from multiple sources and move it to your preferred destination. To perform transformations on the data, you have the flexibility to use dbt, a robust data transformation tool.

Monitoring & Maintaining ETL Workflows

When building an ETL workflow using a manual method, you must pay additional attention to monitor the process. This is to ensure that the data quality is not degraded during migration. Here are a few data pipeline monitoring and maintenance tips:

  • You must monitor the extraction process on multiple levels to ensure optimal resource usage and eliminate errors that might corrupt the data.
  • When working with APIs, ensure that API changes don’t break your ETL workflow. This can happen due to version updates or the expiration of access tokens.
  • In real-world systems, the source data changes frequently. You must validate your ETL workflow to account for these changes and keep your destination updated.

However, if you prefer a SaaS-based solution like Airbyte, most of your monitoring needs can be automated. With features like schema management, refreshes, and change data capture, Airbyte reduces manual intervention while monitoring data pipelines.

Scaling ETL Processes

  • Scalability is a necessity for modern-day applications. You must consider building ETL workflows that accommodate growing data volumes.
  • Understand the amount of data to be processed within the service level agreements (SLAs). This can help you build systems that meet your business requirements.
  • It’s better to design a flexible processing system before deploying the model for production to handle growing data loads.
  • You can also use data integration tools to automate data pipelines, freeing you from stressing over infrastructure management and scaling issues.

ETL Workflow Practical Examples

There are multiple use cases of ETL workflows. Let’s explore a few examples that can help you better understand the benefits.

Suppose you want to build a chatbot to search issues from your GitLab repositories. For this purpose, you can use Retrieval-Augmented Generation (RAG), a type of generative AI, to enhance your chatbot’s ability to answer context-specific questions.

You can follow along with this tutorial by executing the code snippets in your preferred code editor, like Jupyter Notebook. In this tutorial, we will use PyAirbyte to extract data from GitLab and build an RAG data pipeline.

Configure a PyAirbyte connection

After establishing a connection with GitLab, you can select the number of streams you want to work with. For instance, you can extract the issues stream from the data by executing the code below. When working with AI frameworks, storing this data in document format can help you improve training natural language processing (NLP) models.

Select issues stream from GitLab

You can now use AI frameworks like LangChain to perform chunking and embedding operations on this data to transform it and make it compatible with LLM applications.

Generate Vector Embeddings from GitLab Issues

These embeddings can be stored in one of the popular vector databases, such as Qdrant, Pinecone, or Weaviate, to enhance high-dimensional vector data management. You can execute the code below to store the data in Qdrant.

Setting Up Qdrant

After performing the ETL steps, you can use LangChain’s capabilities to train LLMs with this data. With the code mentioned below, you can use the data stored in Qdrant to develop a model using the ChatOpenAI extension of LangChain.

Train LLM using genereted vector embeddings

When you execute the code above, the success message indicated in the last line of the code will appear on your screen.

After performing these steps, you can ask questions from your model to ensure the process was performed correctly.

Testing the model

With this method, you can generate your own LLM to answer internal technical queries related to GitLab issues. The ETL workflow plays an essential role in simplifying data migration between dispersed data sources and vector databases, reducing the time required to move data. Follow this tutorial to learn more about building an RAG pipeline.

A Great Example of How to Extract Data from GA4 with Airbyte

Let’s look at another example of how Airbyte helps you extract data from Google Analytics 4—a web analytics platform. This data can be analyzed to derive actionable insights, which allows you to improve website traffic management.

Build this pipeline by utilizing PyAirbyte. Start configuring the GA4 connection by executing the code mentioned in this section.

GA4 Connection in PyAirbyte

Select the required streams you want to analyze and load them into the cache.

Extracting Streams from GA4

After extracting the GA4 data into the cache, you can use Python’s capabilities to analyze this data.

Converting GA4 Data into Pandas DataFrame

Next, import the necessary libraries to analyze and visualize this data. For instance, you can convert date, screenpageviews, and bouncerate columns into an analysis-ready format.

Transforming the data

To analyze the trends in page views and the bounce rate, you can aggregate and group the data by date.

Analyzing the data

You can also use the Seaborn library to compare metrics across different pages on weekdays and weekends to better understand your online business.

Data Visualization

Eventually, you can load this data and insights into your preferred destination, such as a data warehouse or a storage solution, to gain a unified view. Consolidating this data enables you to quickly share insights with your team and create a collaborative environment.

Conclusion

ETL workflow provides you with the capabilities to simplify complex processes, improve data quality, and enhance data management, fostering a data-driven environment within your organization. Before building an ETL workflow, there are a few factors that you must consider to improve performance. Understanding aspects like specific business requirements and the amount of data to deal with is crucial to developing effective ETL workflows.

To automate your processes, improve scalability, and reduce infrastructure overhead, you can use data integration tools like Airbyte.

Limitless data movement with free Alpha and Beta connectors
Introducing: our Free Connector Program
The data movement infrastructure for the modern data teams.
Try a 14-day free trial