End-to-end RAG with Airbyte Cloud, Google Drive, and Snowflake Cortex

Learn how to build an end-to-end Retrieval-Augmented Generation (RAG) pipeline. We will extract data from Google Drive using Airbyte Cloud to load it on Snowflake Cortex.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Introduction

Investing in cryptocurrency has been around for a while. However, people have been making blind investments without solid knowledge of the topic. By getting a comprehensive understanding of the underlying principles, market dynamics, and potential risks and rewards associated with cryptocurrencies, investors can make more informed decisions. This guide aims to make use of analytical data of Bitcoin, exploring its potential as an investment, the factors influencing its value, and the strategies for managing and mitigating risks.

In this tutorial, we'll walk you through the process of using Airbyte to pass a document titled "Is Bitcoin a Good Investment" to Snowflake Cortex for processing. This process, called retrieval-augmented generation, leverages Snowflake's LLM functions to seamlessly consume and analyze the data. Eventually, you'll have a comprehensive understanding of how to extract valuable insights from this document , leveraging advanced data tools to better inform your cryptocurrency investment decisions.

TL;DR

We will be moving airbyte data into Snowflake Cortex, which allows us to perform cosine similarity search. Finally, we can get some insights from our document.

Understanding RAG

LLMs are quite helpful when you are interested in general information. Unfortunately, we cannot say the same when it comes to domain-specific information and that's where they start to hallucinate. By providing LLms with up-to-date information from any data source, you address this limitation since the LLM can now use this data. This process is called Retrieval-Augmented Generation (RAG).

Prerequisites

I provided my document download link but feel free to use your own custom source.

1. Data Source: In this tutorial we use a Google Drive  folder

2. Airbyte Cloud Account : Log in here 

3. SnowFlake Account: Ensure Cortex functions are enabled. Log in here 

4. OpenAI  api key: Ensure you do not have a rate limit to continue. Access yours here 

STEP 1: Setup Airbyte Data Source

In Airbyte cloud source connectors , select the Google Drive connector as your source, paste your folder url (mandatory) and create a stream with the Document File Type Format (Experimental). Finally, test it to ensure it's perfectly set up.

STEP 2:  Setup Snowflake Cortex Destination

To set up a Snowflake instance you need to set up entities  (warehouse, database, schema, user, and role) in the Snowflake console as explained in this documentation .

Basically, run the following worksheet in the snowflake console (ensure that you are running all statements).

AIRBYTE-- set variables (these need to be uppercase)
set airbyte_role = 'AIRBYTE_ROLE';
set airbyte_username = 'AIRBYTE_USER';
set airbyte_warehouse = 'AIRBYTE_WAREHOUSE';
set airbyte_database = 'AIRBYTE';
set airbyte_schema = 'AIRBYTE_SCHEMA';

-- set user password
set airbyte_password = 'YOUR PASSWORD';

begin;

-- create Airbyte role
use role securityadmin;
create role if not exists identifier($airbyte_role);
grant role identifier($airbyte_role) to role SYSADMIN;

-- create Airbyte user
create user if not exists identifier($airbyte_username) password = $airbyte_password default_role = $airbyte_role default_warehouse = $airbyte_warehouse;
grant role identifier($airbyte_role) to user identifier($airbyte_username);

-- change role to sysadmin for warehouse / database steps
use role sysadmin;

-- create Airbyte warehouse
create warehouse if not exists identifier($airbyte_warehouse) warehouse_size = xsmall warehouse_type = standard auto_suspend = 60 auto_resume = true initially_suspended = true;

-- create Airbyte database
create database if not exists identifier($airbyte_database);

-- grant Airbyte warehouse access
grant USAGE on warehouse identifier($airbyte_warehouse) to role identifier($airbyte_role);

-- grant Airbyte database access
grant OWNERSHIP on database identifier($airbyte_database) to role identifier($airbyte_role);

commit;

begin;
USE DATABASE identifier($airbyte_database);

-- create schema for Airbyte data
CREATE SCHEMA IF NOT EXISTS identifier($airbyte_schema);

commit;

begin;

-- grant Airbyte schema access
grant OWNERSHIP on schema identifier($airbyte_schema) to role identifier($airbyte_role);

commit;

This will spin up a database, ready to store your data. Move back to Airbyte cloud destination connectors and set up Snowflake Cortex. Make sure to set up your credentials with the following format based on the script above. Finally, test the source to make sure it's working as expected.

  •  Chunk size - Different embedding models have different token limitations. In this tutorial I used 1000 for Open AI embedding option.The best chunking is also dependent on the data you are dealing with.
  •   Embedding Model - Paste your OpenAI api key and save.
  • Setup the Indexes as shown below; 

STEP 3: Move Data

Next, We create a connection and sync the data to access it in a snowflake instance. Here is an example of successful connections after sync;

STEP4: Explore data in Snowflake

At this point you should be able to see the data in Snowflake. The following columns will be available in your database, tables have the following columns

  •  DOCUMENT_ID - unique based on primary key
  •  CHUNK_ID - randomly generated uuid
  •  DOCUMENT_CONTENT - text context from source
  •  METADATA - metadata from source
  •  EMBEDDING - Vector representation of the document_content

Here is a snippet of how one of my results appears.

STEP 5: Building the RAG with Snowflake Cortex Functions.

RAG heavily relies on semantic comparison techniques. The measurement of similarity between vectors is a fundamental operation in semantic comparison. This operation is used to find the top N closest vectors to a query vector, which can be used for semantic search. Vector search also enables developers to improve the accuracy of their generative AI responses by providing related documents to a large language model. 

The key elements in the RAG process are :

  1. Generate Embeddings from the query : Converting a question into a vector array.

Ideally, You can either embed data using OpenAI, Cohere, OpenAI compatible or Fake(from Airbyte Cloud UI). Then you have to embed the question with the appropriate method distinct from each model.

In cases you use Fake to embed the data, you will need to replace the fake embedding in Snowflake with the Snowflake Cortex embedding.

You can use the following functions to embed data instantly on Snowflake Cortex:

If you use OpenAI the data embedding model, you will generate the embeddings using OpenAI embedding function.

  1. Similarity Search to find matching chunks  

Snowflake Cortex provides three vector similarity functions:

VECTOR_INNER_PRODUCT 

VECTOR_L2_DISTANCE 

VECTOR_COSINE_SIMILARITY  :We will use this function in our demo.

  1. Leverage In-built Snowflake Cortex Completion : This will find matching chunks.

Learn how to manage privileges in Snowflake to allow you to use Cortex functions like complete here

# use Snowflake's Cortex in-build completion to find matching chunks.
def get_completion_from_snowflake(question, document_chunks: List[str], model_name):

prompt = 'You are a cryptocurrency investment advisor and specialize in bitcoin. Answer the question based on the context. Do not use any other information. Be concise. When returning a list of items, please enumerate description on separate lines'

        print(f"\nSending chunks to Snowflake (LLM: {model_name}) for completion...")
        conn = get_db_connection()
        cur = conn.cursor()

        chunks = "\n\n".join(document_chunks)

        query = f"""
        SELECT snowflake.cortex.complete(
        '{model_name}',
        CONCAT(
           prompt ,'Context: ',
            $$
            {chunks}
            
 {question} $$,
        'Answer: '
        )
        ) as response;"""
        cur.execute(query)
        result = cur.fetchall()
        cur.close()
        conn.close()
        return result[0][0].strip()

Finally, get the response.

To gain a better understanding of the logic, visit the following resources, Google Colab  to use OpenAI embeddings and codelab  to use a fake model.

Demo of our crypto advisor RAG

 Conclusion

This tutorial provides a step-by-step guide on leveraging Airbyte data in Snowflake infra, Snowflake Cortex and LLM to perform RAG operations. As we saw in our demo, the measurement of similarity between vectors is a fundamental operation in semantic comparison. By following the tutorial, you can easily utilize valuable data to gain high quality insights.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Similar use cases

No similar recipes were found, but check back soon!