MQTT 5 Client

For a comprehensive guide on utilizing the MQTT client, as well as access to hands-on examples, refer to the tutorial:

Your First MQTT Lua Program

The MQTT 5 client fully supports QoS 0, 1, and 2 of MQTT message delivery and is designed for continuous operation even if the connection temporarily goes down. Messages are queued internally and are sent as soon as the connection recovers. Although the connection recovery is automatic, you must accept the new connection request.

The MQTT 5 client does not support the older protocol MQTT 3.1.1. Use the MQTT 3.1.1 Client if you require the older protocol. The code is backward compatible with older clients designed for the original MQTT stack. Code requiring the original stack will automatically be re-routed to the MQTT 3.1.1 Client API.

The MQTT client is designed in Lua and utilizes the Barracuda App Server socket API. An MQTT client runs as a cosocket, thus having an understanding of how cosockets work is recommended. Unlike a standard cosocket, an MQTT client instance has an unlimited queue size, thus caution must be taken when publishing messages from a non cosocket. A cosocket publishing messages will automatically suspend if the TCP queue is full.

The following example shows how to connect to an MQTT server running at localhost:

Example 1: Creating an MQTT instance with the minimum set of required functions.

The above example shows the required parameters when creating and starting an MQTT client instance (line 10). Function create() in the MQTT module takes additional optional arguments.

The onstatus() is called when a connection status changes or a connection request fails. The above onstatus() example may be all that you need for your particular design if you always connect to the same broker and you expect the connection to always be available. The above onstatus() tells the MQTT client to keep trying to connect/reconnect to the MQTT server indefinitely.

In the following example, we have an onstatus() that only accepts a new and successful connection. Anything else will be denied. Returning false or nothing terminates the MQTT client’s internal coroutine.

local done=false

local function onstatus(type,code,status)
   if "mqtt" == type and "connect" == code and 0 == status.reasoncode then
      print"Successful new connection"
      print("Server properties:", ba.json.encode(status.properties))
      return true -- Accept connection
   end
   print("Disconnect or connect failed",type,code)
   return false -- Deny reconnect
end

local function onpublish(topic,payload,prop)
   print("Received:",topic,payload,ba.json.encode(prop))
   done=true
end

local mqtt=require("mqttc").create("broker.emqx.io",onstatus,onpublish)
mqtt:subscribe"/my-topic"
mqtt:publish("/my-topic","my payload",{retain=true},{myprop="myval"})

local cnt=1
while not done do
   if cnt > 100 then break end -- Giving up
   ba.sleep(50)
   cnt = cnt+1
end

mqtt:disconnect(0)

Example 2: An MQTT example designed for our online tutorial server.

Clicking the above Run button sends the above example to our online tutorial server where you will be able to run this example. The online tutorial server is designed to run LSP pages and an LSP page runs in an ephemeral request/response environment. This ephemeral environment, which includes function print(), is incompatible with asynchronous cosockets. Note, we previously mentioned that the MQTT stack runs as a cosocket. A simple solution to the incompatibility is the code between line 22 and 27. The LSP page waits for the asynchronous MQTT messaging to complete before continuing. This enables the asynchronous MQTT callbacks to use function print which sends the message to the browser.

An MQTT instance is typically not created in an LSP page, but doing so can be useful during development. See Example 3 for more information on developing MQTT code.

The callback function onstatus() receives MQTT connect/disconnect state information. We have a connect request if type=mqtt and code=connect. Line 18 also checks if the server’s response code (reasoncode) is zero (success).

The callback function onpublish() is called when the broker sends any PUBLISH message to this client and when a per subscribe callback has not been installed for the topic.

Lua includes a garbage collector and all object instances created in an LSP page’s ephemeral environment would normally eventually garbage collect. However, a connected cosocket is self-referencing as long as it has a valid (connected) socket and will not garbage collect before it disconnects. We specifically prevent the MQTT stack from running in the background by calling mqtt:disconnect() on line 29.

MQTT 5 includes properties with name/value pairs. MQTT properties are converted to/from Lua tables. Line 6 and 14 above simply encodes the properties table as JSON to make it easy to visualize the server properties.

Line 19 subscribes to a topic and line 20 publishes to a topic before the MQTT server connection has been established (before onstatus has been called). Messages are queued internally and automatically sent once the connection has been established.

TLS Connection

Example 2 connects to the broker using a non-secure connection. To use a secure TLS connection, change line 18 to the following:

local mqtt=require("mqttc").create(
   "broker.emqx.io",onstatus,onpublish,{secure=true})

MQTT 5 Properties

The MQTT 5 specification includes properties. We use the naming from the specification, but the words are concatenated and all lowercase. Example: specification 'Payload Format Indicator', code payloadformatindicator. The names from the specification are reserved. An MQTT user property will be encoded if you use a non-reserved word e.g. hello="world".

API for creating an MQTT Client

create(addr, onstatus [, onpub] [, options [, properties]])

Creates and starts an MQTT client instance. The function returns the mqtt instance. See the MQTT Object Methods for details on the returned object.

MQTT Object Methods

function mqtt:publish(topic,payload [,options [,properties]])

Publish a message. Messages are queued up internally if the MQTT stack is not connected. These messages will be sent once the MQTT client connects to a broker. When the MQTT client is connected and the internal queue is empty, a cosocket rapidly publishing messages will automatically suspend if the TCP queue is full. The automatic suspension only works for cosockets when the internal MQTT queue is empty. In all other cases, including non-cosocket senders, a sender publishing faster than the network can consume makes the internal queue grow. Method mqtt:status() returns the number of elements in the internal queue. See Cosockets flow control for details on cosocket flow control. See Example 3 for how to publish messages using a cosocket. Method mqtt:publish() returns the connection status.

mqtt:subscribe(topic [,onsuback] [,options [,properties]]])

Subscribe to a topic.

mqtt:unsubscribe(topic [,onunsubscribe [,properties]])

Unsubscribe from a topic.

mqtt:disconnect([reason])

Send a disconnect message to the server and shut down the MQTT client. The onstatus callback is not called when you call mqtt:disconnect().

mqtt:status()

Returns number of messages in publish queue, connection status, and disconnect status. The connection status is true if the MQTT client is connected. The disconnect status is false unless mqtt:disconnect() has been called. Note: all mqtt methods return the connection status. Example:

local queuelen,connstatus,terminated=mqtt:status()
mqtt:setwill(will)

You may change/update the will message on reconnect

MQTT Client Examples

Tutorials and Examples

Hot Code Replacement During Development (and Speed Test)

An MQTT client is normally run for the lifetime of the application and is typically created in a .preload script or in a Lua module loaded by the .preload script. However, the ability to hot swap the code speeds up the development process. Example 3 below is similar to Example 2 in that it is designed to be run as an LSP page. Hot swapping code during development is particularly useful in embedded devices when using the Xedge. See the tutorial Rapid Firmware Development with the Barracuda App Server for an introduction to this technique.

In Example 2, we implemented a technique that redirected the MQTT client’s printed output to the browser. While this works, a more optimal solution is to employ the trace() function, which sends data directly to the console (trace log). In instances where console access is limited, the trace log can be viewed via the TraceLogger's web interface.

The following MQTT speed test is specifically set up to work as an xlua file in Xedge.

trace"Starting MQTT test"

local mqttBroker = "192.168.1.100" -- Change this
local fmt=string.format
local mqtt,startTime

function onunload()
   trace"Stopping MQTT test"
   mqtt:disconnect()
end
 
local function cosocketPub(sock)
   sock:read(1) -- Wait for onstatus to return
   startTime = ba.clock()
   for i=1,1000 do
      mqtt:publish("/my-topic",fmt("my payload %d",i))
   end
end
 
local function onstatus(type,code,status)
   if "mqtt" == type and "connect" == code and 0 == status.reasoncode then
      trace"New connection"
      ba.socket.event(cosocketPub) -- Run as a cosocket
      return true -- Accept connection
   end
   trace("Disconnect or connect failed",type,code)
   return false -- Deny reconnect
end
 
local recCnt=1
local function onpublish(...)
   recCnt = recCnt + 1
   if recCnt == 1000 then
      trace("Send and receive time in milliseconds:", ba.clock() - startTime)
   end
end
  
mqtt=require("mqttc").create(mqttBroker, onstatus, onpublish)
mqtt:subscribe"/my-topic"

Example 3: MQTT QoS 0 speed test designed to run as an xlua file; sending and receiving 2000 messages.

Function cosocketPub() (line 12) publishes 1000 messages in one burst. We mentioned in the introduction that the MQTT stack has an unlimited message queue, which could make a small device run out of memory when queueing this many messages. We avoid this problem by running function cosocketPub() as a cosocket (see line 23). Function cosocketPub() runs as an unconnected cosocket; we only use the cosocket for rapidly publishing messages. A cosocket automatically yields when the TCP/IP queue is full, thereby avoiding queuing up messages. See Cosockets flow control for details. Note that function ba.socket.event() is similar to coroutine.resume() in that it immediately resumes the coroutine, making the caller wait (waits on line 23). We want to delay the 1000 message publishing burst until the onstatus() function returns and until the MQTT client has completed the broker connection establishment. We solve this by sleeping for one millisecond on line 13. The 1000 messages would otherwise queue up internally in the MQTT stack before the broker connection establishment had been completed. See method mqtt:publish() for details on publishing from a cosocket.

You may copy Example 3, paste the code into a new Xedge file, and run the file, but make sure to change the broker name (line 2). You should set up a local broker such as the Mosquitto broker. Do not use public online test brokers as they may not permit sending this many messages in a burst. The example fails and gets stuck if it does not receive as many messages as it sends.

Example 3 sends and receives a total of 2000 messages. We tested the example on a low end ESP32 microcontroller and it took around 4.2 seconds to send and receive 2000 messages. We could make the same example execute in just under three seconds by removing line 13. The reason for this speed increase is likely from the TCP transfer being more efficient when all messages are queued prior to sending. In any event, the speed increase shows that the bottleneck is not the execution time of Lua, but is rather the WiFi transfer.

MQTT Sessions and Quality of Service

We use QoS 2 in the following example, which guarantees delivery of the message even when the TCP connection breaks during transmission. A TCP connection guarantees delivery of messages as long as the connection does not break. This is where MQTT’s QoS comes in as the MQTT’s messaging logic recovers from this when QoS > 0.

Note that you should have a good understanding of the MQTT protocol before using QoS > 0.

MQTT 5 servers and clients maintain the connection state as long as they have an active session. A Session Expiry Interval must be set if you plan on using QoS > 0. The connection will otherwise always be clean and no session will be resumed. In the following example we set the Session Expiry Interval (line 15) to the maximum value, which never expires. Note that the server may change the time, which is detected in the onstatus() callback.

Example 4: Sending and receiving messages with QoS 2.

The MQTT client initially connects to the MQTT server with the Clean Start flag set to one. If the TCP connection breaks and a new TCP connection is established, the Clean Start flag is set to zero when sending the CONNECT message. The server should then respond and inform the client that it has a session. You can test session recovery by running the above example. Initially, line 3 prints NO. You can forcefully close the TCP connection by using a tool such as CurrPorts. When the connection is re-established, line 3 prints YES.

Note that all information is lost, including subscriptions, if the MQTT server does not indicate it can resume with a session. A recommendation is to create a new MQTT client and re-subscribe to all required topics if the session cannot be resumed.

Setting a QoS substantially increases message transmission time. MQTT’s handling of QoS messages, especially for QoS 2, is very inefficient. For example, converting Example 3 from QoS 0 to QoS 2 will make the example 4 times slower.

Websocket Example

The following example shows how to connect to an MQTT server using WebSockets, which can be useful if the client is within an intranet blocking all external access, except for the secure HTTPS port 443. This would require an MQTT server listening for secure WebSocket connections on port 443. The following example connects to the public test broker provided by emqx.io using a non-secure WebSocket connection.

The example also shows how to use a custom connection handler. Argument one provided to the connect function (line 31) is now a function and not a string. See the WebSockets Documentation for an introduction to creating a client WebSocket connection using the HTTP Client Library. See the Socket API for all socket functions used.

Note that some MQTT brokers, such as AWS IoT Core, offer the option to connect using HTTPS on port 443 with TLS ALPN. If you are interested in exploring this option, be sure to check out the tutorial on how to connect to AWS IoT Core using MQTT and ALPN. This can be a convenient way to establish a secure connection and may be particularly useful if your network has restrictions on certain ports.

Example 5: Using the HTTP client library for establishing a WebSocket connection.

The custom connection handler wsconnect() in the above example (line 12) runs in the context of the MQTT client’s cosocket, which is in a not connected state. The HTTP client library, which is required for initiating a WebSocket connection is designed for the blocking socket API and is not compatible with coroutines. We solve this problem by running the HTTP client request in a native thread (line 15). The next step is to disable (freeze) the cosocket on line 26 and wait for the native thread to complete. When the native thread starts, it initiates a WebSocket HTTP request on line 17. We resume the cosocket on line 24 when the HTTP request has completed. Line 27 will then start executing and the custom connection handler returns the new WebSocket connection or returns nil if the connection failed.

Connecting Via a Socks5 or HTTP Proxy

The following example is similar to the above WebSocket example in that it uses the HTTP client library for initiating the connection. The example shows how to design a custom connection handler called proxyconnect(). See the HTTP Client Library for details on the various options.

Example 6: Using the HTTP client library for connecting via a proxy.