This tutorial demonstrates how to use data stored in Airbyte's Snowflake Cortex destination to perform Retrieval-Augmented Generation (RAG). You should use this destination when you intend to use Snowflake for LLM specific vector operations like RAG.
As a practical example, we'll build a Product Assistant—an AI chatbot capable of answering product-related questions using data from multiple Airbyte-related sources. With the Product Assistant, you can ask questions across all your sales enablement data in one place.
Prerequisites:
- Vector data stored in Snowflake via Snowflake Cortex destination. In our case we are using data from airbyte docs, Github issues and Zendesk.
- Snowflake account with Cortex functions enabled
- Open AI key
a. Install dependencies
# Add virtual environment support in Google Colab
!apt-get install -qq python3.10-venv
# Install openai
# tbd - add snowflake python connector
%pip install --quiet openai snowflake-connector-python langchain-openai tiktoken
b. Explore data stored in Snowflake.
Let's see what document/vecto data in Snowflake looks like.
# Fetch data from airbyte_docs table
from snowflake import connector
from google.colab import userdata
from typing import List
import pandas as pd
def get_db_connection():
return connector.connect(
account=userdata.get("SNOWFLAKE_HOST"),
role=userdata.get("SNOWFLAKE_ROLE"),
warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"),
database=userdata.get("SNOWFLAKE_DATABASE"),
schema=userdata.get("SNOWFLAKE_SCHEMA"),
user=userdata.get("SNOWFLAKE_USERNAME"),
password=userdata.get("SNOWFLAKE_PASSWORD"),
)
def fetch_table_data(table_name, columns):
conn = get_db_connection()
cursor = conn.cursor()
# Form the query to select specific columns
columns_str = ", ".join(columns)
query = f"SELECT {columns_str} FROM {table_name};"
cursor.execute(query)
result = cursor.fetchall()
# Fetch the column names
col_names = [desc[0] for desc in cursor.description]
cursor.close()
conn.close()
# Load the result into a pandas DataFrame
df = pd.DataFrame(result, columns=col_names)
return df;
# show data from airbtye_docs table
data_frame = fetch_table_data("airbyte_docs", ["document_id", "document_content", "metadata", "embedding"])
data_frame
c. Build the RAG pipeline and ask a question
Let's write the three main pieces of a RAG pipeline:
- Embedding incoming query
- Doing similarity search to find matching chunks
- Send chunks to LLM for completion
from openai import OpenAI
from snowflake import connector
from langchain_openai import OpenAIEmbeddings
from google.colab import userdata
from typing import List
from rich.console import Console
def get_db_connection():
return connector.connect(
account=userdata.get("SNOWFLAKE_HOST"),
role=userdata.get("SNOWFLAKE_ROLE"),
warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"),
database=userdata.get("SNOWFLAKE_DATABASE"),
schema=userdata.get("SNOWFLAKE_SCHEMA"),
user=userdata.get("SNOWFLAKE_USERNAME"),
password=userdata.get("SNOWFLAKE_PASSWORD"),
)
# convert user's query into a vector array to prep for similiary search
def get_embedding_from_openai(query)->str:
print(f"Embedding user's query -> {query}...")
embeddings = OpenAIEmbeddings(openai_api_key=userdata.get("OPENAI_API_KEY"))
return embeddings
# use Snowflake's Cortex in-build similarity search to find matching chunks
def get_similar_chunks_from_snowflake(query_vector, table_names) -> List[str]:
print("\nRetrieving similar chunks...")
conn = get_db_connection()
cursor = conn.cursor()
chunks = []
for table_name in table_names:
query = f"""
SELECT document_content,
VECTOR_COSINE_SIMILARITY(embedding, CAST({query_vector} AS VECTOR(FLOAT, 1536))) AS similarity
FROM {table_name}
ORDER BY similarity DESC
LIMIT 2
"""
cursor.execute(query)
result = cursor.fetchall()
print(f"Found {len(result)} matching chunks in table:{table_name}!")
chunks += [item[0] for item in result]
cursor.close()
conn.close()
return chunks
# use Snowflake's Cortex in-build completion to find matching chunks.
def get_completion_from_snowflake(question, document_chunks: List[str], model_name):
print(f"\nSending chunks to Snowflake (LLM: {model_name}) for completion...")
conn = get_db_connection()
cur = conn.cursor()
chunks = "\n\n".join(document_chunks)
query = f"""
SELECT snowflake.cortex.complete(
'{model_name}',
CONCAT(
'You are an Airbyte product assistant. Answer the question based on the context. Do not use any other information. Be concise. When returning a list of items, please enumerate description on separate lines','Context: ',
$$
{chunks}
{question} $$,
'Answer: '
)
) as response;"""
cur.execute(query)
result = cur.fetchall()
cur.close()
conn.close()
# TO-DO: better parsing here
return result[0][0].strip()
# Putting it all together
def get_response(query, table_names, model_name="llama2-70b-chat"):
# Step 1: embed the query
embeddings = get_embedding_from_openai(query)
# Step 2: get similar chunks from sources/tables in Snowflake
chunks = get_similar_chunks_from_snowflake(embeddings.embed_query(query), table_names)
if (len(chunks) == 0):
return "I am sorry, I do not have the context to answer your question."
else:
# Step 3: send chunks to LLM for completion
return get_completion_from_snowflake(query, chunks, model_name)
# Ask a question
query = 'How can I store vector data in Snowflake'
response = get_response(query, ["airbyte_docs"], "snowflake-arctic")
Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")
d. Let's ask another question
query = 'What are the upcoming features for Snowflake Cortex?'
response = get_response(query, ["airbyte_github_issues"])
Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")
e. Closing the loop
Let's see if there are customers asking for upcoming features above.
query = 'Are there customers asking for better authorization options for Snowflake Cortex? Give me their names and email.'
response = get_response(query, ["airbyte_zendesk_tickets", "airbyte_zendesk_users"])
Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")