End-to-end RAG using S3, PyAirbyte, Pinecone, and Langchain

Learn how to build an end-to-end RAG pipeline, extracting data from an S3 bucket using PyAirbyte, storing it in a Pinecone vector store, and then use LangChain to perform RAG on the stored data.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This notebook demonstrates an end-to-end Retrieval-Augmented Generation (RAG) pipeline. We will extract data from an S3 bucket using PyAirbyte, store it in a Pinecone vector store, and then use LangChain to perform RAG on the stored data. This workflow showcases how to integrate these tools to build a scalable RAG system.

Prerequisites

  1. AWS S3 Bucket:
  2. Pinecone Account:
    • Create a Pinecone Account: Sign up for an account on the Pinecone website.
    • Obtain Pinecone API Key: Generate a new API key from your Pinecone project settings. For detailed instructions, refer to the Pinecone documentation.
  3. OpenAI API Key:
    • Create an OpenAI Account: Sign up for an account on OpenAI.
    • Generate an API Key: Go to the API section and generate a new API key. For detailed instructions, refer to the OpenAI documentation.

Install PyAirbyte and other dependencies

# Add virtual environment support in Google Colab
!apt-get install -qq python3.10-venv

# First, we need to install the necessary libraries.
!pip3 install airbyte openai langchain pinecone-client langchain-openai langchain-pinecone python-dotenv langchainhub

Setup Source S3 with PyAirbyte

The provided code configures an Airbyte source to extract data from an Amazon S3 bucket containing CSV files.

To configure according to your requirements, you can refer to this references.

Note: The credentials are retrieved securely using the get_secret() method. This will automatically locate a matching Google Colab secret or environment variable, ensuring they are not hard-coded into the notebook. Make sure to add your key to the Secrets section on the left.

import airbyte as ab

source = ab.get_source(
    "source-s3",
    config={
        "streams": [
            {
                "name": "",
                "format": {
                    "filetype": "csv",
                    "ignore_errors_on_fields_mismatch": True,
                },
                "globs": ["**"],
                "legacy_prefix": "",
                "validation_policy": "Emit Record",
            }
        ],
        "bucket": ab.get_secret("S3_BUCKET_NAME"),
        "aws_access_key_id": ab.get_secret("AWS_ACCESS_KEY"),
        "aws_secret_access_key": ab.get_secret("AWS_SECRET_KEY"),
        "region_name": ab.get_secret("AWS_REGION")
    }
)
source.check()

This is a basic process of fetching data from an S3 bucket using Airbyte and converting it into a format suitable for further processing or analysis.

source.select_all_streams() # Select all streams
read_result = source.read() # Read the data
documents_list = [doc for value in read_result.values() for doc in value.to_documents()]

print(str(documents_list[10]))

Use Langchain to build a RAG pipeline

The code uses RecursiveCharacterTextSplitter to break documents into smaller chunks. Metadata within these chunks is converted to strings. This facilitates efficient processing of large texts, enhancing analysis capabilities.

from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
chunked_docs = splitter.split_documents(documents_list)
print(f"Created {len(chunked_docs)} document chunks.")

for doc in chunked_docs:
    for md in doc.metadata:
        doc.metadata[md] = str(doc.metadata[md])
from langchain_openai import OpenAIEmbeddings
import os

os.environ['OPENAI_API_KEY'] = ab.get_secret("OPENAI_API_KEY")
## Embedding Technique Of OPENAI
embeddings=OpenAIEmbeddings()

Setting up Pinecone

Pinecone is a managed vector database service designed for storing, indexing, and querying high-dimensional vector data efficiently.

from pinecone import Pinecone, ServerlessSpec
from pinecone import Pinecone

os.environ['PINECONE_API_KEY'] = ab.get_secret("PINECONE_API_KEY")
pc = Pinecone()
index_name = "s3-quickstarts-index" # Replace with your index name


# Uncomment this if you have not created a Pinecone index yet

# spec = ServerlessSpec(cloud="aws", region="us-east-1") // Replace with your cloud and region
# pc.create_index(
#         "quickstarts",
#         dimension=1536, // Replace with your model dimensions
#         metric='cosine', // Replace with your model metric
#         spec=spec
# )

index = pc.Index(index_name)

index.describe_index_stats()

PineconeVectorStore is a class provided by the LangChain library specifically designed for interacting with Pinecone vector stores. from_documents method of PineconeVectorStore is used to create or update vectors in a Pinecone vector store based on the provided documents and their corresponding embeddings.

from langchain_pinecone import PineconeVectorStore

pinecone = PineconeVectorStore.from_documents(
    chunked_docs, embeddings, index_name=index_name
)

Now setting up a pipeline for RAG using LangChain, incorporating document retrieval from Pinecone, prompt configuration, and a chat model from OpenAI for response generation.

from langchain_openai import ChatOpenAI
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

retriever = pinecone.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)


def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)
print("Langchain RAG pipeline set up successfully.")
print(rag_chain.invoke("What are some best documentaries to watch?"))

Similar use cases

No similar recipes were found, but check back soon!