HTTP Sink Connector

Type

sink

Class

io.axual.connect.plugins.http.HttpSinkConnector

Target System

HTTP / REST API

Maintainer

Axual

License

Apache License 2.0

Project

gitlab.com/axual/public/connect-plugins/http-sink-connector

Download

GitLab Releases

This page documents version 1.1.0-RC0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The HTTP Sink Connector pushes records from Kafka topics to a remote HTTP server. By default, the connector creates a JSON payload containing the topic name, record key, and value.

It is developed and maintained by Axual as an open-source connector available at gitlab.com/axual/public/connect-plugins/http-sink-connector.

The connector is fully pluggable: the message format, header selection strategy, and authentication mechanism are each implemented as a separate class, allowing you to substitute the built-in implementations with custom ones.

Features

  • Push Kafka records to any HTTP or HTTPS endpoint

  • All HTTP methods supported (POST, PUT, PATCH, DELETE, etc.)

  • Pluggable message formatter — defaults to JSON envelope format

  • Static HTTP header definition

  • Kafka header to HTTP header forwarding (UTF-8 Strings)

  • Configurable HTTP redirect support

  • TLS/SSL with configurable Certificate Authority, protocols, and cipher suites

  • TLS with Client Certificate authentication

  • HTTP Basic Authentication (built-in provider)

  • Pluggable authentication — bring your own IAuthenticationProvider

  • Configurable retry behaviour with wait time

  • Content logger for debugging request/response bodies at DEBUG level

When to Use

  • You need to send Kafka records to a REST API or webhook endpoint.

  • Your target system exposes an HTTP API but has no dedicated Kafka connector.

  • You need flexible authentication or custom message formatting via the pluggable provider model.

When NOT to Use

  • Your target system has a dedicated Kafka connector — prefer a purpose-built connector for better reliability and throughput.

  • You need dynamic endpoint URLs built from record field values — this connector does not support dynamic endpoint templating (see Known limitations).

  • You need bidirectional sync — this connector is sink only.

Installation

The connector is available from the GitLab Releases.

  1. Navigate to the GitLab releases page and download the JAR for version 1.1.0-RC0.

For installation steps, see Installing Connector Plugins.

Configuration

Configuration options are grouped as follows:

  • HTTP — Endpoint URL, HTTP method, and valid response codes

  • TLS/SSL — TLS/SSL settings for HTTPS endpoints

  • Connection — Timeouts and redirect behaviour

  • Retry — Retry count and wait time

  • Content — Message formatter, header selector, authentication provider, and content type

  • Static Headers — Statically defined HTTP headers added to every request

  • Logging — Content logger for debugging

HTTP

Key Type Mandatory Default Description

endpoint

String

Yes

The URI where the connector sends the data.

method

String

No

POST

The HTTP method used for delivering the data.

valid.status.codes

List

No

200, 204

A list of HTTP response status codes that indicate success. Any other status code is treated as a failure and triggers a retry.

TLS/SSL

Key Type Mandatory Default Description

ssl.ca.certificates

List

No

(empty)

A comma-separated list of PEM-encoded X509 certificates for trusted Certificate Authorities. Use this to trust a custom CA without modifying the JVM truststore.

ssl.ca.file.locations

List

No

(empty)

Paths to PEM files containing valid X509 certificates for trusted Certificate Authorities.

ssl.enable.hostname.verification

Boolean

No

true

When true, the hostname in the endpoint URL is matched against the names stored in the server certificate.

ssl.protocols

List

No

(JVM defaults)

Sets the supported SSL/TLS protocols for the HTTP client.

ssl.cipher.suites

List

No

(JVM defaults)

Sets the supported cipher suites for the HTTP client.

Connection

Key Type Mandatory Default Description

connection.timeout.ms

Integer

No

5000

Maximum time in milliseconds to wait for a connection to be established.

connection.request.timeout.ms

Integer

No

2000

Maximum time in milliseconds to wait for a connection from the connection manager pool.

socket.timeout.ms

Integer

No

1000

Maximum time in milliseconds to wait between receiving data packets.

redirect.allow

Boolean

No

true

When true, HTTP redirects are followed automatically.

redirect.allowcircular

Boolean

No

false

When true, circular redirects are allowed.

redirect.maximum

Integer

No

50

Maximum number of redirects to follow before failing.

Retry

Key Type Mandatory Default Description

retry.maximum

Integer

No

3

Maximum number of retries before failing. Unexpected status codes (not in valid.status.codes) also count as failures.

retry.wait.ms

Integer

No

50

Time in milliseconds to wait before retrying after an invalid status code or connection failure.

Content

The connector uses three pluggable components for formatting, header selection, and authentication. Each component is configured by its class name, and its specific parameters are passed using a prefix (e.g. message.formatter.param., header.selector.param., authentication.provider.param.*).

Key Type Mandatory Default Description

message.formatter.class

String

No

io.axual.connect.plugins.http.formatter.JsonEnvelopeMessageFormatter

Class that formats the Kafka record into the HTTP request body. Must implement io.axual.connect.plugins.http.sender.IMessageFormatter.

header.selector.class

String

No

io.axual.connect.plugins.http.headerselection.BasicHeaderSelector

Class that selects which Kafka headers to forward as HTTP headers. Must implement io.axual.connect.plugins.http.sender.IHeaderSelector.

authentication.provider.class

String

No

io.axual.connect.plugins.http.authentication.NoopAuthenticationProvider

Class that handles HTTP authentication. Must implement io.axual.connect.plugins.http.sender.IAuthenticationProvider.
Set to io.axual.connect.plugins.http.authentication.BasicAuthenticationProvider to enable HTTP Basic Authentication.

content.type

String

No

application/json; charset=utf-8

Value of the Content-Type header. Can be a MIME type or include a charset (e.g. application/json; charset=utf-8).

Basic Authentication Provider parameters

When authentication.provider.class is set to io.axual.connect.plugins.http.authentication.BasicAuthenticationProvider, the following parameters are required (prefixed with authentication.provider.param.):

Key Type Mandatory Default Description

authentication.provider.param.username

String

Yes

The username to use when connecting to the target service.

authentication.provider.param.password

Password

Yes

The password to use when connecting to the target service.

authentication.provider.param.preemptive.authentication.enabled

Boolean

No

false

When true, credentials are always sent with every request without waiting for a 401 challenge.

Basic Header Selector parameters

When header.selector.class is set to io.axual.connect.plugins.http.headerselection.BasicHeaderSelector (the default), Kafka headers can be forwarded as HTTP headers using aliases (prefixed with header.selector.param.):

Key Type Mandatory Default Description

header.selector.param.headers

List

No

(empty)

Comma-separated list of header aliases to forward. Example: headers=ab,cd requires headers.ab.key and headers.cd.key to be set.

header.selector.param.headers.{alias}.key

String

Conditional

The exact Kafka header name to forward for the given alias. Required for each alias defined in header.selector.param.headers.

Kafka header forwarding is limited to headers stored as UTF-8 Strings.

Static Headers

Static HTTP headers are added to every request. Define a list of aliases first, then configure the name and value for each alias.

Key Type Mandatory Default Description

header.static.aliases

List

No

(empty)

A list of aliases identifying static headers to include in every request.

header.static.{alias}.name

String

Conditional

The HTTP header name for the given alias. Required for each alias defined in header.static.aliases.

header.static.{alias}.value

String

Conditional

The HTTP header value for the given alias. Required for each alias defined in header.static.aliases.

Logging

Key Type Mandatory Default Description

content.logger.enabled

Boolean

No

false

When true, logs the request and response contents at DEBUG level using the SLF4J logger io.axual.connect.plugins.http.sender.ContentLogger.

content.logger.name

String

No

(connector name)

The SLF4J logger name to use for content logging. Defaults to the connector name if not set.

Getting Started

This section walks through configuring the HTTP Sink Connector on Axual to push records from a Kafka stream to an HTTP endpoint.

Prerequisites

HTTP endpoint

You need an HTTP or HTTPS endpoint that accepts requests and is reachable from the Kafka Connect cluster. For testing, you can use a service like httpbin.org to inspect the requests the connector sends.

Axual stream with records

The stream this connector consumes must already exist in Axual Self-Service and contain records. See Creating streams if you need to create a stream, and Produce some data to produce test events.

Steps

Step 1 — Create a connector application

  1. In Axual Self-Service, go to Applications and create a new application.

  2. Request Consumer access to the stream you want to push to the HTTP endpoint.

  3. Wait for the stream access request to be approved.

See Configure and install a connector for detailed steps.

Step 2 — Configure the connector

In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point. Replace all placeholder values with those of your environment.

Property Value

connector.class

io.axual.connect.plugins.http.HttpSinkConnector

tasks.max

1

topics

Your Axual stream name

endpoint

https://api.example.com/events

method

POST

content.type

application/json; charset=utf-8

value.converter

org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable

false

For all available properties, see the Configuration section above.

Step 3 — Start the connector

Start the connector application from Axual Self-Service. Once running, records consumed from the stream will be posted to the configured HTTP endpoint.

Step 4 — Verify

Check your HTTP endpoint’s server logs or a request inspection tool (e.g. httpbin.org/post) to confirm requests are arriving as records are consumed from the stream.

Each request will by default contain a JSON envelope with the topic, key, and value:

{
  "topic": "my_http_topic",
  "key": "record-key",
  "value": { ... }
}

Cleanup

When you are done testing:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

Known limitations

  • No dynamic endpoint support. The endpoint property is static — paths and query strings cannot be built dynamically from Kafka record field values.

  • Kafka header forwarding is limited to UTF-8 Strings. Kafka headers with binary or non-UTF-8 values cannot be forwarded as HTTP headers.

  • One request per record. The connector issues one HTTP request per Kafka record. For high-throughput streams, ensure your target endpoint can handle the request rate.

Examples

Example 1 — Minimal POST to a JSON endpoint

{
  "name": "my-http-sink",
  "config": {
    "connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
    "tasks.max": "1",
    "topics": "my_http_topic",
    "endpoint": "https://api.example.com/events",
    "method": "POST",
    "content.type": "application/json; charset=utf-8",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Example 2 — POST with HTTP Basic Authentication

{
  "name": "my-http-sink-basic-auth",
  "config": {
    "connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
    "tasks.max": "1",
    "topics": "my_http_topic",
    "endpoint": "https://api.example.com/events",
    "method": "POST",
    "authentication.provider.class": "io.axual.connect.plugins.http.authentication.BasicAuthenticationProvider",
    "authentication.provider.param.username": "my-service-account",
    "authentication.provider.param.password": "${keyvault:connectors/<tenant>/<instance>/<env>/<app>:http.password}",
    "authentication.provider.param.preemptive.authentication.enabled": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Example 3 — POST with static Authorization header (Bearer token)

{
  "name": "my-http-sink-bearer",
  "config": {
    "connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
    "tasks.max": "1",
    "topics": "my_http_topic",
    "endpoint": "https://api.example.com/events",
    "method": "POST",
    "header.static.aliases": "auth",
    "header.static.auth.name": "Authorization",
    "header.static.auth.value": "Bearer <your-token>",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Example 4 — HTTPS with custom Certificate Authority and Kafka header forwarding

{
  "name": "my-http-sink-tls",
  "config": {
    "connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
    "tasks.max": "1",
    "topics": "my_http_topic",
    "endpoint": "https://internal-api.example.com/events",
    "method": "POST",
    "ssl.ca.file.locations": "/etc/ssl/certs/internal-ca.pem",
    "ssl.enable.hostname.verification": "true",
    "header.selector.param.headers": "correlation-id",
    "header.selector.param.headers.correlation-id.key": "X-Correlation-Id",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Example 5 — Static headers combined with Kafka header forwarding

This example sends two static headers on every request (an API key and a custom source identifier) and also forwards the correlation-id Kafka header as X-Correlation-Id in the HTTP request. The content.type is set explicitly and logging is enabled for debugging.

{
  "name": "my-http-sink-headers",
  "config": {
    "connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
    "tasks.max": "1",
    "topics": "my_http_topic",
    "endpoint": "https://api.example.com/events",
    "method": "POST",
    "content.type": "application/json; charset=utf-8",
    "header.static.aliases": "api-key,source",
    "header.static.api-key.name": "X-Api-Key",
    "header.static.api-key.value": "${keyvault:connectors/<tenant>/<instance>/<env>/<app>:http.api-key}",
    "header.static.source.name": "X-Source",
    "header.static.source.value": "axual-connect",
    "header.selector.class": "io.axual.connect.plugins.http.headerselection.BasicHeaderSelector",
    "header.selector.param.headers": "correlation-id",
    "header.selector.param.headers.correlation-id.key": "X-Correlation-Id",
    "content.logger.enabled": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}
The default message formatter (JsonEnvelopeMessageFormatter) sends a JSON body containing topic, key, and value fields. It accepts no additional parameters.

License

HTTP sink connector is licensed under the Apache License, Version 2.0.