End-to-end RAG using a file source, PyAirbyte, Pinecone, and Langchain

Learn how to build a RAG pipeline, extracting data from a file source using PyAirbyte, storing it in a Pinecone vector store, and then using LangChain to perform RAG on the stored data.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This notebook demonstrates simple RAG (Retrieval-Augmented Generation) pipeline with Pinecone and PyAirbyte. The focus is to showcase how to use source-file on PyAirbyte.

Prerequisites

  • PyAirbyte
  • PyAirbyte is an open-source that packages Airbyte connectors and makes them available in Python. In this tutorial, we will use the source-file
  • Pinecone
  • OpenAI API Key
  • Go to the API Keys page to create the new secret key.

Setup

Install the dependencies and import them.

%pip install airbyte langchain langchain-openai langchain-pinecone langchainhub openai pinecone-client

from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec

import json
import os
import time


import airbyte as ab


# Set your API keys
PINECONE_API_KEY = 'your-pinecone-key'
PINECONE_ENVIRONMENT = 'us-east-1'  # e.g., "us-west1-gcp"
OPENAI_API_KEY = 'sk-proj-xxxx'

api_key = os.environ.get("PINECONE_API_KEY")
os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY

Data Source

For this quickstart purpose we will extract CSV data related to reviews on a clothing brand that being hosted publicly so no need to create any credentials. Find more details about the data: https://cseweb.ucsd.edu/~jmcauley/pdfs/recsys18e.pdf.

In this quickstart we extract data with JSONL format and it's being compressed, we will see how it reflects on the config below. You can find the documentation related to source file specification for Airbyte here.


source = ab.get_source(
    "source-file",
    config={
        "dataset_name": "ModCloth Data",
        "format": "jsonl",
        "url": "https://datarepo.eng.ucsd.edu/mcauley_group/data/modcloth/modcloth_final_data.json.gz",
        "provider": {
            "storage": "HTTPS",
        },
        "reader_options": json.dumps(
            {"compression": "Gzip"}
        ),
    },
)
source.check()


Connection check succeeded for `source-file`.

As we can see here, there is a reader_options that helps us to control how we are going to access the data. There are a lot of options for different file format that covers the common configurations. Make sure you check the documentation for more detailed implementation.

Extract the data


source.select_all_streams()
read_result = source.read()


Read Progress

Started reading at 22:15:59.

Read 82,790 records over 14 seconds (5,913.6 records / second).

Wrote 82,790 records over 9 batches.

Finished reading at 22:16:14.

Started finalizing streams at 22:16:14.

Finalized 9 batches over 0 seconds.

Completed 1 out of 1 streams:

  • ModCloth Data

Completed writing at 22:16:14. Total time elapsed: 15 seconds

Completed `source-file` read operation at 05:16:14.

Here, we are only interested on the reviews.


reviews = [doc["review_text"] for value in read_result.values() for doc in value if doc["review_text"]]

reviews[:5]

['I liked the color, the silhouette, and the fabric of this dress. But the ruching just looked bunchy and ruined the whole thing. I was so disappointed, I really waned to like this dress. Runs a little small; I would need to size up to make it workappropriate.',
"From the other reviews it seems like this dress either works for your body type or it doesn't. I have a small waist but flabby tummy and this dress is perfect for me! The detail around the front hides everything and the clingyness of the dress makes me look curvier than usual. The material is thick but clings to your bum (enough that when you walk the bum jiggle shows through!) and the slit is a bit high so it's not necessarily office appropriate without tights, but it's a good dress with tights or for an occasion.",
"I love the design and fit of this dress!  I wore it to a wedding and was comfortable all evening. The color is really pretty in person too!  The fabric quality seems decent but not great so I'm not sure how many washes it will make it through.",
"I bought this dress for work  it is flattering and office appropriate.  It hits just above my knees and I am pretty short at 5'1.  Depending on how you adjust the top it can be a little low cut in the front, especially if you have a short torso. The material is on the thinner side, so should be great for summer/early fall and will work with tights underneath as well.  I love it!",
'This is a very professional look. It is Great for work !']

Prepare Pinecone Index and Embedding


embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)

pc = Pinecone(api_key=PINECONE_API_KEY)

MODCLOTH_INDEX = "modcloth-reviews-index"


if MODCLOTH_INDEX not in pc.list_indexes().names():
    pc.create_index(
        name=MODCLOTH_INDEX, 
        dimension=1536, 
        metric='euclidean',
        spec=ServerlessSpec(
            cloud='aws',
            region=PINECONE_ENVIRONMENT
        )
    )

    # wait for index to be initialized
    while not pc.describe_index(MODCLOTH_INDEX).status['ready']:
        time.sleep(1)


index = pc.Index(MODCLOTH_INDEX)


Populate data for the vector store. For this demo purpose, we just load the first 100 reviews.


vector_store = PineconeVectorStore.from_texts(
    reviews[:100], embeddings, index_name=MODCLOTH_INDEX
)


RAG Pipeline

For below block of code, you can refer to this LangChain documentation. We will just use it here:


from langchain_openai import ChatOpenAI
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

retriever = vector_store.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0, api_key=OPENAI_API_KEY)


def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

print(rag_chain.invoke("Show 3 reviews text related with cute clothes"))


1. "Cute dress! Very comfy"
2. "Cute dress! Very comfy"
3. "This was a great dress"

Summary

PyAirbyte source-file provides easy way for use to extract data from some file systems with varied formats. It also offers some flexibilities and options on how we want to extract the data, which is convenient.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Similar use cases

No similar recipes were found, but check back soon!