Engineering
Article

How we scale workflow orchestration with Temporal

Benoit Moriceau
April 14, 2022
10 min read
Limitless data movement with free Alpha and Beta connectors
Limitless data movement with free Alpha and Beta connectors
Introducing: our Free Connector Program ->

Designing a sync workflow

A sync in Airbyte is split into 3 activities:

  • A data replication
  • A data normalization (optional)
  • A dbt transformation (optional)

All those activities are long running operations that can take anywhere from minutes to several hours depending on the volume of the data and the throughput of a connector. All of these activities are very different from the very short activities that fetch schedules or report the status with database I/O that runs in a few milliseconds. In order to isolate long runs in a separately scaled worker pool, we are running those activities in child workflows. This child workflow is not independent from the parent workflow; if the parent workflow is terminated or canceled, the child workflow will be as well. This allows us to ensure only one synchronization can run per connection using a uniqueness constraint at the parent workflow level. 

Implementing a Temporal workflow

We are using the Temporal Java SDK to define a workflow. It provides a set of annotations used to decorate an interface that will let Temporal know how to run the different methods we are writing. There is one main annotation on the interface level: @WorkflowInterface. There are 3 methods annotations that we are using: 

  • @WorkflowMethod: There can be only one per workflow and this method contains the implementation ordering the activity call. This is the entrypoint for running the workflow.
  • @SignalMethod: Decorates a method that will be called as a signal by temporal. It doesn’t perform an I/O and will only update the memory state of the Temporal workflow.
  • @QueryMethod: Decorates a method that will be called as a query by temporal. It is a synchronous method that will return some part of the Temporal in memory state.

@WorkflowInterface
public interface ConnectionManagerWorkflow {

 @WorkflowMethod
 void run(ConnectionUpdaterInput connectionUpdaterInput);

 @SignalMethod
 void submitManualSync();

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class JobInformation {

   private long jobId;
   private int attemptId;

 }

 @QueryMethod
 JobInformation getJobInformation();
}

Implementing a Temporal activity

This is very similar to the implementation of a workflow. Temporal provides a set of annotations that allows you to decorate an interface. It will then identify those methods as an activity and be able to make sure that they are only run once per workflow run. For example:


@ActivityInterface
public interface ConfigFetchActivity {

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class ScheduleRetrieverInput {

   private UUID connectionId;

 }

 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 class ScheduleRetrieverOutput {

   private Duration timeToWait;

 }

 @ActivityMethod
 ScheduleRetrieverOutput getTimeToWait(ScheduleRetrieverInput input);

}

It then needs to be registered inside the workflow. Doing so will allow you to associate it with the most optimal pool of workers.


private final ConfigFetchActivity configFetchActivity =
   Workflow.newActivityStub(ConfigFetchActivity.class,
ActivityConfiguration.SHORT_ACTIVITY_OPTIONS);

Triggering a sync run

Sometimes an Airbyte user wants to manually trigger a sync. We use a Temporal signal method in order to transmit this request. The signal methods are an asynchronous way to communicate with Temporal and modify its internal state. After modifying the state, Temporal will efficiently re-evaluate the wait conditions and allow running the child workflow.

Canceling a sync run

Cancellation is a great example of how we can combine different Temporal primitives to safely handle a variety of orchestrator states. Specifically, we will discuss how we use the following Temporal primitives: child workflow, signal methods, cancellation scope and continue as new.

Cancellations are triggered by an external user that sends a signal to a workflow via the Temporal service when the current run needs to be canceled. This can be done using the Temporal API or the Temporal CLI. In order to be able to handle a signal, we first need to declare and implement the signal method in our workflow implementation. In the Java SDK, Temporal uses an annotation for that, as you can see here:


@WorkflowInterface
public interface ConnectionManagerWorkflow {

 @WorkflowMethod
 void run(ConnectionUpdaterInput connectionUpdaterInput);

 @SignalMethod
 void cancelJob();
}

The implementation of the signal method is very simple; it only sets the state of the job to a canceled state and then cancels the cancellation scope (which we will explain below). Then, the state will be used to report the right status in our internal database's representation of the synchronization.


@Override
public void cancelJob() {
 workflowState.setCancelled(true);
 cancellableSyncWorkflow.cancel();
}

Cancellation scopes allow you to use a callback method that will run on workflow cancellation.If the cancel method of the cancellation scope is called, it will stop the execution of the callback method and the rest of the activity. The cancellation is then propagated to the child workflow. That allows us to ensure the synchronization is properly stopped when the parent workflow is canceled. This is how to create a cancellation scope:


Workflow.newCancellationScope(() -> {
  // Implementation here
}

This is how to cancel the scope:

cancellableSyncWorkflow.cancel();

As said before, this will cancel the parent workflow and propagate the cancellation to the child workflow. We have configured the ParentClosePolicy to propagate the cancellation instead of terminating the workflow abruptly. We are using PARENT_CLOSE_POLICY_REQUEST_CANCEL which allows us to terminate the child workflow gracefully.

Conclusion

In this post, we presented a high-level overview of the challenges Airbyte faces when orchestrating data synchronization workflows and the reasons we chose to build out a customizable solution using Temporal. We also dove into greater depth to showcase some of the Temporal features we are using in order to handle the wide array of requirements for orchestrating data synchronization at Airbyte. If you are interested in reading the entire implementation of our scheduling, the source code is open-source and available here.

If these kinds of challenges seem interesting to you, please consider applying! We're currently hiring for our platform team.

The data movement infrastructure for the modern data teams.
Try a 14-day free trial