Engineering
Article

How we scale workflow orchestration with Temporal

Benoit Moriceau
April 14, 2022
10 min read

Airbyte's role is to provide an easy way to perform data synchronization between a source and a destination. Workflow orchestration is important since it ensures that the data are synchronized on the frequency specified by our customers.

In this article we will discuss the characteristics necessary for an orchestrator that can support Airbyte's data synchronization and how we leverage Temporal. Then, we will detail how we structure long-running Temporal workflows to facilitate flexible internal scheduling of syncs. We detail the Temporal-specific features that enable us to implement these workflows. 

Why did Airbyte choose Temporal?

For Airbyte's synchronization workflows, our orchestrator system needs to be able to:

  • Assign work to nodes on both Docker Compose and Kubernetes. There should be minimal differences between the configuration for running on one or the other environment.
  • Schedule workflows flexibly using intervals and crontabs while making it possible to dynamically enable, disable, or change the schedule with predictable behavior. 
  • Scale horizontally to many thousands of workflows simultaneously across many nodes.

There are many options for workflow orchestrators that take a wide range of approaches. Most workflow orchestrators handle long running asynchronous workflows that can handle status updates and retries while maintaining an execution history.

However, Temporal also enables us to programmatically configure workflows. Although most of our workflows represent data synchronization, some perform actions such as looking up the schema of a database. These actions can introduce latency that an Airbyte user feels in the UI. Temporal allows us to configure and execute a job with minimal overhead, which is not possible in Airflow and several other common orchestrators.

One characteristic we were looking for was the ability to easily embed the tool in our OSS releases of Airbyte. Airbyte is supported on both Docker Compose and Kubernetes and operators of Airbyte have a wide range of experience for maintaining distributed systems. We avoided great Kubernetes-specific tools like Argo to prevent diverging internal Airbyte sync orchestration between Docker Compose and Kubernetes systems. Temporal offers a variety of deployment options that function across all of our deployment platforms.

The Airbyte API also uses Postgres as a database. Since Temporal can use Postgres as a backing store, no additional container dependencies were required for our OSS user base even though we logically separate Airbyte's configuration database and Temporal's internal database. For production environments we recommend using separate databases for each.

The vast majority of Airbyte platform code is also written in Java. While not strictly a requirement, we preferred to use a high quality Java SDK for interacting with the orchestrator. Temporal provides an expressive Java SDK that uses code to specify the behavior of workflows and activities. You just write code and get a workflow!

In terms of scalability, we wanted to be able to handle many thousands of syncs running simultaneously through our scheduling and worker execution infrastructure. Temporal is able to address this by allowing independent scaling between different server components and provides an easy model for connecting worker nodes to separate task queues, which allows easy scaling on a multi-node system like Kuberentes.

Before going into more depth on the architecture of our solution, we will explain some of the fundamental building blocks of the Temporal workflow model. 

Temporal overview

Temporal is a workflow engine that handles state management for workflows that are split into steps or "activities''. We use the Java SDK to connect to a Temporal service, which provides an API for registering workers that can run Temporal activities, launch asynchronous workflows, send signals to running workflows, and query state for a workflow. This service is easily configured to run on Docker or Kubernetes and uses a backing data store such as Postgres or Cassandra.

If you're coming from a batch-oriented workflow engine such as Airflow, many of the core concepts have matching analogues for DAGs, sub-DAGs, and sensors.

The Java SDK allows expressing the concept of workflows (DAGs in Airflow), which execute some activities in a defined order. This is the entry point of any Temporal run. Workflows can receive signals and perform queries, which will be explained below. Worker nodes communicate with the Temporal server to determine which workflows should execute on an individual node. This is coordinated by assigning certain workflows on creation to named task queues which can be registered on individual nodes.

Activities (tasks in Airflow) are steps within a Temporal workflow that represent some operation or operations. These operations can contain any logic, but since they represent retryable units of work, they are often used to separate behavior such as calls to an external database or API. The output of an activity is serialized and stored by the backing data store of Temporal along with any relevant status information for the activity and workflow.

It is also possible to have child workflows (subDAGs in Airflow) that are workflows launched by other workflows. It is possible to configure the behavior of child workflows to fail or continue based on if the parent workflow fails or gets canceled.

There are two methods that allow interacting with workflows: signals and queries. Signals allow modifying the Temporal in memory state for a workflow. These methods are asynchronous and are not guaranteed to run immediately. If a Temporal workflow is depending on a condition modified by the signal, it will intelligently re-evaluate the condition to decide if a workflow should continue. Queries simply allow reading the state for a workflow. We usually use this feature to inspect the state of long-running workflows.

For more information on what Temporal is and how it functions, we recommend consulting the Temporal website's introduction and other guides. 

How we organize Temporal workflows

Airbyte's primary Temporal workflow is used to synchronize data between a source (API, db, file, etc…) to a destination (db, warehouse, cloud storage, etc…). In order to properly run a synchronization we need to perform the following operations in a specific order:

  • Fetch the scheduling configuration
  • Wait the appropriated amount of time
  • Fetch the job config
  • Update the job status to report that it is running
  • Run the synchronization
  • Update the job status to report a failure or a successful run

All those different steps have been isolated in a dedicated Temporal action. An action is one of the following: a Temporal activity, a child workflow or a Temporal workflow internal method. In the following sections, we will provide examples of all three types of actions.

We decided to use a permanently running Temporal workflow to represent each data synchronization. In order to limit the size of the event histories for a permanently running workflow, we use a Temporal concept called ContinueAsNew to restart from the first step after performing a run.

This design facilitates fully customizing the type of scheduling we support since the time we wait before running is determined in an activity. This will allow Airbyte to support cron-based workflow scheduling, blackout periods, and interval-based scheduling. Additionally, Temporal makes even long-running "wait" operations efficient by caching the state in a cold storage and only loading it in memory when needed.

Limitless data movement with free Alpha and Beta connectors
Limitless data movement with free Alpha and Beta connectors
Introducing: our Free Connector Program ->

Designing a sync workflow

A sync in Airbyte is split into 3 activities:

  • A data replication
  • A data normalization (optional)
  • A dbt transformation (optional)

All those activities are long running operations that can take anywhere from minutes to several hours depending on the volume of the data and the throughput of a connector. All of these activities are very different from the very short activities that fetch schedules or report the status with database I/O that runs in a few milliseconds. In order to isolate long runs in a separately scaled worker pool, we are running those activities in child workflows. This child workflow is not independent from the parent workflow; if the parent workflow is terminated or canceled, the child workflow will be as well. This allows us to ensure only one synchronization can run per connection using a uniqueness constraint at the parent workflow level. 

Implementing a Temporal workflow

We are using the Temporal Java SDK to define a workflow. It provides a set of annotations used to decorate an interface that will let Temporal know how to run the different methods we are writing. There is one main annotation on the interface level: @WorkflowInterface. There are 3 methods annotations that we are using: 

  • @WorkflowMethod: There can be only one per workflow and this method contains the implementation ordering the activity call. This is the entrypoint for running the workflow.
  • @SignalMethod: Decorates a method that will be called as a signal by temporal. It doesn’t perform an I/O and will only update the memory state of the Temporal workflow.
  • @QueryMethod: Decorates a method that will be called as a query by temporal. It is a synchronous method that will return some part of the Temporal in memory state.

@WorkflowInterface
public interface ConnectionManagerWorkflow {

 @WorkflowMethod
 void run(ConnectionUpdaterInput connectionUpdaterInput);

 @SignalMethod
 void submitManualSync();

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class JobInformation {

   private long jobId;
   private int attemptId;

 }

 @QueryMethod
 JobInformation getJobInformation();
}

Implementing a Temporal activity

This is very similar to the implementation of a workflow. Temporal provides a set of annotations that allows you to decorate an interface. It will then identify those methods as an activity and be able to make sure that they are only run once per workflow run. For example:


@ActivityInterface
public interface ConfigFetchActivity {

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class ScheduleRetrieverInput {

   private UUID connectionId;

 }

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class ScheduleRetrieverOutput {

   private Duration timeToWait;

 }

 @ActivityMethod
 ScheduleRetrieverOutput getTimeToWait(ScheduleRetrieverInput input);

}

It then needs to be registered inside the workflow. Doing so will allow you to associate it with the most optimal pool of workers.


private final ConfigFetchActivity configFetchActivity =
   Workflow.newActivityStub(ConfigFetchActivity.class,
ActivityConfiguration.SHORT_ACTIVITY_OPTIONS);

Triggering a sync run

Sometimes an Airbyte user wants to manually trigger a sync. We use a Temporal signal method in order to transmit this request. The signal methods are an asynchronous way to communicate with Temporal and modify its internal state. After modifying the state, Temporal will efficiently re-evaluate the wait conditions and allow running the child workflow.

Canceling a sync run

Cancellation is a great example of how we can combine different Temporal primitives to safely handle a variety of orchestrator states. Specifically, we will discuss how we use the following Temporal primitives: child workflow, signal methods, cancellation scope and continue as new.

Cancellations are triggered by an external user that sends a signal to a workflow via the Temporal service when the current run needs to be canceled. This can be done using the Temporal API or the Temporal CLI. In order to be able to handle a signal, we first need to declare and implement the signal method in our workflow implementation. In the Java SDK, Temporal uses an annotation for that, as you can see here:


@WorkflowInterface
public interface ConnectionManagerWorkflow {

 @WorkflowMethod
 void run(ConnectionUpdaterInput connectionUpdaterInput);

 @SignalMethod
 void cancelJob();
}

The implementation of the signal method is very simple; it only sets the state of the job to a canceled state and then cancels the cancellation scope (which we will explain below). Then, the state will be used to report the right status in our internal database's representation of the synchronization.


@Override
public void cancelJob() {
 workflowState.setCancelled(true);
 cancellableSyncWorkflow.cancel();
}

Cancellation scopes allow you to use a callback method that will run on workflow cancellation.If the cancel method of the cancellation scope is called, it will stop the execution of the callback method and the rest of the activity. The cancellation is then propagated to the child workflow. That allows us to ensure the synchronization is properly stopped when the parent workflow is canceled. This is how to create a cancellation scope:


Workflow.newCancellationScope(() -> {
  // Implementation here
}

This is how to cancel the scope:

cancellableSyncWorkflow.cancel();

As said before, this will cancel the parent workflow and propagate the cancellation to the child workflow. We have configured the ParentClosePolicy to propagate the cancellation instead of terminating the workflow abruptly. We are using PARENT_CLOSE_POLICY_REQUEST_CANCEL which allows us to terminate the child workflow gracefully.

Conclusion

In this post, we presented a high-level overview of the challenges Airbyte faces when orchestrating data synchronization workflows and the reasons we chose to build out a customizable solution using Temporal. We also dove into greater depth to showcase some of the Temporal features we are using in order to handle the wide array of requirements for orchestrating data synchronization at Airbyte. If you are interested in reading the entire implementation of our scheduling, the source code is open-source and available here.

If these kinds of challenges seem interesting to you, please consider applying! We're currently hiring for our platform team.

Ready to unlock all your data with the power of 300+ connectors?
Try Airbyte Cloud FREE for 14 days