Join our newsletter to get all the insights on the data stack
Should you build or buy your data pipelines?
Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.
# Add virtual environment support for running in Google Colab
!apt-get install -qq python3.10-venv
# Install PyAirbyte
%pip install --quiet airbyte
Locating your Data Source
To see what data sources are available, you can check our docs or run the following:
# Import PyAirbyte
import airbyte as ab
# Show all available connectors
ab.get_available_connectors()
Load the Source Data using PyAirbyte
Create and install a source connector:
import airbyte as ab
# Create and install the source:
source: ab.Source = ab.get_source("source-faker")
Installing 'source-faker' into virtual environment '/content/.venv-source-faker'.
Running 'pip install airbyte-source-faker'...
Connector 'source-faker' installed successfully!
For more information, see the source-faker documentation:
https://docs.airbyte.com/integrations/sources/faker#reference
# Configure the source
source.set_config(
config={
"count": 50_000, # Adjust this to get a larger or smaller dataset
"seed": 123,
},
)
# Verify the config and creds by running `check`:
source.check()
Connection check succeeded for `source-faker`.
Read Data from the PyAirbyte Cache
Once data is read, we can do anything we want to with the resulting streams. This includes to_pandas() which registers a Pandas dataframe and to_sql_table() which gives us a SQLAlchemy Table boject, which we can use to run SQL queries.
# Select all of the source's streams and read data into the internal cache:
source.select_all_streams()
read_result: ab.ReadResult = source.read()
Read Progress
Started reading at 20:10:41.
Read 100,100 records over 1min 17s (1,300.0 records / second).
Wrote 100,100 records over 11 batches.
Finished reading at 20:11:59.
Started finalizing streams at 20:11:59.
Finalized 11 batches over 1 seconds.
Completed 3 out of 3 streams:
users
purchases
products
Completed writing at 20:12:01. Total time elapsed: 1min 19s
Completed `source-faker` read operation at 20:12:01.
# Display or transform the loaded data
products_df = read_result["products"].to_pandas()
display(products_df)
PyAirbyte integrates with Pandas, which integrates with matplotlib as well as many other popular libraries. We can use this as a means of quickly creating graphs.
%pip install matplotlib
import matplotlib.pyplot as plt
users_df = read_result["users"].to_pandas()
plt.hist(users_df["age"], bins=10, edgecolor="black")
plt.title("Histogram of Ages")
plt.xlabel("Ages")
plt.ylabel("Frequency")
plt.show()
Requirement already satisfied: matplotlib in /usr/local/lib/python3.10/dist-packages (3.7.1)
Requirement already satisfied: contourpy>=1.0.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.2.0)
Requirement already satisfied: cycler>=0.10 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (0.12.1)
Requirement already satisfied: fonttools>=4.22.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (4.48.1)
Requirement already satisfied: kiwisolver>=1.0.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.4.5)
Requirement already satisfied: numpy>=1.20 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (1.23.5)
Requirement already satisfied: packaging>=20.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (23.2)
Requirement already satisfied: pillow>=6.2.0 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (9.4.0)
Requirement already satisfied: pyparsing>=2.3.1 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (3.1.1)
Requirement already satisfied: python-dateutil>=2.7 in /usr/local/lib/python3.10/dist-packages (from matplotlib) (2.8.2)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.10/dist-packages (from python-dateutil>=2.7->matplotlib) (1.16.0)
/usr/local/lib/python3.10/dist-packages/duckdb_engine/__init__.py:178: DuckDBEngineWarning: duckdb-engine doesn't yet support reflection on indices
warnings.warn(
Working in SQL
Since data is cached in a local DuckDB database, we can query the data with SQL.
We can do this in multiple ways. One way is to use the JupySQL Extension, which we'll use below.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.0/95.7 kB ? eta -:--:--
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸━ 92.2/95.7 kB 2.9 MB/s eta 0:00:01
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 95.7/95.7 kB 2.3 MB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 250.9/250.9 kB 13.9 MB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 193.0/193.0 kB 20.5 MB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 41.1/41.1 kB 5.2 MB/s eta 0:00:00
Deploy Panel apps for free on Ploomber Cloud! Learn more: https://ploomber.io/s/signup
# Get the SQLAlchemy 'engine' object for the cache
engine = read_result.cache.get_sql_engine()
# Pass the engine to JupySQL
%sql engine
# Get table objects for the 'users' and 'purchases' streams
users_table = read_result.cache["users"].to_sql_table()
purchases_table = read_result.cache["purchases"].to_sql_table()
display([users_table.fullname, purchases_table.fullname])
['main.users', 'main.purchases']
%%sql
# Show most recent purchases by purchase date:
SELECT users.id, users.name, purchases.product_id, purchases.purchased_at
FROM {{ users_table.fullname }} AS users
JOIN {{ purchases_table.fullname }} AS purchases
ON users.id = purchases.user_id
ORDER BY purchases.purchased_at DESC
LIMIT 10
Running query in 'duckdb:///.cache/default_cache_db.duckdb'
# Show tables for the other streams
%sqlcmd tables
Name
products
purchases
users
_PyAirbyte_state
_PyAirbyte_streams
Should you build or buy your data pipelines?
Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.