ElasticSearch sink Connector, version 1.0.1

ElasticSearch

ElasticSearch is an open-source search and analytics suite that makes it easy to ingest, search, visualize, and analyze data.
Common OpenSearch use cases are application search, log analytics, data observability, data ingestion, and more.

Deploying an ElasticSearch instance

This documentation page is a work in progress and as such, we don’t have a solution for obtaining a publicly available service exposing the ElasticSearch API, for you to test your Connector against.

For developers who have the means to run the axual platform locally, we’ll outline the steps to deploy ElasticSearch locally.

  1. Clone the gitlab.com/axual/bitnami-kafka-connect-setup git repository → branch elasticsearch .

  2. Deploy an Elasticsearch multi-node cluster and dashboard by running docker-compose up kibana.

  3. Confirm the deployment was successful by opening the ElasticSearch dashboard.
    Login using elastic : qw34er! as credentials.
    You’re done for now. You’ll get back to the dashboard after starting the Connector.

Configuring a new sink Connector

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_elasticsearch_stream.
    The key/value types will be JSON/JSON.

  2. Produce some data as JSON/JSON events to this stream.

  3. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my.elasticsearch.sink.
    The plugin name is com.ibm.eventstreams.connect.elasticsink.
    If a plugin isn’t available, ask a platform operator to install plugins.

  4. Provide the following minimal configuration in order to connect to the previously configured ElasticSearch instance.
    For advanced configuration, see the official sink connector documentation.

    connector.class

    com.ibm.eventstreams.connect.elasticsink.ElasticSinkConnector

    errors.log.include.messages

    true

    es.user.name

    elastic

    es.index.builder

    com.ibm.eventstreams.connect.elasticsink.builders.DefaultIndexBuilder

    errors.log.enable

    true

    key.converter

    org.apache.kafka.connect.storage.StringConverter

    key.converter.schemas.enable

    false

    value.converter.schemas.enable

    false

    name

    elassink-dev

    errors.tolerance

    all

    es.identifier.builder

    com.ibm.eventstreams.connect.elasticsink.builders.DefaultIdentifierBuilder

    es.document.builder

    com.ibm.eventstreams.connect.elasticsink.builders.JsonDocumentBuilder

    es.connection
    The hostname should be configured as a SAN of the host certificate

    es01:9200

    es.password

    qw34er!

    value.converter

    org.apache.kafka.connect.json.JsonConverter

    es.tls.truststore.location
    A platform operator has to make this truststore available on every node of the Connect-Cluster, and provide you with a path to use

    /etc/tls/elastic/elastic.jks

    topics

    my_elasticsearch_stream

    environment

    dev

    es.tls.truststore.password

    qw34er

  5. [Optional] Adding ID information for the connector:
    If you would like to have named ids in the elasticsearch index, a change should be made to the connector config.
    The value of the es.identifier.builder property should be com.ibm.eventstreams.connect.elasticsink.builders.KeyIdentifierBuilder. In that case the document id of the send in message is used as a identifier. Add the following items to the above displayed configuration.

    es.identifier.builder

    com.ibm.eventstreams.connect.elasticsink.builders.KeyIdentifierBuilder

    transforms

    InsertKey,extractKey

    transforms.InsertKey.type

    org.apache.kafka.connect.transforms.ValueToKey

    transforms.InsertKey.fields

    id

    transforms.extractKey.type

    org.apache.kafka.connect.transforms.ExtractField$Key

    transforms.extractKey.field

    id

    An example message with id is displayed below

    {
        "id": "10",
        "name": "john",
        "age": 28
    }

    Make sure to use unique IDs for event you produce on the topic. The result in Elastic Search would look like the picture displayed below.

    Something something elastic
  6. Authorize the my.elasticsearch.sink sink Connector-Application to consume the my_elasticsearch_stream stream.

  7. You can now start the sink Connector-Application.

  8. Log back into the ElasticSearch dashboard.

  9. Go to the index-pattern creation page

    • Click the menu button (three stripes) in the top-left corner of the screen → Management → Stack management

    • Click Index Patterns (although it may look like it’s already selected)

  10. Click + Create index pattern

    • Index pattern name: my_elasticsearch_stream

    • Click Next step

  11. Click Create index pattern

  12. Go to the Discover page.

    • Click the menu button (three stripes) in the top-left corner of the screen → ElasticSearch Dashboards → Discover

  13. You should now see your records in ElasticSearch

Cleanup

Once you are done, stop the Connector-Application and cleanup the unused axual resources.
Run docker-compose down in the same directory you ran the up command in order to stop and remove the ElasticSearch containers.

License

ElasticSearch sink-Connector is licensed under the Apache License, Version 2.0.

Source code

The source code for the Connect-Plugin can be found at github.com/ibm-messaging/kafka-connect-elastic-sink.