No items found.

Creating a Python Destination (DuckDB)

A guide on how to create a Python Destination (DuckDB). Code snippets linked to a single PR.

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

Should you build or buy your data pipelines?

Download our free guide and discover the best approach for your needs, whether it's building your ELT solution in-house or opting for Airbyte Open Source or Airbyte Cloud.

Download now

This article is a walk-through on creating an Airbyte destination with the DuckDB destination as an example. Besides the Building a Python Destination from our docs, this tutorial will walk you through with a dedicated example and a PR you can follow along and hopefully fill some gaps with more hands-on samples.

What is DuckDB

Before we begin writing the destination connector, you might ask: "Why are we creating a DuckDB destination?" DuckDB is an in-process SQL OLAP database management system with deep support for SQL. It is simple to use as it's a single-file database, the same as SQLite but optimized for analytical workloads. It makes it perfect for local use cases such as building a personal DWH and testing new features locally.

On top, DuckDB is ultra-fast analytical with big data examples such as the NYC taxi example including a 10 Year, 1.5 Billion rows. All of it works speedily on a single laptop. Check out some impressive benchmarks.

Prerequisites before We Start

Let's discuss concepts and prerequisites that are important to know beforehand.

Airbyte works with any source. We implemented a dedicated Airbyte Protocol and, with it, an Airbyte Message. I suggest looking through these to understand more about what we are about to implement.

We have different release stages (Alpha, Beta, and Generally Available). I suggest checking out Airbyte Connector Release Stages to find out more. Think about what programming language you want to write. Currently, we support Java or Python with additional developer kits and provided templates, so you don't need to start from scratch when starting your destination connector. Using Docker under the hood allows us to run any programming language. Still, it’s recommended to use the language that works best with the destination and is most performant. But sometimes you are also constrained by the available language you know. Still, we recommend using the above-mentioned language for the destination or the existing Connector Development Kit (CDK) for source connectors.

Some administrative checks to look for:

  • Find if an open issue or PR to your destination connector exists
  • Try to find an existing connector as an example that is close to your new connector
  • That helps when in doubt to get some samples
  • Check versions and the latest development requirements

Step 1: Choose the Programming Language suited best

Every destination has its native programming language; try to implement that if possible. I chose Python for my DuckDB destination as I have the most experience in it, and Python works well with DuckDB. For sure not the fastest option.

As well, make sure you check the latest requirements. At the time of this tutorial, these are mine:

  1. Java 17
  2. Node 16
  3. Python 3.9
  4. Docker
  5. Jq

Step 2: Scaffold new Destination

When we are clear on the language and the prerequisite, we can generate a brand-new destination. Check if no other Issue or Pull Request (PR) is created; if not, go ahead and create the issue first, following with the PR. I created #17256 for my DuckDB example.

When you are ready to start, check out the latest master branch from our GitHub repo and create a new local branch with `git checkout -b duckdb-destination` (or any other way you manage the GitHub workflow). I use my duckdb throughout my tutorial as an example.

The first important step is to scaffold all needed files and functions with the existing shell script `generate.sh`. This way, the ReadMe, functions and all important files are generated for us, and we can focus on the most critical part, implementing the connector code.

cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.  
./generate.sh

When running the shell script, it will ask you the language.

I chose the Python Destination and added the name of the destination, duckdb. This will generate the folder `connectors/destination-duckdb` with the following files:

tree ../../connectors/destination-duckdb
../../connectors/destination-duckdb
├── Dockerfile
├── README.md
├── build.gradle
├── destination_duckdb
│   ├── __init__.py
│   ├── destination.py
│   └── spec.json
├── integration_tests
│   └── integration_test.py
├── main.py
├── requirements.txt
├── setup.py
└── unit_tests
    └── unit_test.py

3 directories, 11 files

The most important files within the folder `destination_duckdb` with the `spec.json` and `destination.py`.

📝 Creating a PR and pushing it to GitHub: On this part, you could go ahead and create a PR on GitHub and push the first generated files. Even better, create a local commit and start adding your changes.

Set up Python Virtual Environment

If you are using Python, set up an Environment to install all required packages and work within a dedicated environment. This is usually done with `venv` (there are also others such as pyenv, poetry, and many others). Below is the example with venv:

cd airbyte-integrations/connectors/destination-duckdb

# Create a virtual environment in the .venv directory
python -m venv .venv 

# activate the virtualenv
source .venv/bin/activate 

# Install with the "tests" extra which provides test requirements
pip install '.[tests]'

📝 Info: Virtual Env IDE Integration

If you want your IDE to auto-complete and resolve dependencies properly, point it at the python binary in `airbyte-integrations/connectors/destination-duckdb/.venv/bin/python`.

Make sure you set all relevant environment variables. I set these for this tutorial:

BASIC_AUTH_USERNAME=airbyte
BASIC_AUTH_PASSWORD=password

Step 3: Inspect the part we need to implement

We can start implementing our destination logic now that we are all set up. It mostly boils down to two functions `check` and `write`.

Let's analyze these functions before we dive into implementing them in step 4.

Function check()

The first check is executed when a user adds the destination with its credentials and configuration. Airbyte will run this function to check that the connection can be established and runs specific checks you can add here. Also, take note of the detailed description in the comments below.

   def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
        """
        Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
            e.g: if a provided API token or password can be used to connect and write to the destination.

        :param logger: Logging object to display debug/info/error to the logs
            (logs will not be accessible via airbyte UI if they are not passed to this logger)
        :param config: Json object containing the configuration of this destination, content of this json is as specified in
        the properties of the spec.json file

        :return: AirbyteConnectionStatus indicating a Success or Failure

The way we can achieve that is by checking the input `config` parameters and setting up a connection to the database. If all parameters are ok and the connection establishes, we return the `Status.SUCCEEDED`.

Function write()

The write function reads the data passed from the source connector to our destination. You can see below in the function definition that we get a list of Airbyte Messages. This is important to know as Airbyte serialized data into JSON Messages, making it possible to convert any source to any destination.

We also get the ConfiguredAirbyteCatalog, which describes the schema of the messages and how it's persisted in the destination.

   def write(
        self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
    ) -> Iterable[AirbyteMessage]:

        """
        Reads the input stream of messages, config, and catalog to write data to the destination.

        This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing, then the source is given the last state message output from this method as the starting point of the next sync.

        :param config: dict of JSON configuration matching the configuration declared in spec.json
        :param input_messages: The stream of input messages received from the source
        :param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the destination
        
        :return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs

📝 Performance of database <-> database replications: While this is a super powerful feature, it can slow down some connections when the source and destination have structured tables, e.g., reading from a database and writing to a database/warehouse. Improving the speed is something we are working on, so stay tuned!

Step 4: Implement check and write functions

Let's implement the actual code and see how that works for writing our Airbyte Destination.

ℹ️ Follow my changes from DuckDB: You can see all the changes I did for the destination, but also including normalization in my PR # 🎉 New Destination: DuckDB #17494, or the destination folder that we just generated above in the master branch on destination-duckdb.

Configurations in spec.json

Besides the two mentioned functions above, we start with spec.json. Here we define what options are possible for this same connector.

For example, the supported Sync Modes we want to implement for this destination. The DuckDB should support `overwrite` and `append`. Other options are if we support Normalization and custom dbt. If you want to know more about these types, please check the Airbyte Docs; in this tutorial, I assume you know about these already.

It is important to define `required` parameters, e.g., for DuckDB, I need a `destination-path` to define the path for storing the file. Without this, there will be no connection. But you can also define optional parameters by adding them to `properties` without marking them as `required`, which I did for `schema` as I use `main` as a default.

See the full YAML `spec.json` for the DuckDB example:

{
  "documentationUrl": "https://docs.airbyte.io/integrations/destinations/duckdb",
  "supported_destination_sync_modes": ["overwrite", "append"],
  "supportsIncremental": true,
  "supportsDBT": true,
  "supportsNormalization": true,
  "connectionSpecification": {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Destination Duckdb",
    "type": "object",
    "required": ["destination_path"],
    "additionalProperties": false,
    "properties": {
      "destination_path": {
        "type": "string",
        "description": "Path to the destination.ducdb file. The file will be placed inside that local mount. For more information check out our docs",
        "example": "/local/destination.duckdb"
      },
        "schema": {
        "type": "string",
        "description": "database schema, default for duckdb is main",
        "example": "main"
      }

    }
  }
}

Implementation of check()

The function check() in destination.py is the easier of both. In belows function `_get_destination_path` you see we check the `destination_path` that was inputted by the user, to check the magic folder location `/local/<path-i-want-to-store/destination.duckdb`. The path `/local` does not exist and will be automatically mapped to `/tmp/airbyte_local` on your local filesystem.

@staticmethod
def _get_destination_path(destination_path: str) -> str:
    """
    Get a normalized version of the destination path.
    Automatically append /local/ to the start of the path
    """
    if not destination_path.startswith("/local"):
        destination_path = os.path.join("/local", destination_path)

    destination_path = os.path.normpath(destination_path)
    if not destination_path.startswith("/local"):
        raise ValueError(
            f"destination_path={destination_path} is not a valid path." "A valid path shall start with /local or no / prefix"
        )

    return destination_path

Therefore if this prefix hasn't been added by the user, essentially the destination will only be stored within the docker container. Instead of on the filesystem of the running machine, to be accessed locally either by duckdb-cli or an SQL editor such as DBeaver.

🚨 Important: Please ensure that Docker Desktop has access to `/tmp` (and `/private` on a MacOS, as `/tmp` has a symlink that points to `/private`. It will not work otherwise). You allow it with "File sharing" in `Settings -> Resources -> File sharing -> add the one or two above folder` and hit the "Apply & restart" button.

After we've checked the path was set correctly, we will set up a duckdb connection, important here to install and add duckdb to setup.py.

import duckdb

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
    try:
        # parse the destination path
        param_path = config.get("destination_path")
        path = self._get_destination_path(param_path)

        os.makedirs(os.path.dirname(path), exist_ok=True)
        con = duckdb.connect(database=path, read_only=False)
        con.execute("SELECT 1;")

        return AirbyteConnectionStatus(status=Status.SUCCEEDED)

    except Exception as e:
        return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")

Id that connection was established, we sent the succeeded status, and the user will be able to go to the next step, to add either a source connector or configure the connection.

Implementation of write()

The write function is doing the hard work. I followed the sqlite destination as the approach with a single database file is very similar to DuckDB. I also recommend that you find a similar destination where you can follow along or get inspiration on how it can be done.

ℹ️ Again, make sure to check the docs Building a Python Destination for more details

First, we'll do the same again to check if the destination_path is still correct and establish a connection if so.

def write(
    self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:

    streams = {s.stream.name for s in configured_catalog.streams}
    logger.info(f"Starting write to DuckDB with {len(streams)} streams")

    path = config.get("destination_path")
    path = self._get_destination_path(path)

    logger.info(f"Opening DuckDB file at {path}")
    con = duckdb.connect(database=path, read_only=False)

Later, we create the raw tables called `_airbyte_raw_` for each table we get from the source connector through the streams from `configured_catalog`. A stream is a group of related records (a table, API endpoint, files).

 # con.execute("BEGIN TRANSACTION")
    for configured_stream in configured_catalog.streams:

        name = configured_stream.stream.name
        table_name = f"_airbyte_raw_{name}"
        if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
            # delete the tables
            logger.info(f"--- {streams}")
            query = """
            DROP TABLE IF EXISTS {}
            """.format(
                table_name
            )
            con.execute(query)
        # create the table if needed
        query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            _airbyte_ab_id TEXT PRIMARY KEY,
            _airbyte_emitted_at TEXT,
            _airbyte_data TEXT
        )
        """

        con.execute(query)

And then we insert the data with an SQL command `INSERT INTO` for each stream. Here we write the serialized JSON blob; thus, we call the tables raw tables.

In this example, we check which `message.type` each message. Depending on the type, we thread them differently. For instance, if the type is an AirbyteStateType, we execute the insert query and return the message. In the case of an RecordType which contains the actual data, we add it to the buffer.

   buffer = defaultdict(list)

    for message in input_messages:

        if message.type == Type.STATE:
            for stream_name in buffer.keys():

                logger.info(f"4---mesage: {message}")
                query = """
                INSERT INTO {table_name}
                VALUES (?,?,?)
                """.format(
                    table_name=f"_airbyte_raw_{stream_name}"
                )
                logger.info(f"query: {query}")

                con.executemany(query, buffer[stream_name])

            con.commit()
            buffer = defaultdict(list)
            yield message

        elif message.type == Type.RECORD:
            data = message.record.data
            stream = message.record.stream
            if stream not in streams:
                logger.debug(f"Stream {stream} was not present in configured streams, skipping")
                continue

            # add to buffer
            buffer[stream].append((str(uuid.uuid4()), datetime.datetime.now().isoformat(), json.dumps(data)))
        else:
            logger.info(f"Message type {message.type} not supported, skipping")

In the end, we insert any remaining messages:

 for stream_name in buffer.keys():

        query = """
        INSERT INTO {table_name}
        VALUES (?,?,?)
        """.format(
            table_name=f"_airbyte_raw_{stream_name}"
        )

        con.executemany(query, buffer[stream_name])
        con.commit()

As this is a fairly simple example as we work with a local file, it showcases how to write the main part of the destination function to consume data from Airbyte provided by the source connector, and write it to the destination.

📝 How `executemany` works: You can insert several rows using prepared statements that can be passed with an array of data: `con.executemany("INSERT INTO items VALUES (?, ?, ?)", [['chainsaw', 500, 10], ['iphone', 300, 2]] )`

Step 5: Testing connector

Let's explore how you'd test the destination after implementing the connector's first version. There are three options you can choose from; we start with the native python way initially, which does not require any Docker or involvement of Airbyte itself yet.

Locally with Python and a test file

We can mock the data we're receiving from the source connector and Airbyte with a simple JSON file, which serves as an example of data getting passed.

Having set up the virtual environment from step 2, we can activate this and install required PyPI packages (if not already) in `setup.py` which for my case is duckdb :

source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r requirements.txt

Now let's check if our connector is working by defining needed mock data stored in integration_tests/messages.jsonl and looks like this for my simple tests:

{
  "type": "RECORD",
  "record": {
    "stream": "airbyte_acceptance_table",
    "emitted_at": 1664705198575,
    "data": {
      "column1": "test",
      "column2": 222,
      "column3": "2022-06-20T18:56:18",
      "column4": 33.33,
      "column5": [
        1,
        2,
        null
      ]
    }
  }
}

To start a test solely in Python without docker and without the platform of Airbyte you would start something like this.

📝 To run that successfully: This only works if I temporarily uncomment the function `_get_destination_path` as in `destination.py`. Because these functions ensure the path starts with `/local`, which does not exist on my local machine. But it is required later for running within docker to map the path correctly.

python main.py spec
python main.py check --config integration_tests/config.json
python main.py discover --config integration_tests/config.json
cat integration_tests/messages.jsonl| python main.py write --config integration_tests/config.json --catalog integration_tests/configured_catalog.json

---output
{"type": "SPEC", "spec": {"documentationUrl": "https://docs.airbyte.io/integrations/destinations/duckdb", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Destination Duckdb", "type": "object", "required": ["destination_path"], "additionalProperties": false, "properties": {"destination_path": {"type": "string", "description": "Path to the destination.ducdb file. The file will be placed inside that local mount. For more information check out our docs", "example": "/local/destination.duckdb"}, "schema": {"type": "string", "description": "database schema, default for duckdb is main", "example": "main"}}}, "supportsIncremental": true, "supportsNormalization": true, "supportsDBT": true, "supported_destination_sync_modes": ["overwrite", "append"]}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
usage: main.py [-h] {spec,check,write} ...
main.py: error: argument command: invalid choice: 'discover' (choose from 'spec', 'check', 'write')
{"type": "LOG", "log": {"level": "INFO", "message": "Begin writing to the destination..."}}
{"type": "LOG", "log": {"level": "INFO", "message": "Starting write to DuckDB with 1 streams"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Opening DuckDB file at /tmp/duckdb/destination.duckdb"}}
{"type": "LOG", "log": {"level": "INFO", "message": "--- {'airbyte_acceptance_table'}"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Writing complete."}}

And you can check if the file has been successfully created:

ls -ltr /tmp/duckdb
total 2072
-rw-r--r--  1 sspaeti  wheel  1060864 Jan 17 15:18 destination.duckdb

Docker and Gradle

In the next step, we use Docker and Gradle to test and build the destination. Gradle is the Java build tool to support building, testing unit and integration tests, and many other features we use in our repo.

Let's define the Dockerfile:

FROM python:3.9.11 as base

# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code

# upgrade pip to the latest version
RUN apt-get update && apt-get -y upgrade \
    && pip install --upgrade pip

COPY setup.py ./
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .
# build a clean environment
FROM base
# RUN conda install -c conda-forge python-duckdb
WORKDIR /airbyte/integration_code

# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone

#adding duckdb manually (outside of setup.py - lots of errors)
RUN pip install duckdb

# copy payload code only
COPY main.py ./
COPY destination_duckdb ./destination_duckdb

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-duckdb

📝 Installing DuckDB PiPy in Dockerfile: There were some problems with installing DuckDB, that's why the duckdb is added manually here besides being part of `setup.py`. This might be corrected or removed in the future.

Now we build the docker images either with docker:

#build destination
cd airbyte-integrations/connectors/destination-duckdb
docker build . -t airbyte/destination-duckdb:dev --no-cache

Or Gradle:

#build destination
cd airbyte-integrations/connectors/destination-duckdb
docker build . -t airbyte/destination-duckdb:dev --no-cache

ℹ️ I'm using no-cache to ensure changes get applied, but that will take substantially longer to build. Consider removing that when you know, you won’t change critical parts.

Acceptance, Unit, and Integration Tests

Besides building and testing our implementations, we also write acceptance tests to try specific cases we know might go wrong, or we want to ensure certain behaviors also consist of the connector's evolution.

The better the tests, the more confident the connector does what it is supposed to be, and we can upgrade and add features with more confidence.

Unfortunately, we do not have acceptance tests yet for the Python Destination case. But we have unit and integration tests.

To test unit tests you can run:

./gradlew :airbyte-integrations:connectors:destination-duckdb:unitTest

And likewise with integration tests you can run:


./gradlew :airbyte-integrations:connectors:destination-duckdb:integrationTest

📝 The outputs and results can be seen in the PR in the first comment under "Tests".

Airbyte Platform Build

Building and running it including Airbyte means that our docker image will get kicked off by the Airbyte when a sync gets triggered. Instead of testing our destination only, we test end to end.

Then we build the whole airbyte platform:

SUB_BUILD=PLATFORM ./gradlew build

and startup airbyte with dev:

airbyte-server                    |
airbyte-server                    |     ___    _      __          __
airbyte-server                    |    /   |  (_)____/ /_  __  __/ /____
airbyte-server                    |   / /| | / / ___/ __ \/ / / / __/ _ \
airbyte-server                    |  / ___ |/ / /  / /_/ / /_/ / /_/  __/
airbyte-server                    | /_/  |_/_/_/  /_.___/\__, /\__/\___/
airbyte-server                    |                     /____/
airbyte-server                    |         : airbyte-server :
airbyte-server                    | --------------------------------------------------
airbyte-server                    |  Will soon be available at http://localhost:8000/
airbyte-server                    | --------------------------------------------------

Going to localhost:8000 you should see a sign in pop up:

And after that seeing the front page of Airbyte.

In the destinations you should see now our new connector pop up:

You can also add any image maunually by going to the `Settings -> Destinations -> New connector`:

Creating a Sync to Test

If we now go ahead and create a sync (I used the simple Faker source) and mapped it to my new DuckDB destination.

If you run the sync and all works, you should see a `Succeeded` message and that the DuckDB database file is created at the defined path.

If you get any errors, please check the troubleshooting chapter after the conclusion.

Update Docs

If all tests are green and you are about to finish, make sure you update generated `docs/integrations/destinations/duckdb.md` for your connector. The skeleton is generated and you can add the missing pieces.

This is the place where you describe what you implemented and show less technical people, what your connector implementation contains, and what not.

Format your Code

Before you check all the checks running on GitHub and ask someone to review, make sure you format your code with Gradle like this:

Step 6: Add Normalization

In addition, you can activate Normalization for a destination. The Normalization is a lesser-known but powerful feature of Airbyte.

In addition, you can activate Normalization for a destination. The Normalization runs dbt as part of the Airbyte and normalizes the third normal form (3NF) to unnest "nested" JSON into tables and columns essentially. Imagen data from GitHub API are transformed into tabular tables related to Foreign Keys.

🚨 This is a big task and normally isn't done outside of Airbyte: Normalization needs a lot of experience with Airbyte and also how dbt works. Also, check Basic Normalization and Adding Normalization to a Connector in our docs to learn more about it.

As this goes beyond the scope of this simple tutorial, let's do a separate one. But still, you can see all the code changes necessary for DuckDB in the PR under Files changed and see what had to be done.

But to activate Normalization, you need to spec.json and turn on `supportsNormalization` and `supportDBT` by setting them to `true`. With the newest version, this has been moved to destination_definitions.yaml and destination_specs.yaml.

`supportsNormalization` is a boolean that indicates if this connector supports basic normalization via DBT. If true, `supportsDBT` must also be true.

`supportsDBT` is a boolean that indicates whether this destination is compatible with DBT. If set to true, the user can define custom DBT transformations that run on this destination after each successful sync. This must be true if `supportsNormalization` is set to `true`.

To build the docker image and the Gradle command, in case you want to test it for DuckDB, you can run:

#build normalization
cd airbyte-integrations/connectors/destination-duckdb
docker build . -t airbyte/normalization-duckdb:0.1.0 -t airbyte/normalization-duckdb:0.2.25 --no-cache

cd ../../..
./gradlew :airbyte-integrations:bases:base-normalization:airbyteDockerDuckDb && docker tag airbyte/normalization-duckdb:dev airbyte/normalization-duckdb:0.2.25

And the same for running the integration tests either locally with PyTest:

cd airbyte-integrations/bases/base-normalization
NORMALIZATION_TEST_TARGET=duckdb  pytest integration_tests/test_normalization.py 

Or with Gradle from the root folder:

NORMALIZATION_TEST_TARGET=duckdb ./gradlew :airbyte-integrations:bases:base-normalization:integrationTest`

Don't worry if that was too fast; it wasn't the idea that you understand each step for Normalization at this point. If you want to implement Normalization for your destination, don't hesitate to reach out to us so that we can support you with it.

Wrapping up

Congratulations, and thank you, you managed to write your first destination and contribute to a big open-source project. After you have done everything, it’s time to wait for a code review and correct things that can be improved or need correction.

In case you had trouble or found yourself in a position of help, I left a chapter below to troubleshoot some of the common errors I encountered while developing the DuckDB. Also, be aware that you can always ask us in the GitHub issue/PR itself, ask on slack in the airbyte-help channel, or search and ask in our Forum.

Suppose you want to know more about how Airbyte synchronizes data between a source (API, DB, file, etc.…) to a destination (DB, warehouse, cloud storage, etc.); we have an in-depth article about How we scale workflow orchestration with Temporal which goes into details of how Airbyte internally orchestrates the different activities.

References and Troubleshooting

Resetting Airbyte

Sometimes, you get stuck in a strange state. You may have docker volumes that were not removed or created, or the image needs to have what you expect. If that happens, it's a good thing to restart Airbyte from scratch.

Note that this will reset and delete existing syncs, configured sources, or destinations. Make sure to save them with Octavia CLI or save users and configurations in a separate place so you can manually add them again.

To fully reset Airbyte, you must delete the docker volumes associated with Airbyte. 

This is where data is stored. Assuming that you are running Airbyte by running `docker-compose up`, then what you need to do is:

  • Turn off Airbyte completely: `docker-compose down -v` (-v will remove volumes and you'll start from an empty db)
  • Turn Airbyte back on: `docker-compose up`

More on On Deploying | Airbyte Documentation.

Error Task :airbyte-config:init:test FAILED

> Task :airbyte-config:init:test FAILED

SpecFormatTest > testOnAllExistingConfig() FAILED
    java.lang.AssertionError:
    Expecting empty but was: [java.lang.RuntimeException: Fail JsonSecretsProcessor validation
        at io.airbyte.config.init.SpecFormatTest.lambda$testOnAllExistingConfig$3(SpecFormatTest.java:45)
        at org.assertj.core.api.iterable.ThrowingExtractor.apply(ThrowingExtractor.java:36)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        ...(96 remaining lines not displayed - this can be changed with Assertions.setMaxStackTraceElementsDisplayed)]
        at io.airbyte.config.init.SpecFormatTest.testOnAllExistingConfig(SpecFormatTest.java:54)

If that happens, you can temporarily change `destination_definitions.yaml` from `0.1.0` to `dev`:

Most probably, you will get the next error right after `:airbyte-config:specs:generateSeedConnectorSpecs`, which is expected, see in the next chapter.

Error `:airbyte-config:specs:generateSeedConnectorSpecs`

This error is expected as we didn't upload the image to the docker hub yet:

Task :airbyte-config:specs:generateSeedConnectorSpecs FAILED
2023-01-10 20:16:05,087 main INFO Loading mask data from '/seed/specs_secrets_mask.yaml
2023-01-10 20:16:05,091 main ERROR Unable to load mask data from '/seed/specs_secrets_mask.yaml': null.
2023-01-10 19:16:05 INFO i.a.c.EnvConfigs(getEnvOrDefault):1173 - Using default value for environment variable SPEC_CACHE_BUCKET: 'io-airbyte-cloud-spec-cache'
2023-01-10 19:16:05 INFO i.a.c.s.SeedConnectorSpecGenerator(run):78 - Updating seeded SOURCE definition specs if necessary...
2023-01-10 19:16:05 INFO i.a.c.s.SeedConnectorSpecGenerator(run):90 - Finished updating /Users/sspaeti/Documents/git/work/airbyte.git/simon/duckdb-destination/airbyte-config/init/src/main/resources/seed/source_specs.yaml
2023-01-10 19:16:05 INFO i.a.c.s.SeedConnectorSpecGenerator(run):78 - Updating seeded DESTINATION definition specs if necessary...
2023-01-10 19:16:05 INFO i.a.c.s.SeedConnectorSpecGenerator(fetchSpecFromGCS):119 - Seeded spec not found for docker image airbyte/destination-duckdb:dev - fetching from GCS bucket io-airbyte-cloud-spec-cache...
Exception in thread "main" java.lang.RuntimeException: Failed to fetch valid spec file for docker image airbyte/destination-duckdb:dev from GCS bucket io-airbyte-cloud-spec-cache. This will continue to fail until the connector change has been approved and  published. See [https://github.com/airbytehq/airbyte/tree/master/docs/connector-development# publishing-a-connector](https://github.com/airbytehq/airbyte/tree/master/docs/connector-development# publishing-a-connector) for more details.
        at io.airbyte.config.specs.SeedConnectorSpecGenerator.lambda$fetchSpecFromGCS$4(SeedConnectorSpecGenerator.java:121)
        at java.base/java.util.Optional.orElseThrow(Optional.java:403)
        at io.airbyte.config.specs.SeedConnectorSpecGenerator.fetchSpecFromGCS(SeedConnectorSpecGenerator.java:121)
        at io.airbyte.config.specs.SeedConnectorSpecGenerator.lambda$fetchUpdatedSeedSpecs$3(SeedConnectorSpecGenerator.java:114)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
        at io.airbyte.config.specs.SeedConnectorSpecGenerator.fetchUpdatedSeedSpecs(SeedConnectorSpecGenerator.java:115)
        at io.airbyte.config.specs.SeedConnectorSpecGenerator.run(SeedConnectorSpecGenerator.java:83)
        at io.airbyte.config.specs.SeedConnectorSpecGenerator.main(SeedConnectorSpecGenerator.java:74)

M1 Issues

If you have an M1 or other problems, see Developing Locally | Airbyte Documentation.

Local file share cannot be accessed

As mentioned in the article, make sure that you run above with:

Please make sure that Docker Desktop has access to `/tmp` (and `/private` on a MacOS, as `/tmp` has a symlink that points to `/private`. It will not work otherwise). You allow it with "File sharing" in `Settings -> Resources -> File sharing -> add the one or two above folder` and hit the "Apply & restart" button.

Troubleshooting Normalization

More details on troubleshooting with copying data and code out of the running container can be found in Demo Hour: Debugging Normalization. Even though that's about the normalization part of Airbyte, it can still be helpful to understand more.

  • 1:07 Setting up a normalized Airbyte sync.
  • 4:08 Examining the normalization failure and getting the tables to sync properly.
  • 9:18 Looking into the destination table and examining the failure.
  • 12:41 Building and using the dev version of the connector.
  • 18:45 Questions.

Similar use cases

How to write an Airbyte Python Destination: DuckDB

A guide on how to create a Python Destination (DuckDB). Code snippets linked to a single PR.