Learn how to monitor data replication and migration pipelines using Google's Data Validation Tool (DVT) and dbt tests.
An important part of building an ELT data pipeline for data replication or data migration is the ability to monitor and get notified when there are errors. When you are unaware of errors, your data will contain inconsistencies and your reporting will be inaccurate. The complexity of most pipelines, due to the number of tools used, makes setting up a monitoring and alerting system even more challenging.
In this article, I share why it is important to set up a monitoring and alerting system for your ELT data pipeline. I explained what key metrics to monitor, and common challenges you will encounter setting up a monitoring and alerting system. I further highlight the different monitoring and alerting tools and showed how to implement a typical monitoring/alerting system using Google’s Data Validation Tool (DVT) and data build tool (dbt).
Well, the question should be, “Why not monitor your data pipelines''?. This is because having your pipelines run as a black box can be very costly for the business. Let me share a personal story to explain this. The initial pipeline for data replication in my company was designed with AWS Data Migration Service (DMS) replicating data from an RDS (PostgreSQL) replica to an S3 bucket. Then we had Snowpipe (An ELT tool for Snowflake) fetch new data from the S3 bucket and pipe this data to Snowflake (our data warehouse).
This architecture worked, but it was a complete black box because we had little or no visibility into what was happening under the hood. There was no alerting or notification system in place to notify us of pipeline failures. We only get to know of pipeline failures when we see inconsistency in data or inaccuracy in downstream analytics after a day or more. But how does this impact the business? One of the impacts of this for us was an increased customer churn rate. Because we had late data arrival and inconsistencies, we could not detect on time when customers had issues with KYC verification.
Below are key reasons why you should monitor your pipelines:
While it’s important to set up these monitoring/alerting systems to gain visibility into your pipelines, determining what key metrics to measure and what tools to utilize is not always a straightforward task. This is because the metrics to measure are largely dependent on the use case for your data pipeline and several other factors.
For example, if one of your pipelines delivers critical data in real-time that is used for tracking application server downtime, your priority will be to monitor delays in data arrival based on an SLA defined by the organization or team.
Below are the different categories of metrics to monitor that apply to whatever use case you or your organization may have:
With data quality monitoring, a monitoring system is set up to continuously validate the quality of the data at the different stages of the pipeline. First, at the extract load (EL) step where the quality of data from the source is validated against data at the target after a load job is completed. The key metrics monitored here include source-target records count match, source-target columns count match, data format errors, data volume errors, column name changes, referential integrity etc. Secondly, the quality of the data is monitored at the transformation step after a transformation job runs. The key metrics monitored at this step include: data type errors, null values, etc.
Here, monitoring is focused on the end-to-end reliability of the pipeline. The errors are monitored at the different steps of the pipeline:
This type of monitoring happens after the transformation stage of a pipeline. Here, the transformed data is monitored to identify anomalies based on specific business requirements. For example, currency price depreciation or appreciation, trade loss based on market value, etc. Alerts are triggered when these metrics reach a certain threshold value.
ELT data pipelines are built using a combination of tools including dbt, Airflow, Airbyte, SQL, Cloud services, databases, data warehouses, and data lakes. This diversity of tools is good for extensibility and for using the most efficient tool at each layer of the data stack. However, this results in many moving parts in a pipeline. This can make monitoring or gaining full visibility into data pipelines become a nightmare.
Before setting up a pipeline monitoring and alerting system, I strongly recommend streamlining the number of processes in the pipeline you want to monitor first. A complex pipeline with multiple possible points of failure will require setting up monitoring/alerting systems at these different layers.This will make things quite difficult to track and manage.
To simplify the complexity of my company’s pipeline in the example I gave above, we first brought in Airbyte - an open-source data ingestion tool to handle our data replication. Airbyte helped us reduce the number of possible points of failure in our pipelines. Instead of first replicating data to an S3 bucket using Data Migration Service (DMS), we replicated data directly to our warehouse (Snowflake) using Airbyte. With this architecture, we removed the need to monitor things at three different levels of our data flow: RDS-DMS level, DMS-S3 Bucket level, and S3-Snowpipe level. Now, we only monitor our pipelines at the warehouse level.
Having streamlined the number of monitoring processes, let’s talk about data pipeline monitoring tools and how to set up a typical monitoring/alerting system for your data pipelines.
There’re a lot of tools in the market that serve for monitoring and triggering alerts in your data pipelines. However, these tools differ in their capability. While some tools focus on monitoring cloud infrastructure, logs, and application security, others are niched towards monitoring data quality, data validation & data lineage. Also, some tools are proprietary cloud-based solutions while others are open-source.
Some of the proprietary cloud-based solutions include Monte Carlo, Databand, Datadog, Datafold, Accel Data. The open-source alternatives include Prometheus, Loki, Great Expectations, Data Validation Tool (DVT), dbt test, data-diff by Datafold, etc... One important point to note is that you may need to combine two or more of these tools to achieve your goal.
In the next section, I share how to set up these monitoring/alerting systems using two open-source tools: Data Validation Tool (DVT) and data build tool (dbt).
Data validation is the practice of checking the integrity, accuracy, and structure of data before it is used for a business operation.
Data validation is a crucial step in building a data pipeline as it provides the layer for checking the validity of the data before it’s put to use in downstream analytics.
Data Validation Tool (DVT) is an open-source Python CLI tool that compares heterogeneous data source tables with multi-leveled validation functions.You can run a DVT process to validate that the source and target tables match and are correct after a data load process is complete. DVT supports column, row, custom query, schema, column data type validation with connections for many data warehouses and databases.
Datafold also provides an open-source data-diff project to efficiently compare tables between databases and data warehouses. You can learn more about using data-diff by reading our tutorial about validating data replication pipelines from Postgres to Snowflake.
You can set up and run DVT on a virtual machine in any of the cloud platforms. You also have the option of running DVT in a Docker container. Follow the instructions here to install and set up DVT on your local machine or a cloud environment. The code walk-through in this section was run on a cloud shell session on Google Cloud. DVT provides a command-line interface (CLI) for executing dvt commands once installed.
To validate a source table against a destination table, first create a source and a destination connection. We do this by running the below code via the CLI.
The code snippet above will create a MySQL connection as source connection and a BigQuery connection as target connection.
A column validation run a count (*) on both source and target. This will count the number of columns in the source table and verify it matches the count on the target table. To run a column validation, execute the data-validation run command via the CLI. Below is how a column validation between a MySQL source table and a BigQuery target table would look:
The above code snippet will perform a column count of the source table and validate against the column count of the destination table.The --bq-result-handler flag will help output the result of the validation to an intermediary BigQuery table. By default, without the --bq-result-handler flag, the validation result will be outputted to the console.
For an Airbyte sync with table normalization enabled, you will need to specify the names of the columns to be validated in your column validation. This is to exclude the additional meta-data columns added by Airbyte during a sync. Below code shows how you can specify columns in your validation:
A row validation runs a count (*) on both source and target. This will count the number of rows in the source table and verify it matches the count on the target table. Below is how a row validation between a MySQL source table and a BigQuery target table would look:
The --use-random-row and --random-row-batch-size flags specify that you want to only validate a subset of rows at random. This comes in handy when you have large tables since row validation requires more memory and compute.
Schema validation will get the column data type for each column in the source table and verify that it matches that of the target table. A type mismatch in the source and target tables results in a failed validation status.
Below is how a schema validation between a MySQL source table and a BigQuery target table would look:
Row comparison validation
This type of validation performs a row-wise comparison of the values of a specified column in both the source and target tables. A mismatch in these values results in a failed validation status.
Below is how a schema validation between a MySQL source table and a BigQuery target table would look:
Below is a sample output of a validation results in a BigQuery table:
The column named ‘difference’ represents the difference between the column/row count in the source table and the target table. The validation_status column shows the status of the validation.
This table can then be queried and error notifications/alerts sent to an email or Slack channel.
I will look at this in the last section on Sending Notifications/Alerts to a Slack Channel.
Running validation from a YAML file
Another method of running validations is by saving your validation configuration to a YAML file. This way you can store previous validations and modify your validation configuration easily. This approach also helps automate the validation process as the validations can be run on schedule.
To generate a YAML configuration file for validation, you specify the --config-file flag. See code below:
Below is how a YAML configuration generated from the above code will look like.
Your generated YAML configuration file can be found in the directory you executed the generate validation command from.
You can now run your validations from the YAML configuration file using below code:
dbt is a data transformation tool that enables data & analytics engineers to transform data in their warehouses by simply writing SQL statements. dbt handles turning these select statements into tables and views in the warehouse. To set up a dbt project, follow this setting up a dbt project guide.
dbt provides a test functionality for performing data quality checks including data types checks, null value checks, duplicates check, referential integrity check, etc. Let’s see how to perform data quality checks using dbt test. A dbt test is defined either as a singular test, a generic test in an SQL file or a YAML configuration file with the logic to return failing records.
Below is an example of defining a test for a source table (orders table) using a YAML configuration file.
The test configuration in the example above first checks for duplicates and non-null values in the order_id column of the orders_table, then it checks for the accepted values ('placed', 'shipped', 'completed', 'returned') in the status column. Finally, it checks for referential integrity in the customer_id column to ensure every customer_id on the orders table has an associated id on the customer's table.
To run the test, run the command: dbt test --store-failures.
The --store-failures flag stores the result of the tests in an intermediary table. You can then query this table to send error/failure notifications/alerts. The records that didn't pass the tests are saved in a result table in the schema suffixed with ‘dbt_test__audit’ in the data warehouse.
It is important to note that the approach of monitoring data quality using dbt as explained above is also applicable to business metric monitoring.
dbt provides a monitoring and alerting system for tracking pipeline errors or job failures. The notifications are configured on dbt cloud and are triggered immediately after a job runs. Notifications can be sent to email or a Slack channel. Here’s a guide on this. For the extract transform (EL) layer in your pipeline, Airbyte provides a reliable monitoring and alerting system for end-to-end monitoring and sending of sync failure/success notifications to a Slack channel. Follow this step-by-step guide on setting up Slack notifications for Airbyte sync.
To send notifications/alerts for tracking issues or failures with your pipelines, you will build a simple Slack bot that can be run on a serverless function e.g AWS Lambda or GCP CloudFunction. The bot is a simple Python script that can be scheduled to run at an interval of time or based on a data load event.
The bot will query any of the tables containing the result of a data quality and data validation test from the sections above and send a notification/alert based on some defined logic.
The code snippet below implements a notification/alerting system that runs from a Google Cloud Function and pushes notifications/alerts to a Slack channel when there’s a mismatch between the count of rows/columns from the source table and the target table in our data validation example above. Here’s a guide on creating a Slack webhook url.
In this article you saw the need to set up a monitoring/alerting system for your ELT data pipelines.
Data quality, pipeline reliability, and business metrics monitoring are the key metrics to monitor for your pipelines. Pipeline complexity is one of the major challenges a data team will face when planning to set up a monitoring/alerting system for an ELT pipeline. I recommended using a data replication tool like Airbyte to reduce this complexity. Then I looked at the different proprietary data pipeline monitoring/alerting tools and their open-source alternatives. I further took a deep-dive into how to set up a typical monitoring/alerting system using Data Validation Tool (DVT) and Data Build Tool (dbt). Finally, we saw how to build a bot for triggering notifications/alerts for our pipelines.
Get all your ELT data pipelines running in minutes with Airbyte.
Learn how to move your data to a data warehouse with Airbyte, model it, and build a self-service layer with Whaly’s BI platform.
Build a Slack activity dashboard quickly using the Slack Airbyte connector and Apache Superset.