AMQP 1.0 Source Connectors
Type |
Source |
Core Class |
|
OAuth2 Class |
|
Target System |
Messaging & Streaming (AMQP 1.0) |
Maintainer |
Axual |
License |
Proprietary (client-only) |
Project |
|
Download |
Contact Axual Support to obtain the connector library. |
Description
The AMQP 1.0 Source Connectors read messages from AMQP 1.0 queues or topics and publish them as records to Kafka. They are tested with ActiveMQ Artemis and RabbitMQ (with AMQP 1.0 plugin).
Two connector variants are available:
-
Core AMQP Source Connector (
io.axual.connect.plugins.amqp.source.AmqpSourceConnector) — supports anonymous or username/password (PLAIN) authentication. -
OAuth2 AMQP Source Connector (
io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) — supports token-based OAuth2 authentication.
Both connectors share the same AMQP consumption behaviour (queue/topic address, QoS, link durability, etc.). The OAuth2 variant replaces basic authentication with token acquisition and SASL/OAuth2.
Connector variants
Select a connector based on the authentication offered by your AMQP broker or gateway:
| Authentication scenario | Connector to use |
|---|---|
No authentication (anonymous) |
Core connector ( |
Username and password (SASL PLAIN) |
Core connector. Set |
OAuth2 (client credentials or private key JWT) |
OAuth2 connector ( |
OAuth2 method selection (oauth.method):
| Method | Description |
|---|---|
|
Client ID and secret sent via HTTP Basic. |
|
Client ID and secret sent in the token request body. |
|
Signed client assertion using a private key. |
Features
-
Read messages from AMQP 1.0 queues or topics and publish them as records to Kafka
-
Two variants: Core (anonymous/PLAIN) and OAuth2 (client credentials or private key JWT)
-
Configurable Quality of Service:
AT_MOST_ONCE,AT_LEAST_ONCE,EXACTLY_ONCE -
Durable consumer support — AMQP link persists across disconnections so no messages are lost on restart
-
Flexible key extraction chain:
MESSAGE_ID,CORRELATION_ID,SUBJECT,UUID, orNULL -
AMQP message properties forwarded as Kafka headers with
msg.prefix -
AMQP application properties forwarded as Kafka headers with
app.prefix -
Tested with ActiveMQ Artemis and RabbitMQ (AMQP 1.0 plugin)
When to Use
-
You need to bridge an AMQP 1.0 broker (RabbitMQ, ActiveMQ Artemis) to Kafka.
-
Your broker requires OAuth2 token-based authentication.
-
You need durable subscriptions to ensure no messages are lost across connector restarts.
-
You want configurable QoS levels for message delivery guarantees.
When NOT to Use
-
Your broker does not support AMQP 1.0 — AMQP 0.9.x (classic RabbitMQ default) requires a different connector.
-
You need schema-aware (structured) Kafka records — this connector always emits plain string keys and values.
-
You need to transform or enrich messages before publishing to Kafka — use Kafka Streams or ksqlDB after ingestion instead.
Installation
The AMQP 1.0 Source Connectors are maintained in the Axual public repository.
Contact the Axual Support team to obtain the connector library.
For installation steps, see Installing Connector Plugins.
Configuration
Core AMQP Source Connector
The following options are available for the Core connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector).
| Property | Description | Type | Default | Valid values | Importance |
|---|---|---|---|---|---|
|
The hostname where the AMQP server can be reached. |
string |
non-empty string |
high |
|
|
The port number where the AMQP server listens. |
int |
|
[1,…] |
high |
|
Enable TLS connection to the AMQP server. |
boolean |
|
high |
|
|
When set to |
boolean |
|
high |
|
|
Username for connecting to the AMQP server. Uses anonymous authentication if not set. |
string |
null |
high |
|
|
Password for connecting to the AMQP server. |
password |
null |
high |
|
|
The AMQP queue or topic address to consume messages from. |
string |
non-empty string |
high |
|
|
Quality of Service level: |
string |
|
AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE |
medium |
|
Use a durable consumer. The AMQP link remains in place after disconnection so messages are not lost. Essential for persistent topic subscriptions. |
boolean |
|
high |
|
|
Explicit link name. Overrides auto-generation. When multiple tasks share the same link name, messages are load-balanced between them. |
string |
null |
low |
|
|
Prefix for auto-generated link names. The actual link name will be |
string |
null |
medium |
|
|
Number of messages prefetched and buffered locally before requiring acknowledgments. |
int |
|
[1,…] |
medium |
|
Maximum time in milliseconds to wait for a message when polling. |
long |
|
[100,…] |
medium |
|
Maximum time in milliseconds to wait for graceful consumer shutdown. |
long |
|
[1000,…] |
low |
|
Session-level incoming window size for flow control. |
int |
|
[1,…] |
medium |
|
Session-level outgoing window size for flow control. |
int |
|
[1,…] |
medium |
|
Automatically acknowledge messages after successful processing. |
boolean |
|
medium |
|
|
On processing error: if |
boolean |
|
medium |
|
|
Maximum idle time in milliseconds before the connection is considered dead. |
long |
|
[1,…] |
medium |
|
Maximum size in bytes for individual AMQP frames. |
long |
|
[1,…] |
medium |
|
Network buffer size in bytes for receiving frames. |
int |
|
[1,…] |
medium |
|
Extension size in bytes when the input buffer is insufficient to receive a frame. |
int |
|
[1,…] |
medium |
|
Network buffer size in bytes for sending frames. |
int |
|
[1,…] |
medium |
|
Extension size in bytes when the output buffer is insufficient to send a frame. |
int |
|
[1,…] |
medium |
|
Prefix for auto-generated AMQP container IDs. The actual ID will be |
string |
null |
medium |
|
|
Explicit container ID. Overrides auto-generation. |
string |
null |
low |
|
|
Ordered list of key extraction strategies. Each is tried in turn until a key is found. |
list |
|
high |
|
|
The Kafka topic to write messages to. |
string |
non-empty string |
high |
|
|
Attempt a live broker connection during connector validation. |
boolean |
|
low |
OAuth2 AMQP Source Connector
The OAuth2 connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) supports all of the same AMQP and consumer properties as the Core connector (see above), plus the following OAuth2-specific options:
| Property | Description | Type | Default | Importance |
|---|---|---|---|---|
|
The URL for the access token endpoint. |
string |
high |
|
|
Client authentication method: |
string |
|
high |
|
The client identifier registered with the authorization server. |
string |
null |
high |
|
The client secret (for |
password |
null |
high |
|
Space-separated list of requested permissions (e.g., |
string |
null |
high |
|
Private key in JWK JSON format (for |
password |
null |
high |
|
Private key in PEM encoding (alternative to JWK). |
password |
null |
high |
|
Password for an encrypted PEM private key. |
password |
null |
high |
|
JWS algorithm used to sign the client assertion. Default is |
string |
|
high |
|
JWT Issuer Claim ( |
string |
null |
high |
|
JWT Audience Claim ( |
string |
null |
high |
|
JWT Expiration Time Claim ( |
int |
|
medium |
|
JWT Not Before Claim ( |
int |
null |
medium |
|
When |
boolean |
|
medium |
|
When |
boolean |
|
medium |
Data conversion
The connector converts AMQP 1.0 messages to Kafka Connect records as follows:
-
Message body (payload) — always emitted as a
Stringvalue usingSchema.OPTIONAL_STRING_SCHEMA.AMQPNullproduces a null record value. -
Record key — extracted by the
amqp.key.extractorchain using the same string formatting. -
Message Properties — added as Kafka headers with the
msg.prefix (e.g.,msg.Subject,msg.ContentType,msg.CreationTime). -
Application Properties — added as Kafka headers with the
app.prefix. Container types (Array/List/Map) are skipped.
All Kafka Connect headers use Schema.OPTIONAL_BYTES_SCHEMA with UTF-8 encoded content.
Type-specific string rendering for body and header values:
| AMQP type | String representation |
|---|---|
|
|
|
|
|
|
|
Epoch milliseconds (long as string) |
|
|
|
Single-character Unicode string |
|
Not supported — results in a conversion error |
Other primitive types |
|
Getting Started
This section walks you through configuring the AMQP 1.0 Source Connector on Axual to stream messages from an AMQP broker into a local Kafka stream.
Prerequisites
AMQP broker
-
You have access to an AMQP 1.0 broker (e.g., RabbitMQ or ActiveMQ Artemis) reachable from the Kafka Connect cluster.
-
You have the broker hostname, port, and credentials available.
-
For OAuth2 authentication: you have a valid OAuth2 token endpoint URL, client ID, and client secret (or private key).
Axual stream
The local stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
The name of the stream will bemy_amqp_source_topic.
The key/value types will beString/String. -
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy_amqp_source.
The plugin name is{plugin-name}.
If a plugin isn’t available, ask a platform operator to install plugins.
Step 2 — Configure the connector
|
Always set |
-
Provide the following minimal configuration to connect to your AMQP broker using the Core connector (anonymous or username/password). For OAuth2 authentication, see the OAuth2 configuration section below.
connector.classio.axual.connect.plugins.amqp.source.AmqpSourceConnectoramqp.hostHostname of the AMQP broker
broker.example.comamqp.port5672
Default AMQP port. Use5671for TLS.amqp.consumer.sourceThe queue or topic address to consume from
my.queueamqp.auth.usernameUsername for SASL PLAIN authentication. Omit for anonymous access.
amqp.auth.passwordPassword for SASL PLAIN authentication. Omit for anonymous access.
amqp.consumer.qosAT_LEAST_ONCE
Quality of Service level. Options:AT_MOST_ONCE,AT_LEAST_ONCE,EXACTLY_ONCE.topicmy_amqp_source_topic
The Kafka topic to write messages to.amqp.key.extractorMESSAGE_ID,SUBJECT,NULL
Key extraction chain; tries each extractor in order.key.converterorg.apache.kafka.connect.storage.StringConvertervalue.converterorg.apache.kafka.connect.storage.StringConverterheader.converterorg.apache.kafka.connect.converters.ByteArrayConverter
OAuth2 configuration
When using the OAuth2 connector, replace connector.class with io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector and add the following properties:
|
Access token endpoint URL |
|
|
|
Your OAuth2 client ID |
|
Your OAuth2 client secret (for |
|
Optional space-separated list of requested scopes |
-
Authorize the
my_amqp_sourcesource Connector-Application to produce to themy_amqp_source_topicstream.
Examples
The following end-to-end examples show complete configurations for common authentication modes.
Anonymous (no authentication)
{
"name": "my-amqp-source",
"config": {
"connector.class": "io.axual.connect.plugins.amqp.source.AmqpSourceConnector",
"tasks.max": "1",
"amqp.host": "broker.example.com",
"amqp.port": "5672",
"amqp.consumer.source": "queue1",
"amqp.consumer.qos": "AT_LEAST_ONCE",
"amqp.consumer.durable": "true",
"topic": "amqp.queue1",
"amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"validate.connection": "false"
}
}
Username and password (PLAIN)
{
"name": "my-amqp-source-plain",
"config": {
"connector.class": "io.axual.connect.plugins.amqp.source.AmqpSourceConnector",
"tasks.max": "1",
"amqp.host": "broker.example.com",
"amqp.port": "5672",
"amqp.consumer.source": "queue2",
"amqp.auth.username": "amqp_user",
"amqp.auth.password": "Amqp@Password2024",
"amqp.consumer.qos": "AT_LEAST_ONCE",
"amqp.consumer.durable": "true",
"topic": "amqp.queue2",
"amqp.key.extractor": "MESSAGE_ID,SUBJECT,UUID",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"validate.connection": "false"
}
}
OAuth2 — Client Secret Basic
{
"name": "my-amqp-source-oauth2-basic",
"config": {
"connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
"tasks.max": "1",
"amqp.host": "broker.example.com",
"amqp.port": "5672",
"amqp.consumer.source": "queue-basic",
"oauth.token.endpoint": "https://auth.example.com/oauth2/token",
"oauth.method": "client_secret_basic",
"oauth.client.id": "my-client",
"oauth.client.secret": "Cl13nt$ecret2024",
"oauth.scope": "read write",
"topic": "amqp.oauth.basic",
"amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
OAuth2 — Client Secret Post
{
"name": "my-amqp-source-oauth2-post",
"config": {
"connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
"tasks.max": "1",
"amqp.host": "broker.example.com",
"amqp.port": "5672",
"amqp.consumer.source": "queue-post",
"oauth.token.endpoint": "https://auth.example.com/oauth2/token",
"oauth.method": "client_secret_post",
"oauth.client.id": "my-client",
"oauth.client.secret": "Cl13nt$ecret2024",
"oauth.scope": "read write",
"topic": "amqp.oauth.post",
"amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
OAuth2 — Private Key JWT with JWK JSON
{
"name": "my-amqp-source-oauth2-pkjwt-jwk",
"config": {
"connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
"tasks.max": "1",
"amqp.host": "broker.example.com",
"amqp.port": "5672",
"amqp.consumer.source": "queue-pkjwt",
"oauth.token.endpoint": "https://auth.example.com/oauth2/token",
"oauth.method": "private_key_jwt",
"oauth.client.id": "my-service",
"oauth.private.key.jwk.json": "{\"kty\":\"RSA\",\"d\":\"...\",\"n\":\"...\",\"e\":\"AQAB\",\"kid\":\"my-key\"}",
"oauth.private.key.jws.algorithm": "RS256",
"oauth.private.key.claims.issuer": "my-service",
"topic": "amqp.oauth.pkjwt.jwk",
"amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
OAuth2 — Private Key JWT with Encrypted PEM
{
"name": "my-amqp-source-oauth2-pkjwt-pem",
"config": {
"connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
"tasks.max": "1",
"amqp.host": "broker.example.com",
"amqp.port": "5672",
"amqp.consumer.source": "queue-pkjwt-pem",
"oauth.token.endpoint": "https://auth.example.com/oauth2/token",
"oauth.method": "private_key_jwt",
"oauth.client.id": "my-service",
"oauth.private.key.pem.content": "-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIF...\n-----END ENCRYPTED PRIVATE KEY-----",
"oauth.private.key.pem.password": "changeit",
"oauth.private.key.jws.algorithm": "RS256",
"oauth.private.key.claims.issuer": "my-service",
"topic": "amqp.oauth.pkjwt.pem",
"amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
Known limitations
-
AMQP
DataandAmqpSequencemessage body sections are not supported — messages using them result in a conversion error. -
Application Properties values do not support complex Array/List/Map types — these are silently skipped.
-
The connector always emits string keys and values — schema-aware converters (e.g., Avro, Protobuf) cannot be used.
-
EXACTLY_ONCEQoS requires broker-side support and is not guaranteed on all brokers.