This notebook demonstrates an end-to-end Retrieval-Augmented Generation (RAG) pipeline. We will extract data from Jira using PyAirbyte, store it in a Pinecone vector store, and then use LangChain to perform RAG on the stored data. This workflow showcases how to integrate these tools to build a scalable RAG system.
Prerequisites
- Jira:
- Pinecone Account:
- Create a Pinecone Account: Sign up for an account on the Pinecone website.
- Obtain Pinecone API Key: Generate a new API key from your Pinecone project settings. For detailed instructions, refer to the Pinecone documentation.
- OpenAI API Key:
- Create an OpenAI Account: Sign up for an account on OpenAI.
- Generate an API Key: Go to the API section and generate a new API key. For detailed instructions, refer to the OpenAI documentation.
Install PyAirbyte and other dependencies
!pip3 install airbyte openai langchain pinecone-client langchain-openai langchain-pinecone langchainhub
Setup Source Jira with PyAirbyte
The provided code configures an Airbyte source to extract issues data from jira data
To configure according to your requirements, you can refer to this references.
Note: The credentials are retrieved securely using the get_secret() method. This will automatically locate a matching Google Colab secret or environment variable, ensuring they are not hard-coded into the notebook. Make sure to add your key to the Secrets section on the left.
import airbyte as ab
import json
projects = json.loads(ab.get_secret('projects_list'))
source = ab.get_source(
"source-jira",
install_if_missing=True,
config={
"api_token": ab.get_secret('jira_api_token'),
"domain": ab.get_secret('jira_domain') ,
"email": ab.get_secret('jira_email_id'),
"start_date": "2021-01-01T00:00:00Z", # optional field, can be ignored
"projects": projects
},
)
# Verify the config and creds by running `check`:
source.check()
source.select_streams(['issues']) # Select only issues stream
read_result: ab.ReadResult = source.read()
documents_list = []
for key, value in read_result.items():
docs = value.to_documents()
for doc in docs:
documents_list.append(doc)
print(str(documents_list))
# store and display the issues stream in data frame
issues_df = read_result["issues"].to_pandas()
display(issues_df)
Use Langchain to build a RAG pipeline.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.utils import filter_complex_metadata
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
chunked_docs = splitter.split_documents(documents_list)
chunked_docs = filter_complex_metadata(chunked_docs)
print(f"Created {len(chunked_docs)} document chunks.")
for doc in chunked_docs:
for md in doc.metadata:
doc.metadata[md] = str(doc.metadata[md])
from langchain_openai import OpenAIEmbeddings
import os
os.environ['OPENAI_API_KEY'] = ab.get_secret("OPENAI_API_KEY")
embeddings=OpenAIEmbeddings()
Setting up Pinecone
Pinecone is a managed vector database service designed for storing, indexing, and querying high-dimensional vector data efficiently.
from pinecone import Pinecone, ServerlessSpec
os.environ['PINECONE_API_KEY'] = ab.get_secret("PINECONE_API_KEY")
index_name = "airbytejiraindex"
pc = Pinecone()
# Create pinecone index if not exists otherwise skip this step
if not (pc.list_indexes()[0]['name'] == index_name):
pc.create_index(
name=index_name,
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
index = pc.Index(index_name)
index.describe_index_stats()
from langchain_pinecone import PineconeVectorStore
pinecone = PineconeVectorStore.from_documents(
chunked_docs, embedding=embeddings, index_name=index_name
)
RAG
from langchain_openai import ChatOpenAI
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
retriever = pinecone.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
os.environ['OPENAI_API_KEY'] = ab.get_secret("OPENAI_API_KEY")
llm = ChatOpenAI(model_name="gpt-3.5-turbo")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
print("Langchain RAG pipeline set up successfully.")
print(rag_chain.invoke("Summarize the issue of key IT-20"))
print(rag_chain.invoke("What is the source data about?"))