This notebook demonstrates simple RAG (Retrieval-Augmented Generation) pipeline with Milvus Lite and PyAirbyte. The focus is to showcase how to set these components for a fully local development in Python.
Prerequisites
- PyAirbyte
- PyAirbyte is an open-source that packages Airbyte connectors and makes them available in Python. In this tutorial, we will just use the
source-faker
, but it's easy to set it up for other sources. - Milvus Lite
- Milvus Lite is the lightweight version of Milvus that enables vector emdeddings and similarity search into the Python application.
- OpenAI API Key
- Go to the API Keys page to create the new secret key.
1. Install and set dependencies
%pip install airbyte pymilvus openai milvus-model
import json
import os
import airbyte as ab
from openai import OpenAI
from pymilvus import MilvusClient
# in production, you might want to avoid putting the key here.
os.environ["OPENAI_API_KEY"] = "sk-proj-****"
openai_client = OpenAI()
2. Set the source
For simplicity, we will just use source-faker
to generate some data.
source = ab.get_source(
"source-faker",
config={"count": 5_0},
install_if_missing=True,
)
source.check()
source.select_all_streams()
result = source.read()
for name, records in result.streams.items():
print(f"Streams of {name} has {len(records)} records.")
Read Progress
Started reading at 08:19:11.
Read 150 records over 1 seconds (150.0 records / second).
Wrote 150 records over 3 batches.
Finished reading at 08:19:13.
Started finalizing streams at 08:19:13.
Finalized 3 batches over 1 seconds.
Completed 3 out of 3 streams:
Completed writing at 08:19:14. Total time elapsed: 3 seconds
Completed `source-faker` read operation at 15:19:14.
Streams of products has 50 records.
Streams of users has 50 records.
Streams of purchases has 50 records.
Here we can see there are streams of products
, users
, and purchases
. All of them has 50 records.
3. Milvus Lite & Text Embedding
milvus_client = MilvusClient("./milvus_source_fake.db")
{"type": "DEBUG", "message": "Created new connection using: 5a7d992d451b41db831d254213b64892", "data": {}}
This will create the milvus_source_fake.db
if this is the first initialization. There are some limitations, but this quick setup for local development should be enought to test the pipeline.
Let's focused to get the products
data. We will keep it simple by just getting the relevant fields:
data = []
for record in result.streams["products"]:
make = record["make"]
model = record["model"]
year = record["year"]
text = f"{make} {model} {year}"
data.append(text)
['Mazda MX-5 2023',
'Mercedes-Benz C-Class 2023',
'Honda Accord Crosstour 2023',
'GMC Jimmy 2023',
'Infiniti FX 2023']
['Mazda MX-5 2023',
'Mercedes-Benz C-Class 2023',
'Honda Accord Crosstour 2023',
'GMC Jimmy 2023',
'Infiniti FX 2023']
embedded_data = openai_ef.encode_documents(data)
milvus_client.create_collection("products", dimension=512)
{"type": "DEBUG", "message": "Successfully created collection: products", "data": {}}
{"type": "DEBUG", "message": "Successfully created an index on collection: products", "data": {}}
embedded_docs = []
for _id, embedded_text in enumerate(embedded_data):
embedded_docs.append({"id": _id+1, "vector": embedded_text, "text": data[_id]})
milvus_client.insert(collection_name="products", data=embedded_docs)
{'insert_count': 50, 'ids': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50], 'cost': 0}
4. Inspect the search results
question = "Give list of products from Suzuki"
search_res = milvus_client.search(
collection_name="products",
data=[
openai_ef.encode_documents([question])[0]
], # Use the `emb_text` function to convert the question to an embedding vector
limit=3, # Return top 3 results
search_params={"metric_type": "COSINE", "params": {}}, # Inner product distance
output_fields=["text"], # Return the text field
)
retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
[
"Suzuki SJ 410 2023",
0.5219288468360901
],
[
"Isuzu VehiCROSS 2023",
0.38782158493995667
],
[
"Jaguar S-Type 2023",
0.35628464818000793
]
]
5. Use OpenAI ChatGPT to get the RAG response
Let's define the system and user prompts for the Language Model. This prompt is assembled with the retrieved documents from Milvus.
We also use OpenAI ChatGPT to generate a response based on the prompts.
context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
SYSTEM_PROMPT = """Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided."""USER_PROMPT = f"""Use the following pieces of information enclosed in tags to provide an answer to the question enclosed in tags.{context}{question}"""
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)
print(response.choices[0].message.content)
Summary
This shows how easy to build RAG pipeline in Python for quick local development which helps us to speed our development iterations. All within the comport of Python environment.