ELT pipelines can be tricky business when you have requirements around the sequencing of data processing and referential integrity. Things become even more complicated when you have many data sources which arrive at different rates, or many downstream exposures of the data with different SLAs. Due to these common complexities, data engineers are often grappling with the dreaded “race condition” .
This post will cover the basics of what a race condition is, how they might crop up in ELT pipelines, and some examples of how to protect against them.
To follow along with this post, you’ll need prerequisite knowledge in a few areas. First, you should have some foundational knowledge of Computer Science topics, particularly concurrency , parallelism , and determinism . See here for some discussion on the differences between concurrency and parallelism. You should also be familiar with ETL / ELT pipelines, and typical strategies for moving data through such pipelines.
What is a Race Condition? First, let’s work through some of the terminology semantics. Wikipedia describes a race condition as “the condition of an electronics, software, or other system where the system's substantive behavior is dependent on the sequence or timing of other uncontrollable events, leading to unexpected results”. In other words, the system’s behavior depends on the sequencing of events, and if the sequencing of events isn’t deterministic and isn’t explicitly controlled for, you have a race condition.
A classic example of this in computing is multithreading . In a multithreaded program, a race condition occurs when two or more threads can access shared data and they try to change it at the same time.
Because the order in which threads are scheduled for execution in modern operating systems is non-deterministic, programmers can’t depend on any one thread modifying the data before another. If this non-determinism is not accounted for, each thread will “race” to access or modify the data, and results may be random and unexpected. To prevent this, a “lock” mechanism must be implemented, so that one and only one thread can modify the shared data at any given time.
The description of race conditions above uses the example of multithreading, however there are many software abstractions where the same concepts apply.
Essentially any computing abstraction involving concurrent or parallel execution of code can exhibit race conditions. This might be multiple processes executing on the same machine (multi-processing), or multiple processes executing on separate machines by entirely different systems. This is more typical of what we’ll see in ELT pipelines. It can be useful to generalize units of computing work using terms like “task” or “job”. For the purposes of this article, let’s use the term “job” to refer to one processing component of an ELT pipeline.
A “data race” is one type of race condition. It refers specifically to the situation where one process accesses a piece of data while another process is modifying it. This is probably the most common type of race condition, and is what we will typically encounter in ELT pipelines.
This is all very abstract, let’s get to an example.
Data Replication Occurring at Same Time as Transformation Queries Suppose your team is using Airbyte to replicate data from a transactional application database to your data warehouse. Data is replicated incrementally each hour, and each replication job takes about 20 minutes to complete. The tables being replicated are highly normalized, and referential integrity between those tables is enforced in the source system.
Let’s say your Airbyte deployment is configured to replicate each table in serial. First Table A, then Table B when A completes, etc. Two tables, products and orders , have a foreign key constraint and referential integrity requirement (that is, every product_id in orders must also exist in products ).
Suppose a large batch of new orders has been added to the source database since the last time an Airbyte replication job ran. It contains some brand new products which didn’t exist the last time the replication ran (so they don’t exist in the replicated products table yet).
If the orders table gets replicated to your data warehouse before the products table, and a data transformation job kicks off in between, you might run into some issues. Imagine the following sequence of events:
orders replicated to DWH (including records referencing new products) Data transformation job begins, queries both the orders and products table, and performs a join products table replicated to DWH (including new products)The results of the data transformation in step 2 are probably going to exhibit some relational integrity issues, assuming they’ve been materialized as a table, and not a view. This is because when they ran, they referenced an orders table which contained references to new products, which had not yet been inserted into the products table. This might look something like the below using dummy data. In this case, a left join was used in stg_orders_join_products , which resulted in some data quality issues in the resulting denormalized table.
The impact of Data Race Conditions In the above example, the resulting staging table contains some NULL values for the orders associated with product_id 4 , because it does not yet exist in the products table. If users are viewing this raw data, they will wonder why values are missing.
Another negative impact might be invalid aggregate counts built from this staging table. Suppose we have a report to display the number of products ordered per day, broken down by the product_brand . Since the staging model contains a left join , we can at least see that there were 2 products ordered on January 2, but not which brands they are. A count grouped by product_brand would just yield a NULL category.
If we chose to use an inner join instead of a left join in the staging model, the data quality for the records that do have a match would be okay, but then there would be records missing altogether. Then the order count for January 2 would be incorrect, and we might be in breach of data freshness SLAs, leaving users thinking “I thought this pipeline was supposed to update hourly, why are there missing records?”
Possible Solutions to Data Race Conditions Stick to Inner Joins Instead of Outer Joins As mentioned above, simply inner joining the source tables in staging models is one potential solution here, but not quite a ‘complete’ solution. In this case, we can deliberately allow the records whose ids are not present in both sources to drop out. The next time the transformation pipeline runs, the records which dropped out during the previous transformation job due to the data race condition will be available. In this way, the pipeline is still “eventually consistent”. This approach works assuming the following:
You have a way to enforce that inner joins are used consistently in your staging models (this is hard if you have a lot of developers or a large / complex transformation layer) You don’t have strict data freshness SLAs Use Data Orchestrators to Trigger Jobs Sequentially Data orchestrators like Airflow, Prefect, or Dagster can be used to ensure data replication jobs are never running at the same time as data transformation jobs in the first place. This is a simple solution, but probably doesn’t scale for many data sources with varying replication schedules, and many data models joining across those data sources. If this approach is used, it is probably best to configure your orchestrator to trigger the data transformation job upon success of the data replication job, rather than just using schedule based triggers. If the data transformation job is scheduled to run at a particular time, you risk accidentally overlapping with a replication job if it is running longer than expected.
Create a ‘Snapshot’ Raw Database Another potential solution is to create a static, unchanging ‘snapshot’ raw database to point your data transformation queries at. This can be a zero copy clone of whichever raw database data is actively replicating into.
The important part about this solution is that this clone must be created during a “dead period” in data replication jobs. That is, no new data can be actively replicating into your raw database when the clone is created. If there are ongoing replication jobs, you might just run into the same relational integrity issues between sources tables (you might happen to create the clone in between the replication of two tables with a foreign key relationship - another race condition).
Once this snapshot database is created, you can be sure that you have all the same relational integrity guarantees as the source system. Another benefit of this approach is that intraday data churn is limited for users who are consuming the transformed data via a reporting layer.
Suppose data transformations are occurring intraday via scheduled jobs or CI builds, and the results of those transformations are immediately exposed to downstream reporting systems upon successful deployments. If the raw data source is a static, unchanging snapshot, users can have very clear expectations about data freshness SLAs.
For instance, they can be confident in statements like “I can expect data as fresh as the end of the business day yesterday”. New, partial intraday data won’t be arriving to the reporting system unexpectedly throughout the day. This all depends on the frequency of data replications, transformations, and associated SLAs of course, but the general concept still applies that this raw database snapshot approach can be used to avoid surprises and set more clear expectations for users.
Common ‘High Water Mark’ Source Data Filters Yet another solution might be to implement a “global timestamp filter” to all raw data sources before transforming them. This timestamp could be the common “high water mark” of loaded_at timestamps among all source tables being transformed.
For example, if 3 source tables are being transformed, and the maximum shared timestamp among them is 11AM, then we can filter all records to be less than or equal to that before transforming, even if some tables contain records with loaded_at timestamps greater than that. This way all sources reflect data up to a consistent point in time, and we can be sure that no sources are “ahead” of others from a data replication perspective.
If you are using a system like dbt for data transformations, this might be achievable through clever usage of a macro or model pre-hook.
Global Distributed Locking Mechanism A more general solution than those proposed so far is a global distributed locking mechanism. This is a way to dynamically ensure that data replication jobs and data transformation jobs don’t overlap, and never occur at the same time.
For example, if a data transformation job is scheduled to run when a data replication job is still in progress, it will “wait” until the replication job completes, then begin. Systems like AWS DynamoDB can be used to implement such a locking mechanism. This is much harder to implement than the other solutions described, but also much more general and dynamic.
For systems with many replication and transformation jobs running concurrently, this can be more practical and maintainable than configuring schedules manually.
Conclusion Orchestrating data pipelines to be resilient to race conditions is challenging, but worth the benefits in improved data quality, consistency, and system reliability. In this post, we focused on data race conditions between data replication and transformation jobs, but similar situations can manifest in many other places in the typical pipeline. In a future post, some other types of data race conditions can potentially be explored.