SMQ Cluster Management

Clustering can be used for the following:

Enabling clustering is best illustrated with an example:

smq=require"smq.hub".create()
cluster=require"smq.cluster".create(smq,"cluster password")
conn,err=require"smq.conn".create(cluster, 1900)
if conn then
   --Cluster nodes. Use names if you have DNS
   local list={"192.168.1.100","192.168.1.101", "192.168.1.102"}
   conn:setlist(list) -- Connect the nodes
end

Figure 1: MTL implicitly created by Cluster Manager

In the above example, we are implicitly creating a Multiplex Transport Layer (MTL) instance when creating the Cluster Manager. A more advanced setup would instead create a cluster as follows:

mtl=require"smq.mtl".create("cluster password")
conn,err=require"smq.conn".create(mtl, 1900)
if conn then
   local list={"192.168.1.100","192.168.1.101", "192.168.1.102"}
   conn:setlist(list) -- Connect the nodes
end

smq_1=require"smq.hub".create()
cluster_1=require"smq.cluster".create(smq_1, mtl)

Figure 2: Creating MTL, Conn, and one SMQ/Cluster Combo

The above setup is required if you plan on creating multiple SMQ broker instances and having each broker instance be part of the cluster solution.

Download Cluster Example Code from GitHub.

SMQ Cluster Manager

The Cluster Manager adds clustering support for the SMQ broker, thus enabling any number of SMQ brokers to scale up horizontally. Note that the Cluster Manager requires a Multiplex Transport Layer (MTL) instance. You can either create an MTL instance or let the Cluster Manager create one for you. You must explicitly create an MTL if you plan on creating multiple SMQ brokers and SMQ Cluster Managers. Each broker/cluster-manager combo uses the same MTL instance.

SMQ Cluster Manager API

create(smq, mtl [,name])
create(smq [,password [,op] ])

Creates and returns an SMQ Cluster Manager instance: require"smq.cluster".create(smq,mtl)

cluster:publish(data, [ptid,] subtopic)

Publish a message to the server SMQ client (server's ephemeral TID). The message is published to all connected cluster nodes. The function returns the number of sent messages -- i.e. the number of connected cluster nodes.

cluster:pubon(data, ptid, topic, subtopic)

Publish on behalf of 'ptid' - publish on behalf of another SMQ client connected to the SMQ broker instance registered with the Cluster Manager.

SMQ Multiplex Transport Layer

The Multiplex Transport Layer (MTL) provides communication busses for communication with other servers. The MTL is required when using the SMQ Cluster Manager, but the MTL can also be used for custom communication between connected servers.

The Multiplex Transport Layer is typically used in combination with the Connection Manager; however, the Multiplex Transport Layer can also be used directly if a more exotic clustering configuration is required than what can be provided by the Connection Manager. An example of such a connection configuration can be found in the example below. You can also combine the use of the Connection Manager with your own connection manager.

The following example shows how to establish an MTL connection by initially using HTTP. You can also establish an HTTPS connection by using an https:// URL and/or use the proxy/tunneling settings available to the HTTP client library.

Common code required by both client and server code below (in a .preload/.config script).

mtl=require"smq.mtl".create("my password")
HTTP client in .preload/.config scriptServer LSP Page
local http=require"httpc".create()
http:request{
   url="/path/2/cluster/page.lsp",
   method="GET",
   header={SMQ="CLUSTER"}
}
if http:status() == 204 then
   local sock=ba.socket.http2sock(http)
   mtl:commence(sock)
end
<?lsp
if request:header"SMQ" == "CLUSTER" then
   response:setstatus(204)
   response:flush()
   local sock=ba.socket.req2sock(request)
   app.mtl:commence(sock)
   return
end
response:senderror(404)
?>

References: Module httpc, ba.socket.http2sock, ba.socket.req2sock

Multiplex Transport Layer (MTL) API

create(password,[op])

Creates and returns an MTL instance: require"smq.mtl".create(smq)

The API provided by the MTL is used by the Connection Manager and the Cluster Manager. The API may also be used by custom applications that wish to establish communication channels with the cluster nodes.

You do not need to understand the API returned by create() unless you use the MTL for your own custom transport.

mtl:open(name,statusCB,dataCB)

Open a communication channel.

mtl:isopen(name)

Check if a communication channel is open.

mtl:close(name)

Close an open communication channel.

mtl:commence(sock, statusCB)

Start the initial handshake with the peer and commence normal operation if the handshaking succeeds. The Connection Manager calls mtl:commence for each connection it establishes with another server.

mtl:hascon(ipaddr)

See also conn:status

mtl:shutdown()
Close all active MTL connections.
sendframe(sock, id, data)

Send a data frame: require"smq.mtl".sendframe(sock, id, data)

SMQ Connection Manager

The Connection Manager (CONN) automates the connection of cluster nodes. The CONN takes a list of names or IP addresses and attempts to connect to the remote clusters. The CONN also acts as a server and waits for other CONNs to connect. When a connection is established -- i.e. when a client or server socket object is created, function mtl:commence is called and the socket object is passed into the MTL. The Connection Manager automatically attempts to reconnect broken connections.

One can manually add cluster names or ip-addresses to the CONN one at a time or one can set a pre-defined list. The CONN is designed to detect addresses pointing to 'self'. This construction makes it possible to use the same list on all cluster nodes.

create(mtl, port [,op])

Creates and returns an SMQ Connection Manager instance: require"smq.conn".create(mtl, 1999)

One can create multiple CONNs for the same MTL instance. One CONN may, for example, establish non secure communication links and another may establish secure (TLS) communication links.

The connections are, by default, non secure. To enable secure communication (TLS), set op.shark to a SharkSSL client object and op.sshark to a SharkSSL server object. You must also setup a certificate for the SharkSSL server object and a certificate store for the SharkSSL client object. The signer (CA cert) of the server certificate must be added to the client's certificate store. Non trusted connections are closed by the client.

conn:add(addr [,port])

Add one cluster node. One can call this function repeatably and one can also add nodes already registered.

conn:setlist(addrlist)

Set, replace, or remove the registered cluster nodes. An existing list can be removed by providing an empty table {}.

conn:status()
conn:shutdown()
Closes the server listening socket and shuts down operation. Shutting down operation does not affect active connections in the MTL.

Auto Discovery of Cluster Nodes

We can create a simple mechanism for auto discovering and connecting cluster nodes when the nodes are on the same network. We can easily create an auto discovery service that finds other nodes by sending UDP broadcast messages.

The following example can be copied "as is" and used for auto discovering other nodes on the same network:

local function broadcastCosock(s,conn,udpport,tcpport)
   local s,err = ba.socket.udpcon{port=udpport}
   if s then
      s:setoption("broadcast", true)
      local msg=ba.socket.h2n(2, tcpport)
      local data
      while true do
         if not data then s:sendto(msg,"255.255.255.255",udpport) end
         data,err=s:read(5000,true)
         if data then
            if #data == 2 then
               -- 'err' is now sender's IP i.e. s:read is using recvfrom
               conn:add(err, ba.socket.n2h(2, data)) -- Add IP address
            end
         elseif err ~= "timeout" then
            break
         end
      end
   end
   trace("Unexpected error in broadcastCosock", err)
end

local function autodiscover(conn,udpport,tcpport)
   ba.socket.event(broadcastCosock,conn,udpport,tcpport)
end
function autodiscover(conn,udpport,tcpport)

Create an auto discovery service. Function autodiscover creates a UDP cosocket, which listens for messages and sends new broadcast messages every 5 seconds. When a new node is discovered or rediscovered, conn:add is called, which adds the new IP address if not already registered.

Using SMQ for Server to Server Communication

The SMQ broker includes an integrated client that can be used by server side Lua code. The integrated client enables server side Lua code to function as any other SMQ client. By setting up an SMQ cluster of at least two nodes, SMQ can be used for communication with other connected nodes. In fact, you may use SMQ for server to server communication only and not have any other clients connected.

When communicating with other connected cluster nodes, you may use standard named (one-to-many) publish/subscribe messages and send one-to-one messages by sending it directly to the ephemeral topic ID, the unique ID created for each client. See the One-to-one Communication introduction for more information.

Unlike a regular connected SMQ client, the client embedded in the SMQ broker has the hard coded ephemeral topic ID (etid) one (1). The server side SMQ client will receive messages from any client publishing to etid 'one' as long as the client is connected to the same broker. A client connected to another broker, including the server side client cannot directly publish to the etid 'one' and expect this to be sent over the cluster connection to another broker node. For this to work, we must first discover the client in the node we want to communicate with and have the SMQ Cluster Manager setup a communication path.

Each SMQ client has a unique etid and the etid for the SMQ client on the server side is one. When any SMQ client communicates over a cluster connection, the SMQ Cluster Manager sets up a phantom connection client on the other side of the cluster connection. The phantom connection gets its own unique etid, but this etid will be different from the etid on the origin broker. The purpose with the phantom client is to trap messages sent to the etid of this client, send it over the cluster connection, and then send it to the correct client. The logic for this is handled by the SMQ Cluster Manager.

The following example is a fully functional example that can be installed in a .preload script and run in the Mako Server "as is", except you must use IP addresses that work on your network. The example uses a one-to-many message called 'ping' and the server side SMQ client subscribes to this message. When the client receives a message addressed to 'ping', it responds by sending the one-to-one message 'pong' to the sender of the message. The sender of the 'ping' message is in the code below one of the server side client's connected to the cluster.

local mtl=require"smq.mtl".create("cluster password")
local conn,err=require"smq.conn".create(mtl, 1900)
assert(conn,err)
local list={"192.168.1.100","192.168.1.101", "192.168.1.102"}
conn:setlist(list) -- Connect the nodes
local smq=require"smq.hub".create()
cluster=require"smq.cluster".create(smq, mtl)

smq:create("ping", 2)
smq:createsub("ping", 3)
smq:createsub("pong", 4)

local function onping(data,ptid,tid,subtid)
   trace(string.format("%-20s %10X %d %d",data,ptid,tid,subtid))
   smq:publish("I am good, thanks!", ptid, "pong")
end

local function onpong(data,ptid,tid,subtid)
   trace(string.format("%-20s %10X %d %d",data,ptid,tid,subtid))
end

--smq:subscribe('self', {subtopic="ping",onmsg=onping})
smq:subscribe('ping', {onmsg=onping})
smq:subscribe('self', {subtopic="pong",onmsg=onpong})

local function oneshot()
   for ip,stat in pairs(conn:status()) do
      trace(ip, stat and "connected" or "broken")
   end
   --cluster:publish("How are you?", "ping")
   smq:publish("How are you?", "ping")
end

ba.timer(oneshot):set(3000,true)

Figure 3: Server to Server Communication Example

Line 1 to 7, which is copied from Figure 2, sets up an SMQ cluster.

Line 9 to 11 is not required, but we do this hard coding of topic and sub-topic names to make it easier to understand the printouts from the example. You may remove these three lines and the program will still work, but the topic IDs (tids) will get assigned random number values.

Function 'onping' on line 13 is registered as the callback when we subscribe to the named topic 'ping' on line 23. This function prints the data and responds by publishing 'pong' to the sender of the message -- i.e. responds by sending a message to the publishers ephemeral topic ID (ptid). Notice that we publish to the named topic ping, but the 'pong' message is named using a sub-topic name. Different types of one-to-one messages can only be differentiated by using sub-topic names. See sub-topic names introduction for more information.

Function oneshot on line 26 is a timer function that starts 3 seconds after startup, which is more than the time needed for connecting the three cluster nodes. The function prints out the connection status of the two other connected nodes and then publishes a message to the named topic 'ping'.

In the cluster configuration on line 4, we setup expectations for three cluster nodes, which means that each cluster node expects to connect to two other nodes. The printouts from one of the cluster nodes are shown below. Notice that function trace also prints out the code line. As an example, the first two printouts below are from code line 28.

28: 192.168.1.100      connected
28: 192.168.116.1      connected
onmsg 14: How are you?                  1 2 0
onmsg 14: How are you?           3C230B05 2 0
onmsg 14: How are you?           5E9E1C6E 2 0
onmsg 19: I am good, thanks!            1 1 4
onmsg 19: I am good, thanks!     3C230B05 1 4
onmsg 19: I am good, thanks!     5E9E1C6E 1 4

From the above printouts, you can see that line 14 is activated three times. The first printout is received from 'self' since ptid is one. Recall that the server side SMQ client's etid is hard coded to one. The two next printouts are received ping messages from the other two nodes. The printed ptid (3C230B05 and 5E9E1C6E) is from the phantom connection client handled by the Cluster Manager. The phantom connection client enables us to send a one-to-one message on line 15 in figure 3 and directly respond to the sender of the message, which is another server side SMQ client in another node.

You may have noticed that we received three ping messages, including the one sent from the same cluster node. We can easily add a filter in the onping callback to detect messages sent from 'self' by adding a if ptid ~= 1 clause and use this code to filter out messages received from 'self'. However, there is a better way to prevent sending messages to 'self'. In the code in Figure 3, we enable inter cluster communication by using message 'ping' as a broadcast (one-to-many) message as a way to discover other server side SMQ clients in the other connected nodes. Instead of publishing to a named topic (ping), we can use the specialized publish methods in the SMQ Cluster Manager API for setting up one-to-one communication channels.

Notice the two code lines (22 and 30) that are commented out. You can enable those two code lines and comment out line 23 and 31. Line 30 uses the specialized cluster:publish method to send a message to the server side SMQ client in all other connected nodes, but excludes 'self'. The next printout below is from this code setup.

28: 192.168.1.100      connected
28: 192.168.116.1      connected
onmsg 14: How are you?           ED981D9D 1 3
onmsg 14: How are you?           680F3958 1 3
onmsg 19: I am good, thanks!     680F3958 1 4
onmsg 19: I am good, thanks!     ED981D9D 1 4

From the above printouts, you can see that we are no longer receiving the 'ping' message sent to 'self'. Also, notice that the 'ping' message type is now sent as a sub-topic name.

We recommend that you analyze the above printouts and cross-check with the example code in Figure-3 to learn more about named topics and the ephemeral topic IDs.