ElasticSearch Sink Connector

Type

sink

Class

com.ibm.eventstreams.connect.elasticsink

Target System

ElasticSearch

Maintainer

IBM

License

Apache License 2.0

Project

github.com/ibm-messaging/kafka-connect-elastic-sink

Download

GitHub Releases

This page documents version 1.0.3. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The ElasticSearch Sink Connector consumes records from Kafka topics and indexes them into an ElasticSearch cluster.

It is maintained by IBM as part of the open-source github.com/ibm-messaging/kafka-connect-elastic-sink.

ElasticSearch is an open-source search and analytics suite that makes it easy to ingest, search, visualize, and analyze data. Common use cases include application search, log analytics, data observability, and data ingestion.

Features

  • Index Kafka records into ElasticSearch

  • Configurable document builders (JSON, default)

  • Configurable identifier builders (default key-based or record field-based)

  • TLS truststore support for secure connections

  • Error tolerance with configurable logging

When to Use

  • You need to index Kafka topic data into ElasticSearch for search or analytics.

  • You want to visualize Kafka events in Kibana.

When NOT to Use

  • Your records are not in JSON format and you do not have a converter configured.

  • Your ElasticSearch cluster is not accessible from the Kafka Connect cluster.

Installation

The connector is available from the GitHub Releases.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official sink connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors. TIP: For Infrastructure-as-Code deployment, see the Axual Kafka Connect Boilerplates for Terraform and Management API boilerplates.

Getting Started

Prerequisites

ElasticSearch instance

This documentation page is targeted at developers who can run the Axual platform locally. To deploy an ElasticSearch cluster and dashboard locally:

  1. Clone the gitlab.com/axual/bitnami-kafka-connect-setup git repository, branch elasticsearch.

  2. Deploy the ElasticSearch multi-node cluster and dashboard by running docker-compose up kibana.

  3. Confirm the deployment was successful by opening the ElasticSearch dashboard.
    Login using elastic : qw34er! as credentials.

Axual stream

The Kafka stream this connector will consume must already exist and contain records in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_elasticsearch_stream.
    The key/value types will be JSON/JSON.

  2. Produce some data as JSON/JSON events to this stream.

  3. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my.elasticsearch.sink.
    The plugin name is com.ibm.eventstreams.connect.elasticsink.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    connector.class

    com.ibm.eventstreams.connect.elasticsink.ElasticSinkConnector

    errors.log.include.messages

    true

    es.user.name

    elastic

    es.index.builder

    com.ibm.eventstreams.connect.elasticsink.builders.DefaultIndexBuilder

    errors.log.enable

    true

    key.converter

    org.apache.kafka.connect.storage.StringConverter

    key.converter.schemas.enable

    false

    value.converter.schemas.enable

    false

    name

    elassink-dev

    errors.tolerance

    all

    es.identifier.builder

    com.ibm.eventstreams.connect.elasticsink.builders.DefaultIdentifierBuilder

    es.document.builder

    com.ibm.eventstreams.connect.elasticsink.builders.JsonDocumentBuilder

    es.connection

    es01:9200

    es.password

    qw34er!

    value.converter

    org.apache.kafka.connect.json.JsonConverter

    es.tls.truststore.location

    /etc/tls/elastic/elastic.jks
    A platform operator must make this truststore available on every Connect node.

    topics

    my_elasticsearch_stream

    environment

    dev

    es.tls.truststore.password

    qw34er

    For advanced options, see the official sink connector documentation.

  2. [Optional] To use named IDs based on a record field, change es.identifier.builder and add:

    es.identifier.builder

    com.ibm.eventstreams.connect.elasticsink.builders.KeyIdentifierBuilder

    transforms

    InsertKey,extractKey

    transforms.InsertKey.type

    org.apache.kafka.connect.transforms.ValueToKey

    transforms.InsertKey.fields

    id

    transforms.extractKey.type

    org.apache.kafka.connect.transforms.ExtractField$Key

    transforms.extractKey.field

    id

    Example message with ID:

    {
        "id": "10",
        "name": "john",
        "age": 28
    }

    Make sure to use unique IDs for every event you produce on the topic.

  3. Authorize the my.elasticsearch.sink sink Connector-Application to consume the my_elasticsearch_stream stream.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.

Step 4 — Verify

  1. Log back into the ElasticSearch dashboard.

  2. Go to the index-pattern creation page.

    • Click the menu button → Management → Stack management → Index Patterns.

  3. Click + Create index pattern

    • Index pattern name: my_elasticsearch_stream

    • Click Next step, then Create index pattern.

  4. Go to the Discover page.

    • You should now see your records in ElasticSearch.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Run docker-compose down to stop and remove the ElasticSearch containers.

Known limitations

  • TLS truststore must be made available on every Connect node by a platform operator before starting the connector.

  • This documentation covers local deployment only — a publicly available ElasticSearch endpoint for cloud testing is not described.

Examples

Minimal configuration

{
  "name": "my-elasticsearch-sink",
  "config": {
    "connector.class": "com.ibm.eventstreams.connect.elasticsink.ElasticSinkConnector",
    "topics": "my_elasticsearch_stream",
    "es.connection": "es01:9200",
    "es.user.name": "elastic",
    "es.password": "<your-elasticsearch-password>",
    "es.tls.truststore.location": "/etc/tls/elastic/elastic.jks",
    "es.tls.truststore.password": "<your-truststore-password>",
    "es.document.builder": "com.ibm.eventstreams.connect.elasticsink.builders.JsonDocumentBuilder",
    "es.index.builder": "com.ibm.eventstreams.connect.elasticsink.builders.DefaultIndexBuilder",
    "es.identifier.builder": "com.ibm.eventstreams.connect.elasticsink.builders.DefaultIdentifierBuilder",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true"
  }
}

License

ElasticSearch sink connector is licensed under the Apache License, Version 2.0.