HTTP Sink Connector
Type |
sink |
Class |
|
Target System |
HTTP / REST API |
Maintainer |
Axual |
License |
Apache License 2.0 |
Project |
|
Download |
|
This page documents version 1.1.0-RC0. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support. |
Description
The HTTP Sink Connector pushes records from Kafka topics to a remote HTTP server. By default, the connector creates a JSON payload containing the topic name, record key, and value.
It is developed and maintained by Axual as an open-source connector available at gitlab.com/axual/public/connect-plugins/http-sink-connector.
The connector is fully pluggable: the message format, header selection strategy, and authentication mechanism are each implemented as a separate class, allowing you to substitute the built-in implementations with custom ones.
Features
-
Push Kafka records to any HTTP or HTTPS endpoint
-
All HTTP methods supported (POST, PUT, PATCH, DELETE, etc.)
-
Pluggable message formatter — defaults to JSON envelope format
-
Static HTTP header definition
-
Kafka header to HTTP header forwarding (UTF-8 Strings)
-
Configurable HTTP redirect support
-
TLS/SSL with configurable Certificate Authority, protocols, and cipher suites
-
TLS with Client Certificate authentication
-
HTTP Basic Authentication (built-in provider)
-
Pluggable authentication — bring your own
IAuthenticationProvider -
Configurable retry behaviour with wait time
-
Content logger for debugging request/response bodies at DEBUG level
When to Use
-
You need to send Kafka records to a REST API or webhook endpoint.
-
Your target system exposes an HTTP API but has no dedicated Kafka connector.
-
You need flexible authentication or custom message formatting via the pluggable provider model.
When NOT to Use
-
Your target system has a dedicated Kafka connector — prefer a purpose-built connector for better reliability and throughput.
-
You need dynamic endpoint URLs built from record field values — this connector does not support dynamic endpoint templating (see Known limitations).
-
You need bidirectional sync — this connector is sink only.
Installation
The connector is available from the GitLab Releases.
-
Navigate to the GitLab releases page and download the JAR for version
1.1.0-RC0.
For installation steps, see Installing Connector Plugins.
Configuration
Configuration options are grouped as follows:
-
HTTP — Endpoint URL, HTTP method, and valid response codes
-
TLS/SSL — TLS/SSL settings for HTTPS endpoints
-
Connection — Timeouts and redirect behaviour
-
Retry — Retry count and wait time
-
Content — Message formatter, header selector, authentication provider, and content type
-
Static Headers — Statically defined HTTP headers added to every request
-
Logging — Content logger for debugging
HTTP
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
Yes |
— |
The URI where the connector sends the data. |
|
String |
No |
POST |
The HTTP method used for delivering the data. |
|
List |
No |
200, 204 |
A list of HTTP response status codes that indicate success. Any other status code is treated as a failure and triggers a retry. |
TLS/SSL
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
List |
No |
(empty) |
A comma-separated list of PEM-encoded X509 certificates for trusted Certificate Authorities. Use this to trust a custom CA without modifying the JVM truststore. |
|
List |
No |
(empty) |
Paths to PEM files containing valid X509 certificates for trusted Certificate Authorities. |
|
Boolean |
No |
true |
When |
|
List |
No |
(JVM defaults) |
Sets the supported SSL/TLS protocols for the HTTP client. |
|
List |
No |
(JVM defaults) |
Sets the supported cipher suites for the HTTP client. |
Connection
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
5000 |
Maximum time in milliseconds to wait for a connection to be established. |
|
Integer |
No |
2000 |
Maximum time in milliseconds to wait for a connection from the connection manager pool. |
|
Integer |
No |
1000 |
Maximum time in milliseconds to wait between receiving data packets. |
|
Boolean |
No |
true |
When |
|
Boolean |
No |
false |
When |
|
Integer |
No |
50 |
Maximum number of redirects to follow before failing. |
Retry
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Integer |
No |
3 |
Maximum number of retries before failing. Unexpected status codes (not in |
|
Integer |
No |
50 |
Time in milliseconds to wait before retrying after an invalid status code or connection failure. |
Content
The connector uses three pluggable components for formatting, header selection, and authentication.
Each component is configured by its class name, and its specific parameters are passed using a
prefix (e.g. message.formatter.param., header.selector.param., authentication.provider.param.*).
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
No |
|
Class that formats the Kafka record into the HTTP request body. Must implement |
|
String |
No |
|
Class that selects which Kafka headers to forward as HTTP headers. Must implement |
|
String |
No |
|
Class that handles HTTP authentication. Must implement |
|
String |
No |
|
Value of the |
Basic Authentication Provider parameters
When authentication.provider.class is set to
io.axual.connect.plugins.http.authentication.BasicAuthenticationProvider, the following
parameters are required (prefixed with authentication.provider.param.):
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
String |
Yes |
— |
The username to use when connecting to the target service. |
|
Password |
Yes |
— |
The password to use when connecting to the target service. |
|
Boolean |
No |
false |
When |
Basic Header Selector parameters
When header.selector.class is set to
io.axual.connect.plugins.http.headerselection.BasicHeaderSelector (the default), Kafka headers
can be forwarded as HTTP headers using aliases (prefixed with header.selector.param.):
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
List |
No |
(empty) |
Comma-separated list of header aliases to forward. Example: |
|
String |
Conditional |
— |
The exact Kafka header name to forward for the given alias. Required for each alias defined in |
| Kafka header forwarding is limited to headers stored as UTF-8 Strings. |
Static Headers
Static HTTP headers are added to every request. Define a list of aliases first, then configure the name and value for each alias.
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
List |
No |
(empty) |
A list of aliases identifying static headers to include in every request. |
|
String |
Conditional |
— |
The HTTP header name for the given alias. Required for each alias defined in |
|
String |
Conditional |
— |
The HTTP header value for the given alias. Required for each alias defined in |
Logging
| Key | Type | Mandatory | Default | Description |
|---|---|---|---|---|
|
Boolean |
No |
false |
When |
|
String |
No |
(connector name) |
The SLF4J logger name to use for content logging. Defaults to the connector |
Getting Started
This section walks through configuring the HTTP Sink Connector on Axual to push records from a Kafka stream to an HTTP endpoint.
Prerequisites
HTTP endpoint
You need an HTTP or HTTPS endpoint that accepts requests and is reachable from the Kafka Connect cluster. For testing, you can use a service like httpbin.org to inspect the requests the connector sends.
Axual stream with records
The stream this connector consumes must already exist in Axual Self-Service and contain records. See Creating streams if you need to create a stream, and Produce some data to produce test events.
Steps
Step 1 — Create a connector application
-
In Axual Self-Service, go to Applications and create a new application.
-
Request Consumer access to the stream you want to push to the HTTP endpoint.
-
Wait for the stream access request to be approved.
See Configure and install a connector for detailed steps.
Step 2 — Configure the connector
In Axual Self-Service, open the application and add a new connector. Use the following minimal configuration as a starting point. Replace all placeholder values with those of your environment.
| Property | Value |
|---|---|
|
|
|
|
|
Your Axual stream name |
|
|
|
|
|
|
|
|
|
|
For all available properties, see the Configuration section above.
Step 3 — Start the connector
Start the connector application from Axual Self-Service. Once running, records consumed from the stream will be posted to the configured HTTP endpoint.
Step 4 — Verify
Check your HTTP endpoint’s server logs or a request inspection tool (e.g. httpbin.org/post) to confirm requests are arriving as records are consumed from the stream.
Each request will by default contain a JSON envelope with the topic, key, and value:
{
"topic": "my_http_topic",
"key": "record-key",
"value": { ... }
}
Known limitations
-
No dynamic endpoint support. The
endpointproperty is static — paths and query strings cannot be built dynamically from Kafka record field values. -
Kafka header forwarding is limited to UTF-8 Strings. Kafka headers with binary or non-UTF-8 values cannot be forwarded as HTTP headers.
-
One request per record. The connector issues one HTTP request per Kafka record. For high-throughput streams, ensure your target endpoint can handle the request rate.
Examples
Example 1 — Minimal POST to a JSON endpoint
{
"name": "my-http-sink",
"config": {
"connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
"tasks.max": "1",
"topics": "my_http_topic",
"endpoint": "https://api.example.com/events",
"method": "POST",
"content.type": "application/json; charset=utf-8",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Example 2 — POST with HTTP Basic Authentication
{
"name": "my-http-sink-basic-auth",
"config": {
"connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
"tasks.max": "1",
"topics": "my_http_topic",
"endpoint": "https://api.example.com/events",
"method": "POST",
"authentication.provider.class": "io.axual.connect.plugins.http.authentication.BasicAuthenticationProvider",
"authentication.provider.param.username": "my-service-account",
"authentication.provider.param.password": "${keyvault:connectors/<tenant>/<instance>/<env>/<app>:http.password}",
"authentication.provider.param.preemptive.authentication.enabled": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Example 3 — POST with static Authorization header (Bearer token)
{
"name": "my-http-sink-bearer",
"config": {
"connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
"tasks.max": "1",
"topics": "my_http_topic",
"endpoint": "https://api.example.com/events",
"method": "POST",
"header.static.aliases": "auth",
"header.static.auth.name": "Authorization",
"header.static.auth.value": "Bearer <your-token>",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Example 4 — HTTPS with custom Certificate Authority and Kafka header forwarding
{
"name": "my-http-sink-tls",
"config": {
"connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
"tasks.max": "1",
"topics": "my_http_topic",
"endpoint": "https://internal-api.example.com/events",
"method": "POST",
"ssl.ca.file.locations": "/etc/ssl/certs/internal-ca.pem",
"ssl.enable.hostname.verification": "true",
"header.selector.param.headers": "correlation-id",
"header.selector.param.headers.correlation-id.key": "X-Correlation-Id",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Example 5 — Static headers combined with Kafka header forwarding
This example sends two static headers on every request (an API key and a custom source identifier)
and also forwards the correlation-id Kafka header as X-Correlation-Id in the HTTP request.
The content.type is set explicitly and logging is enabled for debugging.
{
"name": "my-http-sink-headers",
"config": {
"connector.class": "io.axual.connect.plugins.http.HttpSinkConnector",
"tasks.max": "1",
"topics": "my_http_topic",
"endpoint": "https://api.example.com/events",
"method": "POST",
"content.type": "application/json; charset=utf-8",
"header.static.aliases": "api-key,source",
"header.static.api-key.name": "X-Api-Key",
"header.static.api-key.value": "${keyvault:connectors/<tenant>/<instance>/<env>/<app>:http.api-key}",
"header.static.source.name": "X-Source",
"header.static.source.value": "axual-connect",
"header.selector.class": "io.axual.connect.plugins.http.headerselection.BasicHeaderSelector",
"header.selector.param.headers": "correlation-id",
"header.selector.param.headers.correlation-id.key": "X-Correlation-Id",
"content.logger.enabled": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
The default message formatter (JsonEnvelopeMessageFormatter) sends a JSON body containing
topic, key, and value fields. It accepts no additional parameters.
|
License
HTTP sink connector is licensed under the Apache License, Version 2.0.