Monitor ELT pipelines with DVT and dbt tests

Learn how to monitor data replication and migration pipelines using Google's Data Validation Tool (DVT) and dbt tests.

An important part of building an ELT data pipeline for data replication or data migration is the ability to monitor and get notified when there are errors. When you are unaware of errors, your data will contain inconsistencies and your reporting will be inaccurate. The complexity of most pipelines, due to the number of tools used, makes setting up a monitoring and alerting system even more challenging.

In this article, I share why it is important to set up a monitoring and alerting system for your ELT data pipeline. I explained what key metrics to monitor, and common challenges you will encounter setting up a monitoring and alerting system. I further highlight the different monitoring and alerting tools and showed how to implement a typical monitoring/alerting system using Google’s Data Validation Tool (DVT) and data build tool (dbt).

Why monitor your data pipelines?

Well, the question should be, “Why not monitor your data pipelines''?. This is because having your pipelines run as a black box can be very costly for the business. Let me share a personal story to explain this. The initial pipeline for data replication in my company was designed with AWS Data Migration Service (DMS) replicating data from an RDS (PostgreSQL) replica to an S3 bucket. Then we had Snowpipe (An ELT tool for Snowflake) fetch new data from the S3 bucket and pipe this data to Snowflake (our data warehouse).

This architecture worked, but it was a complete black box because we had little or no visibility into what was happening under the hood. There was no alerting or notification system in place to notify us of pipeline failures. We only get to know of pipeline failures when we see inconsistency in data or inaccuracy in downstream analytics after a day or more. But how does this impact the business? One of the impacts of this for us was an increased customer churn rate. Because we had late data arrival and inconsistencies, we could not detect on time when customers had issues with KYC verification.

Below are key reasons why you should monitor your pipelines:

  • Get a complete bird’s-eye-view of the health of data.
  • Prevent inconsistency in data delivery.
  • Get a continuous data testing approach.
  • Early detection of data quality and data integrity issues.
  • Keep track of data processing costs, metadata, and overall system performance.
  • Provide feedback for optimizing pipeline performance.

What metrics should you monitor?

While it’s important to set up these monitoring/alerting systems to gain visibility into your pipelines, determining what key metrics to measure and what tools to utilize is not always a straightforward task. This is because the metrics to measure are largely dependent on the use case for your data pipeline and several other factors.

For example, if one of your pipelines delivers critical data in real-time that is used for tracking application server downtime, your priority will be to monitor delays in data arrival based on an SLA defined by the organization or team.

Below are the different categories of metrics to monitor that apply to whatever use case you or your organization may have:

Data quality monitoring

With data quality monitoring, a monitoring system is set up to continuously validate the quality of the data at the different stages of the pipeline. First, at the extract load (EL) step where the quality of data from the source is validated against data at the target after a load job is completed. The key metrics monitored here include source-target records count match, source-target columns count match, data format errors, data volume errors, column name changes, referential integrity etc. Secondly, the quality of the data is monitored at the transformation step after a transformation job runs. The key metrics monitored at this step include: data type errors, null values, etc.

Pipeline reliability monitoring

Here, monitoring is focused on the end-to-end reliability of the pipeline. The errors are monitored at the different steps of the pipeline:

  1. The Extract Load (EL) step: This step is handled by an ELT tool like Airbyte. Here, errors such as sync failures due to authentication issues, normalization error, error loading new columns (SCD) during a sync, JSON schema validator errors etc. are monitored.
  2. Transformation step: This step is handled by a transformation tool like dbt. Here, errors such as failed transformation job runs, data delivery delays (longer than expected run durations), data lineage, or data loss issues etc. are monitored.

Business metrics monitoring

This type of monitoring happens after the transformation stage of a pipeline. Here, the transformed data is monitored to identify anomalies based on specific business requirements. For example, currency price depreciation or appreciation, trade loss based on market value, etc. Alerts are triggered when these metrics reach a certain threshold value.

What are the challenges of monitoring data pipelines?

ELT data pipelines are built using a combination of tools including dbt, Airflow, Airbyte, SQL, Cloud services, databases, data warehouses, and data lakes. This diversity of tools is good for extensibility and for using the most efficient tool at each layer of the data stack. However, this results in many moving parts in a pipeline. This can make monitoring or gaining full visibility into data pipelines become a nightmare.

Before setting up a pipeline monitoring and alerting system, I strongly recommend streamlining the number of processes in the pipeline you want to monitor first. A complex pipeline with multiple possible points of failure will require setting up monitoring/alerting systems at these different layers.This will make things quite difficult to track and manage. 

To simplify the complexity of my company’s pipeline in the example I gave above, we first brought in Airbyte - an open-source data ingestion tool to handle our data replication. Airbyte helped us reduce the number of possible points of failure in our pipelines. Instead of first replicating data to an S3 bucket using Data Migration Service (DMS), we replicated data directly to our warehouse (Snowflake) using Airbyte. With this architecture, we removed the need to monitor things at three different levels of our data flow: RDS-DMS level, DMS-S3 Bucket level, and S3-Snowpipe level. Now, we only monitor our pipelines at the warehouse level.

Having streamlined the number of monitoring processes, let’s talk about data pipeline monitoring tools and how to set up a typical monitoring/alerting system for your data pipelines.

Monitoring data quality metrics in your pipeline

There’re a lot of tools in the market that serve for monitoring and triggering alerts in your data pipelines. However, these tools differ in their capability. While some tools focus on monitoring cloud infrastructure, logs, and application security, others are niched towards monitoring data quality, data validation & data lineage. Also, some tools are proprietary cloud-based solutions while others are open-source.

Some of the proprietary cloud-based solutions include Monte Carlo, Databand, Datadog, Datafold, Accel Data. The open-source alternatives include Prometheus, Loki, Great Expectations, Data Validation Tool (DVT), dbt test, data-diff by Datafold, etc... One important point to note is that you may need to combine two or more of these tools to achieve your goal.

In the next section, I share how to set up these monitoring/alerting systems using two open-source tools: Data Validation Tool (DVT) and data build tool (dbt).

Data validation monitoring using Data Validation Tool (DVT)

Data validation is the practice of checking the integrity, accuracy, and structure of data before it is used for a business operation. 

Data validation is a crucial step in building a data pipeline as it provides the layer for checking the validity of the data before it’s put to use in downstream analytics.

Data Validation Tool (DVT) is an open-source Python CLI tool that compares heterogeneous data source tables with multi-leveled validation functions.You can run a DVT process to validate that the source and target tables match and are correct after a data load process is complete. DVT supports column, row, custom query, schema, column data type validation with connections for many data warehouses and databases.

Datafold also provides an open-source data-diff project to efficiently compare tables between databases and data warehouses. You can learn more about using data-diff by reading our tutorial about validating data replication pipelines from Postgres to Snowflake.

Using DVT with BigQuery

You can set up and run DVT on a virtual machine in any of the cloud platforms. You also have the option of running DVT in a Docker container. Follow the instructions here to install and set up DVT on your local machine or a cloud environment. The code walk-through in this section was run on a cloud shell session on Google Cloud. DVT provides a command-line interface (CLI) for executing dvt commands once installed.

To validate a source table against a destination table, first create a source and a destination connection. We do this by running the below code via the CLI.


# Create MYSQL connection as Source connection
data-validation connections add --connection-name MYSQL_CONN MySQL --host HOST_IP --port PORT --user-name USER-NAME --password PASSWORD
 
# Create BigQuery connection as target connection
data-validation connections add --connection-name $BigQuery_CONN BigQuery --project-id $MY_GCP_PROJECT

The code snippet above will create a MySQL connection as source connection and a BigQuery connection as target connection.

Column validation

A column validation run a count (*) on both source and target. This will count the number of columns in the source table and verify it matches the count on the target table. To run a column validation, execute the data-validation run command via the CLI. Below  is how a column validation between a MySQL source table and a BigQuery target table would look:


data-validation validate column\
--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \
--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \
--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result

The above code snippet will perform a column count of the source table and validate against the column count of the destination table.The --bq-result-handler flag will help output the result of the validation to an intermediary BigQuery table. By default, without the --bq-result-handler flag, the validation result will be outputted to the console.

For an Airbyte sync with table normalization enabled, you will need to specify the names of the columns to be validated in your column validation. This is to exclude the additional meta-data columns added by Airbyte during a sync. Below code shows how you can specify columns in your validation:


data-validation validate column\--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--count column1, column2, column3, column4, column5 \--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result

Row validation

A row validation runs a count (*) on both source and target. This will count the number of rows in the source table and verify it matches the count on the target table. Below  is how a row validation between a MySQL source table and a BigQuery target table would look:


data-validation validate row \--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash '*' \--primary-keys id  \--use-random-row --random-row-batch-size 50

The --use-random-row and --random-row-batch-size flags specify that you want to only validate a subset of rows at random. This comes in handy when you have large tables since row validation requires more memory and compute.

Schema validation

Schema validation will get the column data type for each column in the source table and verify that it matches that of the target table. A type mismatch in the source and target tables results in a failed validation status.

Below is how a schema validation between a MySQL source table and a BigQuery target table would look:


data-validation validate schema\--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result

Row comparison validation

This type of validation performs a row-wise comparison of the values of a specified column in both the source and target tables. A mismatch in these values results in a failed validation status.

Below is how a schema validation between a MySQL source table and a BigQuery target table would look:


data-validation validate row \
--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \
--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table
--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result \
--hash '*' \
--primary-keys id  \
--use-random-row --random-row-batch-size 50

Below is a sample output of a validation results in a BigQuery table:

The column named ‘difference’ represents the difference between the column/row count in the source table and the target table. The validation_status column shows the status of the validation.

This table can then be queried and error notifications/alerts sent to an email or Slack channel. 

I will look at this in the last section on Sending Notifications/Alerts to a Slack Channel.

Running validation from a YAML file

Another method of running validations is by saving your validation configuration to a YAML file. This way you can store previous validations and modify your validation configuration easily. This approach also helps automate the validation process as the validations can be run on schedule.

To generate a YAML configuration file for validation, you specify the --config-file flag. See code below:


data-validation validate column \
--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \
--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table
--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result \
--config-file validation_config.yaml

Below is how a YAML configuration generated from the above code will look like.


result_handler:
  project_id: your-project-id
  table_id: data_validation.validation_results
  type: BigQuery
source: MYSQL_CONN
target: BigQuery_CONN
validations:
- aggregates:
  - field_alias: count
    source_column: null
    target_column: null
    type: count
  calculated_fields: []
  filter_status: null
  filters: []
  format: table
  labels: []
  random_row_batch_size: null
  schema_name: transportation_data
  table_name: citibike_stations
  target_schema_name: onesphere-analytics.master_data
  target_table_name: citibike_stations
  threshold: 0.0
  type: Column
  use_random_rows: false

Your generated YAML configuration file can be found in the directory you executed the generate validation command from.

You can now run your validations from the YAML configuration file using below code:


data-validation run-config -c validation_config.yaml

Data quality monitoring using data build tool (dbt)

dbt is a data transformation tool that enables data & analytics engineers to transform data in their warehouses by simply writing SQL statements. dbt handles turning these select statements into tables and views in the warehouse. To set up a dbt project, follow this setting up a dbt project guide.

dbt provides a test functionality for performing data quality checks including data types checks, null value checks, duplicates check, referential integrity check, etc. Let’s see how to perform data quality checks using dbt test. A dbt test is defined either as a singular test, a generic test in an SQL file or a YAML configuration file with the logic to return failing records.

Below is an example of defining a test for a source table (orders table) using a YAML configuration file.


version: 2
source:
  - name: orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'returned']
      - name: customer_id
        tests:
          - relationships:
              to: ref('customers')
              field: id

The test configuration in the example above first checks for duplicates and non-null values in the order_id column of the orders_table, then it checks for the accepted values ('placed', 'shipped', 'completed', 'returned') in the status column. Finally, it checks for referential integrity in the customer_id column to ensure every customer_id on the orders table has an associated id on the customer's table.

To run the test, run the command: dbt test --store-failures.

The --store-failures flag stores the result of the tests in an intermediary table. You can then query this table to send error/failure notifications/alerts. The records that didn't pass the tests are saved in a result table in the schema suffixed with ‘dbt_test__audit’  in the data warehouse.

It is important to note that the approach of monitoring data quality using dbt as explained above is also applicable to business metric monitoring. 

Pipeline reliability monitoring using dbt

dbt provides a monitoring and alerting system for tracking pipeline errors or job failures. The notifications are configured on dbt cloud and are triggered immediately after a job runs. Notifications can be sent to email or a Slack channel. Here’s a guide on this. For the extract transform (EL) layer in your pipeline, Airbyte provides a reliable monitoring and alerting system for end-to-end monitoring and sending of sync failure/success notifications to a Slack channel. Follow this step-by-step guide on setting up Slack notifications for Airbyte sync.

Sending Notifications to a Slack Channel

To send notifications/alerts for tracking issues or failures with your pipelines, you will build a simple Slack bot that can be run on a serverless function e.g AWS Lambda or GCP CloudFunction. The bot is a simple Python script that can be scheduled to run at an interval of time or based on a data load event.

The bot will query any of the tables containing the result of a data quality and data validation test from the sections above and send a notification/alert based on some defined logic.

The code snippet below implements a notification/alerting system that runs from a Google Cloud Function and pushes notifications/alerts to a Slack channel when there’s a mismatch between the count of rows/columns from the source table and the target table in our data validation example above. Here’s a guide on creating a Slack webhook url.


import pandas as pd
import requests
from google.oauth2 import service_account

# Credentials from GCP service account saved in a json file.
credentials = service_account.Credential.from_service_account_file('./google_credentials.json')

def slack_notification_bot(credentials, slack_webhook_url):
    query = '''select  *
            from validation_result_table
            where validation_status = 'fail';'''
    validation_data = pd.read_gbq(query, project_id= 'gcp_project_id', credentials= credentials)
    message = '''Validation Report:\n
    Error! Incomplete rows of data loaded'''
    if len(validation_data) > 0:
        requests.post(slack_webhook_url, json={'text': message})
    else:
        return 'done'

def main():
    # Run the slack notification bot
    slack_notification_bot(credentials, slack_webhook_url)

Conclusion

In this article you saw the need to set up a monitoring/alerting system for your ELT data pipelines.

Data quality, pipeline reliability, and business metrics monitoring are the key metrics to monitor for your pipelines. Pipeline complexity is one of the major challenges a data team will face when planning to set up a monitoring/alerting system for an ELT pipeline. I recommended using a data replication tool like Airbyte to reduce this complexity. Then I looked at the different proprietary data pipeline monitoring/alerting tools and their open-source alternatives. I further took a deep-dive into how to set up a typical monitoring/alerting system using Data Validation Tool (DVT) and Data Build Tool (dbt). Finally, we saw how to build a bot for triggering notifications/alerts for our pipelines.

Open-source data integration

Get all your ELT data pipelines running in minutes with Airbyte.

Similar use cases

Deploy a Self-service Business Intelligence Project With Whaly & Airbyte

Learn how to move your data to a data warehouse with Airbyte, model it, and build a self-service layer with Whaly’s BI platform.

Build a Slack activity dashboard with Apache Superset

Build a Slack activity dashboard quickly using the Slack Airbyte connector and Apache Superset.

Build a Data Pipeline to ingest CSV files from S3 to Snowflake

Learn how to create an Airbyte Cloud connection to replicate your CSV data from S3 to Snowflake.