In this demo, we use PyAirbyte to ingest cryptocurrency data from CoinAPI.io into Snowflake.
Prerequisites
- CoinAPI API key.
- A Snowflake account with a database configured to work with PyAirbyte. Find specific details around config in our documentation.
Install PyAirbyte
# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv
# Install PyAirbyte
%pip install --quiet airbyte
Define a Snowflake Cache
Define a PyAirbyte Cache for Snowflake.
from airbyte.caches import SnowflakeCache
from google.colab import userdata
# Define a Snowflake Cache and pass the necessary configuration
sf_cache = SnowflakeCache(
account=userdata.get("SNOWFLAKE_ACCOUNT"),
username=userdata.get("SNOWFLAKE_USERNAME"),
password=userdata.get("SNOWFLAKE_PASSWORD"),
warehouse="AIRBYTE_DEVELOP_WAREHOUSE",
database="AIRBYTE_DEVELOP",
role="AIRBYTE_DEVELOPER",
schema_name="PYAIRBYTE_DEMO"
)
Load the Source Data using PyAirbyte
In this section, we establish a connection to CoinAPI.io to access cryptocurrency data via PyAirbyte. The connector is configured with necessary parameters like the API key, environment setting, symbol ID for the specific cryptocurrency index (in this case, COINBASE_SPOT_INDEX_USD), and the data period we are interested in. Check the docs for more details.
We select all available streams for the source, which you can consult using the get_available_streams()
method, or the docs. Then, we proceed to read from the source into Snowflake.
import airbyte as ab
# Configure and read from the source
read_result = ab.get_source(
"source-coin-api",
config={
"api_key": userdata.get("API_KEY"),
"environment": "production",
"symbol_id": "COINBASE_SPOT_INDEX_USD",
"period": "1DAY",
"start_date": "2023-01-01T00:00:00"
},
streams=["ohlcv_historical_data", "trades_historical_data", "quotes_historical_data"],
).read(cache=sf_cache)
Read data from Snowflake
Read from the already-written Snowflake table into a pandas Dataframe. After the data is in the cache, you can read it without re-configuring or re-creating the source object.
# Read from the cache into a pandas Dataframe:
ohlcv_df = read_result["ohlcv_historical_data"].to_pandas()
Run data transformations
- Convert
time_period_start
to datetime for easy handling of dates. - Convert numeric columns to numeric types for calculations.
- Calculate
daily_movement
to analyze daily price changes in the market.
import pandas as pd
# Convert 'time_period_start' to datetime format and necessary columns to numeric
ohlcv_df['time_period_start'] = pd.to_datetime(ohlcv_df['time_period_start'])
numeric_columns = ['price_open', 'price_high', 'price_low', 'price_close', 'volume_traded', 'trades_count']
ohlcv_df[numeric_columns] = ohlcv_df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# Calculate daily price movement
ohlcv_df['daily_movement'] = ohlcv_df['price_close'] - ohlcv_df['price_open']
Write Dataframe to Snowflake
Get a SQL engine from the Snowflake cache
engine = sf_cache.get_sql_engine()
Now, we can write our transformed Dataframe back to Snowflake in a new table called daily_movement
.
from snowflake.connector.pandas_tools import pd_writer
ohlcv_df.to_sql('daily_movement', engine, index=False, method=pd_writer, if_exists='replace')