Extract data from Postgres using PyAirbyte

Learn how to leverage PyAirbyte and use Postgres as a Cache, while running in a Google Colab only. It installs packages on the system and requires sudo access.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

PyAirbyte Custom Postgres Cache Demo

Below is a pre-release demo of PyAirbyte, showcasing how to use Postgres as a Cache.

This notebook is designed to be run in a Google Colab only. It installs packages on the system and requires sudo access. If you want to run it in a local Jupyter notebook, please proceed with caution.

Install PyAirbyte

# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv

# Install PyAirbyte
%pip install --quiet airbyte

Install and setup Postgres (optional)

If you are not running this notebook on Google Colab, or you prefer to use an existing database, please skip the following setup and proceed to the next section.

# Install postgresql server
!sudo apt-get -y -qq update
!sudo apt-get -y -qq install postgresql
!sudo service postgresql start

# Setup a password `postgres` for username `postgres`
!sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"

# Setup a database with name `pyairbyte_demo` to be used
!sudo -u postgres psql -U postgres -c 'DROP DATABASE IF EXISTS pyairbyte_demo;'
!sudo -u postgres psql -U postgres -c 'CREATE DATABASE pyairbyte_demo;'

Locating your Data Source

To see what data sources are available, you can check our docs or run the following:

# Import PyAirbyte
import airbyte as ab

# Show all available connectors
ab.get_available_connectors()

Load the Source Data using PyAirbyte

Create and install a source connector:

import airbyte as ab

# Create and install the source:
source: ab.Source = ab.get_source("source-faker")

Installing 'source-faker' into virtual environment '/content/.venv-source-faker'.
Running 'pip install airbyte-source-faker'...

Connector 'source-faker' installed successfully!
For more information, see the source-faker documentation:
https://docs.airbyte.com/integrations/sources/faker#reference

# Configure the source
source.set_config(
    config={
        "count": 50_000,  # Adjust this to get a larger or smaller dataset
        "seed": 123,
    },
)
# Verify the config and creds by running `check`:
source.check()

Connection check succeeded for `source-faker`.

Define a Postgres Cache

Define a PyAirbyte Cache from the existing PosgreSQL database.

from airbyte.caches import PostgresCacheConfig, PostgresCache

#Define a Postgres Cache and pass the necessary configuration
pg_cache = PostgresCache(
    PostgresCacheConfig(
      host="localhost",
      port=5432,
      username="postgres",
      password="postgres",
      database="pyairbyte_demo"
    )
)

Read Data into the PostgreSQL Cache

# Select all of the source's streams and read data into the previously defined Postgres cache:
source.select_all_streams()
read_result: ab.ReadResult = source.read(cache=pg_cache)

Read Progress

Started reading at 22:27:43.

Read 100,100 records over 59 seconds (1,696.6 records / second).

Wrote 100,100 records over 13 batches.

Finished reading at 22:28:42.

Started finalizing streams at 22:28:42.

Finalized 13 batches over 8 seconds.

Completed 3 out of 3 streams:

  • users
  • products
  • purchases

Completed writing at 22:28:51. Total time elapsed: 1min 8s

Completed `source-faker` read operation at 22:28:51.

Working in SQL

Since data is cached in the Postgres database, we can query the data with SQL.

We can do this in multiple ways. One way is to use the JupySQL Extension, which we'll use below.

# Install JupySQL to enable SQL cell magics
%pip install --quiet jupysql
# Load JupySQL extension
%load_ext sql
# Configure max row limit (optional)
%config SqlMagic.displaylimit = 200
# Get the SQLAlchemy 'engine' object for the cache
engine = read_result.cache.get_sql_engine()
# Pass the engine to JupySQL
%sql engine
# Get table objects for the 'users' and 'purchases' streams
users_table = read_result.cache["users"].to_sql_table()
purchases_table = read_result.cache["purchases"].to_sql_table()
display([users_table.fullname, purchases_table.fullname])

['airbyte_raw.users', 'airbyte_raw.purchases']

%%sql
# Show most recent purchases by purchase date:
SELECT users.id, users.name, purchases.product_id, purchases.purchased_at
FROM {{ users_table.fullname }} AS users
JOIN {{ purchases_table.fullname }} AS purchases
ON users.id = purchases.user_id
ORDER BY purchases.purchased_at DESC
LIMIT 10

Running query in 'postgresql+psycopg2://postgres:***@localhost:5432/pyairbyte_demo'

10 rows affected.

40	Kelvin	80	None
44	Jen	1	None
24	Nestor	20	None
19	Marquitta	93	None
43	Tari	26	None
28	Porter	10	None
1	Murray	41	None
9	Travis	70	None
5	Osvaldo	89	None
46	Rodger	35	None

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Similar use cases

No similar recipes were found, but check back soon!