End-to-end RAG using Facebook Marketing, PyAirbyte, Milvus (Zilliz), and Langchain

Learn how to use PyAirbyte to load data from Facebook marketing, store the data in Milvus (Zilliz) vector store and perform a short RAG demo (using OpenAI/LangChain).

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This notebook demonstrates simple RAG (Retrieval-Augmented Generation) pipeline with Facebook Marketing, Milvus Lite and PyAirbyte.

Prerequisites

  • PyAirbyte
  • PyAirbyte is an open-source that packages Airbyte connectors and makes them available in Python. We will connect to
    source-facebook-marketing, and retrive its streams
  • Milvus Lite
  • Milvus Lite is the lightweight version of Milvus that enables vector emdeddings and similarity search into the Python application.
  • OpenAI API Key
  • Go to the API Keys page to create the new secret key.

1. Install and set dependencies

%pip install airbyte pymilvus openai milvus-model
import json
import os
import airbyte as ab
from openai import OpenAI
from pymilvus import MilvusClient


# in production, you might want to avoid putting the key here.
os.environ["OPENAI_API_KEY"] = "sk-proj-****"
openai_client = OpenAI()

2. Set the source

Connect source-facebook-marketing to fetch streams

source = ab.get_source(
    "source-facebook-marketing",
    config={
        "start_date": "2024-06-01T00:00:00Z",
        "account_id": "account",
        "access_token": "token"
    },
    install_if_missing=True,
)
source.check()
source.select_all_streams()
result = source.read()

for name, records in result.streams.items():
    print(f"Streams of {name} has {len(records)} records.")

3. Milvus Lite & Text Embedding

milvus_client = MilvusClient("./milvus_source_fake.db")
milvus_client = MilvusClient("./milvus_source_fake.db")
data = []
for record in result.streams["products"]:
    make = record["make"]
    model = record["model"]
    year = record["year"]

    text = f"{make} {model} {year}"
    data.append(text)
data[:5]
['Mazda MX-5 2023',
 'Mercedes-Benz C-Class 2023',
 'Honda Accord Crosstour 2023',
 'GMC Jimmy 2023',
 'Infiniti FX 2023']
from pymilvus import model

openai_ef = model.dense.OpenAIEmbeddingFunction(
    model_name='text-embedding-3-large', # Specify the model name
    api_key=os.environ["OPENAI_API_KEY"], # Provide your OpenAI API key
    dimensions=512 # Set the embedding dimensionality
)
embedded_data = openai_ef.encode_documents(data)
milvus_client.create_collection("products", dimension=512)
{"type": "DEBUG", "message": "Successfully created collection: products", "data": {}}
{"type": "DEBUG", "message": "Successfully created an index on collection: products", "data": {}}
embedded_docs = []
for _id, embedded_text in enumerate(embedded_data):
    embedded_docs.append({"id": _id+1, "vector": embedded_text, "text": data[_id]})
milvus_client.insert(collection_name="products", data=embedded_docs)
{'insert_count': 50,
 'ids': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50],
 'cost': 0}

4. Inspect the search results

question = "Give list of products from Suzuki"
search_res = milvus_client.search(
    collection_name="products",
    data=[
        openai_ef.encode_documents([question])[0]
    ],  # Use the `emb_text` function to convert the question to an embedding vector
    limit=3,  # Return top 3 results
    search_params={"metric_type": "COSINE", "params": {}},  # Inner product distance
    output_fields=["text"],  # Return the text field
)
retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
    [
        "Suzuki SJ 410 2023",
        0.5219288468360901
    ],
    [
        "Isuzu VehiCROSS 2023",
        0.38782158493995667
    ],
    [
        "Jaguar S-Type 2023",
        0.35628464818000793
    ]
]

5. Use OpenAI ChatGPT to get the RAG response

Let's define the system and user prompts for the Language Model. This prompt is assembled with the retrieved documents from Milvus.

We also use OpenAI ChatGPT to generate a response based on the prompts.

context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)
SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in  tags to provide an answer to the question enclosed in  tags.

{context}


{question}

"""
response = openai_client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)

Suzuki SJ 410 2023

Summary

This shows how easy to build RAG pipeline in Python for quick local development which helps us to speed our development iterations. All within the comport of Python environment.

Similar use cases

No similar recipes were found, but check back soon!