No items found.

Extract crypto data to Snowflake using PyAirbyte

Learn how to use PyAirbyte to extract cryptocurrency data from CoinAPI.io, and load it to Snowflake, followed by a series of transformations and analyses to derive meaningful insights from this data.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

PyAirbyte Custom Snowflake Cache Demo

In this demo, we use PyAirbyte to ingest cryptocurrency data from CoinAPI.io into Snowflake.

Prerequisites

  • CoinAPI API key.
  • A Snowflake account with a database configured to work with PyAirbyte. Find specific details around config in our documentation.

Install PyAirbyte

# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv

# Install PyAirbyte
%pip install --quiet airbyte

Define a Snowflake Cache

Define a PyAirbyte Cache for Snowflake.

from airbyte.caches import SnowflakeCache
from google.colab import userdata

# Define a Snowflake Cache and pass the necessary configuration
sf_cache = SnowflakeCache(
      account=userdata.get("SNOWFLAKE_ACCOUNT"),
      username=userdata.get("SNOWFLAKE_USERNAME"),
      password=userdata.get("SNOWFLAKE_PASSWORD"),
      warehouse="AIRBYTE_DEVELOP_WAREHOUSE",
      database="AIRBYTE_DEVELOP",
      role="AIRBYTE_DEVELOPER",
      schema_name="PYAIRBYTE_DEMO"
)

Load the Source Data using PyAirbyte

In this section, we establish a connection to CoinAPI.io to access cryptocurrency data via PyAirbyte. The connector is configured with necessary parameters like the API key, environment setting, symbol ID for the specific cryptocurrency index (in this case, COINBASE_SPOT_INDEX_USD), and the data period we are interested in. Check the docs for more details.

We select all available streams for the source, which you can consult using the get_available_streams() method, or the docs. Then, we proceed to read from the source into Snowflake.

import airbyte as ab

# Configure and read from the source
read_result = ab.get_source(
    "source-coin-api",
    config={
          "api_key": userdata.get("API_KEY"),
          "environment": "production",
          "symbol_id": "COINBASE_SPOT_INDEX_USD",
          "period": "1DAY",
          "start_date": "2023-01-01T00:00:00"
    },
    streams=["ohlcv_historical_data", "trades_historical_data", "quotes_historical_data"],
).read(cache=sf_cache)

Read data from Snowflake

Read from the already-written Snowflake table into a pandas Dataframe. After the data is in the cache, you can read it without re-configuring or re-creating the source object.

# Read from the cache into a pandas Dataframe:
ohlcv_df = read_result["ohlcv_historical_data"].to_pandas()

Run data transformations

  • Convert time_period_start to datetime for easy handling of dates.
  • Convert numeric columns to numeric types for calculations.
  • Calculate daily_movement to analyze daily price changes in the market.
import pandas as pd

# Convert 'time_period_start' to datetime format and necessary columns to numeric
ohlcv_df['time_period_start'] = pd.to_datetime(ohlcv_df['time_period_start'])
numeric_columns = ['price_open', 'price_high', 'price_low', 'price_close', 'volume_traded', 'trades_count']
ohlcv_df[numeric_columns] = ohlcv_df[numeric_columns].apply(pd.to_numeric, errors='coerce')

# Calculate daily price movement
ohlcv_df['daily_movement'] = ohlcv_df['price_close'] - ohlcv_df['price_open']

Write Dataframe to Snowflake

Get a SQL engine from the Snowflake cache

engine = sf_cache.get_sql_engine()

Now, we can write our transformed Dataframe back to Snowflake in a new table called daily_movement.

from snowflake.connector.pandas_tools import pd_writer

ohlcv_df.to_sql('daily_movement', engine, index=False, method=pd_writer, if_exists='replace')

Similar use cases

No similar recipes were found, but check back soon!