This tutorial demonstrates how to use data stored in Airbyte's Snowflake Cortex destination to perform Retrieval-Augmented Generation (RAG). You should use this destination when you intend to use Snowflake for LLM specific vector operations like RAG.
As a practical example, we'll build a Product Assistant—an AI chatbot capable of answering product-related questions using data from multiple Airbyte-related sources. With the Product Assistant, you can ask questions across all your sales enablement data in one place.
Prerequisites: Vector data stored in Snowflake via Snowflake Cortex destination. In our case we are using data from airbyte docs, Github issues and Zendesk. Snowflake account with Cortex functions enabled Open AI key a. Install dependencies # Add virtual environment support in Google Colab !apt-get install -qq python3.10-venv # Install openai # tbd - add snowflake python connector %pip install --quiet openai snowflake-connector-python langchain-openai tiktokenb. Explore data stored in Snowflake. Let's see what document/vecto data in Snowflake looks like.
# Fetch data from airbyte_docs table from snowflake import connector from google.colab import userdata from typing import List import pandas as pd def get_db_connection(): return connector.connect( account=userdata.get("SNOWFLAKE_HOST"), role=userdata.get("SNOWFLAKE_ROLE"), warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"), database=userdata.get("SNOWFLAKE_DATABASE"), schema=userdata.get("SNOWFLAKE_SCHEMA"), user=userdata.get("SNOWFLAKE_USERNAME"), password=userdata.get("SNOWFLAKE_PASSWORD"), ) def fetch_table_data(table_name, columns): conn = get_db_connection() cursor = conn.cursor() # Form the query to select specific columns columns_str = ", ".join(columns) query = f"SELECT {columns_str} FROM {table_name};" cursor.execute(query) result = cursor.fetchall() # Fetch the column names col_names = [desc[0] for desc in cursor.description] cursor.close() conn.close() # Load the result into a pandas DataFrame df = pd.DataFrame(result, columns=col_names) return df; # show data from airbtye_docs table data_frame = fetch_table_data("airbyte_docs", ["document_id", "document_content", "metadata", "embedding"]) data_framec. Build the RAG pipeline and ask a question Let's write the three main pieces of a RAG pipeline:
Embedding incoming query Doing similarity search to find matching chunks Send chunks to LLM for completion from openai import OpenAI from snowflake import connector from langchain_openai import OpenAIEmbeddings from google.colab import userdata from typing import List from rich.console import Console def get_db_connection(): return connector.connect( account=userdata.get("SNOWFLAKE_HOST"), role=userdata.get("SNOWFLAKE_ROLE"), warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"), database=userdata.get("SNOWFLAKE_DATABASE"), schema=userdata.get("SNOWFLAKE_SCHEMA"), user=userdata.get("SNOWFLAKE_USERNAME"), password=userdata.get("SNOWFLAKE_PASSWORD"), ) # convert user's query into a vector array to prep for similiary search def get_embedding_from_openai(query)->str: print(f"Embedding user's query -> {query}...") embeddings = OpenAIEmbeddings(openai_api_key=userdata.get("OPENAI_API_KEY")) return embeddings # use Snowflake's Cortex in-build similarity search to find matching chunks def get_similar_chunks_from_snowflake(query_vector, table_names) -> List[str]: print("\nRetrieving similar chunks...") conn = get_db_connection() cursor = conn.cursor() chunks = [] for table_name in table_names: query = f""" SELECT document_content, VECTOR_COSINE_SIMILARITY(embedding, CAST({query_vector} AS VECTOR(FLOAT, 1536))) AS similarity FROM {table_name} ORDER BY similarity DESC LIMIT 2 """ cursor.execute(query) result = cursor.fetchall() print(f"Found {len(result)} matching chunks in table:{table_name}!") chunks += [item[0] for item in result] cursor.close() conn.close() return chunks # use Snowflake's Cortex in-build completion to find matching chunks. def get_completion_from_snowflake(question, document_chunks: List[str], model_name): print(f"\nSending chunks to Snowflake (LLM: {model_name}) for completion...") conn = get_db_connection() cur = conn.cursor() chunks = "\n\n".join(document_chunks) query = f""" SELECT snowflake.cortex.complete( '{model_name}', CONCAT( 'You are an Airbyte product assistant. Answer the question based on the context. Do not use any other information. Be concise. When returning a list of items, please enumerate description on separate lines','Context: ', $$ {chunks} {question} $$, 'Answer: ' ) ) as response;""" cur.execute(query) result = cur.fetchall() cur.close() conn.close() # TO-DO: better parsing here return result[0][0].strip() # Putting it all together def get_response(query, table_names, model_name="llama2-70b-chat"): # Step 1: embed the query embeddings = get_embedding_from_openai(query) # Step 2: get similar chunks from sources/tables in Snowflake chunks = get_similar_chunks_from_snowflake(embeddings.embed_query(query), table_names) if (len(chunks) == 0): return "I am sorry, I do not have the context to answer your question." else: # Step 3: send chunks to LLM for completion return get_completion_from_snowflake(query, chunks, model_name) # Ask a question query = 'How can I store vector data in Snowflake' response = get_response(query, ["airbyte_docs"], "snowflake-arctic") Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")d. Let's ask another question query = 'What are the upcoming features for Snowflake Cortex?' response = get_response(query, ["airbyte_github_issues"]) Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")e. Closing the loop Let's see if there are customers asking for upcoming features above.
query = 'Are there customers asking for better authorization options for Snowflake Cortex? Give me their names and email.' response = get_response(query, ["airbyte_zendesk_tickets", "airbyte_zendesk_users"]) Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")About the Author About the Author