MQTT PubSub Examples
Monitoring OPCUA server changes
It is possible to connect MQTT client to OPC UA server and subscribe to node changes. To do this you need to create MQTT client and connect it to a created OPC UA server.
local ua = require("opcua.api")
-- create server
local uaServer = ua.newServer()
uaServer:initialize()
local ObjectsFolder = "i=85"
local int32DataValue = {
Type = ua.VariantType.UInt32,
Value = 10
}
local request = {
NodesToAdd = {ua.newVariableParams(ObjectsFolder, "writeHook", int32DataValue)}
}
-- Add a node
local resp = uaServer:addNodes(request)
local results = resp.Results
assert(results[1].StatusCode, ua.StatusCode.Good)
local nodeId = results[1].AddedNodeId
-- Create MQTT client
local config = {
bufSize = 128 -- max size of MQTT message
}
local uaMmqtt = ua.newMqttClient(config, uaServer)
-- Array with fields parameters.
local fields = {
-- #1
{
nodeId = nodeId, -- ID of a node, which changes will be monitored
name = "MqttNode", -- Name of a field, will as a field in JSON
}
}
-- create dataset with fields
local classId = "5fa38ebb-44d2-a3ec-d251-1030c777f10a"
uaMmqtt:createDataset(fields, classId)
-- Connect to MQTT broker
local tranportProfileUri = ua.TranportProfileUri.MqttJson
local endpointUrl = "opc.mqtt://test.mosquitto.org:1883"
uaMmqtt:connect(endpointUrl, tranportProfileUri)
-- Start periodic publishing
local dataTopic = "rtl/json/data/urn:arykovanov-note:opcua:server/group/dataset"
uaMmqtt:startPublishing(dataTopic, "test_cyclic_publisher", 2000)
-- Run server.
uaServer:run()
-- Function which will periodically write data to address space
-- Those changes will be hooked by MQTT client for publishing data.
local writeRequest = {
NodesToWrite = {
{
NodeId = nodeId,
AttributeId = ua.AttributeId.Value,
Value = {
Type = ua.VariantType.UInt32,
Value=123
}
}
}
}
uaServer:write(writeRequest)
uaMmqtt:stopPublishing()
uaServer:shutdown()
Publishing data to MQTT broker
It is possible to publish data to MQTT broker without OPC UA server. To do this you need to create MQTT client, configure fields for messages, connect it to a MQTT broker and start publishing.
local ua = require("opcua.api")
local uaMqtt = ua.newMqttClient()
local fields = {
{ name = "Value1" },
{ name = "Value2" }
}
local tranportProfileUri = ua.TranportProfileUri.MqttBinary
local endpointUrl = "opc.mqtt://test.mosquitto.org:1883"
uaMqtt:connect(endpointUrl, tranportProfileUri)
local datasetId = uaMqtt:createDataset(fields)
local dataTopic = "rtl/uadp/data/urn:arykovanov-note:opcua:server/group/dataset"
local publisherId = "test_manual_publisher"
for i=64, 74 do
uaMqtt:setValue(datasetId, "Value1", {Type=ua.VariantType.UInt32, Value=i})
uaMqtt:setValue(datasetId, "Value2", {Type=ua.VariantType.UInt32, Value=i*2})
uaMqtt:publish(dataTopic, publisherId)
ba.sleep(1000)
end
Subscribing to MQTT messages
OPCUA PubSub messages can be received and decoded by OPCUA MQTT client. Decoding of a message is required to decode binary messages. JSON messages are decoded and transformed to same the format as decoded binary messages. Thus application can subscribe to either binary pr JSON messages and OPCUA MQTT client will return messages in the same format.
local ua = require("opcua.api")
-- Create MQTT client instance
local mqttClient = ua.newMqttClient()
-- Connect to MQTT broker
local function callbackCallback(status)
ua.Tools.printTable("status", status)
end
mqttClient:connect("opc.mqtt://test.mosquitto.org:1883", callbackCallback)
-- The only message callback for both JSON and binary data
local function messageCallback(payload, err)
if err then
print("Error:" .. tostring(err))
end
ua.Tools.printTable("payload", payload)
end
-- Subscribe on a topic with binary data
mqttClient:subscribe("rtl/uadp/data/urn:arykovanov-note:opcua:server/group/dataset", messageCallback)
-- Subscribe on a topic with JSON data
mqttClient:subscribe("rtl/json/data/urn:arykovanov-note:opcua:server/group/dataset", messageCallback)
-- Wait for some time
local cnt=1
while cnt <= 3 do
ba.sleep(1000)
cnt = cnt+1
end