The Bytes

Scaling data pipelines on Kubernetes

Airbyte was originally created as a Docker Compose application. Containers are dynamically created to perform work - which at Airbyte is mostly replicating data from a source to a destination. Docker is great for small to medium loads, where a single node deployment suffices. As Airbyte grew in popularity, our users began to see larger transfer sizes and higher data volumes. Even the biggest cloud VMs struggled to serve our power users, who relied on Airbyte to orchestrate thousands of data pipelines (or ELT jobs) and move tens of TBs everyday. So we asked ourselves: How do we scale Airbyte to serve bigger loads?

A natural choice for scaling data pipelines is Kubernetes - a popular container orchestration tool developed to horizontally scale workloads. However, using Kubernetes comes with its own pitfalls. When running Docker on a single node, any message passing between the Docker containers is automatically handled by the OS as pipes between Linux processes. Kubernetes’ distributed nature isolates containers by design, making it more difficult to pass data between our data sources and destinations, especially since there are no guarantees that pods are scheduled on the same Kubernetes nodes.

As an open-source project, we want to allow any company to easily operate, develop and contribute to Airbyte. Because of this, we need to be able to support various container implementations and cannot make any container runtime assumptions. This means any solution we design cannot leave Kubernetes’ abstractions.

In this article, we’ll talk about the challenges we faced adapting Airbyte from Docker to Kubernetes, chiefly piping data between different Kubernetes pods. We’ll discuss several approaches we explored, and present the solution we eventually picked.

Using the Kubernetes API to dynamically create job pods

Let’s talk about the easier problem first: horizontally scaling Airbyte data integration jobs on Kubernetes. To do this, we can use the Kubernetes API to dynamically create job containers as Kubernetes pods. Whenever Airbyte receives a job, either scheduled or manually triggered by a user, we create any required containers with the necessary configuration as a Kube pod. Simple jobs include checking to see if a configured connector is valid. More complicated and typical jobs include syncing data from a source to a destination or performing data transformations.

Figuring this out was pretty straightforward due to the good folks at Fabric that have built a robust Java Kubernetes client. We use this client to interact with the Kubernetes API server. One gotcha we ran into - our initial Kubernetes pod creation/destruction implementation overwhelmed the Kubernetes API server by checking for pod statuses too frequently and actually brought our cluster down! Make sure you don’t do the same!

One easy alternative to dynamically creating job pods is to have a fixed pool of job pods waiting to pick up specific jobs. This sounds fine in theory but is unfeasible in practice. This is because Airbyte supports over 150 connectors, which translates to needing at least 150 pools of connector job pods, with each pool waiting to pick up that specific type of job. This number only grows with the number of connectors we support - the opposite of scaling! While this might be feasible in a Cloud service like Airbyte Cloud with multiple tenants, it is a massive waste of resources for our open-source users who often only use a handful of connectors.

Using socat to redirect STDIO between pods in different nodes

Since we’ve handled how to horizontally scale Kubernetes jobs, how do we pipe data between our newly created containers? This is required as the pod reading data from the source might not share the same runtime as the pod writing data to the destination. We cannot reliably schedule the containers together.

The main challenge here is the distributed nature of Kubernetes pods. When creating separate pods, there is no guarantee that pods will end up on the same node. Furthermore, our open source users often have tight constraints on node sizes, due to existing clusters or operating on clusters they do not control, so allowing for splitting jobs across nodes is an additional benefit to many users.

An interesting point to note - if we had access to a Docker agent from inside a Kube pod, it would be possible to create arbitrary containers on the same node, simplifying our message passing dilemma. However this comes with it’s own set of challenges. Namely, running Docker-in-Docker is a common practice to bypass Docker’s security safeguards, and mostly used for development or CI purposes — not a risk we want to take as Airbyte is meant to run arbitrary containers.

Let’s revisit what we need. Using Linux process terminology - we need a way to pipe STDIO from arbitrary Kubernetes pods regardless of the node they are on. More specifically, we need a way to create a connection to a remote ip/port, wrap the Kubernetes pod’s main container’s STDIO in a network protocol, with reasonable default configuration options (e.g. SSL, timeouts, forking, lower-level networking options). Part of this requires overriding the main container’s entrypoint like we share in this awesome article.

Fortunately, Linux’s minimalist and modular approach to software development means a plethora of networking tools have sprung up over the decades. The two most famous one being netcat and socat. We chose socat as it’s more fully featured.

For example, using socat to listen on port 7000 and redirect incoming data sent via the TCP protocol to STDOUT is as simple as <span class='text-style-code'>socat -d TCP-L:7000 STDOUT</span>. 

Using the sidecar pattern to create a sidecar socat container

Now that we know we want to use socat, how do we bundle this with our containers?

Recall earlier that Airbyte makes no demands on a job container’s runtime besides needing to implement the Airbyte protocol. Airbyte users love this as it means they can pick any language to implement the various connector containers. 

The simplest way to bundle socat is to make it a requirement for all Airbyte containers. However, this presents another issue: how does Airbyte validate socat is properly installed? Airbyte users now have something else to worry about. What happens if we change our tech stack later on? Would we have to change all the compiled Airbyte images?

The sidecar pattern is a perfect fit for our concerns of isolation and encapsulation. Instead of bundling socat with the main container, we create a sidecar socat container alongside the main container, as part of the Kubernetes pod. The individual sidecar container is added by Airbyte and is invisible to users. This lets us switch socat for another tool whenever, and lets users continue to be able to pick the right runtime environment for their containers. Win win!

Using named pipes to pipe data between the sidecar and the main containers

We’ve talked about using socat to pipe data between Kube pods. With the socat sidecar, we now have two containers, a sidecar container and a main container, within the same Kube pod we want to pipe data to and from.

As they are separate containers, each has a separate entrypoint and aren’t aware of the other running container. Because of this, the beloved Linux redirection operator >, e.g. <span class='text-style-code'>socat_container > main_container</span>, does not work. Furthermore, since this is Kubernetes, there isn’t even any parent process across both containers to make such a command possible.

Instead, we are going to use another tool from the Linux arsenal: named pipes. Named pipes are pipes that appear within the file system. This works as containers within a Kubernetes pod scheduled on the same node, which means access to the same local file system is guaranteed.

Let’s say we create the named pipes stdout-pipe and stderr-pipe. We can use these pipes within the container entrypoint with <span class='text-style-code'><entrypoint> <args> 2> stderr-pipe > stdout-pipe</span>.

Here, we redirect the entrypoint process’ stderr and stdout streams, via 2> via > respectively, to the stdout-pipe and stderr-pipe named pipes.

Our socat sidecar can now redirect the entrypoint process’ stdout stream out of the Kubernetes pod to a remote TCP port by reading from the stdout-pipe named pipe with <span class='text-style-code'>cat stdout-pipe | socat -d - TCP:[remote-ip]:[remote-port]</span>

The same can be done with the stderr-pipe named pipe. All we need is a pod listening on the other side!

Interested readers can check out how named pipes are used in our codebase today here.

Using an Airbyte Worker to dynamically create job pods

So far we’ve covered:

  1. How to horizontally scale: use Kubernetes.
  2. How to pipe data between Kube pods: use socat.
  3. How to bundle socat with containers: use the sidecar pattern.
  4. How to pipe data between our two containers: use named pipes.

How does all of this fit within an Airbyte Kubernetes deployment?

Let’s briefly talk about Airbyte workers. Airbyte workers pick up a pending job and orchestrate all the necessary pods to complete the job. Here, the worker pod is the pod using the above-mentioned Kubernetes client to create the aforementioned job pods.

Here’s a diagram illustrating how this works with a sync job.

  1. The worker pod creates a source job pod. It passes the job pod it’s IP address and the ports it’s listening on so the job pod’s socat sidecars can redirect stdout/stderr to it.
  2. The worker pod creates a destination job pod and tells the job pod to listen on a specific port so it can send input to the destination job pod. The worker pod also opens ports for the destination pod to pipe stdout/stderr to.
  3. Using socat and the above mechanism, the source job pod pipes data, read from the source, to the Airbyte worker. The worker process validates this before sending it to the destination job pod.
  4. The destination pod’s stdin socat sidecar receives this and sends it to the main container. The data is transformed to a destination-compatible format before being written to the destination.
  5. The destination pod’s socat sidecars pipe the destination job pod main container’s stdout/stderr back to the worker for status and error handling.

In summary, the worker pod sits between a dynamically created source and destination pod. It reads the source pod’s stdout stream, performs basic validation on the streamed data, and feeds that to the destination pod’s stdin stream. All this is possible through a holistic combination of the Kubernetes API, socat, named pipes, and sidecar containers. This mechanism powers all of Airbyte’s Kubernetes jobs.

Conclusion

There you have it - combining various low level Linux tools with the sidecar pattern allows one to easily pipe data between dynamically created runtime-agnostic applications in Kubernetes. This was crucial in adapting Airbyte for Kubernetes, and lets us scale Airbyte to the next level as we carry out our vision to power data movement for every company.

We’ve seen two main downsides while operating this architecture over the last 6 months:

  1. Because TCP does not contain instructions on how to read the data within a TCP connection, a separate port is required for every STDIO stream piped between two ports. Cleanly managing this is non-trivial.
  2. Data traverses 3 different pods - the source, the worker and the destination pods - before arriving at its final destination. This results in chatty and inefficient networking. We can do better!

We are already working on V2 of this architecture and will share what we learn. Stay tuned!

Getting started is easy

Start breaking your data siloes with Airbyte.