MQTT Source Connector

Type

source and sink

Class

io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector

Target System

MQTT Broker

Maintainer

Lenses.io

License

Apache License 2.0

Project

github.com/lensesio/stream-reactor

Download

GitHub Releases

This page documents version 9.0.2. Newer versions should be compatible unless there are breaking changes, but field names or default values may differ. If you notice discrepancies, please contact Axual Support.

Description

The MQTT Source Connector subscribes to MQTT topics on an MQTT broker and publishes received messages as records to Kafka topics.

It is maintained by Lenses.io as part of the open-source Stream Reactor project github.com/lensesio/stream-reactor.

The connector uses KCQL (Kafka Query Language) to map MQTT topics to Kafka topics.

Compatible MQTT brokers include: HiveMQ, Eclipse Mosquitto, EMQX, VerneMQ, AWS IoT Core, Azure IoT Hub, and any broker implementing MQTT 3.1 or 3.1.1.

Features

  • Subscribe to MQTT broker topics and publish messages to Kafka

  • KCQL-based topic mapping between MQTT and Kafka topics

  • Configurable MQTT Quality of Service (QoS 0, 1, or 2)

  • TLS/SSL connectivity via SSL URI scheme

  • Compatible with any MQTT 3.1/3.1.1 broker

When to Use

  • You need to ingest IoT sensor data or device telemetry from an MQTT broker into Kafka.

  • You are bridging an edge messaging layer (MQTT) with a stream processing backbone (Kafka).

  • Your devices or gateways already publish to an MQTT broker and you want to process that data at scale.

When NOT to Use

  • You need to write Kafka records back to MQTT — a sink direction is not covered in this guide.

  • Your source system uses a different messaging protocol — consider the JMS or AMQP connectors instead.

Installation

The connector is available from the GitHub Releases page.

  1. Navigate to the releases page and select the version matching your Kafka Connect installation.

  2. Download the JAR file.

For installation steps, see Installing Connector Plugins.

Configuration

For the complete configuration reference, see the official source connector documentation.

To configure a connector in Axual Self-Service, see Starting Connectors. TIP: For Infrastructure-as-Code deployment, see the Axual Kafka Connect Boilerplates for Terraform and Management API boilerplates.

Getting Started

Prerequisites

MQTT broker

  1. Sign up for a free trial of HiveMQ MQTT Broker.

  2. After registration, a free cluster will be created in your cluster overview.
    Note down the broker URL and port for the connect.mqtt.hosts config.

  3. Click Manage ClusterAccess Management.
    Create a user and note down the credentials for connect.mqtt.username and connect.mqtt.password.

  4. Open the HiveMQ WebSocket Client and connect:

    • Host: The cluster URL

    • Port: 8884

    • Username and Password: the credentials you just created

    • SSL: tick the box

    • Click Connect.

  5. Expand SubscriptionsAdd New Topic Subscription → Topic: my_mqtt_topicSubscribe.

  6. Expand Publish → Topic: my_mqtt_topic → write a message → Publish.
    You should see the message in the Messages section.

Axual stream

The stream where the connector will produce events must already exist in Axual Self-Service. See Creating streams if you need to create it.

Steps

Step 1 — Create a connector application

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be my_mqtt_kafka_topic.
    The key/value types will be String/String.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_mqtt_app.
    The plugin name is io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

Step 2 — Configure the connector

  1. Provide the following minimal configuration:

    connect.mqtt.hosts

    Example value:
    ssl://mylonguuidfromhivemq.s1.eu.hivemq.cloud:8883

    connect.mqtt.kcql

    INSERT INTO my_mqtt_kafka_topic SELECT * FROM my_mqtt_topic

    connect.mqtt.service.quality

    1

    connect.mqtt.username

    Name of user you created

    connect.mqtt.password

    Password of user you created

    For advanced options, see the official source connector documentation.

  2. Authorize the my_mqtt_app source Connector-Application to produce to the my_mqtt_kafka_topic stream.

Step 3 — Start the connector

Start the connector application from Axual Self-Service.
The connector will not read events produced before it started.

Step 4 — Verify

Return to the HiveMQ WebSocket Client tab and produce some more events.

In Axual Self-Service, use stream-browse on my_mqtt_kafka_topic to confirm MQTT messages are arriving. Note: the values will be base64 encoded.

Cleanup

When you are done:

  1. Stop the connector application in Axual Self-Service.

  2. Remove stream access for the application if no longer needed.

  3. Return to HiveMQ’s cluster overviewManage Clusters → delete your cluster.

Known limitations

  • The connector does not read events produced to the MQTT broker before it started.

  • MQTT message values are delivered to Kafka as base64-encoded strings.

Examples

Minimal configuration

{
  "name": "my-mqtt-source",
  "config": {
    "connector.class": "io.lenses.streamreactor.connect.mqtt.source.MqttSourceConnector",
    "connect.mqtt.hosts": "ssl://mylonguuidfromhivemq.s1.eu.hivemq.cloud:8883",
    "connect.mqtt.kcql": "INSERT INTO my_mqtt_kafka_topic SELECT * FROM my_mqtt_topic",
    "connect.mqtt.service.quality": "1",
    "connect.mqtt.username": "mqtt_user",
    "connect.mqtt.password": "Mqtt@Secure2024"
  }
}

License

MQTT source connector is licensed under the Apache License, Version 2.0.