Partition and cluster BigQuery tables with Airbyte and dbt

Learn how to modify the dbt code used by Airbyte to partition and cluster BigQuery tables.

As your business grows, the size of your BigQuery tables will inevitably grow as well. Scanning entire tables to answer business questions will become slower and more expensive as a result. Fortunately, modern data warehouses provide options to limit how much data each query scans. 

In this article, you will first learn how to use Airbyte to load data from any source to BigQuery. Then, you will learn how to customize the dbt models used by Airbyte to partition and cluster BigQuery tables.

What is a partitioned table?

A partitioned table is a kind of table that stores your data in smaller chunks (partition) based on the value of a column. For example, if your table contains all of your company’s orders, one partition could be one of the dates. Storing data this way makes it faster to retrieve data as the query engine does not have to scan the entire table, which can be very large. Since BigQuery charges you based on the bytes scanned for any query, it can be significantly cheaper to use a partitioned table. 

There are different strategies to partition a table, depending on how you plan to query the data. Here are the options in BigQuery:

  • Using a time-unit column: you can partition any column with a data type of TIMESTAMP, DATE, or DATETIME in the table. This strategy is often used for modeled tables that business users can query. Example queries are: how many new users signed up this year, how many products were sold in the last 30 days, etc. 
  • Ingestion time: the time that the data is ingested into the BigQuery table. This strategy is often used in the ETL/ELT process. A common use case is to take data modified in the past day and merge it with a historical table to remove duplicate records. 
  • Integer range: the table is partitioned using an integer column. This approach is less used in practice because the number of partitions can easily exceed BigQuery’s limit (4,000).

What is a clustered table?

A clustered table works similarly to a partitioned table and is also implemented to increase query performance and save costs. However, unlike a partitioned table where you can only define one partition column, you can configure up to four clustered columns for a table. Depending on the clustering configuration, the related data will be stored together. Since data is not separated as in a partitioned table, you will only know the cost for a query after running it.

The order that you define the cluster columns is also important. Similar to how you sort a table in Excel, BigQuery will sort the data using the first cluster column followed by the next one. Which column you choose first depends on how you plan to query your data. For example, if your business operates across regions, users will only want to look at their territory. In this case, it makes sense to cluster your data by the region column first, followed by the order status column, instead of the other way around. 

Partitioned table vs. Clustered table

The BigQuery documentation does a great job of explaining the differences between partitioned and clustered tables. I have summarized the main differences in the table below.

Load data to BigQuery with Airbyte

There are many different ways to load data to BigQuery. You can manually extract data from sources to a CSV or write custom scripts to do that. A much better and faster way to load data to BigQuery is to use Airbyte built-in connectors. Let’s walk through the steps to do that in the next section. 

Pre-requisites

  • A Google Cloud project with billing enabled
  • A Service Account with at least BigQuery User permission and a Service Account Key

Run Airbyte locally

First, start an Airbyte instance locally with:


git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up

Visit the Airbyte deployment using your browser of choice at http://localhost:8000. Finish the onboarding process to get to the home screen. 

Set up a Google Cloud Storage file source

Create an Airbyte data source to extract data from a public Google Cloud Storage file with sample COVID data.

Set up a BigQuery destination

To create a BigQuery destination you need to fill in the Google project id and the service account key that you created previously.

Sync data from GCS to BigQuery

The final step here is to configure the sync from your data source to BigQuery. Select a sync frequency, sync mode for each of the tables, and choose Basic normalization. Select no custom transformation at this stage so that Airbyte generates the base normalization code for you. 

With everything set up, you can create the connection and launch the first sync. Wait for it to finish, and you can check that the data is present in BigQuery. You can see that the covid_epideemiology table is replicated in the covid_data dataset along with a table named _airbyte_raw_covid_epidemiology. Airbyte wrote the raw JSON data to the second table and used the normalization code to create the replicated table.


Partition and cluster BigQuery tables with dbt

Next, you will learn how to use dbt to partition and cluster data in BigQuery. By default, Airbyte uses dbt to normalize the extracted data. You can extract and modify the dbt code used by Airbyte to apply custom transformations to the loaded data.

Extract the generated dbt code by Airbyte

Based on the Transformations with SQL documentation, you first have to extract the code that Airbyte generated. Go to the connection File - Covid Data, and open the latest successful job. Note down the folder that the process ran in from the top section of the log. In my case, it was /tmp/workspace/1/0 with 1 corresponds to the Airbyte job_id, and 0 to the attempt_id. Let’s create an environment variable so that you can easily reuse my code snippets. 


NORMALIZE_WORKSPACE="1/0"

Alternatively, you can run the following command to get the workspace folder for the latest job.


NORMALIZE_WORKSPACE=`docker run --rm -i -v airbyte_workspace:/data  busybox find /data -path "*normalize/models*" | sed -E "s;/data/([0-9]+/[0-9]+/)normalize/.*;\1;g" | sort | uniq | tail -n 1`

With the environment variable set, you can verify that the project is configured correctly by running:


#!/usr/bin/env bash
docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization debug --profiles-dir=. --project-dir=.

This snippet will run the dbt debug command inside the dbt folder in the Airbyte server container. If you received an error, double-check that the environment variable has been set. If everything is working, you should see an output like this: 


Running with dbt=0.19.0
dbt version: 0.19.0
python version: 3.8.7
python path: /usr/local/bin/python
os info: Linux-5.10.47-linuxkit-x86_64-with-glibc2.2.5
Using profiles.yml file at ./profiles.yml
Using dbt_project.yml file at /data/1/0/normalize/dbt_project.yml

Configuration:
  profiles.yml file [OK found and valid]
  dbt_project.yml file [OK found and valid]

Required dependencies:
 - git [OK found]

Connection:
  method: service-account-json
  database: demobox-313313
  schema: covid_data
  location: None
  priority: None
  timeout_seconds: 300
  maximum_bytes_billed: None
  Connection test: OK connection ok

Based on the Transformation with dbt documentation, you can extract the generated dbt code to an external directory. Replace the PROJECT_PATH variable with your desired path.


#!/usr/bin/env bash

PROJECT_PATH="$(pwd)/covid-data"
rm -rf $PROJECT_PATH/
mkdir -p $PROJECT_PATH

docker cp airbyte-server:/tmp/workspace/$NORMALIZE_WORKSPACE/normalize/. $PROJECT_PATH

NORMALIZE_DIR=$PROJECT_PATH/normalization-files/normalize
cd $NORMALIZE_DIR
ls

And that’s it. You have extracted the dbt code that Airbyte used. Now, let’s look at how to add partition and cluster configurations for the replicated tables. 

Configure BigQuery partitioned tables

The dbt code for the covid_epidemiology table will be in the following directory: $PROJECT_PATH/models/airbyte_tables/covid_data/.

In this case, it makes sense for us to partition the table by the date column as users will most likely want to query COVID cases during a specific period. You can add a partition_by key to the dbt config block. Based on the dbt BigQuery documentation, the value of the partition_by key should follow this schema:


{
 "field": "<field name>",
 "data_type": "<timestamp | date | datetime | int64 >",
 "granularity": "< hour | day | month | year >"


 # Only required if data_type is "int64"
 "range": {
   "start": <int>,
   "end": <int>,
   "interval": <int>
 }
}

Note that the data type for the partition column should be a date/time-related column. Since you are getting the data from an API endpoint instead of a relational database, there is no table schema for Airbyte to fetch. As a result, the date column is set to type string, and you need to convert it to type date. Modify the covid_epidemiology.sql file to look like this: 


{{ config(
   schema="covid_data",
   tags=["top-level"],
   partition_by={
       "field": "date",
       "data_type": "date"
   }
) }}
-- Final base SQL model
select
   key,
   date(date) as date,
   new_tested,
   new_deceased,
   total_tested,
   new_confirmed,
   new_recovered,
   total_deceased,
   total_confirmed,
   total_recovered,
   _airbyte_emitted_at,
   _airbyte_covid_epidemiology_hashid
from {{ ref('covid_epidemiology_ab3') }}

Alternatively, you can change the data type casting in the intermediate model with the following path: airbyte_ctes/covid_epidemiology_ab2.sql. If you have dbt installed locally, you can run this code right from your command line using:


# Install dependencies
dbt deps --profiles-dir=$PROJECT_PATH --project-dir=$PROJECT_PATH
# Run dbt models
dbt run --profiles-dir=$PROJECT_PATH --project-dir=$PROJECT_PATH --full-refresh

If you check the table in BigQuery, you can see that the table is now a partitioned table.

Trying to query the table using the partitioned column will process less data than querying the whole table. Note that the raw Airbyte table is not clustered/partitioned yet, so every time Airbyte runs, it will scan the entire raw table. 

Configure BigQuery clustered tables

Similar to how you configure a partitioned table, you can do so for a clustered table by modifying the config block. The cluster_by variable will be used for this and can take in a single value or a list of up to four values. The config block will look like this for the clustered table: 


{{ config(
   schema="covid_data",
   tags=["top-level"],
   partition_by={
       "field": "date",
       "data_type": "date"
   },
   cluster_by=["key"]
) }}

Notice that you are creating a table with both partitioned and clustered columns here. After running the model, checking on BigQuery shows that the table is both partitioned and clustered. 


Adding custom dbt transformations to Airbyte

To add back the transformations to Airbyte, you need to create a new git repo and push your modified code there. Before pushing your code it is important to change a variable in the dbt_project.yml file. Due to an issue outlined here, you must change "modules-path: /tmp/dbt_module" to "modules-path: ../dbt_modules"

It is also worth noting that the profiles.yml file contains sensitive information and should not be pushed to a public repository. You do not need this file to run custom transformations, so be sure to add it to the .gitignore file. If you want to push your code to a private repository, follow the instructions here

After having your code stored in a remote repository, let’s open the settings page for the connection. Here you should select "Raw data - no normalization" as you will be running the custom normalization. Add two transformation steps, one for dbt deps and one for dbt run similar to the following images.

That’s it. The next time Airbyte triggers the connection, the custom transformations will run, and you will have a partitioned and clustered table. 

Since you have replaced the default normalization step, be careful if you make changes to the connection. For example, if you add a new table or column, the normalization step for the added entity does not exist in your custom code and thus will not run. Similarly, if you remove a table and do not change the custom normalization code, the dbt process will fail. Be sure to repeat the "Extract the generated dbt code from Airbyte” step above to get the Airbyte generated code for your new tables.

Conclusion

In this article, you have learned what partitioned, and clustered tables are in BigQuery. You have also learned how to extract the dbt transformation code Airbyte generates and configure a partitioned and clustered table. From here, you can have Airbyte trigger an entire ETL/ELT pipeline for you without needing to set up any extra orchestration infrastructure.

If you would like to get involved, we hope you’ll join Airbyte’s Slack community, the most active community around data integration!

About the author

Tuan Nguyen is the CTO of Joon Solutions, a Data as a Service company. Tuan has built and managed data science teams, analytics infrastructure, and analytics use cases.

Similar use cases

Airflow and Airbyte OSS - Better Together

Learn how to create an Airflow DAG (directed acyclic graph) that triggers Airbyte synchronizations.

Explore Airbyte's incremental refresh data synchronization

Learn how Airbyte’s incremental synchronization replication modes work.

Version control Airbyte configurations with Octavia CLI

Use Octavia CLI to import, edit, and apply Airbyte application configurations to replicate data from Postgres to BigQuery.