MQTT PubSub Examples

Monitoring OPCUA server changes

It is possible to connect MQTT client to OPC UA server and subscribe to node changes. To do this you need to create MQTT client and connect it to a created OPC UA server.

local ua = require("opcua.api")

-- create server
local uaServer = ua.newServer()
uaServer:initialize()

local ObjectsFolder = "i=85"
local int32DataValue = {
  Type = ua.VariantType.UInt32,
  Value = 10
}
local request = {
  NodesToAdd = {ua.newVariableParams(ObjectsFolder, "writeHook", int32DataValue)}
}

-- Add a node
local resp = uaServer:addNodes(request)
local results = resp.Results
assert(results[1].StatusCode, ua.StatusCode.Good)
local nodeId =  results[1].AddedNodeId

-- Create MQTT client
local config = {
  bufSize = 128 -- max size of MQTT message
}

local uaMmqtt = ua.newMqttClient(config, uaServer)

-- Array with fields parameters.
local fields = {
  -- #1
  {
    nodeId = nodeId,   -- ID of a node, which changes will be monitored
    name = "MqttNode", -- Name of a field, will as a field in JSON
  }
}

-- create dataset with fields
local classId = "5fa38ebb-44d2-a3ec-d251-1030c777f10a"
uaMmqtt:createDataset(fields, classId)

-- Connect to MQTT broker
local tranportProfileUri = ua.TranportProfileUri.MqttJson
local endpointUrl = "opc.mqtt://test.mosquitto.org:1883"
uaMmqtt:connect(endpointUrl, tranportProfileUri)

-- Start periodic publishing
local dataTopic = "rtl/json/data/urn:arykovanov-note:opcua:server/group/dataset"
uaMmqtt:startPublishing(dataTopic, "test_cyclic_publisher", 2000)

-- Run server.
uaServer:run()

-- Function which will periodically write data to address space
-- Those changes will be hooked by MQTT client for publishing data.
local writeRequest = {
  NodesToWrite = {
    {
      NodeId = nodeId,
      AttributeId = ua.AttributeId.Value,
      Value = {
        Type = ua.VariantType.UInt32,
        Value=123
      }
    }
  }
}


uaServer:write(writeRequest)

uaMmqtt:stopPublishing()
uaServer:shutdown()

Full source

Publishing data to MQTT broker

It is possible to publish data to MQTT broker without OPC UA server. To do this you need to create MQTT client, configure fields for messages, connect it to a MQTT broker and start publishing.

local ua = require("opcua.api")

local uaMqtt = ua.newMqttClient()

local fields = {
  { name = "Value1" },
  { name = "Value2" }
}

local tranportProfileUri = ua.TranportProfileUri.MqttBinary
local endpointUrl = "opc.mqtt://test.mosquitto.org:1883"
uaMqtt:connect(endpointUrl, tranportProfileUri)

local datasetId = uaMqtt:createDataset(fields)

local dataTopic = "rtl/uadp/data/urn:arykovanov-note:opcua:server/group/dataset"
local publisherId = "test_manual_publisher"

for i=64, 74 do
  uaMqtt:setValue(datasetId, "Value1", {Type=ua.VariantType.UInt32, Value=i})
  uaMqtt:setValue(datasetId, "Value2", {Type=ua.VariantType.UInt32, Value=i*2})
  uaMqtt:publish(dataTopic, publisherId)

  ba.sleep(1000)
end

Full source

Subscribing to MQTT messages

OPCUA PubSub messages can be received and decoded by OPCUA MQTT client. Decoding of a message is required to decode binary messages. JSON messages are decoded and transformed to same the format as decoded binary messages. Thus application can subscribe to either binary pr JSON messages and OPCUA MQTT client will return messages in the same format.

local ua = require("opcua.api")

-- Create MQTT client instance
local mqttClient = ua.newMqttClient()

-- Connect to MQTT broker
local function callbackCallback(status)
  ua.Tools.printTable("status", status)
end
mqttClient:connect("opc.mqtt://test.mosquitto.org:1883", callbackCallback)

-- The only message callback for both JSON and binary data
local function messageCallback(payload, err)
  if err then
    print("Error:" .. tostring(err))
  end
  ua.Tools.printTable("payload", payload)
end

-- Subscribe on a topic with binary data
mqttClient:subscribe("rtl/uadp/data/urn:arykovanov-note:opcua:server/group/dataset", messageCallback)
-- Subscribe on a topic with JSON data
mqttClient:subscribe("rtl/json/data/urn:arykovanov-note:opcua:server/group/dataset", messageCallback)

-- Wait for some time
local cnt=1
while cnt <= 3 do
   ba.sleep(1000)
   cnt = cnt+1
end

Full source