ElasticSearch Sink Connector
Type |
sink |
Class |
|
Target System |
ElasticSearch |
Maintainer |
IBM |
License |
Apache License 2.0 |
Project |
|
Download |
|
This page documents version 1.0.3. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support. |
Description
The ElasticSearch Sink Connector consumes records from Kafka topics and indexes them into an ElasticSearch cluster.
It is maintained by IBM as part of the open-source github.com/ibm-messaging/kafka-connect-elastic-sink.
ElasticSearch is an open-source search and analytics suite that makes it easy to ingest, search, visualize, and analyze data. Common use cases include application search, log analytics, data observability, and data ingestion.
Features
-
Index Kafka records into ElasticSearch
-
Configurable document builders (JSON, default)
-
Configurable identifier builders (default key-based or record field-based)
-
TLS truststore support for secure connections
-
Error tolerance with configurable logging
When to Use
-
You need to index Kafka topic data into ElasticSearch for search or analytics.
-
You want to visualize Kafka events in Kibana.
When NOT to Use
-
Your records are not in JSON format and you do not have a converter configured.
-
Your ElasticSearch cluster is not accessible from the Kafka Connect cluster.
Installation
The connector is available from the GitHub Releases.
-
Navigate to the releases page and select the version matching your Kafka Connect installation.
-
Download the JAR file.
For installation steps, see Installing Connector Plugins.
Configuration
For the complete configuration reference, see the official sink connector documentation.
| To configure a connector in Axual Self-Service, see Starting Connectors. TIP: For Infrastructure-as-Code deployment, see the Axual Kafka Connect Boilerplates for Terraform and Management API boilerplates. |
Getting Started
Prerequisites
ElasticSearch instance
This documentation page is targeted at developers who can run the Axual platform locally. To deploy an ElasticSearch cluster and dashboard locally:
-
Clone the
gitlab.com/axual/bitnami-kafka-connect-setupgit repository, branchelasticsearch. -
Deploy the ElasticSearch multi-node cluster and dashboard by running
docker-compose up kibana. -
Confirm the deployment was successful by opening the ElasticSearch dashboard.
Login usingelastic:qw34er!as credentials.
Axual stream
The Kafka stream this connector will consume must already exist and contain records in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
The name of the stream will bemy_elasticsearch_stream.
The key/value types will beJSON/JSON. -
Produce some data as
JSON/JSONevents to this stream. -
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy.elasticsearch.sink.
The plugin name iscom.ibm.eventstreams.connect.elasticsink.
If a plugin isn’t available, ask a platform operator to install plugins.
Step 2 — Configure the connector
-
Provide the following minimal configuration:
connector.classcom.ibm.eventstreams.connect.elasticsink.ElasticSinkConnectorerrors.log.include.messagestruees.user.nameelastices.index.buildercom.ibm.eventstreams.connect.elasticsink.builders.DefaultIndexBuildererrors.log.enabletruekey.converterorg.apache.kafka.connect.storage.StringConverterkey.converter.schemas.enablefalsevalue.converter.schemas.enablefalsenameelassink-deverrors.tolerancealles.identifier.buildercom.ibm.eventstreams.connect.elasticsink.builders.DefaultIdentifierBuilderes.document.buildercom.ibm.eventstreams.connect.elasticsink.builders.JsonDocumentBuilderes.connectiones01:9200es.passwordqw34er!value.converterorg.apache.kafka.connect.json.JsonConverteres.tls.truststore.location/etc/tls/elastic/elastic.jks
A platform operator must make this truststore available on every Connect node.topicsmy_elasticsearch_streamenvironmentdeves.tls.truststore.passwordqw34erFor advanced options, see the official sink connector documentation.
-
[Optional] To use named IDs based on a record field, change
es.identifier.builderand add:es.identifier.buildercom.ibm.eventstreams.connect.elasticsink.builders.KeyIdentifierBuildertransformsInsertKey,extractKeytransforms.InsertKey.typeorg.apache.kafka.connect.transforms.ValueToKeytransforms.InsertKey.fieldsidtransforms.extractKey.typeorg.apache.kafka.connect.transforms.ExtractField$Keytransforms.extractKey.fieldidExample message with ID:
{ "id": "10", "name": "john", "age": 28 }Make sure to use unique IDs for every event you produce on the topic.
-
Authorize the
my.elasticsearch.sinksink Connector-Application to consume themy_elasticsearch_streamstream.
Step 4 — Verify
-
Log back into the ElasticSearch dashboard.
-
Go to the index-pattern creation page.
-
Click the menu button → Management → Stack management → Index Patterns.
-
-
Click + Create index pattern
-
Index pattern name:
my_elasticsearch_stream -
Click Next step, then Create index pattern.
-
-
Go to the Discover page.
-
You should now see your records in ElasticSearch.
-
Known limitations
-
TLS truststore must be made available on every Connect node by a platform operator before starting the connector.
-
This documentation covers local deployment only — a publicly available ElasticSearch endpoint for cloud testing is not described.
Examples
Minimal configuration
{
"name": "my-elasticsearch-sink",
"config": {
"connector.class": "com.ibm.eventstreams.connect.elasticsink.ElasticSinkConnector",
"topics": "my_elasticsearch_stream",
"es.connection": "es01:9200",
"es.user.name": "elastic",
"es.password": "<your-elasticsearch-password>",
"es.tls.truststore.location": "/etc/tls/elastic/elastic.jks",
"es.tls.truststore.password": "<your-truststore-password>",
"es.document.builder": "com.ibm.eventstreams.connect.elasticsink.builders.JsonDocumentBuilder",
"es.index.builder": "com.ibm.eventstreams.connect.elasticsink.builders.DefaultIndexBuilder",
"es.identifier.builder": "com.ibm.eventstreams.connect.elasticsink.builders.DefaultIdentifierBuilder",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
License
ElasticSearch sink connector is licensed under the Apache License, Version 2.0.