Airbyte’s newly launched Snowflake Cortex destination enables users to create their own dedicated vector store directly within Snowflake - with no coding required! Creating a new pipeline with Snowflake Cortex takes only a few minutes, allowing you to load any of 300+ sources into your new Snowflake-backed vector store.
In this blog post, we will show you how you can utilize the Snowflake Cortex destination to load vector data into Snowflake, followed by leveraging Snowflake’s Cortex functions to perform Retrieval-Augmented Generation (RAG). Cortex offers general-purpose ML functions and LLM-specific functions, along with vector functions for semantic search, each backed by multiple models, all without needing to manage or host the underlying models or infrastructure.
As a practical example, we'll build an AI Product assistant — an AI chatbot capable of answering product-related questions using data from multiple Airbyte-related sources. With the AI Product Assistant, you can ask questions across all your company’s data in one place.
We will use the following three data sources specific to Airbyte:
Airbyte’s official documentation for the Snowflake Cortex destination GitHub issues, documenting planned features, known bugs, and work in progress. Zendesk tickets comprising customer details and inquiries/requests RAG Overview LLMs are general purpose tools trained on an enormous corpus of data, allowing them to answer questions on almost any topic. However, if you are looking to extract information from your proprietary data, LLMs fall short, leading to potential inaccuracies and hallucinations. This is where RAG becomes super relevant. RAG addresses this limitation by enabling LLMs to access relevant and up-to-date data from any data source. The LLM can then use this information to answer relevant questions about your proprietary data.
Prerequisites To follow through the whole process, you will need the following accounts. You can also work with your own custom sources or work with a single source:
Airbyte cloud instance Snowflake account with Cortex functions enabled Open AI Source specific: Google drive, Github and Zendesk Step 1: Set up Snowflake Cortex destination To select a destination in Airbyte Cloud, click on the Destinations tab on left, select “New Destination” and filter for the destination you are looking for.
Configure the Snowflake Cortex destination in Airbyte:
Destination name: Provide a friendly name. Processing: Set chunk size to 1000 (the size refers to the number of tokens, not characters, so this is roughly 4KB of text. The best chunking is dependent on the data you are dealing with). You can leave other options as empty. You would usually set the metadata and text fields to limit reading of records to specific columns in a given stream. Embedding: Select OpenAI from the dropdown and provide your OpenAI api key for powering the embedding service. You can use any other available models also. Just remember to embed the questions you ask the product assistant with the same model. Indexing: Provide credentials for connecting to your Snowflake instance. Step 2: Set up Google Drive, Github and Zendesk sources To select a source, click on sources tab on left, select “New Source” and filter for your source you are looking for.
Google Drive Select Google Drive on the sources page and configure the Google Drive source. You will need to authenticate and provide the following:
Folder Url: Provide a link to your google drive folder. For this tutorial you can move the snowflake-cortex.md file from here over to your google drive. Alternatively, you can use your own file. Refer to the accompanying documentation to learn about valid file formats. The list of streams to syncFormat: Set the format dropdown to “Document File Type Format” Name: Provide a stream name. Github Select Github on the sources page and configure the Github source. You will need to authenticate with your Github account and provide the following:
Github repositories: You can use the repo:airbytehq/PyAirbyte
or provide your own. Zendesk Optional: Select Zendesk on the sources pages and configure the Zendesk source. You will be able to authenticate using OAuth, or API token and provide the following:
Sub-domain name: if your zendesk account URL is https://MY_SUBDOMAIN.zendesk.com/, then MY_SUBDOMAIN is your subdomain. After you have set up all sources, this is what your source screen will look like:
Step 3: Set up all connections Next, we will set up the following connections:
Google Drive → Snowflake Cortex Github → Snowflake Cortex Zendesk → Snowflake Cortex Google Drive → Snowflake Cortex Select Connections tab on left navigation in Airbyte Cloud; then click on “New connection” on top right.
In the configuration flow, pick the existing Google Drive source and Snowflake Cortex as the destination. On the select streams screen, select Full Refresh|Overwrite
as the sync mode. On the configure connection, select the Schedule type to every 24 hours. Optionally provide a stream prefix for your table. In our case we prefix airbyte_ to all streams. Click on Finish and Sync. If everything went well, there should be a connection now synching data from Google Drive to Snowflake Cortex destination. Give the sync a few minutes to run. Once the first run has completed, you can check Snowflake to confirm data sync. The data should be synched under a table named “[optional prefix][stream name]” Github → Snowflake Cortex Select Connections tab on left navigation in Airbyte Cloud; then click on “New connection” on top right.
In the configuration flow, pick the existing Github source and Snowflake Cortex as the destination. On the select streams screen, make sure all streams except issues
are unchecked. Also, set the sync mode to Full Refresh|Overwrite.
On the configure connection screen, select the Schedule type to every 24 hours. Optionally provide a stream prefix for your table. In our case we prefix airbyte_ to all streams. Click on Finish and Sync. If everything went well, there should be a connection now synching data from Github to Snowflake Cortex destination. Give the sync a few minutes to complete. Once the first run has completed, you can check Snowflake to confirm data sync. The data should be synched under a table named “[optional prefix]issues” Zendesk → Snowflake Cortex Select Connections tab on left navigation in Airbyte Cloud; then click on “New connection” on top right.
In the configuration flow, pick the existing Zendesk source and Snowflake Cortex as the destination. On the select streams screen, make sure all streams except tickets
and users
are unchecked. Also, set the sync mode to Full Refresh | Overwrite.
On the configure connection screen, select the Schedule type to every 24 hours. Optionally provide a stream prefix for your table. In our case we prefix airbyte_ to all streams. Click on Finish and Sync. If everything went well, there should be a connection now synching data from Zendesk to Snowflake Cortex destination. Give the sync a few minutes to complete. Once the first run has completed, you can check Snowflake to confirm data sync. The data should be synched under a table named “[optional prefix]tickets” and ““[optional prefix]tickets” Once all the three connections are set, the Connections screen should look like this:
Step 4: Explore data in Snowflake You should see the following tables in snowflake. Depending on your stream prefix, your table names might be different.
Google Drive related: airbyte_docs Github related: airbyte_github_issues Zendesk related: airbyte_zendesk_tickets, airbyte_zendesk_users All tables have the following columns
DOCUMENT_ID - unique based on primary key CHUNK_ID - randomly generated uuid DOCUMENT_CONTENT - text context from source METADATA - metadata from source EMBEDDING - Vector representation of the document_content Results from one of these tables should look like the following:
Step 5: RAG building blocks In this section, we'll go over the essential building blocks of a Retrieval-Augmented Generation (RAG) system. This involves embedding the user's question, retrieving matching document chunks, and using a language model to generate the final answer. The first step is done using OpenAI, the second and third steps are achieved using Snowflake’s Cortex functions. Let's break down these steps:
5.1 Embedding user’s question When a user asks a question, it's crucial to transform the question into a vector representation using the same model employed for generating vectors in the vector database. In this example, we use Langchain to access and use OpenAI's embedding model. This step ensures that the question is represented in a format that can be effectively compared with document vectors.
def get_embedding_from_openai(query)->str:
embeddings = OpenAIEmbeddings(openai_api_key=userdata.get("OPENAI_API_KEY"))
return embeddings
5.2 Retrieving matching chunks The embedded question is then sent to Snowflake, where document chunks with similar vector representations are retrieved. All relevant chunks are returned.
def get_similar_chunks_from_snowflake(query_vector, table_names) -> List[str]:
conn = get_db_connection()
cursor = conn.cursor()
chunks = []
for table_name in table_names:
query = f"""
SELECT document_content,
VECTOR_COSINE_SIMILARITY(embedding, CAST({query_vector} AS VECTOR(FLOAT, 1536)))
AS similarity
FROM {table_name}
ORDER BY similarity DESC
LIMIT 2
"""
cursor.execute(query)
result = cursor.fetchall()
chunks += [item[0] for item in result]
cursor.close()
conn.close()
return chunks
5.3 Passing chunks to LLM for final answer All the chunks are concatenated into a single string and sent to the Snowflake (using Snowflake Python Connector) along with a prompt (set of instructions) and user’s question.
def get_completion_from_snowflake(question, document_chunks: List[str], model_name):
conn = get_db_connection()
cur = conn.cursor()
chunks = "\n\n".join(document_chunks)
query = f"""
SELECT snowflake.cortex.complete(
'{model_name}',
CONCAT(
'You are an Airbyte product assistant. Answer the question based on the context.
Do not use any other information. Be concise. When returning a list of items,
please enumerate description on separate lines','Context: ',
$$
{chunks}
$$,
'Question: ',
$$ {question} $$,
'Answer: '
)
) as response;"""
cur.execute(query)
result = cur.fetchall()
cur.close()
conn.close()
return result[0][0].strip()
Step 6: Putting it all together Google Colab For convenience, we have integrated everything into a Google Collab notebook. Explore the fully functional RAG code in Google Colab.
# Putting it all together
def get_response(query, table_names, model_name="llama2-70b-chat"):
# Step 1: embed the query
embeddings = get_embedding_from_openai(query)
# Step 2: get similar chunks from sources/tables in Snowflake
chunks = get_similar_chunks_from_snowflake(embeddings.embed_query(query), table_names)
if (len(chunks) == 0):
return "I am sorry, I do not have the context to answer your question."
else:
# Step 3: send chunks to LLM for completion
return get_completion_from_snowflake(query, chunks, model_name)
Every time a question is asked, we pass the list of relevant tables and model. For the first question below, answer is retrieved from Airbyte docs only, whereas for the second question, answer is retrieved from Github issues.
You can also provide multiple tables as input, and the resulting response will be a combination of data from both sources.
Streamlit app We have also integrated the above RAG functionality into a Streamlit chat app. Explore the fully functional code on Github .
Conclusion Using Airbyte’s catalog of 300+ connectors and the LLM capabilities of Snowflake Cortex, you can build an insightful AI assistant to answer questions across a number of data sources. We have kept things simple here, but as your number of sources increase, consider integrating an AI agent to interpret user intent and retrieve answers from the appropriate source.