MQTT source and sink-connector
MQTT
The MQTT Source Connector is a connector which connects to a MQTT broker and copies the information from the subscribed topics to Kafka. This is the Apache 2.0 Lenses Connector
If you already have a publicly accessible deployment of MQTT, you can already configure and install a connector using the Axual Self-Service UI. Use the configuration instructions below.
If you don’t have one available, follow the next section to deploy a publicly available instance of MQTT.
Deploying a MQTT instance
-
Sign up for a free trial of Hive MQTT Broker here.
-
After successful registration, a free cluster should be automatically created in your cluster overview. Note down the broker’s URL and port. we’ll use them later, as part of the
connect.mqtt.hosts
connector-config. -
Click "Manage Cluster" and then click the "Access Management" button in the top-right of the screen. Create a user and note down its credentials: we’ll use them later to connect to the cluster, as the
connect.mqtt.username
andconnect.mqtt.password
connector-config. -
We’ll use the HiveMQ WebSocket Client to create sample data for our MQTT cluster. Fill the following fields:
-
Host: The cluster URL you saw in the cluster overview page (which you noted down for the
connect.mqtt.hosts
config) -
Port:
8884
(this is a special port opened for this browser-client) -
ClientID: can use the default provided value
-
Username: value you just created
-
Password: value you just created
-
SSL: tick the box
-
-
Click "Connect".
-
Expand the "Subscriptions" section. Click "Add New Topic Subscription". In the pop-up window, edit only the "Topic" field:
my_mqtt_topic
, then click "Subscribe". -
Expand the "Publish" section. Use
my_mqtt_topic
as the topic name (without a trailing '/1') and write some text in the message box, then click "Publish". You should see the message appear in the "Messages" section below. -
Keep this page open. We’ll return later to produce more events, after configuring the Connector in the self-service portal.
Configuring a new source connector
Follow the 2022.1@axual::self-service/stream-management.html.adoc#creating-streams documentation in order to create one stream and deploy it onto an environment. The name of the stream will also be my_mqtt_topic
. The key/value types will be String
.
Follow the Configuring Connector-Applications documentation to set up a new connector application. Let’s call it my_mqtt_app
. The plugin name is "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector".The values you will need to supply as configuration will be listed in this section.
For advanced configuration, see the official connector documentation.
Provide the following minimal configuration in order to connect to the previously configured MQTT instance.
Format:
|
Example value:
|
|
INSERT INTO my_mqtt_topic SELECT * FROM my_mqtt_topic |
|
|
|
Name of user you created |
|
Password of user you created |
You can now start the source and sink Connector-Application.
The connector we just created will not read events produced in the past. Return to the HiveMQ WebSocket Client tab and produce some more events.
You can now check the stream to see the events published by the connector. The values will be base64 encoded.
Cleanup
Once you are done, stop the connector application and cleanup the unused axual resources.
Don’t forget to return to the cluster overview → Manage Clusters → and delete your cluster if you’re not using it anymore.
License
MQTT source and sink-connector is licensed under the Apache License, Version 2.0.
Source code
The source code for the connector can be found on github.com/lensesio/stream-reactor