MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe network protocol designed for machine-to-machine (M2M) and Internet of Things (IoT) communication. The protocol is ideal for connecting devices with limited resources or on unreliable networks due to its minimal data overhead and efficient distribution of messages.
MQTT was originally developed by IBM in the late 1990s and has since become an open standard widely adopted across various industries. The protocol operates on top of TCP/IP and is designed to be straightforward, efficient, and easy to implement. MQTT has gone through several iterations, with the most notable versions being MQTT 3.1.1 and MQTT 5.0. The MQTT standard is maintained by OASIS (Organization for the Advancement of Structured Information Standards).
Common use cases for MQTT include:
- Remote monitoring and control of devices
- Sensor data collection and telemetry
- Home automation and smart home applications
- Industrial automation and control systems
This blog post explores the key features of MQTT and demonstrates its usage in Go and Java applications.
Terminology ¶
MQTT employs a publish/subscribe model, which decouples the message sender (publisher) from the message receiver (subscriber) with the help of a broker. Key concepts and terminology associated with MQTT include:
Broker: The central server or hub in the MQTT network. The broker receives all messages from publishers and routes them to the appropriate subscribers, acting as a post office for messages.
Client: Any device that connects to the broker. Publishers and subscribers are both considered clients, while the broker functions as a server.
Publisher: A client that sends messages to the broker. For example, a temperature sensor publishing temperature readings.
Subscriber: A client that receives messages from the broker. For example, a mobile app subscribing to temperature readings for display.
Topic: A string (e.g., home/livingroom/temperature) that acts as a label for messages. Publishers send messages to a specific topic, and subscribers subscribe to topics of interest to receive those messages.
Message: The actual data being sent, such as a temperature reading or a command.
An MQTT system can consist of multiple publishers and subscribers, each able to publish and subscribe to multiple topics.
When multiple subscribers subscribe to the same topic, all receive the same messages published to that topic. This differs from traditional message queue systems, where messages are typically consumed by only one subscriber.
In MQTT, the publisher and subscriber do not need to know about each other, nor do they need to be running or online at the same time.
Architecture ¶
The process is straightforward: a publisher sends messages on a specific topic to the broker. The broker then forwards these messages to all subscribers who have registered their interest in that topic. The publisher doesn't need to know who or where the subscribers are, and vice versa.
In this example, the mobile app receives humidity updates while the thermostat control system receives temperature updates because they have subscribed to the relevant topics.
Payload ¶
Over MQTT, any kind of data can be sent as a message payload. The payload is a binary blob, so text, JSON, images, or any other data format can be sent. The broker does not interpret the payload; it simply forwards it to subscribers. The MQTT protocol imposes a maximum payload size of 268,435,455 bytes (256 MB). This is the maximum size of the entire MQTT packet, including the header and payload. A broker may impose a lower limit on the maximum message size, so it is important to check the documentation of the broker in use.
For the demo applications, Protocol Buffers (protobuf) will be used to encode the payload. MQTT itself uses a binary format for sending the message, so it makes sense to use a payload format that is also compact. This is especially important when sending MQTT messages over a network with limited bandwidth.
The demo applications will use this protobuf schema for the message payload.
message SensorReading {
float value = 1;
}
Topic ¶
Topics in MQTT are used to route messages between publishers and subscribers. They provide a way to organize and categorize messages, allowing subscribers to filter the messages they receive based on their interests.
Topics are created on the fly in the broker when a publisher sends a message to a new topic, or a subscriber subscribes to a new topic. There is no need to predefine topics in the broker. This allows for a flexible and dynamic messaging system where topics can be created and used as needed.
An MQTT topic is a string that can contain multiple levels separated by slashes (/), similar to a file path. For example, a topic could be home/living_room/temperature
or sensors/device_id_1234/data/temperature
.
According to the MQTT specification, a topic can be up to 65,535 bytes in length, but a broker can, and most often does, implement a lower limit. For example, AWS IoT Core restricts topics to a maximum length of 256 bytes and seven slashes. EMQX follows the specification and allows topics up to 65kB in length. But in practice, you should not use excessively long topics. Also, be aware that the topic is part of the message header and increases the overall message size.
Best practices for MQTT topic naming include:
- Use a hierarchical structure to organize topics logically.
- Use descriptive and concise names that indicate the message's content and source.
- Avoid using spaces and non-ASCII characters in topics.
- Topics are case-sensitive, so
sensor/temp
andSensor/Temp
are different topics. - Avoid leading or trailing slashes in topics.
- Limit the number of topic levels to avoid overly complex structures.
- Avoid using topics that start with
$
, as these are reserved for broker-specific information. - Use
_
or-
to connect words (or use camelCase) within a topic level, such ashome/living_room/temperature
orhome/livingRoom/temperature
. - Consider modeling your message schema instead of relying heavily on wildcard topics for filtering.
Identifying the Publisher ¶
In MQTT, subscribers often need to know which publisher sent a message. This is important for understanding the context of the message. There are two common patterns for identifying the publisher of a message in MQTT.
Using the Topic ¶
The topic can be structured to include the unique identifier of the publisher as part of the topic itself. This way, the topic itself provides context about the source of the message.
Example: Instead of publishing to a generic topic like temperature/data, you would use a topic like:
-
sensors/living_room/temperature
-
sensors/bedroom/temperature
-
home/ground_floor/living_room/temperature
-
home/ground_floor/bedroom/temperature
-
device/id_1234/data/temperature
-
device/id_5678/data/temperature
A subscriber does not need to know the identifier of the publisher because MQTT supports wildcard subscriptions. This allows a subscriber to subscribe to multiple topics without needing to know the exact topics in advance.
For example, a subscriber could subscribe to the topic sensors/#
to receive messages from all sensors in the home. Or it could subscribe to sensors/+/temperature
to receive temperature data from all sensors, regardless of their specific location or ID.
MQTT supports two types of wildcards for subscriptions:
-
Single-level wildcard (
+
): Matches exactly one topic level. For example, subscribing tosensors/+/temperature
would matchsensors/living_room/temperature
andsensors/bedroom/temperature
, but notsensors/living_room/humidity
. -
Multi-level wildcard (
#
): Matches zero or more topic levels. For example, subscribing tosensors/#
would match all topics that start withsensors/
, includingsensors/living_room/temperature
,sensors/bedroom/humidity
, and evensensors/living_room/humidity/data
. The multi-level wildcard must be the last character in the topic, and it must be preceded by a slash (/
).
Including the Identifier in the Message Payload ¶
Another way to identify the publisher is to include the unique identifier or name of the publisher in the message payload itself.
Example:
Topic: home/temperature
Payload (JSON format): {"sensor_id": "living_room_sensor", "value": 22.5}
With this approach, the topic structure can be kept simple and generic, while still providing the necessary context in the payload. The downside is that it is no longer possible to use topic filters to subscribe to messages from a specific publisher. Instead, the subscriber needs to parse the payload to extract the publisher's identifier.
In terms of message size, there is not a huge difference between these two approaches. The topic is part of the MQTT message header. With the first approach, the topic is longer, but the payload is smaller, and vice versa with the second approach.
Broker ¶
There are many MQTT brokers available, both open-source and commercial. There are brokers that can be run on-premise and brokers that are available as a service in the cloud. The MQTT organization maintains a list of brokers on its website, which can be found here.
For this article, the EMQX broker will be used. The broker can be started in a Docker container.
docker run --name emqx-enterprise -p 1883:1883 -p 18083:18083 emqx/emqx-enterprise:5.10.0
This opens TCP port 1883 and port 18083 for the web management console. EMQX can also listen on ports for SSL (8883), WebSockets (8083), and secure WebSockets (8084), which are not used in this article, so they are not exposed.
EMQX ships with a free Community License that allows unlimited connections on a single-node deployment in an internal environment. A commercial license is needed for clustered deployments, high-availability setups, or commercial use in SaaS, hosted services, or embedded/resold products.
The web console can be accessed at http://localhost:18083/
with the default username and password admin
/public
. The web console allows management of the broker, viewing connected clients, and monitoring message traffic.
Client Libraries ¶
There are many client libraries available for MQTT for various programming languages and platforms. A list of libraries can be found on the MQTT website.
For this article, the Java and Go client libraries from the Eclipse Paho project will be used. The Eclipse Paho project provides open-source client implementations for various programming languages.
For Java, the following dependency is added to the pom.xml
:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
For Go, the following dependency is added to the go.mod
:
go get github.com/eclipse/paho.mqtt.golang
Both client libraries support MQTT 3.1.1. There are also MQTT 5.0 client libraries available, but the new features of MQTT 5.0 are not used in the following code examples. This article focuses on the basic features available in both MQTT 3.1.1 and MQTT 5.0.
In the following examples, the Go program will play the role of a publisher, and the Java program will be the subscriber. The client libraries, in most cases, provide both publisher and subscriber functionality, so you can use the same library for both roles.
Quality of Service (QoS) ¶
MQTT provides three levels of Quality of Service (QoS) to ensure message delivery reliability:
- QoS 0: At most once (fire and forget). No guarantee of delivery. Messages may be lost.
- QoS 1: At least once (guaranteed delivery). Messages may be delivered multiple times.
- QoS 2: Exactly once (guaranteed delivery with deduplication)
The QoS level can be specified when publishing messages and subscribing to topics. This means that there can be different levels between the publisher and broker and between the broker and subscriber. For example, you can publish messages with QoS 0 and subscribe to the topic with QoS 1.
QoS 0: This is the fastest and most efficient level, but it is also the most unreliable. Choose this level if it is acceptable for message loss to occur. This can be suitable for non-critical data or when the data is sent frequently, so that any lost messages can be quickly replaced by new ones.
QoS 1: This level guarantees that a message is delivered at least once, but it may be delivered multiple times. The subscriber must be able to handle duplicate messages. QoS 1 strikes a good balance between level 0 and level 2, providing a good compromise between reliability and performance.
QoS 2: Choose this level if it is important for an application to receive every message exactly once. This is the most reliable level, but it also has the highest overhead and latency. It is suitable for applications where duplicate messages can cause issues.
If you are interested in how QoS works in detail, check out this article from HiveMQ.
Publish Messages ¶
To publish a message in MQTT, a client sends data to a specific topic on the broker. The broker then distributes this message to all subscribers of that topic. To publish a message, the client needs to specify the topic, the payload, and the desired QoS level.
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID("sensor-1")
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to connect to broker: %v", token.Error())
}
defer client.Disconnect(250)
reading := &mqttdemo.SensorReading{
Value: 22.5,
}
payload, err := proto.Marshal(reading)
if err != nil {
log.Fatalf("Failed to marshal protobuf: %v", err)
}
// 0 is the QoS level, false means no retained message
token := client.Publish("sensors/living_room/temperature", 0, false, payload)
token.Wait()
if token.Error() != nil {
log.Fatalf("Failed to publish message: %v", token.Error())
}
The client ID is a unique identifier for the client, which is used by the broker to manage connections and sessions. The client ID must be unique across all clients connected to the broker. If a client connects with a client ID already in use, the broker will disconnect the existing client.
The MQTT specification allows a client to send an empty string as a client ID. In this case, the broker will generate a unique ID for the connection. This is only suitable for clients that do not need a persistent session. These are typically clients that only publish messages or subscribe to receive messages for the duration of their connection. The broker-assigned ID will be different every time the client connects, so no session state is maintained.
The MQTT 3.1.1 specification limits the client ID to 23 UTF-8 encoded bytes and only allows alphanumeric characters. However, most modern brokers, like Mosquitto, HiveMQ, and EMQX, have relaxed these restrictions to allow longer client IDs and a wider range of characters.
Consume Messages ¶
Consuming messages in MQTT involves subscribing to one or more topics on the broker. When a message is published to a subscribed topic, the broker forwards it to the subscriber. This allows clients to receive real-time updates without needing to poll for new data. If multiple subscribers subscribed to the same topic, they will all receive the same message.
public static void main(String[] args) throws MqttException, InterruptedException {
String clientId = "subscriber-1";
try (MqttClient client = new MqttClient("tcp://127.0.0.1:1883", clientId)) {
run(client);
}
}
private static void run(MqttClient client) throws MqttException, InterruptedException {
client.connect();
client.subscribe("sensors/living_room/temperature", (topic, message) -> {
try {
Sensor.SensorReading reading = Sensor.SensorReading
.parseFrom(message.getPayload());
System.out.printf("Received temperature reading: %.1f°C from topic: %s%n",
reading.getValue(), topic);
}
catch (Exception e) {
System.err.printf("Failed to parse message: %s%n", e.getMessage());
}
});
TimeUnit.SECONDS.sleep(30);
client.disconnect();
}
Authentication and Security ¶
The previous examples did not include any authentication or security measures and sent data unencrypted over the network. This is fine for a local test environment, but in a production environment, the MQTT broker and clients should always be secured to prevent unauthorized access and ensure data integrity.
To secure an MQTT broker, username and password authentication and TLS/SSL encryption can be used. Most MQTT brokers support these features, including EMQX. For setting up SSL in EMQX, follow these instructions.
For authentication, the most basic method is to use a username and password. The MQTT protocol provides fields in the connection message for the username and password. For security, it is highly recommended to use this in conjunction with TLS/SSL to encrypt the entire communication channel because the username and password are sent in plain text over the network.
Here are examples of how to set the username and password in the Go and Java client libraries.
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID("sensor-1")
opts.SetUsername("exampleUser")
opts.SetPassword("password123")
client := mqtt.NewClient(opts)
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("exampleUser");
options.setPassword("password123".toCharArray());
client.connect(options);
A more robust and secure method is to use X.509 client certificates in addition to the server-side X.509 certificates. This method provides mutual authentication, where both the client and the broker verify each other's identities using certificates issued by a trusted Certificate Authority (CA).
EMQX supports many more authentication methods, such as JWT and MQTT 5 Enhanced Authentication methods like SCRAM and GSSAPI with Kerberos. You can find more information about the available authentication methods in the EMQX documentation.
Retained Messages ¶
In MQTT, the publisher of a message cannot guarantee that any subscriber will receive the message. It is possible that a message will never be seen by subscribers. This can be demonstrated with the previous Go publisher and Java subscriber examples.
Run the publisher first, then start the subscriber. The subscriber does not receive any messages because it was not subscribed to the topic when the publisher sent the message. Run them in the opposite order, first the subscriber and then the publisher. The subscriber receives the message.
Not knowing the current state of the topic could be a problem for the subscriber. It could take several minutes or even hours until the publisher sends a new message. Imagine a temperature sensor that sends readings every hour. If the subscriber starts subscribing shortly after the publisher published the first reading, the subscriber has to wait for almost an hour until it receives the next reading. Until then, the subscriber has no idea what the current temperature is.
This is where retained messages can help. When a publisher sends a message, the "retain" flag can be set to true. This tells the broker to store the last message sent to that topic as a retained message. When a new subscriber subscribes to that topic, it will immediately receive the last retained message, even if it was sent before the subscriber started.
In Go, this can be done by setting the retain flag to true (3rd argument) when publishing a message.
token := client.Publish("sensors/bedroom/temperature", 1, true, payload)
token.Wait()
if token.Error() != nil {
log.Fatalf("Failed to publish message: %v", token.Error())
}
After that, start the subscriber, and it will receive the last retained message immediately after subscribing.
The header of the message contains a flag that indicates whether the message was retained. The Java application can check this flag with isRetained()
. This can be an important piece of information for the subscriber because it tells the subscriber that this might be an outdated message and should be handled differently than a new message.
Only the first message received after subscribing may be a retained message; all later messages will be new messages, even when the publisher sends these messages with the retain flag set to true.
client.subscribe("sensors/bedroom/temperature", (topic, message) -> {
try {
Sensor.SensorReading reading = Sensor.SensorReading
.parseFrom(message.getPayload());
String timestamp = LocalTime.now().toString();
if (message.isRetained()) {
System.out.printf("[%s] Received RETAINED temperature: %.1f°C from topic: %s%n",
timestamp, reading.getValue(), topic);
}
else {
System.out.printf("[%s] Received NEW temperature: %.1f°C from topic: %s%n",
timestamp, reading.getValue(), topic);
}
}
catch (Exception e) {
System.err.printf("Failed to parse message: %s%n", e.getMessage());
}
});
RetainedMessageSubscriber.java
Note that this also works if the subscriber subscribes with a wildcard topic, such as sensors/#
. The broker will send all retained messages for the topics that match the wildcard subscription.
Brokers only store one retained message per topic. If a new message is published with the retain flag set to true, it replaces the previous retained message for that topic. If a publisher sends a message with the retain flag set to false, it does not remove a retained message. It will simply be treated as a regular message and will not affect the retained message on the broker.
If a retained message should be removed from the broker, an empty message can be published with the retain flag set to true. This tells the broker to delete the retained message for that topic.
token = client.Publish("sensors/bedroom/temperature", 1, true, []byte{})
token.Wait()
if token.Error() != nil {
log.Fatalf("Failed to clear retained message: %v", token.Error())
}
Persistent Sessions ¶
This feature is used for the subscriber side of MQTT. To receive messages from a broker, a subscriber establishes a connection and subscribes to topics. With a non-persistent session, if the connection between the subscriber and the broker is interrupted, the subscriber loses its subscriptions and does not receive any messages sent while it was offline (except any retained messages).
This is where a persistent session can help. When a subscriber connects to the broker, it can request a persistent session. This means that the broker will retain the subscriber's subscriptions and any undelivered messages for that subscriber, even if it disconnects.
When the subscriber reconnects, the broker resumes the session, delivering any queued messages not sent while the subscriber was offline. The broker also automatically re-subscribes the subscriber to all topics to which it was subscribed before the disconnection. This saves the subscriber from having to send subscribe requests again after reconnecting.
To start a persistent session, the subscriber needs to set the "clean session" flag to false when connecting to the broker. It is important that the subscriber sends a unique client ID; it cannot be empty.
If the connection is interrupted, the broker will store all messages that are sent to the subscribed topics. Note that brokers only store messages that are sent with a QoS level of 1 or 2.
When the subscriber reconnects with the same client ID and the "clean session" flag is set to false, the broker will resume the session, and all undelivered messages will be sent to the subscriber.
Note that when the subscriber opens a connection with a different client ID or sets the "clean session" flag to true, the broker will treat it as a new session.
If the "clean session" flag is not specified, the broker will use the default value, which is true.
The subscriber can check the "session present" flag in the connection response. This flag is true if the client has connected to an existing session. This flag can be used to determine if the subscriber needs to re-subscribe to topics. This example subscribes to a topic only if the subscriber connects with a new session, i.e., the "session present" flag is false.
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false); // Enable persistent session
options.setKeepAliveInterval(60); // seconds
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.err.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic,
org.eclipse.paho.client.mqttv3.MqttMessage message) throws Exception {
processMessage(topic, message);
}
@Override
public void deliveryComplete(
org.eclipse.paho.client.mqttv3.IMqttDeliveryToken token) {
// Not used in subscriber, but required to implement MqttCallback
}
});
var response = client.connectWithResult(options);
if (!response.getSessionPresent()) {
System.out.println("New session - subscribing to topic");
// Only subscribe if this is a new session
client.subscribe("sensors/outdoor/temperature", 1, messageListener());
}
else {
System.out.println("Resuming existing session - subscription should be restored");
}
PersistentSessionSubscriber.java
The persistent session feature is useful for applications where subscribers may be offline for extended periods, such as mobile devices or IoT devices that may lose connectivity. Persistent sessions should be used only when it is necessary to ensure that subscribers receive all messages missed while offline. Persistent sessions require the broker to store session information and undelivered messages, which can increase resource usage on the broker.
Clean sessions are suitable for clients that only publish messages or for scenarios where subscribers are interested only in the latest state of a topic, such as the current temperature. In such cases, retained messages can be used instead of persistent sessions. Clean sessions eliminate the overhead of storing session information and undelivered messages on the broker.
Similarities exist between retained messages and persistent sessions, but they serve different purposes:
- Retained messages are tied to a topic, while persistent sessions are tied to a specific client.
- Retained messages provide the last value to any new subscriber, while persistent sessions ensure a specific client receives all messages missed while offline.
- A broker stores only one retained message per topic, while a persistent session can store multiple queued messages for a specific client.
Keepalive ¶
An MQTT client opens a TCP/IP connection to the broker and attempts to keep this connection open as long as possible. This allows subscribers to receive real-time updates whenever new messages arrive. If no data is sent over a TCP/IP connection for a certain period, the connection may be closed by a network device. To prevent this, MQTT clients periodically send a ping message to the broker to keep the connection alive.
This mechanism also helps the broker detect if a client has disconnected unexpectedly and clean up resources associated with that client. If the broker does not receive any messages (including ping messages) from a client within a specified keepalive interval, it assumes that the client has disconnected and will close the connection. Typically, the broker waits 1.5 times the keepalive interval before closing the connection.
When a client connects to the broker, the keepalive interval can be specified.
opts.SetKeepAlive(60 * time.Second)
options.setKeepAliveInterval(60); // seconds
PersistentSessionSubscriber.java
The interval between ping messages is determined by the keepalive interval. The MQTT specification states that the client must send a ping message at least once within the keepalive interval. If other messages are sent during that period, a ping message is not required.
A shorter keepalive interval allows the broker to detect disconnections more quickly but increases network traffic. A longer keepalive interval reduces network traffic but may delay the detection of disconnections.
Persistent TCP/IP connections and the keepalive mechanism are important for the feature discussed next: Last Will and Testament (LWT).
Last Will and Testament (LWT) ¶
When a client unexpectedly disconnects from the broker, only the broker is aware that the client is no longer connected. If that client was a publisher, subscribers are not informed that the publisher is no longer available. To address this, MQTT provides a feature called Last Will and Testament (LWT).
When a client connects to the broker, a "last will" message can be specified. The broker will publish this message to a designated topic if the client disconnects unexpectedly, informing subscribers about the disconnection of a publisher.
The following example shows a common usage pattern for LWT. When the client connects to the broker, a last will message is set with the topic sensors/1/status
and the payload offline
. The retain flag is also set to true, so that the last will message will be published as a retained message whenever an unexpected disconnection occurs.
Immediately after connecting, the client publishes a normal message to the same topic sensors/1/status
with the payload online
, also with the retain flag set to true.
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID("sensor-1")
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
// Last Will and Testament (LWT) setup. qos 1, retain true
opts.SetWill("sensors/1/status", "offline", 1, true)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to connect to broker: %v", token.Error())
}
defer client.Disconnect(250)
// qos 1, retain true
token := client.Publish("sensors/1/status", 1, true, "online")
token.Wait()
if token.Error() != nil {
log.Fatalf("Failed to publish online status: %v", token.Error())
}
If the client disconnects unexpectedly, the broker will publish the last will message with the payload offline
to the topic sensors/1/status
and retain the message. When the publisher reconnects, it publishes the online
message to the same topic, overwriting the previous retained message.
Any new subscribers interested in the status of the client can subscribe to the sensors/1/status
topic and receive either the online
or offline
message, depending on whether the client is currently connected.
An example of a monitoring application that subscribes to the status topic and prints whether a device is online or offline could look like this:
private static void run(MqttClient client) throws MqttException, InterruptedException {
client.connect();
IMqttMessageListener statusHandler = (topic, message) -> {
String deviceId = extractDeviceIdFromTopic(topic);
String status = new String(message.getPayload());
String timestamp = LocalTime.now().toString();
System.out.printf("[%s] Device %s is now %s (retained: %s)%n", timestamp, deviceId,
status, message.isRetained());
};
IMqttMessageListener dataHandler = (topic, message) -> {
String deviceId = extractDeviceIdFromTopic(topic);
String timestamp = LocalTime.now().toString();
System.out.printf("[%s] Received data from %s (size: %d bytes)%n", timestamp,
deviceId, message.getPayload().length);
};
String statusTopic = "sensors/+/status";
String dataTopic = "sensors/+/temperature";
client.subscribe(statusTopic, statusHandler);
client.subscribe(dataTopic, dataHandler);
System.out.printf("Monitoring device status on: %s%n", statusTopic);
System.out.printf("Monitoring device data on: %s%n", dataTopic);
System.out.println();
TimeUnit.MINUTES.sleep(5);
client.disconnect();
}
private static String extractDeviceIdFromTopic(String topic) {
String[] parts = topic.split("/");
if (parts.length >= 2) {
return parts[1]; // Extract device ID from sensors/{device}/...
}
return "unknown";
}
The LWT feature functions only if the client uses persistent TCP/IP connections. Publishers started periodically with cron or systemd timers cannot use LWT because as long as the publisher does not crash, it will disconnect gracefully, and the broker will never send the last will message.
For such use cases, alternative strategies must be implemented to notify subscribers about the state of the publisher.
One possible strategy to detect whether a publisher is running or has crashed is as follows:
At the beginning of the program, the publisher sends a "started" message, and at the end, a "finished" message. This allows subscribers to determine if the publisher is currently running. A monitoring program can then detect if the publisher has not sent a "finished" message within a certain time frame and assume that the publisher has crashed. If there is no "started" message, the monitoring program can assume that the publisher is not running.
Conclusion ¶
The article provided an overview of the MQTT protocol, its architecture, and key features. It explained how MQTT worked, the role of brokers, and how to publish and consume messages with Go and Java. Important concepts such as Quality of Service (QoS), retained messages, persistent sessions, and Last Will and Testament (LWT) were also discussed.
For further exploration of MQTT, I recommend the MQTT Essentials series by HiveMQ, which covers the protocol in detail. HiveMQ also offers a MQTT 5 Essentials article series that explores the new features introduced in MQTT 5.0. Additionally, the MQTT Security Fundamentals articles cover everything required to secure MQTT applications.