Build an AI chatbot with Snowflake Cortex

Lean how to use data stored in Airbyte's Snowflake Cortex destination to perform RAG by building a Product Assistant—an AI chatbot capable of answering product-related questions using data from multiple Airbyte-related sources.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This tutorial demonstrates how to use data stored in Airbyte's Snowflake Cortex destination to perform Retrieval-Augmented Generation (RAG). You should use this destination when you intend to use Snowflake for LLM specific vector operations like RAG.

As a practical example, we'll build a Product Assistant—an AI chatbot capable of answering product-related questions using data from multiple Airbyte-related sources. With the Product Assistant, you can ask questions across all your sales enablement data in one place.

Prerequisites:

  • Vector data stored in Snowflake via Snowflake Cortex destination. In our case we are using data from airbyte docs, Github issues and Zendesk.
  • Snowflake account with Cortex functions enabled
  • Open AI key

a. Install dependencies

# Add virtual environment support in Google Colab
!apt-get install -qq python3.10-venv

# Install openai
# tbd - add snowflake python connector
%pip install --quiet openai snowflake-connector-python langchain-openai tiktoken

b. Explore data stored in Snowflake.

Let's see what document/vecto data in Snowflake looks like.

# Fetch data from airbyte_docs table
from snowflake import connector
from google.colab import userdata
from typing import List
import pandas as pd

def get_db_connection():
    return connector.connect(
        account=userdata.get("SNOWFLAKE_HOST"),
        role=userdata.get("SNOWFLAKE_ROLE"),
        warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"),
        database=userdata.get("SNOWFLAKE_DATABASE"),
        schema=userdata.get("SNOWFLAKE_SCHEMA"),
        user=userdata.get("SNOWFLAKE_USERNAME"),
        password=userdata.get("SNOWFLAKE_PASSWORD"),
    )

def fetch_table_data(table_name, columns):
    conn = get_db_connection()
    cursor = conn.cursor()

    # Form the query to select specific columns
    columns_str = ", ".join(columns)
    query = f"SELECT {columns_str} FROM {table_name};"

    cursor.execute(query)
    result = cursor.fetchall()

    # Fetch the column names
    col_names = [desc[0] for desc in cursor.description]

    cursor.close()
    conn.close()

    # Load the result into a pandas DataFrame
    df = pd.DataFrame(result, columns=col_names)
    return df;

# show data from airbtye_docs table
data_frame = fetch_table_data("airbyte_docs", ["document_id", "document_content", "metadata", "embedding"])
data_frame

c. Build the RAG pipeline and ask a question

Let's write the three main pieces of a RAG pipeline:

  • Embedding incoming query
  • Doing similarity search to find matching chunks
  • Send chunks to LLM for completion
from openai import OpenAI
from snowflake import connector
from langchain_openai import OpenAIEmbeddings
from google.colab import userdata
from typing import List
from rich.console import Console

def get_db_connection():
    return connector.connect(
        account=userdata.get("SNOWFLAKE_HOST"),
        role=userdata.get("SNOWFLAKE_ROLE"),
        warehouse=userdata.get("SNOWFLAKE_WAREHOUSE"),
        database=userdata.get("SNOWFLAKE_DATABASE"),
        schema=userdata.get("SNOWFLAKE_SCHEMA"),
        user=userdata.get("SNOWFLAKE_USERNAME"),
        password=userdata.get("SNOWFLAKE_PASSWORD"),
    )

# convert user's query into a vector array to prep for similiary search
def get_embedding_from_openai(query)->str:
  print(f"Embedding user's query -> {query}...")
  embeddings = OpenAIEmbeddings(openai_api_key=userdata.get("OPENAI_API_KEY"))
  return embeddings

# use Snowflake's Cortex in-build similarity search to find matching chunks
def get_similar_chunks_from_snowflake(query_vector, table_names) -> List[str]:
        print("\nRetrieving similar chunks...")
        conn = get_db_connection()
        cursor = conn.cursor()

        chunks = []
        for table_name in table_names:
            query = f"""
            SELECT document_content,
              VECTOR_COSINE_SIMILARITY(embedding, CAST({query_vector} AS VECTOR(FLOAT, 1536))) AS similarity
            FROM {table_name}
            ORDER BY similarity DESC
            LIMIT 2
            """
            cursor.execute(query)
            result = cursor.fetchall()
            print(f"Found {len(result)} matching chunks in table:{table_name}!")
            chunks += [item[0] for item in result]
        cursor.close()
        conn.close()

        return chunks

# use Snowflake's Cortex in-build completion to find matching chunks.
def get_completion_from_snowflake(question, document_chunks: List[str], model_name):
        print(f"\nSending chunks to Snowflake (LLM: {model_name}) for completion...")
        conn = get_db_connection()
        cur = conn.cursor()

        chunks = "\n\n".join(document_chunks)

        query = f"""
        SELECT snowflake.cortex.complete(
        '{model_name}',
        CONCAT(
            'You are an Airbyte product assistant. Answer the question based on the context. Do not use any other information. Be concise. When returning a list of items, please enumerate description on separate lines','Context: ',
            $$
            {chunks}
            
 {question} $$,
        'Answer: '
        )
        ) as response;"""
        cur.execute(query)
        result = cur.fetchall()
        cur.close()
        conn.close()
        # TO-DO: better parsing here
        return result[0][0].strip()

# Putting it all together
def get_response(query, table_names, model_name="llama2-70b-chat"):
        # Step 1: embed the query
        embeddings = get_embedding_from_openai(query)

        # Step 2: get similar chunks from sources/tables in Snowflake
        chunks = get_similar_chunks_from_snowflake(embeddings.embed_query(query), table_names)

        if (len(chunks) == 0):
            return "I am sorry, I do not have the context to answer your question."
        else:
            # Step 3: send chunks to LLM for completion
            return get_completion_from_snowflake(query, chunks, model_name)

# Ask a question
query = 'How can I store vector data in Snowflake'
response = get_response(query, ["airbyte_docs"], "snowflake-arctic")

Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")

d. Let's ask another question

query = 'What are the upcoming features for Snowflake Cortex?'
response = get_response(query, ["airbyte_github_issues"])
Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")

e. Closing the loop

Let's see if there are customers asking for upcoming features above.

query = 'Are there customers asking for better authorization options for Snowflake Cortex? Give me their names and email.'
response = get_response(query, ["airbyte_zendesk_tickets", "airbyte_zendesk_users"])
Console().print(f"\n\nResponse from LLM:\n\n[blue]{response}[/blue]")

Similar use cases

No similar recipes were found, but check back soon!