End-to-end RAG using Milvus Lite and PyAirbyte - fully in Python

Learn how to build a simple RAG (Retrieval-Augmented Generation) pipeline with Milvus Lite and PyAirbyte, for a fully local development in Python.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This notebook demonstrates simple RAG (Retrieval-Augmented Generation) pipeline with Milvus Lite and PyAirbyte. The focus is to showcase how to set these components for a fully local development in Python.

Prerequisites

  • PyAirbyte
  • PyAirbyte is an open-source that packages Airbyte connectors and makes them available in Python. In this tutorial, we will just use the source-faker, but it's easy to set it up for other sources.
  • Milvus Lite
  • Milvus Lite is the lightweight version of Milvus that enables vector emdeddings and similarity search into the Python application.
  • OpenAI API Key
  • Go to the API Keys page to create the new secret key.

1. Install and set dependencies


%pip install airbyte pymilvus openai milvus-model

import json
import os
import airbyte as ab
from openai import OpenAI
from pymilvus import MilvusClient


# in production, you might want to avoid putting the key here.
os.environ["OPENAI_API_KEY"] = "sk-proj-****"
openai_client = OpenAI()

2. Set the source

For simplicity, we will just use source-faker to generate some data.


source = ab.get_source(
    "source-faker",
    config={"count": 5_0},
    install_if_missing=True,
)
source.check()
source.select_all_streams()
result = source.read()

for name, records in result.streams.items():
    print(f"Streams of {name} has {len(records)} records.")

Read Progress

Started reading at 08:19:11.

Read 150 records over 1 seconds (150.0 records / second).

Wrote 150 records over 3 batches.

Finished reading at 08:19:13.

Started finalizing streams at 08:19:13.

Finalized 3 batches over 1 seconds.

Completed 3 out of 3 streams:

  • products
  • users
  • purchases

Completed writing at 08:19:14. Total time elapsed: 3 seconds

Completed `source-faker` read operation at 15:19:14.

Streams of products has 50 records.
Streams of users has 50 records.
Streams of purchases has 50 records.

Here we can see there are streams of products, users, and purchases. All of them has 50 records.

3. Milvus Lite & Text Embedding


milvus_client = MilvusClient("./milvus_source_fake.db")


{"type": "DEBUG", "message": "Created new connection using: 5a7d992d451b41db831d254213b64892", "data": {}}

This will create the milvus_source_fake.db if this is the first initialization. There are some limitations, but this quick setup for local development should be enought to test the pipeline.

Let's focused to get the products data. We will keep it simple by just getting the relevant fields:


data = []
for record in result.streams["products"]:
    make = record["make"]
    model = record["model"]
    year = record["year"]

    text = f"{make} {model} {year}"
    data.append(text)

data[:5]

['Mazda MX-5 2023',
 'Mercedes-Benz C-Class 2023',
 'Honda Accord Crosstour 2023',
 'GMC Jimmy 2023',
 'Infiniti FX 2023']

['Mazda MX-5 2023',
 'Mercedes-Benz C-Class 2023',
 'Honda Accord Crosstour 2023',
 'GMC Jimmy 2023',
 'Infiniti FX 2023']

embedded_data = openai_ef.encode_documents(data)

milvus_client.create_collection("products", dimension=512)


{"type": "DEBUG", "message": "Successfully created collection: products", "data": {}}
{"type": "DEBUG", "message": "Successfully created an index on collection: products", "data": {}}


embedded_docs = []
for _id, embedded_text in enumerate(embedded_data):
    embedded_docs.append({"id": _id+1, "vector": embedded_text, "text": data[_id]})

milvus_client.insert(collection_name="products", data=embedded_docs)
{'insert_count': 50, 'ids': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50], 'cost': 0}

4. Inspect the search results


question = "Give list of products from Suzuki"

search_res = milvus_client.search(
    collection_name="products",
    data=[
        openai_ef.encode_documents([question])[0]
    ],  # Use the `emb_text` function to convert the question to an embedding vector
    limit=3,  # Return top 3 results
    search_params={"metric_type": "COSINE", "params": {}},  # Inner product distance
    output_fields=["text"],  # Return the text field
)

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
    [
        "Suzuki SJ 410 2023",
        0.5219288468360901
    ],
    [
        "Isuzu VehiCROSS 2023",
        0.38782158493995667
    ],
    [
        "Jaguar S-Type 2023",
        0.35628464818000793
    ]
]

5. Use OpenAI ChatGPT to get the RAG response

Let's define the system and user prompts for the Language Model. This prompt is assembled with the retrieved documents from Milvus.

We also use OpenAI ChatGPT to generate a response based on the prompts.


context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
SYSTEM_PROMPT = """Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided."""USER_PROMPT = f"""Use the following pieces of information enclosed in  tags to provide an answer to the question enclosed in  tags.{context}{question}"""
response = openai_client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)
Suzuki SJ 410 2023

Summary

This shows how easy to build RAG pipeline in Python for quick local development which helps us to speed our development iterations. All within the comport of Python environment.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Similar use cases

No similar recipes were found, but check back soon!