No items found.

Create and Monitor a Data Pipeline Using the Airbyte API

Learn how to export data from a Postgres table to a .CSV file in an Azure Blob Storage by using the new Airbyte API

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

In this tutorial, you will learn how to export data from a Postgres table to a .CSV file in an Azure Blob Storage by using the new Airbyte’s API, which was recently launched as Generally Available, and will allow developers to automate data movement tasks easily.

For this tutorial, I will use C# as the programming language and I will use the RestSharp library to make all the API requests. However, you could use the language of your choice. In addition, If you would like to analyze some sample codes of how to use the API in different programming languages, you can check out Airbyte’s API documentation.

Before going straight to the steps of how to use the API, please make sure you meet the following prerequisites:

Prerequisites

  • Airbyte account: You need to have a valid Airbyte Cloud account. If you are just starting to explore the tool, you can create a 14-day trial account without the need to register a credit card, which I think is awesome.
  • IDE: Make sure you have your IDE of choice installed and ready to be used. I will be using Visual Studio Community Edition and I will be creating a new project from scratch.
  • RestSharp library: As mentioned previously, this is the library I will be using to make all the API requests. If you are planning to use it as well and you will be using the same IDE, then all you need to do is open the command line, go to your project directory, and install the package by running the following command:
dotnet add package RestSharp
  • Newtonsoft library: This is not completely necessary, but I used it to deserialize the API response and to get some values I needed for the demo. The Newtonsoft library can be installed by running the following command:
dotnet add package Newtonsoft.Json

Now that you are all set up and ready, you can follow the next steps to learn how to use the API:

Step 1: Create an API Key

The API Key is the way you identify and authenticate yourself, or your app, to make requests to the API. Due to security reasons, you must create an API Key to be able to use Airbyte’s API.

The key must be created at Airbyte’s Developer Portal, which is the UI Airbyte has available to manage all administrative tasks you would do related to the API, such as creating or revoking a new API Key, monitoring your API requests, and viewing some usage metrics of the last 24 hours, up to the last 30 days.

The portal’s interface is pretty easy and intuitive. To create a new API Key, you have to click on the API Keys option in the left panel, and then click on the button New Api Key:

API Keys option at Airbyte’s Developer Portal

Once you click on the button and type the name you’d like to assign to the key, you will be able to see the key value. You can copy it and save it in a file, or you can download the .key file, which will have the key value in it. You will be able to view the key only once, so make sure you store it safely.

Step 2: Create a Source

My Source is a Postgres table called Person, and according to the documentation, certain parameters are mandatory and some of them are not. I decided to add some extra parameters, just to make sure everything is created as I would like it to be. These are the parameters I used in my API call:

Postgres parameters - Source creation

I used the default Postgres port, and my table was created under the schema public. My database does require a secure connection, and that’s why I’m setting the ssl_mode as required. The replication method will be the standard one, and I’m not using any SSH tunnel.

Regarding the Workspace ID, you can get it by going to Airbyte’s portal and checking out the URL. The value after the section workspaces is your workspace ID.

workspaceId value - Airbyte’s portal

Once you have these parameters ready, you can make your API request. Below you can find the code I used. All you need to do is to replace the values according to your environment.

var client = new RestClient("https://api.airbyte.com/v1/sources");
var request = new RestRequest();
request.AddHeader("accept", "application/json");
request.AddHeader("content-type", "application/json");
request.AddHeader("Authorization", "Bearer {YourApiKey}");
request.AddParameter("application/json", "{\"configuration\":{\"host\":\"YourDatabaseServerName\",\"port\":5432,\"sourceType\":\"postgres\",\"database\":\"YourDatabaseName\",\"username\":\"YourSQLUser\",\"password\":\"YourSQLUserPassword\",\"schemas\":[\"public\"],\"ssl_mode\":{\"mode\":\"require\"},\"replication_method\":{\"method\":\"Standard\"},\"tunnel_method\":{\"tunnel_method\":\"NO_TUNNEL\"}},\"name\":\"YourSourceName\",\"workspaceId\":\"YourWorkspaceID\"}", ParameterType.RequestBody);
var response = client.ExecutePost(request);

Keep in mind the following for your first API call, it could save you a bit of time:

  • Numeric parameters do not need to have the embedded quotation mark \” as the string values must have. See the example below:

Do not add them, or you will the following error:

Unprocessable-entity error message
  • If you are planning to create a pipeline programmatically, you will probably need the IDs of the resources you are creating in subsequent steps of your process. If so, please note that you will be able to get their IDs from the response’s content:
Source ID - API response
  • If you get an error with a main message similar to Internal error, and your request is correct, please try again. I got that message twice, and I believe it was related to connectivity issues. Once I made the API call again, I didn’t get any errors.
  • Airbyte’s API-documentation portal comes in handy when you are building the parameters string. It will save you from making mistakes, such as the one I made with the numeric-parameters’ quotation mark.
API-parameters string creation - Airbyte’s API-documentation portal 

Step 3: Create a Destination

Now that you have your Source created, you can create your Destination. In this case, my destination is an Az Blob Storage account. For this specific Destination, I used the following parameters:

Az Blob Storage parameters - Destination creation

To know the azure_blob_storage_account_name and the azure_blob_storage_endpoint_domain_name values, go to the storage account resource in the Azure portal, and then click on the Endpoints option of the Settings menu in the left panel:

azure_blob_storage_account_name and the azure_blob_storage_endpoint_domain_name values - Azure portal

Look in the Blob service endpoint to get the azure_blob_storage_account_name and the azure_blob_storage_endpoint_domain_name.

To get the azure_blob_storage_account_key value, go to the storage account resource in the Azure portal, and then click on the Access keys option of the Security + networking menu in the left panel. Once there, click on the Show button of the key1, which is the principal access key:

azure_blob_storage_account_key value - Azure portal

To get the azure_blob_storage_container_name, no matter if you have a container or not, you can go to the storage account resource in the Azure portal, and then click on the Containers option of the Data storage menu in the left panel. Once there, you will be able to see the existing containers or create a new one:

azure_blob_storage_container_name value - Azure portal

Once you have these parameters ready, you can make your API request. Below you can find the code I used. All you need to do is replace the values according to your environment.

var client = new RestClient("https://api.airbyte.com/v1/destinations");
var request = new RestRequest();
request.AddHeader("accept", "application/json");
request.AddHeader("content-type", "application/json");
request.AddHeader("Authorization", "Bearer {YourApiKey}");
request.AddParameter("application/json", "{\"configuration\":{\"azure_blob_storage_endpoint_domain_name\":\"blob.core.windows.net\",\"azure_blob_storage_container_name\":\"YourContainerName\",\"azure_blob_storage_account_name\":\"YourAccountName\",\"azure_blob_storage_account_key\":\"YourAccessKey\",\"azure_blob_storage_output_buffer_size\":5,\"format\":{\"format_type\":\"CSV\",\"flattening\":\"No flattening\"},\"azure_blob_storage_spill_size\":500,\"destinationType\":\"azure-blob-storage\"},\"name\":\"Dst_AzBlob\",\"workspaceId\":\"YourWorkspaceID\"}", ParameterType.RequestBody);
var response = client.ExecutePost(request);

C# code - Create Destination with Airbyte’s API

Step 4: Create a Connection

Now that we have our Source and Destination, we can create our Connection. These are the parameters I used:

Connection parameters - Connection creation

As mentioned in the Source-creation step, you can get the Source ID from the response’s content. However, if you are just doing a test, you can get the Source and the Destination IDs by going to Airbyte’s portal, clicking on their respective options in the left panel, and going into the details of each one to see their IDs in the URLs, just as we did with the Workspace ID.

sourceId value - Airbyte’s portal
destinationId value - Airbyte’s portal

Once you have these parameters ready, you can make your API request. Below you can find the code I used. All you need to do is to replace the values according to your environment.

var client = new RestClient("https://api.airbyte.com/v1/connections");
var request = new RestRequest();
request.AddHeader("accept", "application/json");
request.AddHeader("content-type", "application/json");
request.AddHeader("Authorization", "Bearer {YourApiKey}");
request.AddParameter("application/json", "{\"sourceId\":\"YourSourceID\",\"destinationId\":\"YourDestinationID\",\"name\":\"AzPostgres-AzBlob2\",\"schedule\":{\"scheduleType\":\"manual\"},\"dataResidency\":\"auto\",\"namespaceDefinition\":\"destination\",\"namespaceFormat\":null,\"nonBreakingSchemaUpdatesBehavior\":\"ignore\",\"configurations\":{\"streams\":[{\"syncMode\":\"full_refresh_overwrite\",\"name\":\"YourTableName\"}]}}", ParameterType.RequestBody);
var response = client.ExecutePost(request);

Step 5: Trigger and monitor a Job

We are now ready to export data from our table to our storage account. Our final step is to run a Sync of our Connection between the Source and Destination. These are the parameters I used:

Job-triggering parameters - Job triggering

As mentioned before, you can get the Connection ID from the response’s content, or by going to Airbyte’s portal, clicking on the respective option in the left panel, and going into the details of the Connection to see its ID in the URL, just as we did with the Workspace ID:

connectionId value - Airbyte’s portal

Once you have this parameter ready, you can make your API request. Below you can find the code I used. All you need to do is to replace the values according to your environment.

var client = new RestClient("https://api.airbyte.com/v1/jobs");
var request = new RestRequest();
request.AddHeader("accept", "application/json");
request.AddHeader("content-type", "application/json");
request.AddHeader("Authorization", "Bearer {YourApiKey}");
request.AddParameter("application/json", "{\"connectionId\":\"YourConnectionID\",\"jobType\":\"sync\"}", ParameterType.RequestBody);
var response = client.ExecutePost(request);

Now, let’s see it in action:

This is our Source, a table called Person created in the public schema, and It has only 6 records:

Source table in Postgres database

Our target container in our storage account is completely empty:

Destination storage account

We run our Sync and monitor its execution every 10 seconds. This is what it looks like in the console:

C# code - Trigger and monitor a job every 10 seconds

The code is divided into the following steps:

  • Triggering of the job
  • Reading of the jobID
  • Checking the status of the JobID every 10 seconds as long as the status is not “running”
  • Printing messages depending on the Job status

In case you would like to use the code, here it is:

//Trigger a job
var client = new RestClient("https://api.airbyte.com/v1/jobs");
var request = new RestRequest();
request.AddHeader("accept", "application/json");
request.AddHeader("content-type", "application/json");
request.AddHeader("Authorization", "Bearer {YourApiKey}");
request.AddParameter("application/json", "{\"connectionId\":\"YourConnectionID\",\"jobType\":\"sync\"}", ParameterType.RequestBody);
dynamic response = client.ExecutePost(request);
   	 
//Deserialize response to get the jobID and other data
var apiResponse = JsonConvert.DeserializeObject(response.Content);
string jobStatus = apiResponse.status;
string jobId = apiResponse.jobId;
Console.WriteLine("JobID: " + jobId + " - Status: " + jobStatus);

//Checks every 10 seconds the status of the job
while(jobStatus == "running")
{
    //Gets status of the job
    client = new RestClient("https://api.airbyte.com/v1/jobs/" + jobId);//We add the jobId
    request = new RestRequest();
    request.AddHeader("accept", "application/json");
    request.AddHeader("content-type", "application/json");
    request.AddHeader("Authorization", "Bearer {YourApiKey}");
    dynamic statusResponse = client.ExecuteGet(request);

    //Deserialize response to get the status of the job
    apiResponse = JsonConvert.DeserializeObject(statusResponse.Content);
    jobStatus = apiResponse.status;

    //Waits 10 seconds..
    Thread.Sleep(10000);

    //Prints info in console depending on the status
    if (jobStatus == "running")
    {
            Console.WriteLine("Job running...");
    }
    else
    {
            Console.WriteLine("Job Finished...");
            Console.WriteLine(" ");
            Console.WriteLine("Sync details:");
            Console.WriteLine(" JobId:" + apiResponse.jobId);
            Console.WriteLine(" Status:" + jobStatus);
            Console.WriteLine(" Rows/Bytes Synced:" + apiResponse.rowsSynced + "/" + apiResponse.bytesSynced);
            Console.WriteLine(" Sync Start Time:" + apiResponse.startTime);
            Console.WriteLine(" Sync Last Updated:" + apiResponse.lastUpdatedAt);
            Console.WriteLine(" Sync Duration:" + apiResponse.duration);
     }
}

Once the Sync has finished, we can see that the process created a folder with the name of the Source table Person, and the file containing the data in it:

Az Blob Storage account

.CSV file Az Blob Storage account

Summary

In this tutorial, we learned about how to use Airbyte’s new API to export data from a Postgres table to an Azure Blob Storage account. We went into the details on how to create the API requests and how to make the API calls. All the code used was shared, so you can run your tests more easily.

Similar use cases

Orchestrate data ingestion and transformation pipelines with Dagster

Learn how to ingest and transform Github and Slack data with SQL and Python-based transformations.

Create and Monitor a Data Pipeline Using the Airbyte API

Learn how to export data from a Postgres table to a .CSV file in an Azure Blob Storage by using the new Airbyte API

Postgres Replication: Data Transfer Efficiency

Experience swift Postgres replication, effortlessly transferring data between databases in just 10 minutes.