MQTT Source Connector
Type |
source and sink |
Class |
|
Target System |
MQTT Broker |
Maintainer |
Lenses.io |
License |
Apache License 2.0 |
Project |
|
Download |
|
This page documents version 9.0.2. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support. |
Description
The MQTT Source Connector subscribes to MQTT topics on an MQTT broker and publishes received messages as records to Kafka topics.
It is maintained by Lenses.io as part of the open-source Stream Reactor project github.com/lensesio/stream-reactor.
The connector uses KCQL (Kafka Query Language) to map MQTT topics to Kafka topics.
Compatible MQTT brokers include: HiveMQ, Eclipse Mosquitto, EMQX, VerneMQ, AWS IoT Core, Azure IoT Hub, and any broker implementing MQTT 3.1 or 3.1.1.
Features
-
Subscribe to MQTT broker topics and publish messages to Kafka
-
KCQL-based topic mapping between MQTT and Kafka topics
-
Configurable MQTT Quality of Service (QoS 0, 1, or 2)
-
TLS/SSL connectivity via SSL URI scheme
-
Compatible with any MQTT 3.1/3.1.1 broker
When to Use
-
You need to ingest IoT sensor data or device telemetry from an MQTT broker into Kafka.
-
You are bridging an edge messaging layer (MQTT) with a stream processing backbone (Kafka).
-
Your devices or gateways already publish to an MQTT broker and you want to process that data at scale.
When NOT to Use
-
You need to write Kafka records back to MQTT — a sink direction is not covered in this guide.
-
Your source system uses a different messaging protocol — consider the JMS or AMQP connectors instead.
Installation
The connector is available from the GitHub Releases page.
-
Navigate to the releases page and select the version matching your Kafka Connect installation.
-
Download the JAR file.
For installation steps, see Installing Connector Plugins.
Configuration
For the complete configuration reference, see the official source connector documentation.
| To configure a connector in Axual Self-Service, see Starting Connectors. TIP: For Infrastructure-as-Code deployment, see the Axual Kafka Connect Boilerplates for Terraform and Management API boilerplates. |
Getting Started
Prerequisites
MQTT broker
-
Sign up for a free trial of HiveMQ MQTT Broker.
-
After registration, a free cluster will be created in your cluster overview.
Note down the broker URL and port for theconnect.mqtt.hostsconfig. -
Click Manage Cluster → Access Management.
Create a user and note down the credentials forconnect.mqtt.usernameandconnect.mqtt.password. -
Open the HiveMQ WebSocket Client and connect:
-
Host: The cluster URL
-
Port:
8884 -
Username and Password: the credentials you just created
-
SSL: tick the box
-
Click Connect.
-
-
Expand Subscriptions → Add New Topic Subscription → Topic:
my_mqtt_topic→ Subscribe. -
Expand Publish → Topic:
my_mqtt_topic→ write a message → Publish.
You should see the message in the Messages section.
Axual stream
The stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.
Steps
Step 1 — Create a connector application
-
Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
The name of the stream will bemy_mqtt_kafka_topic.
The key/value types will beString/String. -
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy_mqtt_app.
The plugin name isio.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector.
If a plugin isn’t available, ask a platform operator to install plugins.
Step 2 — Configure the connector
-
Provide the following minimal configuration:
connect.mqtt.hostsExample value:
ssl://mylonguuidfromhivemq.s1.eu.hivemq.cloud:8883connect.mqtt.kcqlINSERT INTO my_mqtt_kafka_topic SELECT * FROM my_mqtt_topicconnect.mqtt.service.quality1connect.mqtt.usernameName of user you created
connect.mqtt.passwordPassword of user you created
For advanced options, see the official source connector documentation.
-
Authorize the
my_mqtt_appsource Connector-Application to produce to themy_mqtt_kafka_topicstream.
Step 3 — Start the connector
Start the connector application from Axual Self-Service.
The connector will not read events produced before it started.
Step 4 — Verify
Return to the HiveMQ WebSocket Client tab and produce some more events.
In Axual Self-Service, use stream-browse on my_mqtt_kafka_topic to confirm MQTT messages are arriving.
Note: the values will be base64 encoded.
Cleanup
When you are done:
-
Stop the connector application in Axual Self-Service.
-
Remove stream access for the application if no longer needed.
-
Return to HiveMQ’s cluster overview → Manage Clusters → delete your cluster.
Known limitations
-
The connector does not read events produced to the MQTT broker before it started.
-
MQTT message values are delivered to Kafka as base64-encoded strings.
Examples
Minimal configuration
{
"name": "my-mqtt-source",
"config": {
"connector.class": "io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.hosts": "ssl://mylonguuidfromhivemq.s1.eu.hivemq.cloud:8883",
"connect.mqtt.kcql": "INSERT INTO my_mqtt_kafka_topic SELECT * FROM my_mqtt_topic",
"connect.mqtt.service.quality": "1",
"connect.mqtt.username": "mqtt_user",
"connect.mqtt.password": "Mqtt@Secure2024"
}
}
License
MQTT source connector is licensed under the Apache License, Version 2.0.