Implement AI data pipelines with Langchain, Airbyte, and Dagster
Learn how to set up a maintainable and scalable pipeline for integrating diverse data sources into large language models using Airbyte, Dagster, and LangChain.
Join our newsletter to get all the insights on the data stack
Should you build or buy your data pipelines?
Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.
Large language models (LLMs) like ChatGPT are emerging as a powerful technology for various use cases, but they need the right contextual data. This data is located in a wide variety of sources - CRM systems, external services and a variety of databases and warehouses. Also, a stable pipeline is required to keep the data up to date - this is not a one-off job you can do with some shell/Python hacking.
Airbyte, as a data integration (ELT) platform, plays an essential role in this, as it makes it very easy to get data from just about every tool right in front of your LLM. When combined with an orchestrator like Dagster and a framework like LangChain, making data accessible to LLMs like GPT becomes easy, maintainable and scalable.
This article explains how you can set up such a pipeline.
Overview of what we’ll be building
Set up your connection in Airbyte to fetch the relevant data (choose from hundreds of data sources or implement your own):
Use Dagster to set up a pipeline that processes the data loaded by Airbyte and stores it in a vector store
Combine contextual information in the vectorstore with LLMs using the LangChain retrieval Question/Answering (QA) module:
The final code for this example can be found on Github.
Prerequisites
To run, you need:
Python 3 and Docker installed locally
An OpenAI api key
Install a bunch of Python dependencies we’ll need to go forward:
Configure a source - if you don’t have sample data ready, you can use the “Sample Data (Faker)” data source
Configure a “Local JSON” destination with path /local - dagster will pick up the data from there
Configure a connection from your configured source to the local json destination. Set the “Replication frequency” to manual as Dagster will take care of running the sync at the right point in time.
To keep things simple, only enable a single stream of records (in my case, I chose the “Account” stream from the Salesforce source)
Step 2: Configure the Dagster pipeline
Configure the software-defined assets for dagster in a new file ingest.py:
First, load the existing Airbyte connection as Dagster asset (no need to define manually). The load_assets_from_airbyte_instance function will use the API to fetch existing connections from your Airbyte instance and make them available as assets that can be specified as dependencies to the python-defined assets processing the records in the subsequent steps.
Then, add the LangChain loader to turn the raw jsonl file into LangChain documents as a dependent asset (set stream_name to the name of the stream of records in Airbyte you want to make accessible to the LLM - in my case it’s Account):
Go to http://127.0.0.1:3000/asset-groups to see the Dagster UI. You can click the “Materialize” button to materialize all the assets. This will run all steps of the pipeline:
Triggering an Airbyte job to load the data from the source into a local jsonl file
Splitting the data into nice document chunks that will fit the context window of the LLM
Embedding these documents
Storing the embeddings in a local vector database for later retrieval
Now, a vectorstore.pkl file showed up in your local directory - this contains the embeddings for the data we just loaded via Airbyte.
Alternatively, you can materialize the Dagster assets directly from the command line using:
The next step is to put it to work by running a QA chain using LLMs:
To do this, create a new file query.py.
Load the embeddings:
from langchain.vectorstores import VectorStore
import pickle
vectorstore_file = "vectorstore.pkl"
with open(vectorstore_file, "rb") as f:
global vectorstore
local_vectorstore: VectorStore = pickle.load(f)
Initialize LLM and QA retrieval chain based on the vectorstore:
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
qa = RetrievalQA.from_chain_type(llm=OpenAI(temperature=0), chain_type="stuff", retriever=local_vectorstore.as_retriever())
Add a question-answering loop as the interface:
print("Chat LangChain Demo")
print("Ask a question to begin:")
while True:
query = input("")
answer = qa.run(query)
print(answer)
print("\nWhat else can I help you with:")
When asking questions about your use case (e.g. CRM data), LangChain will manage the interaction between the LLM and the vector store:
The LLM receives a task from the user
The LLM queries the vector store based on the given task
LangChain embeds the question in the same way as the incoming records were embedded during the ingest phase - a similarity search of the embeddings returns the most relevant document which is passed to the LLM
The LLM formulates an answer based on the contextual information
This is just a simplistic demo, but it showcases how to use Airbyte and Dagster to bring data into a format that can be used by LangChain.
Where to go from here?
From this core use case, there are a lot of directions to explore further:
Get deeper into what can be done with Dagster by reading this excellent article on the Dagster blog
Check out the Airbyte catalog to learn more about what kinds of data you are able to load with minimal effort
A big advantage of LLMs is that they can be multi-purpose - add multiple retrieval “tools” to your QA system to allow the bot to answer to a wide range of questions
LangChain doesn’t stop at question answering - explore the LangChain documentation to learn about other use cases like summarization, information extraction and autonomous agents
Should you build or buy your data pipelines?
Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.
Learn how to set up a maintainable and scalable pipeline for integrating diverse data sources into large language models using Airbyte, Dagster, and LangChain.