OpenSearch Sink Connector

Type

sink

Class

io.aiven.kafka.connect.opensearch.OpenSearchSinkConnector

Target System

OpenSearch

Maintainer

Aiven

License

Apache License 2.0

Project

github.com/aiven/opensearch-connector-for-apache-kafka

Download

GitHub Releases

This page documents version 3.1.1. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The OpenSearch Sink Connector consumes records from Kafka topics and indexes them into an OpenSearch cluster.

It is maintained by Aiven as part of the open-source github.com/aiven/opensearch-connector-for-apache-kafka.

OpenSearch is an open-source search and analytics suite that makes it easy to ingest, search, visualize, and analyze data. Common use cases include application search, log analytics, data observability, and data ingestion.

Although the connector is open-sourced, this is not the official connector from Aiven.
The official Aiven connector is not open-sourced nor free.
The connector currently contains a bug which prevents configuration-discovery from happening. The Self-Service UI relies on this functionality to offer suggestions for connector properties. Until the fix is merged, use the JAR file from the git repository.

Features

  • Index Kafka records into OpenSearch

  • Configurable key and schema ignore options

  • Error tolerance with configurable logging and malformed document handling

When to Use

  • You need to index Kafka topic data into OpenSearch for search or analytics.

  • You want to visualize Kafka events in OpenSearch Dashboards.

When NOT to Use

  • Your records are not in JSON format and you do not have a converter configured.

  • Your OpenSearch cluster is not accessible from the Kafka Connect cluster.

Installation

The connector is available from the GitHub Releases.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official sink connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors. TIP: For Infrastructure-as-Code deployment, see the Axual Kafka Connect Boilerplates for Terraform and Management API boilerplates.

Getting Started

Prerequisites

OpenSearch instance

This documentation page is targeted at developers who can run the Axual platform locally. To deploy an OpenSearch cluster and dashboard locally:

  1. Follow the operator documentation to deploy the Axual platform locally with Connect enabled.

  2. Clone the gitlab.com/axual/bitnami-kafka-connect-setup git repository, branch opensearch-axualplatform.

  3. Deploy an OpenSearch single-node cluster and dashboard by running docker-compose up.

  4. Confirm deployment by opening the OpenSearch dashboard. Login using admin : admin.

Axual stream

The Kafka stream this connector will consume must already exist and contain records in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_opensearch_stream.
    The key/value types will be JSON/JSON.

  2. Produce some data as JSON/JSON events to this stream.

  3. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_opensearch_sink.
    The plugin name is io.aiven.kafka.connect.opensearch.OpenSearchSinkConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    connection.url

    connection.username

    admin

    connection.password

    admin

    topics

    my_opensearch_stream

    key.ignore

    true

    schema.ignore

    true

    errors.log.enable

    true

    errors.log.include.messages

    true

    errors.tolerance

    all

    behavior.on.malformed.documents

    warn

    For advanced options, see the official sink connector documentation.

  2. Authorize the my_opensearch_sink sink Connector-Application to consume the my_opensearch_stream stream.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.

Step 4 — Verify

  1. Log back into the OpenSearch dashboard.

  2. Go to the index-pattern creation page.

    • Click the menu button → Management → Stack management → Index Patterns.

  3. Click + Create index pattern

    • Index pattern name: my_opensearch_stream

    • Click Next step, then Create index pattern.

  4. Go to the Discover page.

    • You should now see your records in OpenSearch.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Run docker-compose down to stop and remove the OpenSearch containers.

Known limitations

  • A bug in the connector prevents configuration-discovery — the Self-Service UI cannot auto-suggest property names. Use the JAR from the git repository until the fix is merged.

  • This documentation covers local deployment only — a publicly available OpenSearch endpoint for cloud testing is not described.

Examples

Minimal configuration

{
  "name": "my-opensearch-sink",
  "config": {
    "connector.class": "io.aiven.kafka.connect.opensearch.OpenSearchSinkConnector",
    "connection.url": "https://platform.local:9200",
    "connection.username": "admin",
    "connection.password": "<your-opensearch-password>",
    "topics": "my_opensearch_stream",
    "key.ignore": "true",
    "schema.ignore": "true",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.tolerance": "all",
    "behavior.on.malformed.documents": "warn"
  }
}

License

OpenSearch sink connector is licensed under the Apache License, Version 2.0.