This notebook demonstrates simple RAG (Retrieval-Augmented Generation) pipeline with Facebook Marketing, Milvus Lite and PyAirbyte.
Prerequisites
- PyAirbyte
- PyAirbyte is an open-source that packages Airbyte connectors and makes them available in Python. We will connect to
source-facebook-marketing
, and retrive its streams - Milvus Lite
- Milvus Lite is the lightweight version of Milvus that enables vector emdeddings and similarity search into the Python application.
- OpenAI API Key
- Go to the API Keys page to create the new secret key.
1. Install and set dependencies
%pip install airbyte pymilvus openai milvus-model
import json
import os
import airbyte as ab
from openai import OpenAI
from pymilvus import MilvusClient
# in production, you might want to avoid putting the key here.
os.environ["OPENAI_API_KEY"] = "sk-proj-****"
openai_client = OpenAI()
2. Set the source
Connect source-facebook-marketing
to fetch streams
source = ab.get_source(
"source-facebook-marketing",
config={
"start_date": "2024-06-01T00:00:00Z",
"account_id": "account",
"access_token": "token"
},
install_if_missing=True,
)
source.check()
source.select_all_streams()
result = source.read()
for name, records in result.streams.items():
print(f"Streams of {name} has {len(records)} records.")
3. Milvus Lite & Text Embedding
milvus_client = MilvusClient("./milvus_source_fake.db")
milvus_client = MilvusClient("./milvus_source_fake.db")
data = []
for record in result.streams["products"]:
make = record["make"]
model = record["model"]
year = record["year"]
text = f"{make} {model} {year}"
data.append(text)
['Mazda MX-5 2023',
'Mercedes-Benz C-Class 2023',
'Honda Accord Crosstour 2023',
'GMC Jimmy 2023',
'Infiniti FX 2023']
from pymilvus import model
openai_ef = model.dense.OpenAIEmbeddingFunction(
model_name='text-embedding-3-large', # Specify the model name
api_key=os.environ["OPENAI_API_KEY"], # Provide your OpenAI API key
dimensions=512 # Set the embedding dimensionality
)
embedded_data = openai_ef.encode_documents(data)
milvus_client.create_collection("products", dimension=512)
{"type": "DEBUG", "message": "Successfully created collection: products", "data": {}}
{"type": "DEBUG", "message": "Successfully created an index on collection: products", "data": {}}
embedded_docs = []
for _id, embedded_text in enumerate(embedded_data):
embedded_docs.append({"id": _id+1, "vector": embedded_text, "text": data[_id]})
milvus_client.insert(collection_name="products", data=embedded_docs)
{'insert_count': 50,
'ids': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50],
'cost': 0}
4. Inspect the search results
question = "Give list of products from Suzuki"
search_res = milvus_client.search(
collection_name="products",
data=[
openai_ef.encode_documents([question])[0]
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "COSINE", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
"Suzuki SJ 410 2023",
0.5219288468360901
],
[
"Isuzu VehiCROSS 2023",
0.38782158493995667
],
[
"Jaguar S-Type 2023",
0.35628464818000793
]
]
5. Use OpenAI ChatGPT to get the RAG response
Let's define the system and user prompts for the Language Model. This prompt is assembled with the retrieved documents from Milvus.
We also use OpenAI ChatGPT to generate a response based on the prompts.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in tags to provide an answer to the question enclosed in tags.
{context}
{question}
"""
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
Summary
This shows how easy to build RAG pipeline in Python for quick local development which helps us to speed our development iterations. All within the comport of Python environment.