In this notebook, we'll illustrate how to load data from airbyte-source into Snowflake using PyAirbyte, and afterwards convert the stream data into vector. In this, we've used source-github and stream 'issues', but you can replace the source according to your requirements.
Prerequisites
- GitHub Access Token:
- Snowflake:
Install PyAirbyte and other dependencies
# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv
# First, we need to install the necessary libraries.
!pip3 install airbyte snowflake-connector-python
Setup Source Github
import airbyte as ab
source = ab.get_source(
"source-github",
config={
"repositories": ["airbytehq/quickstarts"],
"credentials": {
"personal_access_token": ab.get_secret("GITHUB_API_KEY"),
},
},
)
source.check()
Reads the data from the selected issues stream, extracting the GitHub issues data for further processing.
source.get_available_streams()
source.select_streams(["issues"]) # we are only interested in issues stream
read_result = source.read()
issues = [doc for doc in read_result["issues"].to_documents()] # Will be useful for vector_embedding
issue_df = read_result['issues'].to_pandas() # Converting data to pandas frame
print(str(issues[5]))
Loading data into Snowflake
It uses the snowflake.connector module to connect to Snowflake with the provided credentials fetched from secrets, Make sure to add your key to the Secrets section on the left.
from snowflake import connector
conn = connector.connect(
account=ab.get_secret("SNOWFLAKE_HOST"),
role=ab.get_secret("SNOWFLAKE_ROLE"),
warehouse=ab.get_secret("SNOWFLAKE_WAREHOUSE"),
database=ab.get_secret("SNOWFLAKE_DATABASE"),
schema=ab.get_secret("SNOWFLAKE_SCHEMA"),
user=ab.get_secret("SNOWFLAKE_USERNAME"),
password=ab.get_secret("SNOWFLAKE_PASSWORD"),
)
cur = conn.cursor()
print(ab.get_secret("SNOWFLAKE_SCHEMA"))
A function to create a Snowflake table based on the schema of a Pandas DataFrame and then uses this function to create a github_issue table in Snowflake from the issue_df DataFrame.
import pandas as pd
def create_table_from_dataframe(conn, df, table_name):
cursor = conn.cursor()
database = ab.get_secret('SNOWFLAKE_DATABASE')
print(database)
cursor.execute(f'USE DATABASE {database}')
schema_name = ab.get_secret('SNOWFLAKE_SCHEMA')
cursor.execute(f'USE SCHEMA {schema_name}')
columns = []
for column, dtype in zip(df.columns, df.dtypes):
if pd.api.types.is_integer_dtype(dtype):
snowflake_type = 'INTEGER'
elif pd.api.types.is_float_dtype(dtype):
snowflake_type = 'FLOAT'
elif pd.api.types.is_bool_dtype(dtype):
snowflake_type = 'BOOLEAN'
elif pd.api.types.is_datetime64_any_dtype(dtype):
snowflake_type = 'TIMESTAMP'
else:
snowflake_type = 'STRING'
columns.append(f'"{column}" {snowflake_type}')
create_table_sql = f'CREATE TABLE {table_name} ({", ".join(columns)});'
cursor.execute(create_table_sql)
# Example usage:
create_table_from_dataframe(conn, issue_df, 'github_issue') # Keep table name according to your requirments
upload_dataframe_to_snowflake that uses Snowflake's pandas integration (write_pandas) to upload a Pandas DataFrame (issue_df) into a Snowflake table ('GITHUB_ISSUE').
from snowflake.connector.pandas_tools import write_pandas
def upload_dataframe_to_snowflake(conn, df, table_name):
success, nchunks, nrows, _ = write_pandas(conn, df, table_name)
if success:
print(f"Successfully inserted {nrows} rows into {table_name}.")
else:
print("Failed to insert data.")
upload_dataframe_to_snowflake(conn, issue_df, 'GITHUB_ISSUE') # Remember to use table name in uppercase
Vector Embedding the Data
Now we utilize the RecursiveCharacterTextSplitter from langchain.text_splitter to segment documents (issues) into smaller chunks based on specified parameters (chunk_size and chunk_overlap).
Then we organize the chunked documents into a Pandas DataFrame (df) with columns for page content (PAGE_CONTENT), metadata (META), and type (TYPE), ensuring all data is represented as strings for consistency and analysis.
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pandas as pd
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
chunked_docs = splitter.split_documents(issues)
print(f"Created {len(chunked_docs)} document chunks.")
for doc in chunked_docs:
for md in doc.metadata:
doc.metadata[md] = str(doc.metadata[md])
df = pd.DataFrame(chunked_docs, columns=['PAGE_CONTENT','META','TYPE']) # please use uppercase
# Convert all columns to string
df = df.astype(str)
print(df.head(3))
Now we establish a new table for storing vector embedded data. First, we create a data and store chunked data, and then we vector embed the data.
create_table_from_dataframe(conn, df, 'vector_github_issues')
upload_dataframe_to_snowflake(conn, df, 'VECTOR_GITHUB_ISSUES') #use uppercase
Now, using Snowflake Cortex, we will turn the page content column into embedding and store them in the embedding column. Different embedding models are available here.
cur = conn.cursor()
# Step 1: Add the new column to store the embeddings
# We are using vector dimension 1024
alter_table_query = """
ALTER TABLE VECTOR_GITHUB_ISSUES
ADD COLUMN embedding VECTOR(FLOAT, 1024);
"""
cur.execute(alter_table_query)
# Step 2: Update the new column with embeddings from Cortex
# Note: Using a subquery to avoid issues with updating the same table in place
update_query = """
UPDATE VECTOR_GITHUB_ISSUES
SET embedding = (
SELECT SNOWFLAKE.CORTEX.EMBED_TEXT_1024('nv-embed-qa-4', page_content)
);
"""
cur.execute(update_query)
This approach demonstrates how to seamlessly integrate data retrieval from an Airbyte source, such as GitHub issues, and efficiently store it in Snowflake for further analysis. By utilizing PyAirbyte for data extraction and Snowflake's capabilities for data warehousing