Azure Cosmos DB Sink Connector

Type

sink

Class

com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector

Target System

Azure Cosmos DB

Maintainer

Microsoft

License

MIT License

Project

github.com/microsoft/kafka-connect-cosmosdb

Download

GitHub Releases

This page documents version 1.19.0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The Azure Cosmos DB Sink Connector consumes records from Kafka topics and writes them into an Azure Cosmos DB container.

It is maintained by Microsoft as part of the open-source github.com/microsoft/kafka-connect-cosmosdb.

Features

  • Write Kafka records into Azure Cosmos DB containers

  • Topic-to-container mapping via configurable topicmap

  • Supports JSON record formats

When to Use

  • You need to write Kafka records into an Azure Cosmos DB container as part of a data pipeline.

  • You want to materialise a Kafka topic as a Cosmos DB document collection.

When NOT to Use

Installation

The connector is available from the GitHub Releases.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors. TIP: For Infrastructure-as-Code deployment, see the Axual Kafka Connect Boilerplates for Terraform and Management API boilerplates.

Getting Started

Prerequisites

Azure Cosmos DB account

  • You already have an Azure Cosmos DB Account.

  • You have an Azure Cosmos DB container (e.g. cosmosdb-testing).

  • You have access to the master key (Azure Cosmos DB primary key).

Axual stream

The Kafka stream this connector will consume must already exist and contain records in Axual Self-Service.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be hotels.
    The key/value types will be JSON/JSON.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my-custom-cosmosdb-instance.
    The plugin name is com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    key.converter

    org.apache.kafka.connect.json.JsonConverter

    topics

    hotels

    value.converter

    org.apache.kafka.connect.json.JsonConverter

    connect.cosmos.connection.endpoint

    connect.cosmos.master.key

    Insert Azure CosmosDB Master Key

    connect.cosmos.containers.topicmap

    Format is topic#containername. Example: hotels#cosmosdbtesting

    connect.cosmos.databasename

    Insert name of database. Example: cosmosdbtesting

    For advanced options, see the official connector documentation.

  2. Authorize the my-custom-cosmosdb-instance sink Connector-Application to consume the hotels stream.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.

Step 4 — Verify

Produce some JSON/JSON events to this stream and check the Azure container to see the documents written by the connector.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Return to Azure and delete your service account and container.

Known limitations

  • The connector requires JSON-formatted Kafka records.

Examples

Minimal configuration

{
  "name": "my-cosmosdb-sink",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "hotels",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connect.cosmos.connection.endpoint": "https://my-cosmos-instance.documents.azure.com:443/",
    "connect.cosmos.master.key": "<your-cosmos-master-key>",
    "connect.cosmos.containers.topicmap": "hotels#cosmosdbtesting",
    "connect.cosmos.databasename": "cosmosdbtesting"
  }
}

License

Azure Cosmos DB sink connector is licensed under the MIT license.