Build an AI chatbot with Snowflake Cortex

Lean how to use data stored in Airbyte's Snowflake Cortex destination to perform RAG by building a Product Assistant—an AI chatbot capable of answering product-related questions using data from multiple Airbyte-related sources.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This tutorial demonstrates how to use data stored in Airbyte's Snowflake Cortex destination to perform Retrieval-Augmented Generation (RAG). You should use this destination when you intend to use Snowflake for LLM specific vector operations like RAG.

As a practical example, we'll build a Product Assistant—an AI chatbot capable of answering product-related questions using data from multiple Airbyte-related sources. With the Product Assistant, you can ask questions across all your sales enablement data in one place.

Prerequisites:

  • Vector data stored in Snowflake via Snowflake Cortex destination. In our case we are using data from airbyte docs, Github issues and Zendesk.
  • Snowflake account with Cortex functions enabled
  • Open AI key

a. Install dependencies

# Add virtual environment support in Google Colab
!apt-get install -qq python3.10-venv

# Install openai
# tbd - add snowflake python connector
%pip install --quiet openai snowflake-connector-python langchain-openai tiktoken

b. Explore data stored in Snowflake.

Let's see what document/vecto data in Snowflake looks like.

# Fetch data from airbyte_docs table
from snowflake import connector
from google.colab import userdata
from typing import List
import pandas as pd

def get_db_connection():
    return connector.connect(
        account=userdata.get("SNOWFLAKE_HOST"),
        role=userdata.get("SNOWFLAKE_ROLE"),
        warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"),
        database=userdata.get("SNOWFLAKE_DATABASE"),
        schema=userdata.get("SNOWFLAKE_SCHEMA"),
        user=userdata.get("SNOWFLAKE_USERNAME"),
        password=userdata.get("SNOWFLAKE_PASSWORD"),
    )

def fetch_table_data(table_name, columns):
    conn = get_db_connection()
    cursor = conn.cursor()

    # Form the query to select specific columns
    columns_str = ", ".join(columns)
    query = f"SELECT {columns_str} FROM {table_name};"

    cursor.execute(query)
    result = cursor.fetchall()

    # Fetch the column names
    col_names = [desc[0] for desc in cursor.description]

    cursor.close()
    conn.close()

    # Load the result into a pandas DataFrame
    df = pd.DataFrame(result, columns=col_names)
    return df;

# show data from airbtye_docs table
data_frame = fetch_table_data("airbyte_docs", ["document_id", "document_content", "metadata", "embedding"])
data_frame

c. Build the RAG pipeline and ask a question

Let's write the three main pieces of a RAG pipeline:

  • Embedding incoming query
  • Doing similarity search to find matching chunks
  • Send chunks to LLM for completion
from openai import OpenAI
from snowflake import connector
from langchain_openai import OpenAIEmbeddings
from google.colab import userdata
from typing import List
from rich.console import Console

def get_db_connection():
    return connector.connect(
        account=userdata.get("SNOWFLAKE_HOST"),
        role=userdata.get("SNOWFLAKE_ROLE"),
        warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"),
        database=userdata.get("SNOWFLAKE_DATABASE"),
        schema=userdata.get("SNOWFLAKE_SCHEMA"),
        user=userdata.get("SNOWFLAKE_USERNAME"),
        password=userdata.get("SNOWFLAKE_PASSWORD"),
    )

# convert user's query into a vector array to prep for similiary search
def get_embedding_from_openai(query)->str:
  print(f"Embedding user's query -> {query}...")
  embeddings = OpenAIEmbeddings(openai_api_key=userdata.get("OPENAI_API_KEY"))
  return embeddings

# use Snowflake's Cortex in-build similarity search to find matching chunks
def get_similar_chunks_from_snowflake(query_vector, table_names) -> List[str]:
        print("\nRetrieving similar chunks...")
        conn = get_db_connection()
        cursor = conn.cursor()

        chunks = []
        for table_name in table_names:
            query = f"""
            SELECT document_content,
              VECTOR_COSINE_SIMILARITY(embedding, CAST({query_vector} AS VECTOR(FLOAT, 1536))) AS similarity
            FROM {table_name}
            ORDER BY similarity DESC
            LIMIT 2
            """
            cursor.execute(query)
            result = cursor.fetchall()
            print(f"Found {len(result)} matching chunks in table:{table_name}!")
            chunks += [item[0] for item in result]
        cursor.close()
        conn.close()

        return chunks

# use Snowflake's Cortex in-build completion to find matching chunks.
def get_completion_from_snowflake(question, document_chunks: List[str], model_name):
        print(f"\nSending chunks to Snowflake (LLM: {model_name}) for completion...")
        conn = get_db_connection()
        cur = conn.cursor()

        chunks = "\n\n".join(document_chunks)

        query = f"""
        SELECT snowflake.cortex.complete(
        '{model_name}',
        CONCAT(
            'You are an Airbyte product assistant. Answer the question based on the context. Do not use any other information. Be concise. When returning a list of items, please enumerate description on separate lines','Context: ',
            $$
            {chunks}
            
 {question} $$,
        'Answer: '
        )
        ) as response;"""
        cur.execute(query)
        result = cur.fetchall()
        cur.close()
        conn.close()
        # TO-DO: better parsing here
        return result[0][0].strip()

# Putting it all together
def get_response(query, table_names, model_name="llama2-70b-chat"):
        # Step 1: embed the query
        embeddings = get_embedding_from_openai(query)

        # Step 2: get similar chunks from sources/tables in Snowflake
        chunks = get_similar_chunks_from_snowflake(embeddings.embed_query(query), table_names)

        if (len(chunks) == 0):
            return "I am sorry, I do not have the context to answer your question."
        else:
            # Step 3: send chunks to LLM for completion
            return get_completion_from_snowflake(query, chunks, model_name)

# Ask a question
query = 'How can I store vector data in Snowflake'
response = get_response(query, ["airbyte_docs"], "snowflake-arctic")

Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")

d. Let's ask another question

query = 'What are the upcoming features for Snowflake Cortex?'
response = get_response(query, ["airbyte_github_issues"])
Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")

e. Closing the loop

Let's see if there are customers asking for upcoming features above.

query = 'Are there customers asking for better authorization options for Snowflake Cortex? Give me their names and email.'
response = get_response(query, ["airbyte_zendesk_tickets", "airbyte_zendesk_users"])
Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Similar use cases

No similar recipes were found, but check back soon!