This notebook demonstrates an end-to-end Retrieval-Augmented Generation (RAG) pipeline. We will extract data from Google Drive using PyAirbyte, store it in a Pinecone vector store, and then use LangChain to perform RAG on the stored data. This workflow showcases how to integrate these tools to build a scalable RAG system.
Prerequisites
- GoogleDrive:
- Pinecone Account:
- Create a Pinecone Account: Sign up for an account on the Pinecone website.
- Obtain Pinecone API Key: Generate a new API key from your Pinecone project settings. For detailed instructions, refer to the Pinecone documentation.
- OpenAI API Key:
- Create an OpenAI Account: Sign up for an account on OpenAI.
- Generate an API Key: Go to the API section and generate a new API key. For detailed instructions, refer to the OpenAI documentation.
Install PyAirbyte and other dependencies
Setup Source GoogleDrive with PyAirbyte
The provided code configures an Airbyte source to extract data from an GoogleDrive Folder contains CSV file named NFLX.csv
To configure according to your requirements, you can refer to this references.
Note: The credentials are retrieved securely using the get_secret() method. This will automatically locate a matching Google Colab secret or environment variable, ensuring they are not hard-coded into the notebook. Make sure to add your key to the Secrets section on the left.
import airbyte as ab
service_json = ab.get_secret('service_json')
source = ab.get_source(
"source-google-drive",
install_if_missing=True,
config={
"folder_url": "https://drive.google.com/drive/folders/1txtyBv_mfXYjn0R_-oxV3Vg5QOi-6XaI",
"credentials": {
"auth_type": "Service",
"service_account_info": f"""{service_json}""",
},
"streams": [{
"name": "NFLX",
"globs": ["**/*.csv"],
"format": {
"filetype": "csv"
},
"validation_policy": "Emit Record",
"days_to_sync_if_history_is_full": 3
}]
},
)
# Verify the config and creds by running `check`:
source.check()
Connection check succeeded for `source-google-drive`.
This is a basic process of fetching data from a Google Drive CSV source using Airbyte and converting it into a list of document objects, making it suitable for further processing or analysis.
# This code reads data from a Google Drive CSV source and converts it into a list of document objects.
source.select_all_streams() # Select all streams from the Google Drive source
read_result = source.read() # Read the data from the selected streams
documents_list = []
# Convert the read data into document objects and add them to the list
for key, value in read_result.items():
docs = value.to_documents()
for doc in docs:
documents_list.append(doc)
# Print the Single row of the csv
print(str(documents_list[0]))
Read Progress
Started reading at 11:20:50.
Read 0 records over 2 seconds (0.0 records / second).
Finished reading at 11:20:53.
Started finalizing streams at 11:20:53.
Finalized 0 batches over 0 seconds.
Completed `source-google-drive` read operation at 16:50:53.
```yaml
_ab_source_file_last_modified: '2024-06-04T04:00:24.000000Z'
_ab_source_file_url: NFLX.csv
_airbyte_extracted_at: 2024-06-07 11:20:25.946000
_airbyte_meta: {}
_airbyte_raw_id: 01HZS6VC3573GHEKRHMW1KA41T
adj_close: '254.259995'
close: '254.259995'
date: '2018-02-05'
high: '267.899994'
low: '250.029999'
open: '262.000000'
volume: '11896100'
Use Langchain to build a RAG pipeline
RecursiveCharacterTextSplitter from the langchain library to split documents into smaller chunks of 512 characters with a 50-character overlap.
It then converts all metadata values in each chunk to strings and prints the total number of created document chunks.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.utils import filter_complex_metadata
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
chunked_docs = splitter.split_documents(documents_list)
chunked_docs = filter_complex_metadata(chunked_docs)
print(f"Created {len(chunked_docs)} document chunks.")
for doc in chunked_docs:
for md in doc.metadata:
doc.metadata[md] = str(doc.metadata[md])
Created 1009 document chunks.
from langchain_community.embeddings import HuggingFaceEmbeddings
## Create Embeddings using HuggingFace sentence-transformers/all-mpnet-base-v2 model
embeddings=HuggingFaceEmbeddings()
Setting up Pinecone
Pinecone is a managed vector database service designed for fast similarity search and real-time recommendation systems, offering scalability, efficiency, and ease of integration.
from pinecone import Pinecone, ServerlessSpec
import os
os.environ['PINECONE_API_KEY'] = ab.get_secret("PINECONE_API_KEY")
index_name = "gdriveairbyteindex"
pc = Pinecone()
# Create pinecone index if not exists otherwise skip this step
if not (pc.list_indexes()[0]['name'] == index_name):
pc.create_index(
name=index_name,
dimension=768,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
index = pc.Index(index_name)
index.describe_index_stats()
{'dimension': 768,
'index_fullness': 0.0,
'namespaces': {'': {'vector_count': 1009}},
'total_vector_count': 1009}
PineconeVectorStore
PineconeVectorStore to store and index high-dimensional vectors extracted from documents, leveraging embeddings provided by Hugging Face
from langchain_pinecone import PineconeVectorStore
pinecone = PineconeVectorStore.from_documents(
chunked_docs, embedding=embeddings, index_name=index_name
)
RAG
Retrieval Augumented Generation provides the Large Language Model (LLM) the context and ask the Large Language Model (LLM) to use the context to generate the response.
This RAG implementation uses the vector databases to the store the text doc embeddings (generated from the data from your knowledge base) and based on the given query, this code retreives the relevant information from the pinecone vector database and add that text context to your prompt. This will be used by the llm to generate the response
from langchain_openai import ChatOpenAI
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
import os
os.environ['OPENAI_API_KEY'] = ab.get_secret("OPENAI_API_KEY")
retriever = pinecone.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
print("Langchain RAG pipeline set up successfully.")
Langchain RAG pipeline set up successfully.
print(rag_chain.invoke("What is the source data about?"))
The source data is about stock market information for Netflix (NFLX) on different dates, including details like opening price, closing price, high, low, adjusted close, and volume traded. The data includes specific dates ranging from 2018-10-31 to 2019-10-18. The information is extracted from a CSV file named NFLX.csv.