Implement AI data pipelines with Langchain, Airbyte, and Dagster

Learn how to set up a maintainable and scalable pipeline for integrating diverse data sources into large language models using Airbyte, Dagster, and LangChain.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Large language models (LLMs) like ChatGPT are emerging as a powerful technology for various use cases, but they need the right contextual data. This data is located in a wide variety of sources - CRM systems, external services and a variety of databases and warehouses. Also, a stable pipeline is required to keep the data up to date - this is not a one-off job you can do with some shell/Python hacking.

Airbyte, as a data integration (ELT) platform, plays an essential role in this, as it makes it very easy to get data from just about every tool right in front of your LLM. When combined with an orchestrator like Dagster and a framework like LangChain, making data accessible to LLMs like GPT becomes easy, maintainable and scalable.

This article explains how you can set up such a pipeline. 

Overview of what we’ll be building

Set up your connection in Airbyte to fetch the relevant data (choose from hundreds of data sources or implement your own):

Use Dagster to set up a pipeline that processes the data loaded by Airbyte and stores it in a vector store

Combine contextual information in the vectorstore with LLMs using the LangChain retrieval Question/Answering (QA) module:

The final code for this example can be found on Github.

Prerequisites

To run, you need:

  • Python 3 and Docker installed locally
  • An OpenAI api key

Install a bunch of Python dependencies we’ll need to go forward:

pip install openai faiss-cpu requests beautifulsoup4 tiktoken dagster_managed_elements langchain dagster dagster-airbyte dagit

Step 1: Set up the the Airbyte connection

First, start Airbyte locally, as described on https://github.com/airbytehq/airbyte#quick-start.

Set up a connection:

  • Configure a source - if you don’t have sample data ready, you can use the “Sample Data (Faker)” data source
  • Configure a “Local JSON” destination with path /local - dagster will pick up the data from there
  • Configure a connection from your configured source to the local json destination. Set the “Replication frequency” to manual as Dagster will take care of running the sync at the right point in time.
  • To keep things simple, only enable a single stream of records (in my case, I chose the “Account” stream from the Salesforce source)

Step 2: Configure the Dagster pipeline

Configure the software-defined assets for dagster in a new file ingest.py:

First, load the existing Airbyte connection as Dagster asset (no need to define manually). The load_assets_from_airbyte_instance function will use the API to fetch existing connections from your Airbyte instance and make them available as assets that can be specified as dependencies to the python-defined assets processing the records in the subsequent steps.

from dagster_airbyte import load_assets_from_airbyte_instance, AirbyteResource

airbyte_instance = AirbyteResource(
    host="localhost",
    port="8000",
)

airbyte_assets = load_assets_from_airbyte_instance(
    airbyte_instance,
    key_prefix="airbyte_asset",
)

Then, add the LangChain loader to turn the raw jsonl file into LangChain documents as a dependent asset (set stream_name to the name of the stream of records in Airbyte you want to make accessible to the LLM - in my case it’s Account):

from langchain.document_loaders import AirbyteJSONLoader
from dagster import asset, AssetIn

stream_name = "<stream name>"

airbyte_loader = AirbyteJSONLoader(
    f"/tmp/airbyte_local/_airbyte_raw_{stream_name}.jsonl"
)

@asset(
    non_argument_deps={AssetKey(["airbyte_asset", stream_name])},
)
def raw_documents():
    return airbyte_loader.load()

Then, add another step to the pipeline splitting the documents up into chunks so they will fit the LLM context later:

from langchain.text_splitter import RecursiveCharacterTextSplitter

@asset
def documents(raw_documents):
    return RecursiveCharacterTextSplitter(chunk_size=1000).split_documents(raw_documents)

The next step generates the embeddings for the documents:

from langchain.vectorstores.faiss import FAISS
from langchain.embeddings import OpenAIEmbeddings
import pickle

@asset
def vectorstore(documents):
    vectorstore_contents = FAISS.from_documents(documents, OpenAIEmbeddings())
    with open(“vectorstore.pkl”, “wb”) as f:
        pickle.dump(vectorstore_contents, f)

Finally, define how to manage IO (for this example just dumping the file to local disk) and export the definitions for Dagster:

from dagster import Definitions

defs = Definitions(
    assets=[airbyte_assets, raw_documents, documents, vectorstore]
)

See the full ingest.py file here

Step 3: Load your data

Now, we can run the pipeline: 

  • Run:
export OPENAI_API_KEY=<your api key>
  • Run the following to start Dagster:
dagster dev -f ingest.py
  • Go to http://127.0.0.1:3000/asset-groups to see the Dagster UI. You can click the “Materialize” button to materialize all the assets. This will run all steps of the pipeline:
  • Triggering an Airbyte job to load the data from the source into a local jsonl file
  • Splitting the data into nice document chunks that will fit the context window of the LLM
  • Embedding these documents
  • Storing the embeddings in a local vector database for later retrieval
  • Now, a vectorstore.pkl file showed up in your local directory - this contains the embeddings for the data we just loaded via Airbyte.

Alternatively, you can materialize the Dagster assets directly from the command line using:

dagster asset materialize --select \* -f ingest.py

Step 4: Create QA application with Langchain

The next step is to put it to work by running a QA chain using LLMs:

To do this, create a new file query.py.

Load the embeddings:

from langchain.vectorstores import VectorStore
import pickle

vectorstore_file = "vectorstore.pkl"

with open(vectorstore_file, "rb") as f:
    global vectorstore
    local_vectorstore: VectorStore = pickle.load(f)

Initialize LLM and QA retrieval chain based on the vectorstore:

from langchain.chains import RetrievalQA
from langchain.llms import OpenAI

qa = RetrievalQA.from_chain_type(llm=OpenAI(temperature=0), chain_type="stuff", retriever=local_vectorstore.as_retriever())

Add a question-answering loop as the interface:

print("Chat LangChain Demo")
print("Ask a question to begin:")
while True:
    query = input("")
    answer = qa.run(query)
    print(answer)
    print("\nWhat else can I help you with:")

See the full query.py file here

You can run the QA bot with:

OPENAI_API_KEY=YOUR_API_KEY python query.py

When asking questions about your use case (e.g. CRM data), LangChain will manage the interaction between the LLM and the vector store:

  • The LLM receives a task from the user
  • The LLM queries the vector store based on the given task
  • LangChain embeds the question in the same way as the incoming records were embedded during the ingest phase - a similarity search of the embeddings returns the most relevant document which is passed to the LLM
  • The LLM formulates an answer based on the contextual information

This is just a simplistic demo, but it showcases how to use Airbyte and Dagster to bring data into a format that can be used by LangChain.

Where to go from here?

From this core use case, there are a lot of directions to explore further:

  • Get deeper into what can be done with Dagster by reading this excellent article on the Dagster blog
  • Check out the Airbyte catalog to learn more about what kinds of data you are able to load with minimal effort
  • In case you are dealing with large amounts of data, consider storing your data on S3 or a similar service -  this is supported by Airbyte S3 destination and built-in Dagster IO managers
  • Look into the various vector stores supported by LangChain - a managed service like offered by Pinecone or Weaviate can simplify things a lot
  • Productionize your pipeline by deploying Dagster properly and run your pipeline on a schedule
  • A big advantage of LLMs is that they can be multi-purpose - add multiple retrieval “tools” to your QA system to allow the bot to answer to a wide range of questions

LangChain doesn’t stop at question answering - explore the LangChain documentation to learn about other use cases like summarization, information extraction and autonomous agents

Similar use cases

How to implement AI data pipeline?: Langchain, Dagster & Airbyte

Learn how to set up a maintainable and scalable pipeline for integrating diverse data sources into large language models using Airbyte, Dagster, and LangChain.

MySQL CDC: Build an ELT pipeline from MySQL Database

Easily set up MySQL CDC using Airbyte, harnessing the power of a robust tool like Debezium to construct a near real-time ELT pipeline.

Build an EL(T) from PostgreSQL using Change Data Capture (CDC)

Set up PostgreSQL CDC in minutes using Airbyte, leveraging a powerful tool like Debezium to build a near real-time EL(T).