MQTT source and sink connectors
MQTT
The MQTT Source Connector is a connector which connects to a MQTT broker and copies the information from the subscribed topics to Kafka. This is the Apache 2.0 Lenses Connector
If you already have a publicly accessible deployment of MQTT, you can already configure and install a connector using the Axual Self-Service UI. Use the configuration instructions below.
If you don’t have one available, follow the next section to deploy a publicly available instance of MQTT.
Deploying a MQTT instance
-
Sign up for a free trial of Hive MQTT Broker here.
-
After successful registration, a free cluster should be automatically created in your cluster overview. Note down the broker’s URL and port. we’ll use them later, as part of the
connect.mqtt.hosts
connector-config. -
Click "Manage Cluster" and then click the "Access Management" button in the top-right of the screen. Create a user and note down its credentials: we’ll use them later to connect to the cluster, as the
connect.mqtt.username
andconnect.mqtt.password
connector-config. -
We’ll use the HiveMQ WebSocket Client to create sample data for our MQTT cluster. Fill the following fields:
-
Host: The cluster URL you saw in the cluster overview page (which you noted down for the
connect.mqtt.hosts
config) -
Port:
8884
(this is a special port opened for this browser-client) -
ClientID: can use the default provided value
-
Username: value you just created
-
Password: value you just created
-
SSL: tick the box
-
-
Click "Connect".
-
Expand the "Subscriptions" section. Click "Add New Topic Subscription". In the pop-up window, edit only the "Topic" field:
my_mqtt_topic
, then click "Subscribe". -
Expand the "Publish" section. Use
my_mqtt_topic
as the topic name (without a trailing '/1') and write some text in the message box, then click "Publish". You should see the message appear in the "Messages" section below. -
Keep this page open. We’ll return later to produce more events, after configuring the Connector in the self-service portal.
Configuring a new source connector
-
Follow the 2022.3@axual::self-service/stream-management.html.adoc#creating-streams documentation in order to create one stream and deploy it onto an environment. The name of the stream will be
my_mqtt_kafka_topic
. The key/value types will beString
. -
Follow the Configuring Connector-Applications documentation to set up a new connector application.
Let’s call itmy_mqtt_app
.
The plugin name is "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector".
The values you will need to supply as configuration will be listed in this section.
For advanced configuration, see the official connector documentation.
Provide the following minimal configuration in order to connect to the previously configured MQTT instance.connect.mqtt.hosts
Format:
ssl://<MQTT-Broker-URL>:<MQTT-Broker-Port>
Example value:
ssl://mylonguuidfromhivemq.s1.eu.hivemq.cloud:8883
connect.mqtt.kcql
INSERT INTO my_mqtt_kafka_topic SELECT * FROM my_mqtt_topic
connect.mqtt.service.quality
1
connect.mqtt.username
Name of user you created
connect.mqtt.password
Password of user you created
-
Authorize the
my_mqtt_app
application to produce to themy_mqtt_kafka_topic
stream. -
You can now start the source and sink Connector-Application.
The connector we just created will not read events produced in the past. -
Return to the HiveMQ WebSocket Client tab and produce some more events.
You can now check the stream to see the events published by the connector. The values will be base64 encoded.
Cleanup
Once you are done, stop the connector application and cleanup the unused axual resources.
Don’t forget to return to HiveMQ’s cluster overview → Manage Clusters → and delete your cluster if you’re not using it anymore.
License
MQTT source and sink-connector is licensed under the Apache License, Version 2.0.
Source code
The source code for the connector can be found on github.com/lensesio/stream-reactor