Unresolved include directive in modules/ROOT/pages/_attributes.adoc - include::../_attributes.adoc[]

AMQP 1.0 Source Connectors

Overview

The AMQP 1.0 Source Connectors read messages from AMQP 1.0 queues or topics and publish them as records to Kafka. They are tested with ActiveMQ Artemis and RabbitMQ (with AMQP 1.0 plugin).

Two connector variants are available:

  • Core AMQP Source Connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector) — supports anonymous or username/password (PLAIN) authentication.

  • OAuth2 AMQP Source Connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) — supports token-based OAuth2 authentication.

Both connectors share the same AMQP consumption behavior (queue/topic address, QoS, link durability, etc.). The OAuth2 variant replaces basic authentication with token acquisition and SASL/OAuth2.

This document makes the following assumptions:

  • You have access to an AMQP 1.0 broker (e.g., RabbitMQ or ActiveMQ Artemis) that is reachable from the Kafka Connect cluster.

  • You have the broker hostname, port, and credentials available.

  • You have access to Axual Self-Service to configure the connector.

Choosing the right connector

Select a connector based on the authentication offered by your AMQP broker or gateway:

Authentication scenario Connector to use

No authentication (anonymous)

Core connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector).
Do not configure amqp.auth.username or amqp.auth.password.

Username and password (SASL PLAIN)

Core connector. Set amqp.auth.username and amqp.auth.password.

OAuth2 (client credentials or private key JWT)

OAuth2 connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector). Configure oauth.* properties.
Do not set amqp.auth.password.

OAuth2 method selection (oauth.method):

Method Description

client_secret_basic

Client ID and secret sent via HTTP Basic.

client_secret_post

Client ID and secret sent in the token request body.

private_key_jwt

Signed client assertion using a private key.
Provide the key as JWK JSON (oauth.private.key.jwk.json) or PEM (oauth.private.key.pem.content).

Configuring a new AMQP 1.0 source connector

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_amqp_source_topic.
    The key/value types will be String/String.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_amqp_source.
    The plugin name is {plugin-name}.
    If a plugin isn’t available, ask a platform operator to install plugins.

  3. Provide the following minimal configuration to connect to your AMQP broker using the Core connector (anonymous or username/password). For OAuth2 authentication, see the OAuth2 configuration section below.

    Always set key.converter, value.converter, and header.converter explicitly. When left unset, Kafka Connect falls back to the worker-level defaults, which vary between installations and may produce unexpected serialization (e.g., wrapping plain strings in a JSON schema envelope). The AMQP 1.0 connector always emits keys and values as plain strings and headers as raw bytes, so the correct converters are StringConverter for keys and values, and ByteArrayConverter for headers.

    connector.class

    io.axual.connect.plugins.amqp.source.AmqpSourceConnector

    amqp.host

    Hostname of the AMQP broker
    broker.example.com

    amqp.port

    5672
    Default AMQP port. Use 5671 for TLS.

    amqp.consumer.source

    The queue or topic address to consume from
    my.queue

    amqp.auth.username

    Username for SASL PLAIN authentication. Omit for anonymous access.

    amqp.auth.password

    Password for SASL PLAIN authentication. Omit for anonymous access.

    amqp.consumer.qos

    AT_LEAST_ONCE
    Quality of Service level. Options: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE.

    topic

    my_amqp_source_topic
    The Kafka topic to write messages to.

    amqp.key.extractor

    MESSAGE_ID,SUBJECT,NULL
    Key extraction chain; tries each extractor in order.

    key.converter

    org.apache.kafka.connect.storage.StringConverter

    value.converter

    org.apache.kafka.connect.storage.StringConverter

    header.converter

    org.apache.kafka.connect.converters.ByteArrayConverter

OAuth2 configuration

When using the OAuth2 connector, replace connector.class with io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector and add the following properties:

oauth.token.endpoint

Access token endpoint URL
https://auth.example.com/oauth2/token

oauth.method

client_secret_basic
One of: client_secret_basic, client_secret_post, private_key_jwt.

oauth.client.id

Your OAuth2 client ID

oauth.client.secret

Your OAuth2 client secret (for client_secret* methods)_

oauth.scope

Optional space-separated list of requested scopes
read write

  1. Authorize the my_amqp_source source Connector-Application to produce to the my_amqp_source_topic stream.

  2. You can now start the source connector application.

Examples

The following end-to-end examples show complete configurations for common authentication modes.

Anonymous (no authentication)

connector.class=io.axual.connect.plugins.amqp.source.AmqpSourceConnector
tasks.max=1

amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue1

amqp.consumer.qos=AT_LEAST_ONCE
amqp.consumer.durable=true

topic=amqp.queue1
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

validate.connection=false

Username and password (PLAIN)

connector.class=io.axual.connect.plugins.amqp.source.AmqpSourceConnector
tasks.max=1

amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue2
amqp.auth.username=myuser
amqp.auth.password=mypassword

amqp.consumer.qos=AT_LEAST_ONCE
amqp.consumer.durable=true

topic=amqp.queue2
amqp.key.extractor=MESSAGE_ID,SUBJECT,UUID

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

validate.connection=false

OAuth2 — Client Secret Basic

connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1

amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-basic

oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=client_secret_basic
oauth.client.id=my-client
oauth.client.secret=super-secret
oauth.scope=read write

topic=amqp.oauth.basic
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

OAuth2 — Client Secret Post

connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1

amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-post

oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=client_secret_post
oauth.client.id=my-client
oauth.client.secret=super-secret
oauth.scope=read write

topic=amqp.oauth.post
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

OAuth2 — Private Key JWT with JWK JSON

connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1

amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-pkjwt

oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=private_key_jwt
oauth.client.id=my-service
oauth.private.key.jwk.json={"kty":"RSA","d":"...","n":"...","e":"AQAB","kid":"my-key"}
oauth.private.key.jws.algorithm=RS256
oauth.private.key.claims.issuer=my-service

topic=amqp.oauth.pkjwt.jwk
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

OAuth2 — Private Key JWT with Encrypted PEM

connector.class=io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector
tasks.max=1

amqp.host=broker.example.com
amqp.port=5672
amqp.consumer.source=queue-pkjwt-pem

oauth.token.endpoint=https://auth.example.com/oauth2/token
oauth.method=private_key_jwt
oauth.client.id=my-service
oauth.private.key.pem.content=-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIF...\n-----END ENCRYPTED PRIVATE KEY-----
oauth.private.key.pem.password=changeit
oauth.private.key.jws.algorithm=RS256
# Optional; defaults to oauth.client.id when not set
oauth.private.key.claims.issuer=my-service

topic=amqp.oauth.pkjwt.pem
amqp.key.extractor=MESSAGE_ID,SUBJECT,NULL

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Configuration reference

Core AMQP Source Connector

The following options are available for the Core connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector).

Property Description Type Default Valid values Importance

amqp.host

The hostname where the AMQP server can be reached.

string

non-empty string

high

amqp.port

The port number where the AMQP server listens.

int

5672

[1,…​]

high

amqp.tls.enabled

Enable TLS connection to the AMQP server.

boolean

false

high

amqp.auth.enabled

When set to false, no authentication is attempted. Default is true.

boolean

true

high

amqp.auth.username

Username for connecting to the AMQP server. Uses anonymous authentication if not set.

string

null

high

amqp.auth.password

Password for connecting to the AMQP server.

password

null

high

amqp.consumer.source

The AMQP queue or topic address to consume messages from.

string

non-empty string

high

amqp.consumer.qos

Quality of Service level: AT_MOST_ONCE (fire and forget), AT_LEAST_ONCE (guaranteed, may duplicate), EXACTLY_ONCE (requires broker support).

string

AT_LEAST_ONCE

AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE

medium

amqp.consumer.durable

Use a durable consumer. The AMQP link remains in place after disconnection so messages are not lost. Essential for persistent topic subscriptions.

boolean

false

high

amqp.consumer.link.name

Explicit link name. Overrides auto-generation. When multiple tasks share the same link name, messages are load-balanced between them.

string

null

low

amqp.consumer.link.name.prefix

Prefix for auto-generated link names. The actual link name will be {prefix}-{task-id}.

string

null

medium

amqp.consumer.link.credit

Number of messages prefetched and buffered locally before requiring acknowledgments.

int

100

[1,…​]

medium

amqp.consumer.receive.timeout

Maximum time in milliseconds to wait for a message when polling.

long

1000

[100,…​]

medium

amqp.consumer.close.timeout

Maximum time in milliseconds to wait for graceful consumer shutdown.

long

5000

[1000,…​]

low

amqp.consumer.incoming.window

Session-level incoming window size for flow control.

int

2048

[1,…​]

medium

amqp.consumer.outgoing.window

Session-level outgoing window size for flow control.

int

2048

[1,…​]

medium

amqp.consumer.auto.acknowledge

Automatically acknowledge messages after successful processing.

boolean

false

medium

amqp.consumer.reject.on.error

On processing error: if true, reject the message (no redelivery); if false, release it (can be redelivered).

boolean

false

medium

amqp.idle.timeout

Maximum idle time in milliseconds before the connection is considered dead.

long

2147483647

[1,…​]

medium

amqp.max.frame.size

Maximum size in bytes for individual AMQP frames.

long

2147483647

[1,…​]

medium

amqp.input.buffer.size

Network buffer size in bytes for receiving frames.

int

131072

[1,…​]

medium

amqp.input.buffer.extend.size

Extension size in bytes when the input buffer is insufficient to receive a frame.

int

65536

[1,…​]

medium

amqp.output.buffer.size

Network buffer size in bytes for sending frames.

int

131072

[1,…​]

medium

amqp.output.buffer.extend.size

Extension size in bytes when the output buffer is insufficient to send a frame.

int

65536

[1,…​]

medium

amqp.container.id.prefix

Prefix for auto-generated AMQP container IDs. The actual ID will be {prefix}-{task-id}.

string

null

medium

amqp.container.id

Explicit container ID. Overrides auto-generation.
WARNING: Using the same ID across multiple tasks may cause broker tracking issues.

string

null

low

amqp.key.extractor

Ordered list of key extraction strategies. Each is tried in turn until a key is found.
Options (case insensitive): MESSAGE_ID, CORRELATION_ID, SUBJECT, UUID, NULL.

list

MESSAGE_ID,SUBJECT,NULL

high

topic

The Kafka topic to write messages to.

string

non-empty string

high

validate.connection

Attempt a live broker connection during connector validation.

boolean

false

low

OAuth2 AMQP Source Connector

The OAuth2 connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) supports all of the same AMQP and consumer properties as the Core connector (see above), plus the following OAuth2-specific options:

Property Description Type Default Importance

oauth.token.endpoint

The URL for the access token endpoint.

string

high

oauth.method

Client authentication method: client_secret_basic, client_secret_post, or private_key_jwt.

string

client_secret_basic

high

oauth.client.id

The client identifier registered with the authorization server.

string

null

high

oauth.client.secret

The client secret (for client_secret_basic and client_secret_post methods).

password

null

high

oauth.scope

Space-separated list of requested permissions (e.g., read_users write_data).

string

null

high

oauth.private.key.jwk.json

Private key in JWK JSON format (for private_key_jwt method).

password

null

high

oauth.private.key.pem.content

Private key in PEM encoding (alternative to JWK).

password

null

high

oauth.private.key.pem.password

Password for an encrypted PEM private key.

password

null

high

oauth.private.key.jws.algorithm

JWS algorithm used to sign the client assertion. Default is RS256.

string

RS256

high

oauth.private.key.claims.issuer

JWT Issuer Claim (iss). Defaults to oauth.client.id if not set.

string

null

high

oauth.private.key.claims.audience

JWT Audience Claim (aud). Defaults to oauth.token.endpoint if not set.

string

null

high

oauth.private.key.claims.expirationSeconds

JWT Expiration Time Claim (exp) in seconds.

int

300

medium

oauth.private.key.claims.notBeforeSeconds

JWT Not Before Claim (nbf). Seconds added to the issued-at time. Not set by default.

int

null

medium

oauth.private.key.claims.issuedAt

When true, adds an Issued At (iat) claim to the JWT.

boolean

false

medium

oauth.private.key.claims.jwtId

When true, adds a random JWT Identifier (jti) claim.

boolean

false

medium

AMQP to Kafka conversion

The connector converts AMQP 1.0 messages to Kafka Connect records as follows:

  • Message body (payload) — always emitted as a String value using Schema.OPTIONAL_STRING_SCHEMA. AMQPNull produces a null record value.

  • Record key — extracted by the amqp.key.extractor chain using the same string formatting.

  • Message Properties — added as Kafka headers with the msg. prefix (e.g., msg.Subject, msg.ContentType, msg.CreationTime).

  • Application Properties — added as Kafka headers with the app. prefix. Container types (Array/List/Map) are skipped.

All Kafka Connect headers use Schema.OPTIONAL_BYTES_SCHEMA with UTF-8 encoded content.

Type-specific string rendering for body and header values:

AMQP type String representation

AMQPBinary

"binary : " + Base64

AMQPDecimal128

"decimal128 : " + Base64

AMQPByte / AMQPUnsignedByte

"0x" + 2-digit hex

AMQPTimestamp

Epoch milliseconds (long as string)

AMQPBoolean

"true" or "false" (lowercase)

AMQPChar

Single-character Unicode string

AMQPArray / AMQPList / AMQPMap

Not supported — results in a conversion error

Other primitive types

String.valueOf(value)

Known limitations

  • AMQP Data and AmqpSequence message body sections are not supported — messages using them result in a conversion error.

  • Application Properties values do not support complex Array/List/Map types — these are silently skipped.

Cleanup

Once you are done, stop the connector application and clean up the unused Axual resources.

License

AMQP 1.0 Source Connectors are licensed under the Apache License, Version 2.0.