Unresolved include directive in modules/ROOT/pages/_attributes.adoc - include::../_attributes.adoc[]
AMQP 1.0 Source Connectors
Overview
The AMQP 1.0 Source Connectors read messages from AMQP 1.0 queues or topics and publish them as records to Kafka. They are tested with ActiveMQ Artemis and RabbitMQ (with AMQP 1.0 plugin).
Two connector variants are available:
-
Core AMQP Source Connector (
io.axual.connect.plugins.amqp.source.AmqpSourceConnector) — supports anonymous or username/password (PLAIN) authentication. -
OAuth2 AMQP Source Connector (
io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) — supports token-based OAuth2 authentication.
Both connectors share the same AMQP consumption behavior (queue/topic address, QoS, link durability, etc.). The OAuth2 variant replaces basic authentication with token acquisition and SASL/OAuth2.
This document makes the following assumptions:
-
You have access to an AMQP 1.0 broker (e.g., RabbitMQ or ActiveMQ Artemis) that is reachable from the Kafka Connect cluster.
-
You have the broker hostname, port, and credentials available.
-
You have access to Axual Self-Service to configure the connector.
Choosing the right connector
Select a connector based on the authentication offered by your AMQP broker or gateway:
| Authentication scenario | Connector to use |
|---|---|
No authentication (anonymous) |
Core connector ( |
Username and password (SASL PLAIN) |
Core connector. Set |
OAuth2 (client credentials or private key JWT) |
OAuth2 connector ( |
OAuth2 method selection (oauth.method):
| Method | Description |
|---|---|
|
Client ID and secret sent via HTTP Basic. |
|
Client ID and secret sent in the token request body. |
|
Signed client assertion using a private key. |
Configuring a new AMQP 1.0 source connector
-
Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
The name of the stream will bemy_amqp_source_topic.
The key/value types will beString/String. -
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy_amqp_source.
The plugin name is{plugin-name}.
If a plugin isn’t available, ask a platform operator to install plugins. -
Provide the following minimal configuration to connect to your AMQP broker using the Core connector (anonymous or username/password). For OAuth2 authentication, see the OAuth2 configuration section below.
Always set
key.converter,value.converter, andheader.converterexplicitly. When left unset, Kafka Connect falls back to the worker-level defaults, which vary between installations and may produce unexpected serialization (e.g., wrapping plain strings in a JSON schema envelope). The AMQP 1.0 connector always emits keys and values as plain strings and headers as raw bytes, so the correct converters areStringConverterfor keys and values, andByteArrayConverterfor headers.connector.classio.axual.connect.plugins.amqp.source.AmqpSourceConnectoramqp.hostHostname of the AMQP broker
broker.example.comamqp.port5672
Default AMQP port. Use5671for TLS.amqp.consumer.sourceThe queue or topic address to consume from
my.queueamqp.auth.usernameUsername for SASL PLAIN authentication. Omit for anonymous access.
amqp.auth.passwordPassword for SASL PLAIN authentication. Omit for anonymous access.
amqp.consumer.qosAT_LEAST_ONCE
Quality of Service level. Options:AT_MOST_ONCE,AT_LEAST_ONCE,EXACTLY_ONCE.topicmy_amqp_source_topic
The Kafka topic to write messages to.amqp.key.extractorMESSAGE_ID,SUBJECT,NULL
Key extraction chain; tries each extractor in order.key.converterorg.apache.kafka.connect.storage.StringConvertervalue.converterorg.apache.kafka.connect.storage.StringConverterheader.converterorg.apache.kafka.connect.converters.ByteArrayConverter
OAuth2 configuration
When using the OAuth2 connector, replace connector.class with io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector and add the following properties:
|
Access token endpoint URL |
|
|
|
Your OAuth2 client ID |
|
Your OAuth2 client secret (for |
|
Optional space-separated list of requested scopes |
-
Authorize the
my_amqp_sourcesource Connector-Application to produce to themy_amqp_source_topicstream. -
You can now start the source connector application.
Examples
The following end-to-end examples show complete configurations for common authentication modes.
Anonymous (no authentication)
connector.class=io.axual.connect.plugins.amqp.source.AmqpSourceConnector
tasks.max=1
amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue1
amqp.consumer.qos=AT_LEAST_ONCE
amqp.consumer.durable=true
topic=amqp.queue1
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
validate.connection=false
Username and password (PLAIN)
connector.class=io.axual.connect.plugins.amqp.source.AmqpSourceConnector
tasks.max=1
amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue2
amqp.auth.username=myuser
amqp.auth.password=mypassword
amqp.consumer.qos=AT_LEAST_ONCE
amqp.consumer.durable=true
topic=amqp.queue2
amqp.key.extractor=MESSAGE_ID,SUBJECT,UUID
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
validate.connection=false
OAuth2 — Client Secret Basic
connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1
amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-basic
oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=client_secret_basic
oauth.client.id=my-client
oauth.client.secret=super-secret
oauth.scope=read write
topic=amqp.oauth.basic
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
OAuth2 — Client Secret Post
connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1
amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-post
oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=client_secret_post
oauth.client.id=my-client
oauth.client.secret=super-secret
oauth.scope=read write
topic=amqp.oauth.post
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
OAuth2 — Private Key JWT with JWK JSON
connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1
amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-pkjwt
oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=private_key_jwt
oauth.client.id=my-service
oauth.private.key.jwk.json={"kty":"RSA","d":"...","n":"...","e":"AQAB","kid":"my-key"}
oauth.private.key.jws.algorithm=RS256
oauth.private.key.claims.issuer=my-service
topic=amqp.oauth.pkjwt.jwk
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
OAuth2 — Private Key JWT with Encrypted PEM
connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1
amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-pkjwt-pem
oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=private_key_jwt
oauth.client.id=my-service
oauth.private.key.pem.content=-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIF...\n-----END ENCRYPTED PRIVATE KEY-----
oauth.private.key.pem.password=changeit
oauth.private.key.jws.algorithm=RS256
# Optional; defaults to oauth.client.id when not set
oauth.private.key.claims.issuer=my-service
topic=amqp.oauth.pkjwt.pem
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Configuration reference
Core AMQP Source Connector
The following options are available for the Core connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector).
| Property | Description | Type | Default | Valid values | Importance |
|---|---|---|---|---|---|
|
The hostname where the AMQP server can be reached. |
string |
non-empty string |
high |
|
|
The port number where the AMQP server listens. |
int |
|
[1,…] |
high |
|
Enable TLS connection to the AMQP server. |
boolean |
|
high |
|
|
When set to |
boolean |
|
high |
|
|
Username for connecting to the AMQP server. Uses anonymous authentication if not set. |
string |
null |
high |
|
|
Password for connecting to the AMQP server. |
password |
null |
high |
|
|
The AMQP queue or topic address to consume messages from. |
string |
non-empty string |
high |
|
|
Quality of Service level: |
string |
|
AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE |
medium |
|
Use a durable consumer. The AMQP link remains in place after disconnection so messages are not lost. Essential for persistent topic subscriptions. |
boolean |
|
high |
|
|
Explicit link name. Overrides auto-generation. When multiple tasks share the same link name, messages are load-balanced between them. |
string |
null |
low |
|
|
Prefix for auto-generated link names. The actual link name will be |
string |
null |
medium |
|
|
Number of messages prefetched and buffered locally before requiring acknowledgments. |
int |
|
[1,…] |
medium |
|
Maximum time in milliseconds to wait for a message when polling. |
long |
|
[100,…] |
medium |
|
Maximum time in milliseconds to wait for graceful consumer shutdown. |
long |
|
[1000,…] |
low |
|
Session-level incoming window size for flow control. |
int |
|
[1,…] |
medium |
|
Session-level outgoing window size for flow control. |
int |
|
[1,…] |
medium |
|
Automatically acknowledge messages after successful processing. |
boolean |
|
medium |
|
|
On processing error: if |
boolean |
|
medium |
|
|
Maximum idle time in milliseconds before the connection is considered dead. |
long |
|
[1,…] |
medium |
|
Maximum size in bytes for individual AMQP frames. |
long |
|
[1,…] |
medium |
|
Network buffer size in bytes for receiving frames. |
int |
|
[1,…] |
medium |
|
Extension size in bytes when the input buffer is insufficient to receive a frame. |
int |
|
[1,…] |
medium |
|
Network buffer size in bytes for sending frames. |
int |
|
[1,…] |
medium |
|
Extension size in bytes when the output buffer is insufficient to send a frame. |
int |
|
[1,…] |
medium |
|
Prefix for auto-generated AMQP container IDs. The actual ID will be |
string |
null |
medium |
|
|
Explicit container ID. Overrides auto-generation. |
string |
null |
low |
|
|
Ordered list of key extraction strategies. Each is tried in turn until a key is found. |
list |
|
high |
|
|
The Kafka topic to write messages to. |
string |
non-empty string |
high |
|
|
Attempt a live broker connection during connector validation. |
boolean |
|
low |
OAuth2 AMQP Source Connector
The OAuth2 connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) supports all of the same AMQP and consumer properties as the Core connector (see above), plus the following OAuth2-specific options:
| Property | Description | Type | Default | Importance |
|---|---|---|---|---|
|
The URL for the access token endpoint. |
string |
high |
|
|
Client authentication method: |
string |
|
high |
|
The client identifier registered with the authorization server. |
string |
null |
high |
|
The client secret (for |
password |
null |
high |
|
Space-separated list of requested permissions (e.g., |
string |
null |
high |
|
Private key in JWK JSON format (for |
password |
null |
high |
|
Private key in PEM encoding (alternative to JWK). |
password |
null |
high |
|
Password for an encrypted PEM private key. |
password |
null |
high |
|
JWS algorithm used to sign the client assertion. Default is |
string |
|
high |
|
JWT Issuer Claim ( |
string |
null |
high |
|
JWT Audience Claim ( |
string |
null |
high |
|
JWT Expiration Time Claim ( |
int |
|
medium |
|
JWT Not Before Claim ( |
int |
null |
medium |
|
When |
boolean |
|
medium |
|
When |
boolean |
|
medium |
AMQP to Kafka conversion
The connector converts AMQP 1.0 messages to Kafka Connect records as follows:
-
Message body (payload) — always emitted as a
Stringvalue usingSchema.OPTIONAL_STRING_SCHEMA.AMQPNullproduces a null record value. -
Record key — extracted by the
amqp.key.extractorchain using the same string formatting. -
Message Properties — added as Kafka headers with the
msg.prefix (e.g.,msg.Subject,msg.ContentType,msg.CreationTime). -
Application Properties — added as Kafka headers with the
app.prefix. Container types (Array/List/Map) are skipped.
All Kafka Connect headers use Schema.OPTIONAL_BYTES_SCHEMA with UTF-8 encoded content.
Type-specific string rendering for body and header values:
| AMQP type | String representation |
|---|---|
|
|
|
|
|
|
|
Epoch milliseconds (long as string) |
|
|
|
Single-character Unicode string |
|
Not supported — results in a conversion error |
Other primitive types |
|
Known limitations
-
AMQP
DataandAmqpSequencemessage body sections are not supported — messages using them result in a conversion error. -
Application Properties values do not support complex Array/List/Map types — these are silently skipped.
License
AMQP 1.0 Source Connectors are licensed under the Apache License, Version 2.0.