This notebook demonstrates simple RAG (Retrieval-Augmented Generation) pipeline with Pinecone and PyAirbyte. The focus is to showcase how to use source-file on PyAirbyte.
Prerequisites PyAirbyte PyAirbyte is an open-source that packages Airbyte connectors and makes them available in Python. In this tutorial, we will use the source-file Pinecone OpenAI API Key Go to the API Keys page to create the new secret key. Setup Install the dependencies and import them.
%pip install airbyte langchain langchain-openai langchain-pinecone langchainhub openai pinecone-client from langchain_openai import OpenAIEmbeddings from langchain_pinecone import PineconeVectorStore from pinecone import Pinecone, ServerlessSpec import json import os import time import airbyte as ab # Set your API keys PINECONE_API_KEY = 'your-pinecone-key' PINECONE_ENVIRONMENT = 'us-east-1' # e.g., "us-west1-gcp" OPENAI_API_KEY = 'sk-proj-xxxx' api_key = os.environ.get("PINECONE_API_KEY") os.environ["PINECONE_API_KEY"] = PINECONE_API_KEYData Source For this quickstart purpose we will extract CSV data related to reviews on a clothing brand that being hosted publicly so no need to create any credentials. Find more details about the data: https://cseweb.ucsd.edu/~jmcauley/pdfs/recsys18e.pdf .
In this quickstart we extract data with JSONL format and it's being compressed, we will see how it reflects on the config below. You can find the documentation related to source file specification for Airbyte here .
source = ab.get_source( "source-file", config={ "dataset_name": "ModCloth Data", "format": "jsonl", "url": "https://datarepo.eng.ucsd.edu/mcauley_group/data/modcloth/modcloth_final_data.json.gz", "provider": { "storage": "HTTPS", }, "reader_options": json.dumps( {"compression": "Gzip"} ), }, ) source.check() Connection check succeeded for `source-file`.
As we can see here, there is a reader_options that helps us to control how we are going to access the data. There are a lot of options for different file format that covers the common configurations. Make sure you check the documentation for more detailed implementation.
source.select_all_streams() read_result = source.read() Read Progress Started reading at 22:15:59.
Read 82,790 records over 14 seconds (5,913.6 records / second).
Wrote 82,790 records over 9 batches.
Finished reading at 22:16:14.
Started finalizing streams at 22:16:14.
Finalized 9 batches over 0 seconds.
Completed 1 out of 1 streams:
Completed writing at 22:16:14. Total time elapsed: 15 seconds
Completed `source-file` read operation at 05:16:14 .
Here, we are only interested on the reviews.
reviews = [doc["review_text"] for value in read_result.values() for doc in value if doc["review_text"]]['I liked the color, the silhouette, and the fabric of this dress. But the ruching just looked bunchy and ruined the whole thing. I was so disappointed, I really waned to like this dress. Runs a little small; I would need to size up to make it workappropriate.', "From the other reviews it seems like this dress either works for your body type or it doesn't. I have a small waist but flabby tummy and this dress is perfect for me! The detail around the front hides everything and the clingyness of the dress makes me look curvier than usual. The material is thick but clings to your bum (enough that when you walk the bum jiggle shows through!) and the slit is a bit high so it's not necessarily office appropriate without tights, but it's a good dress with tights or for an occasion.", "I love the design and fit of this dress! I wore it to a wedding and was comfortable all evening. The color is really pretty in person too! The fabric quality seems decent but not great so I'm not sure how many washes it will make it through.", "I bought this dress for work it is flattering and office appropriate. It hits just above my knees and I am pretty short at 5'1. Depending on how you adjust the top it can be a little low cut in the front, especially if you have a short torso. The material is on the thinner side, so should be great for summer/early fall and will work with tights underneath as well. I love it!", 'This is a very professional look. It is Great for work !']
Prepare Pinecone Index and Embedding embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY) pc = Pinecone(api_key=PINECONE_API_KEY) MODCLOTH_INDEX = "modcloth-reviews-index" if MODCLOTH_INDEX not in pc.list_indexes().names(): pc.create_index( name=MODCLOTH_INDEX, dimension=1536, metric='euclidean', spec=ServerlessSpec( cloud='aws', region=PINECONE_ENVIRONMENT ) ) # wait for index to be initialized while not pc.describe_index(MODCLOTH_INDEX).status['ready']: time.sleep(1) index = pc.Index(MODCLOTH_INDEX) Populate data for the vector store. For this demo purpose, we just load the first 100 reviews.
vector_store = PineconeVectorStore.from_texts( reviews[:100], embeddings, index_name=MODCLOTH_INDEX ) RAG Pipeline For below block of code, you can refer to this LangChain documentation . We will just use it here:
from langchain_openai import ChatOpenAI from langchain import hub from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough retriever = vector_store.as_retriever() prompt = hub.pull("rlm/rag-prompt") llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0, api_key=OPENAI_API_KEY) def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) print(rag_chain.invoke("Show 3 reviews text related with cute clothes"))
1. "Cute dress! Very comfy" 2. "Cute dress! Very comfy" 3. "This was a great dress"
Summary PyAirbyte source-file provides easy way for use to extract data from some file systems with varied formats. It also offers some flexibilities and options on how we want to extract the data, which is convenient.
About the Author About the Author