PyAirbyte Custom Postgres Cache Demo
Below is a pre-release demo of PyAirbyte, showcasing how to use Postgres as a Cache.
This notebook is designed to be run in a Google Colab only. It installs packages on the system and requires sudo access. If you want to run it in a local Jupyter notebook, please proceed with caution.
Install PyAirbyte
# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv
# Install PyAirbyte
%pip install --quiet airbyte
Install and setup Postgres (optional)
If you are not running this notebook on Google Colab, or you prefer to use an existing database, please skip the following setup and proceed to the next section.
# Install postgresql server
!sudo apt-get -y -qq update
!sudo apt-get -y -qq install postgresql
!sudo service postgresql start
# Setup a password `postgres` for username `postgres`
!sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"
# Setup a database with name `pyairbyte_demo` to be used
!sudo -u postgres psql -U postgres -c 'DROP DATABASE IF EXISTS pyairbyte_demo;'
!sudo -u postgres psql -U postgres -c 'CREATE DATABASE pyairbyte_demo;'
Locating your Data Source
To see what data sources are available, you can check our docs or run the following:
# Import PyAirbyte
import airbyte as ab
# Show all available connectors
ab.get_available_connectors()
Load the Source Data using PyAirbyte
Create and install a source connector:
import airbyte as ab
# Create and install the source:
source: ab.Source = ab.get_source("source-faker")
Installing 'source-faker' into virtual environment '/content/.venv-source-faker'.
Running 'pip install airbyte-source-faker'...
Connector 'source-faker' installed successfully!
For more information, see the source-faker documentation:
https://docs.airbyte.com/integrations/sources/faker#reference
# Configure the source
source.set_config(
config={
"count": 50_000, # Adjust this to get a larger or smaller dataset
"seed": 123,
},
)
# Verify the config and creds by running `check`:
source.check()
Connection check succeeded for `source-faker`.
Define a Postgres Cache
Define a PyAirbyte Cache from the existing PosgreSQL database.
from airbyte.caches import PostgresCacheConfig, PostgresCache
#Define a Postgres Cache and pass the necessary configuration
pg_cache = PostgresCache(
PostgresCacheConfig(
host="localhost",
port=5432,
username="postgres",
password="postgres",
database="pyairbyte_demo"
)
)
Read Data into the PostgreSQL Cache
# Select all of the source's streams and read data into the previously defined Postgres cache:
source.select_all_streams()
read_result: ab.ReadResult = source.read(cache=pg_cache)
Read Progress
Started reading at 22:27:43.
Read 100,100 records over 59 seconds (1,696.6 records / second).
Wrote 100,100 records over 13 batches.
Finished reading at 22:28:42.
Started finalizing streams at 22:28:42.
Finalized 13 batches over 8 seconds.
Completed 3 out of 3 streams:
Completed writing at 22:28:51. Total time elapsed: 1min 8s
Completed `source-faker` read operation at 22:28:51.
Working in SQL
Since data is cached in the Postgres database, we can query the data with SQL.
We can do this in multiple ways. One way is to use the JupySQL Extension, which we'll use below.
# Install JupySQL to enable SQL cell magics
%pip install --quiet jupysql
# Load JupySQL extension
%load_ext sql
# Configure max row limit (optional)
%config SqlMagic.displaylimit = 200
# Get the SQLAlchemy 'engine' object for the cache
engine = read_result.cache.get_sql_engine()
# Pass the engine to JupySQL
%sql engine
# Get table objects for the 'users' and 'purchases' streams
users_table = read_result.cache["users"].to_sql_table()
purchases_table = read_result.cache["purchases"].to_sql_table()
display([users_table.fullname, purchases_table.fullname])
['airbyte_raw.users', 'airbyte_raw.purchases']
%%sql
# Show most recent purchases by purchase date:
SELECT users.id, users.name, purchases.product_id, purchases.purchased_at
FROM {{ users_table.fullname }} AS users
JOIN {{ purchases_table.fullname }} AS purchases
ON users.id = purchases.user_id
ORDER BY purchases.purchased_at DESC
LIMIT 10
Running query in 'postgresql+psycopg2://postgres:***@localhost:5432/pyairbyte_demo'
10 rows affected.
40 Kelvin 80 None
44 Jen 1 None
24 Nestor 20 None
19 Marquitta 93 None
43 Tari 26 None
28 Porter 10 None
1 Murray 41 None
9 Travis 70 None
5 Osvaldo 89 None
46 Rodger 35 None