AMQP 1.0 Source Connectors

Type

Source

Core Class

io.axual.connect.plugins.amqp.source.AmqpSourceConnector

OAuth2 Class

io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector

Target System

Messaging & Streaming (AMQP 1.0)

Maintainer

Axual

License

Proprietary (client-only)

Project

Source Code

Download

Contact Axual Support to obtain the connector library.

Description

The AMQP 1.0 Source Connectors read messages from AMQP 1.0 queues or topics and publish them as records to Kafka. They are tested with ActiveMQ Artemis and RabbitMQ (with AMQP 1.0 plugin).

Two connector variants are available:

  • Core AMQP Source Connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector) — supports anonymous or username/password (PLAIN) authentication.

  • OAuth2 AMQP Source Connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) — supports token-based OAuth2 authentication.

Both connectors share the same AMQP consumption behaviour (queue/topic address, QoS, link durability, etc.). The OAuth2 variant replaces basic authentication with token acquisition and SASL/OAuth2.

Connector variants

Select a connector based on the authentication offered by your AMQP broker or gateway:

Authentication scenario Connector to use

No authentication (anonymous)

Core connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector).
Do not configure amqp.auth.username or amqp.auth.password.

Username and password (SASL PLAIN)

Core connector. Set amqp.auth.username and amqp.auth.password.

OAuth2 (client credentials or private key JWT)

OAuth2 connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector). Configure oauth.* properties.
Do not set amqp.auth.password.

OAuth2 method selection (oauth.method):

Method Description

client_secret_basic

Client ID and secret sent via HTTP Basic.

client_secret_post

Client ID and secret sent in the token request body.

private_key_jwt

Signed client assertion using a private key.
Provide the key as JWK JSON (oauth.private.key.jwk.json) or PEM (oauth.private.key.pem.content).

Features

  • Read messages from AMQP 1.0 queues or topics and publish them as records to Kafka

  • Two variants: Core (anonymous/PLAIN) and OAuth2 (client credentials or private key JWT)

  • Configurable Quality of Service: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE

  • Durable consumer support — AMQP link persists across disconnections so no messages are lost on restart

  • Flexible key extraction chain: MESSAGE_ID, CORRELATION_ID, SUBJECT, UUID, or NULL

  • AMQP message properties forwarded as Kafka headers with msg. prefix

  • AMQP application properties forwarded as Kafka headers with app. prefix

  • Tested with ActiveMQ Artemis and RabbitMQ (AMQP 1.0 plugin)

When to Use

  • You need to bridge an AMQP 1.0 broker (RabbitMQ, ActiveMQ Artemis) to Kafka.

  • Your broker requires OAuth2 token-based authentication.

  • You need durable subscriptions to ensure no messages are lost across connector restarts.

  • You want configurable QoS levels for message delivery guarantees.

When NOT to Use

  • Your broker does not support AMQP 1.0 — AMQP 0.9.x (classic RabbitMQ default) requires a different connector.

  • You need schema-aware (structured) Kafka records — this connector always emits plain string keys and values.

  • You need to transform or enrich messages before publishing to Kafka — use Kafka Streams or ksqlDB after ingestion instead.

Installation

The AMQP 1.0 Source Connectors are maintained in the Axual public repository.

Contact the Axual Support team to obtain the connector library.

For installation steps, see Installing Connector Plugins.

Configuration

Core AMQP Source Connector

The following options are available for the Core connector (io.axual.connect.plugins.amqp.source.AmqpSourceConnector).

Property Description Type Default Valid values Importance

amqp.host

The hostname where the AMQP server can be reached.

string

non-empty string

high

amqp.port

The port number where the AMQP server listens.

int

5672

[1,…​]

high

amqp.tls.enabled

Enable TLS connection to the AMQP server.

boolean

false

high

amqp.auth.enabled

When set to false, no authentication is attempted. Default is true.

boolean

true

high

amqp.auth.username

Username for connecting to the AMQP server. Uses anonymous authentication if not set.

string

null

high

amqp.auth.password

Password for connecting to the AMQP server.

password

null

high

amqp.consumer.source

The AMQP queue or topic address to consume messages from.

string

non-empty string

high

amqp.consumer.qos

Quality of Service level: AT_MOST_ONCE (fire and forget), AT_LEAST_ONCE (guaranteed, may duplicate), EXACTLY_ONCE (requires broker support).

string

AT_LEAST_ONCE

AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE

medium

amqp.consumer.durable

Use a durable consumer. The AMQP link remains in place after disconnection so messages are not lost. Essential for persistent topic subscriptions.

boolean

false

high

amqp.consumer.link.name

Explicit link name. Overrides auto-generation. When multiple tasks share the same link name, messages are load-balanced between them.

string

null

low

amqp.consumer.link.name.prefix

Prefix for auto-generated link names. The actual link name will be {prefix}-{task-id}.

string

null

medium

amqp.consumer.link.credit

Number of messages prefetched and buffered locally before requiring acknowledgments.

int

100

[1,…​]

medium

amqp.consumer.receive.timeout

Maximum time in milliseconds to wait for a message when polling.

long

1000

[100,…​]

medium

amqp.consumer.close.timeout

Maximum time in milliseconds to wait for graceful consumer shutdown.

long

5000

[1000,…​]

low

amqp.consumer.incoming.window

Session-level incoming window size for flow control.

int

2048

[1,…​]

medium

amqp.consumer.outgoing.window

Session-level outgoing window size for flow control.

int

2048

[1,…​]

medium

amqp.consumer.auto.acknowledge

Automatically acknowledge messages after successful processing.

boolean

false

medium

amqp.consumer.reject.on.error

On processing error: if true, reject the message (no redelivery); if false, release it (can be redelivered).

boolean

false

medium

amqp.idle.timeout

Maximum idle time in milliseconds before the connection is considered dead.

long

2147483647

[1,…​]

medium

amqp.max.frame.size

Maximum size in bytes for individual AMQP frames.

long

2147483647

[1,…​]

medium

amqp.input.buffer.size

Network buffer size in bytes for receiving frames.

int

131072

[1,…​]

medium

amqp.input.buffer.extend.size

Extension size in bytes when the input buffer is insufficient to receive a frame.

int

65536

[1,…​]

medium

amqp.output.buffer.size

Network buffer size in bytes for sending frames.

int

131072

[1,…​]

medium

amqp.output.buffer.extend.size

Extension size in bytes when the output buffer is insufficient to send a frame.

int

65536

[1,…​]

medium

amqp.container.id.prefix

Prefix for auto-generated AMQP container IDs. The actual ID will be {prefix}-{task-id}.

string

null

medium

amqp.container.id

Explicit container ID. Overrides auto-generation.
WARNING: Using the same ID across multiple tasks may cause broker tracking issues.

string

null

low

amqp.key.extractor

Ordered list of key extraction strategies. Each is tried in turn until a key is found.
Options (case insensitive): MESSAGE_ID, CORRELATION_ID, SUBJECT, UUID, NULL.

list

MESSAGE_ID,SUBJECT,NULL

high

topic

The Kafka topic to write messages to.

string

non-empty string

high

validate.connection

Attempt a live broker connection during connector validation.

boolean

false

low

OAuth2 AMQP Source Connector

The OAuth2 connector (io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector) supports all of the same AMQP and consumer properties as the Core connector (see above), plus the following OAuth2-specific options:

Property Description Type Default Importance

oauth.token.endpoint

The URL for the access token endpoint.

string

high

oauth.method

Client authentication method: client_secret_basic, client_secret_post, or private_key_jwt.

string

client_secret_basic

high

oauth.client.id

The client identifier registered with the authorization server.

string

null

high

oauth.client.secret

The client secret (for client_secret_basic and client_secret_post methods).

password

null

high

oauth.scope

Space-separated list of requested permissions (e.g., read_users write_data).

string

null

high

oauth.private.key.jwk.json

Private key in JWK JSON format (for private_key_jwt method).

password

null

high

oauth.private.key.pem.content

Private key in PEM encoding (alternative to JWK).

password

null

high

oauth.private.key.pem.password

Password for an encrypted PEM private key.

password

null

high

oauth.private.key.jws.algorithm

JWS algorithm used to sign the client assertion. Default is RS256.

string

RS256

high

oauth.private.key.claims.issuer

JWT Issuer Claim (iss). Defaults to oauth.client.id if not set.

string

null

high

oauth.private.key.claims.audience

JWT Audience Claim (aud). Defaults to oauth.token.endpoint if not set.

string

null

high

oauth.private.key.claims.expirationSeconds

JWT Expiration Time Claim (exp) in seconds.

int

300

medium

oauth.private.key.claims.notBeforeSeconds

JWT Not Before Claim (nbf). Seconds added to the issued-at time. Not set by default.

int

null

medium

oauth.private.key.claims.issuedAt

When true, adds an Issued At (iat) claim to the JWT.

boolean

false

medium

oauth.private.key.claims.jwtId

When true, adds a random JWT Identifier (jti) claim.

boolean

false

medium

Data conversion

The connector converts AMQP 1.0 messages to Kafka Connect records as follows:

  • Message body (payload) — always emitted as a String value using Schema.OPTIONAL_STRING_SCHEMA. AMQPNull produces a null record value.

  • Record key — extracted by the amqp.key.extractor chain using the same string formatting.

  • Message Properties — added as Kafka headers with the msg. prefix (e.g., msg.Subject, msg.ContentType, msg.CreationTime).

  • Application Properties — added as Kafka headers with the app. prefix. Container types (Array/List/Map) are skipped.

All Kafka Connect headers use Schema.OPTIONAL_BYTES_SCHEMA with UTF-8 encoded content.

Type-specific string rendering for body and header values:

AMQP type String representation

AMQPBinary

"binary : " + Base64

AMQPDecimal128

"decimal128 : " + Base64

AMQPByte / AMQPUnsignedByte

"0x" + 2-digit hex

AMQPTimestamp

Epoch milliseconds (long as string)

AMQPBoolean

"true" or "false" (lowercase)

AMQPChar

Single-character Unicode string

AMQPArray / AMQPList / AMQPMap

Not supported — results in a conversion error

Other primitive types

String.valueOf(value)

Getting Started

This section walks you through configuring the AMQP 1.0 Source Connector on Axual to stream messages from an AMQP broker into a local Kafka stream.

Prerequisites

AMQP broker

  • You have access to an AMQP 1.0 broker (e.g., RabbitMQ or ActiveMQ Artemis) reachable from the Kafka Connect cluster.

  • You have the broker hostname, port, and credentials available.

  • For OAuth2 authentication: you have a valid OAuth2 token endpoint URL, client ID, and client secret (or private key).

Axual stream

The local stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_amqp_source_topic.
    The key/value types will be String/String.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_amqp_source.
    The plugin name is {plugin-name}.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

Always set key.converter, value.converter, and header.converter explicitly. When left unset, Kafka Connect falls back to the worker-level defaults, which vary between installations and may produce unexpected serialisation (e.g., wrapping plain strings in a JSON schema envelope). The AMQP 1.0 connector always emits keys and values as plain strings and headers as raw bytes, so the correct converters are StringConverter for keys and values, and ByteArrayConverter for headers.

  1. Provide the following minimal configuration to connect to your AMQP broker using the Core connector (anonymous or username/password). For OAuth2 authentication, see the OAuth2 configuration section below.

    connector.class

    io.axual.connect.plugins.amqp.source.AmqpSourceConnector

    amqp.host

    Hostname of the AMQP broker
    broker.example.com

    amqp.port

    5672
    Default AMQP port. Use 5671 for TLS.

    amqp.consumer.source

    The queue or topic address to consume from
    my.queue

    amqp.auth.username

    Username for SASL PLAIN authentication. Omit for anonymous access.

    amqp.auth.password

    Password for SASL PLAIN authentication. Omit for anonymous access.

    amqp.consumer.qos

    AT_LEAST_ONCE
    Quality of Service level. Options: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE.

    topic

    my_amqp_source_topic
    The Kafka topic to write messages to.

    amqp.key.extractor

    MESSAGE_ID,SUBJECT,NULL
    Key extraction chain; tries each extractor in order.

    key.converter

    org.apache.kafka.connect.storage.StringConverter

    value.converter

    org.apache.kafka.connect.storage.StringConverter

    header.converter

    org.apache.kafka.connect.converters.ByteArrayConverter

OAuth2 configuration

When using the OAuth2 connector, replace connector.class with io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector and add the following properties:

oauth.token.endpoint

Access token endpoint URL
https://auth.example.com/oauth2/token

oauth.method

client_secret_basic
One of: client_secret_basic, client_secret_post, private_key_jwt.

oauth.client.id

Your OAuth2 client ID

oauth.client.secret

Your OAuth2 client secret (for client_secret* methods)_

oauth.scope

Optional space-separated list of requested scopes
read write

  1. Authorize the my_amqp_source source Connector-Application to produce to the my_amqp_source_topic stream.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.

Step 4 — Verify

In Axual Self-Service, use stream-browse on my_amqp_source_topic to confirm messages from the AMQP broker are arriving.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if it is no longer needed.

Examples

The following end-to-end examples show complete configurations for common authentication modes.

Anonymous (no authentication)

{
  "name": "my-amqp-source",
  "config": {
    "connector.class": "io.axual.connect.plugins.amqp.source.AmqpSourceConnector",
    "tasks.max": "1",
    "amqp.host": "broker.example.com",
    "amqp.port": "5672",
    "amqp.consumer.source": "queue1",
    "amqp.consumer.qos": "AT_LEAST_ONCE",
    "amqp.consumer.durable": "true",
    "topic": "amqp.queue1",
    "amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "validate.connection": "false"
  }
}

Username and password (PLAIN)

{
  "name": "my-amqp-source-plain",
  "config": {
    "connector.class": "io.axual.connect.plugins.amqp.source.AmqpSourceConnector",
    "tasks.max": "1",
    "amqp.host": "broker.example.com",
    "amqp.port": "5672",
    "amqp.consumer.source": "queue2",
    "amqp.auth.username": "amqp_user",
    "amqp.auth.password": "Amqp@Password2024",
    "amqp.consumer.qos": "AT_LEAST_ONCE",
    "amqp.consumer.durable": "true",
    "topic": "amqp.queue2",
    "amqp.key.extractor": "MESSAGE_ID,SUBJECT,UUID",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "validate.connection": "false"
  }
}

OAuth2 — Client Secret Basic

{
  "name": "my-amqp-source-oauth2-basic",
  "config": {
    "connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
    "tasks.max": "1",
    "amqp.host": "broker.example.com",
    "amqp.port": "5672",
    "amqp.consumer.source": "queue-basic",
    "oauth.token.endpoint": "https://auth.example.com/oauth2/token",
    "oauth.method": "client_secret_basic",
    "oauth.client.id": "my-client",
    "oauth.client.secret": "Cl13nt$ecret2024",
    "oauth.scope": "read write",
    "topic": "amqp.oauth.basic",
    "amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

OAuth2 — Client Secret Post

{
  "name": "my-amqp-source-oauth2-post",
  "config": {
    "connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
    "tasks.max": "1",
    "amqp.host": "broker.example.com",
    "amqp.port": "5672",
    "amqp.consumer.source": "queue-post",
    "oauth.token.endpoint": "https://auth.example.com/oauth2/token",
    "oauth.method": "client_secret_post",
    "oauth.client.id": "my-client",
    "oauth.client.secret": "Cl13nt$ecret2024",
    "oauth.scope": "read write",
    "topic": "amqp.oauth.post",
    "amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

OAuth2 — Private Key JWT with JWK JSON

{
  "name": "my-amqp-source-oauth2-pkjwt-jwk",
  "config": {
    "connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
    "tasks.max": "1",
    "amqp.host": "broker.example.com",
    "amqp.port": "5672",
    "amqp.consumer.source": "queue-pkjwt",
    "oauth.token.endpoint": "https://auth.example.com/oauth2/token",
    "oauth.method": "private_key_jwt",
    "oauth.client.id": "my-service",
    "oauth.private.key.jwk.json": "{\"kty\":\"RSA\",\"d\":\"...\",\"n\":\"...\",\"e\":\"AQAB\",\"kid\":\"my-key\"}",
    "oauth.private.key.jws.algorithm": "RS256",
    "oauth.private.key.claims.issuer": "my-service",
    "topic": "amqp.oauth.pkjwt.jwk",
    "amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

OAuth2 — Private Key JWT with Encrypted PEM

{
  "name": "my-amqp-source-oauth2-pkjwt-pem",
  "config": {
    "connector.class": "io.axual.connect.plugins.amqp.oauth2.source.OAuth2AmqpSourceConnector",
    "tasks.max": "1",
    "amqp.host": "broker.example.com",
    "amqp.port": "5672",
    "amqp.consumer.source": "queue-pkjwt-pem",
    "oauth.token.endpoint": "https://auth.example.com/oauth2/token",
    "oauth.method": "private_key_jwt",
    "oauth.client.id": "my-service",
    "oauth.private.key.pem.content": "-----BEGIN ENCRYPTED PRIVATE KEY-----\nMIIF...\n-----END ENCRYPTED PRIVATE KEY-----",
    "oauth.private.key.pem.password": "changeit",
    "oauth.private.key.jws.algorithm": "RS256",
    "oauth.private.key.claims.issuer": "my-service",
    "topic": "amqp.oauth.pkjwt.pem",
    "amqp.key.extractor": "MESSAGE_ID,SUBJECT,NULL",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

Known limitations

  • AMQP Data and AmqpSequence message body sections are not supported — messages using them result in a conversion error.

  • Application Properties values do not support complex Array/List/Map types — these are silently skipped.

  • The connector always emits string keys and values — schema-aware converters (e.g., Avro, Protobuf) cannot be used.

  • EXACTLY_ONCE QoS requires broker-side support and is not guaranteed on all brokers.

License

This connector is licensed under a proprietary client-only license. It is provided exclusively to authorized clients and may not be redistributed, modified, or used outside of the terms agreed with Axual.