No items found.
No items found.

Leverage PyAirbyte with this demo

This is a demo of how you can leverage PyAirbyte to load the source data and read it from PyAirbyte cache, read its progress, create graphs and more.

Summarize with ChatGPT

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

PyAirbyte Demo

Below is a pre-release demo of PyAirbyte.

Install PyAirbyte

# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv

# Install PyAirbyte
%pip install --quiet airbyte

Locating your Data Source

To see what data sources are available, you can check our docs or run the following:

# Import PyAirbyte
import airbyte as ab

# Show all available connectors
ab.get_available_connectors()

Load the Source Data using PyAirbyte

Create and install a source connector:

import airbyte as ab

# Create and install the source:
source: ab.Source = ab.get_source("source-faker")
Installing 'source-faker' into virtual environment '/content/.venv-source-faker'.
Running 'pip install airbyte-source-faker'...
Connector 'source-faker' installed successfully!
For more information, see the source-faker documentation:
https://docs.airbyte.com/integrations/sources/faker#reference
# Configure the source
source.set_config(
config={
"count": 50_000, # Adjust this to get a larger or smaller dataset
"seed": 123,
},
)
# Verify the config and creds by running `check`:
source.check()
Connection check succeeded for `source-faker`.

Read Data from the PyAirbyte Cache

Once data is read, we can do anything we want to with the resulting streams. This includes to_pandas() which registers a Pandas dataframe and to_sql_table() which gives us a SQLAlchemy Table boject, which we can use to run SQL queries.

# Select all of the source's streams and read data into the internal cache:
source.select_all_streams()
read_result: ab.ReadResult = source.read()

Read Progress

Started reading at 20:10:41.

Read 100,100 records over 1min 17s (1,300.0 records / second).

Wrote 100,100 records over 11 batches.

Finished reading at 20:11:59.

Started finalizing streams at 20:11:59.

Finalized 11 batches over 1 seconds.

Completed 3 out of 3 streams:

  • users
  • purchases
  • products

Completed writing at 20:12:01. Total time elapsed: 1min 19s

Completed `source-faker` read operation at 20:12:01.

# Display or transform the loaded data
products_df = read_result["products"].to_pandas()
display(products_df)
	id	make	model	year	price	created_at	updated_at
0 1 Mazda MX-5 2008 2869.0 2022-02-01 17:02:19 2024-02-12 20:10:42
1 2 Mercedes-Benz C-Class 2009 42397.0 2021-01-25 14:31:33 2024-02-12 20:10:42
2 3 Honda Accord Crosstour 2011 63293.0 2021-02-11 05:36:03 2024-02-12 20:10:42
3 4 GMC Jimmy 1998 34079.0 2022-01-24 03:00:03 2024-02-12 20:10:42
4 5 Infiniti FX 2004 17036.0 2021-10-02 03:55:44 2024-02-12 20:10:42
... ... ... ... ... ... ... ...
95 96 BMW 330 2006 14494.0 2021-09-17 20:52:48 2024-02-12 20:10:42
96 97 Audi R8 2008 17642.0 2021-09-21 11:56:24 2024-02-12 20:10:42
97 98 Cadillac CTS-V 2007 19914.0 2021-09-02 15:38:46 2024-02-12 20:10:42
98 99 GMC 1500 Club Coupe 1997 82288.0 2021-04-20 18:58:15 2024-02-12 20:10:42
99 100 Buick Somerset 1986 64148.0 2021-06-10 19:07:38 2024-02-12 20:10:42
100 rows × 7 columns

Creating graphs

PyAirbyte integrates with Pandas, which integrates with matplotlib as well as many other popular libraries. We can use this as a means of quickly creating graphs.

%pip install matplotlib

import matplotlib.pyplot as plt

users_df = read_result["users"].to_pandas()

plt.hist(users_df["age"], bins=10, edgecolor="black")
plt.title("Histogram of Ages")
plt.xlabel("Ages")
plt.ylabel("Frequency")
plt.show()
Requirement already satisfied: matplotlib in /usr/local/lib/python3.10/dist-packages (3.7.1)
Requirement already satisfied: contourpy>=1.0.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.2.0)
Requirement already satisfied: cycler>=0.10 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (0.12.1)
Requirement already satisfied: fonttools>=4.22.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (4.48.1)
Requirement already satisfied: kiwisolver>=1.0.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.4.5)
Requirement already satisfied: numpy>=1.20 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.23.5)
Requirement already satisfied: packaging>=20.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (23.2)
Requirement already satisfied: pillow>=6.2.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (9.4.0)
Requirement already satisfied: pyparsing>=2.3.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (3.1.1)
Requirement already satisfied: python-dateutil>=2.7 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (2.8.2)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.10/dist-packages (from python-dateutil>=2.7->matplotlib) (1.16.0)
/usr/local/lib/python3.10/dist-packages/duckdb_engine/__init__.py:178: DuckDBEngineWarning: duckdb-engine doesn't yet support reflection on indices
warnings.warn(

Working in SQL

Since data is cached in a local DuckDB database, we can query the data with SQL.

We can do this in multiple ways. One way is to use the JupySQL Extension, which we'll use below.

# Install JupySQL to enable SQL cell magics
%pip install --quiet jupysql
# Load JupySQL extension
%load_ext sql
# Configure max row limit (optional)
%config SqlMagic.displaylimit = 200
 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.0/95.7 kB ? eta -:--:--
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸━ 92.2/95.7 kB 2.9 MB/s eta 0:00:01
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 95.7/95.7 kB 2.3 MB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 250.9/250.9 kB 13.9 MB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 193.0/193.0 kB 20.5 MB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 41.1/41.1 kB 5.2 MB/s eta 0:00:00
Deploy Panel apps for free on Ploomber Cloud! Learn more: https://ploomber.io/s/signup
# Get the SQLAlchemy 'engine' object for the cache
engine = read_result.cache.get_sql_engine()
# Pass the engine to JupySQL
%sql engine
# Get table objects for the 'users' and 'purchases' streams
users_table = read_result.cache["users"].to_sql_table()
purchases_table = read_result.cache["purchases"].to_sql_table()
display([users_table.fullname, purchases_table.fullname])

['main.users', 'main.purchases']

%%sql
# Show most recent purchases by purchase date:
SELECT users.id, users.name, purchases.product_id, purchases.purchased_at
FROM {{ users_table.fullname }} AS users
JOIN {{ purchases_table.fullname }} AS purchases
ON users.id = purchases.user_id
ORDER BY purchases.purchased_at DESC
LIMIT 10

Running query in 'duckdb:///.cache/default_cache_db.duckdb'

id	name	product_id	purchased_at
21589 Torie 48 2024-12-10 20:32:08
39842 Kareen 45 2024-11-28 11:26:13
19248 Jerry 41 2024-11-05 00:59:42
30780 Dwana 82 2024-10-20 20:09:11
4669 Frankie 100 2024-10-15 16:23:02
42204 Lashaun 9 2024-10-06 08:06:59
13251 Charlie 46 2024-09-19 17:55:52
47798 Issac 40 2024-09-13 05:13:17
34970 Eleni 39 2024-09-13 00:15:55
24782 Jose 40 2024-09-08 09:49:06
# Show tables for the other streams
%sqlcmd tables
Name
products
purchases
users
_PyAirbyte_state
_PyAirbyte_streams

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Similar use cases

No similar recipes were found, but check back soon!