Simple Message Queues (SMQ) Hub/Broker

The SMQ hub (a.k.a broker) API is divided into three parts:

  1. API for creating an SMQ broker instance and upgrading HTTP(S) requests to persistent SMQ connections.
  2. API for publishing and subscribing, similar to how a client can subscribe and publish messages.
  3. API for broker management.

You only need the first set of APIs(Part I) if you do not plan on writing any server code that interacts with the broker.

The Peer Table

The broker maintains a Lua table for each connected SMQ client. We refer to this table as the "peer table". The broker is responsible for maintaining the following values:

key value description
sock socket The client's active socket connection object. This is the same value as the parameter passed into function sock2peer.
tid number The unique ephemeral topic ID associated with this client.
topics nil|table List of TIDs (tids) the client is subscribed to or nil if the client is not subscribed to any topics. Keys in this table are the subscribed topic tids, and the values are the sockets associated with the tids. You can iterate this table as follows: for tid, sock in pairs(t.topics or {}) do ....
uid string Client's universally unique ID provided by the client.
info string The optional 'info' data sent by a client when it connects.
phantom n/a Set to a value if the table represents a phantom connection -- a connection endpoint setup by the Cluster Manager.

You may read the above values and you may add additional information to the peer table, but you must under no circumstances modify the values used by the broker. Doing so will make the broker malfunction.

I. API for creating an SMQ broker and upgrading HTTP(S) requests

create([op])

Creates and returns an SMQ instance: require"smq.hub".create([op])

Note: the broker instance also provides an SMQ client API enabling server side Lua code to communicate with other SMQ nodes. The API does not enable server to server communication, however, the cluster manager provides two specialized publish methods that can be used for inter server communication.

Example: see the above LSP page for example code.

smq:connect(cmd [,arg])

The smq:connect function is typically called from an LSP page or a directory function and upgrades (morphs) an incoming HTTP(S) request originating from an SMQ client into a persistent SMQ connection. The function grabs the active socket connection from the request object, thus invalidating any further use of this object after that the function returns. The function will send an HTTP 404 response message should the incoming request not originate from an SMQ client.

Example: see the above LSP page for example code.

II. Server API for publishing and subscribing

The pub/sub API enables server code to act as any other SMQ client. Server code can both publish and subscribe to messages sent to and from both browsers and devices.

Note: Lua strings can contain any value including binary data. Strings in Lua use 8 bit wide characters. Since JavaScript code utilizes 16 bit wide characters, data strings intended for interpretation by a JavaScript client must be provided as UTF-8 by the server code. Use of an UTF-8 enabled editor is recommended if you plan on sending strings to JavaScript code that includes non English characters. JavaScript code can also receive raw data. See the subscribe method in the JavaScript client and the "datatype" setting for details.

smq:create(topic, [tid])

Create a topic and fetch the topic ID (TID). The SMQ protocol is optimized and does not directly use a string when publishing, but a number. The server randomly creates a 32 bit number and persistently stores the topic name and number. The 'create' method can optionally be used prior to publishing a message on a specific topic. Otherwise, the publish method can be used directly with a topic string since the publish method takes care of first creating a topic if you publish to a topic unknown to the broker. The broker will not invoke the permittop callback when the topic is created by server code.

smq:createsub(subtopic [,stid])

Create a sub-topic and fetch the subtopic ID. The createsub method can optionally be used prior to publishing a message on a specific topic and sub-topic. Alternatively, the publish method may be used directly with topic and sub-topic strings, respectively. The publish method will manage the sequence and creation of a topic, then sub-topic in circumstances where the topic and/or sub-topic names are unknown to the broker. The broker will not invoke the permitsubtop callback when the topic is created by server code.

smq:gettid()

Get the server client's ephemeral TID. The broker creates and selects a unique random number as the ephemeral TID for each connected SMQ client, however, the server client's ephemeral TID is always set to one; thus this function always returns one.

smq:observe(topic,onchange)

Request the broker to provide change notification events when the number of subscribers to a specific topic changes. Ephemeral TIDs can also be observed. The number of connected subscribers for a unique ephemeral ID can only be one, which means the client is connected. Receiving a change notification for an ephemeral ID means the client has disconnected and that you no longer will get any change notifications for the observed TID. The broker automatically "unobserves" observed ephemeral TIDs when a change notification is sent.

smq:unobserve(topic)

Stop receiving change notifications for a topic or ephemeral TID.

smq:publish(data, topic [,subtopic])

Publish messages to a topic and optionally to a sub-topic. Topics may be topic names (strings), TIDs (numbers), or ephemeral TIDs (numbers). Messages published to unresolved topic names are instantly resolved. Topic names are resolved by calling smq:create and/or smq:createsub. The server's ephemeral TID is one and the message is sent with 'ptid' set to one unless you set the optional ptid argument. By setting 'ptid', you may publish on behalf of another SMQ client. Note: max payload size is 0xFFF0.

smq:pubon(data, totid, fromtid, subtopic)

Publish a message on behalf of another SMQ client. This is an advanced function that enables server side logic to connect SMQ clients together in a system that is not using named topics, but is instead using ephemeral topic IDs only. Both totid and fromtid must be ephemeral topic IDs. A subtopic number must be provided, but the number can be set to zero if not used.

smq:subscribe(topic [,op])

Subscribe to a topic and optionally to a sub-topic. You may subscribe multiple times to the same topic if you use sub-topics. Subscribing to a topic without providing a sub-topic introduces a "catch all" for sub-topics that does not correspond to any subscribed sub-topics.

The topic name "self" is interpreted as subscribing to the server's own Ephemeral Topic ID. Subscribing to your own Topic ID makes it possible for other connected clients to send a message directly to the server.

smq:unsubscribe(topic)

Requests the broker to unsubscribe the server from a topic. All registered onmsg callback functions, including all callbacks for sub-topics will be removed from the broker.

smq:onmsg(onmsg)

Install a global "onmsg" callback function. The global "onmsg" function is called if you subscribe to a topic and do not provide a callback function when subscribing. The onmsg function is also called if a "topic" callback function fails to decode the data as JSON.

smq:queuesize()
smq:tid2topic(tid)

Translates TID to topic name.

smq:topic2tid(topic)

Translates topic name to TID.

smq:subtopic2tid(subtopic)

Translates sub-topic name to sub-TID.

smq:tid2subtopic(tid)

Translates sub-TID to sub-topic name.

III. API for broker management

smq:peers()

Returns an iterator that lets you iterate over all connected clients.

Example code:

for sock,peer in smq:peers() do
   trace("IP address:",sock:peername(), "UID", peer.uid)
end

See function smq:sock2peer for more information on how to use the peer table returned by the iterator.

smq:sock2peer(sock)
smq:etid2peer(etid)
smq:topics()

Returns an iterator that lets you iterate over all registered tids/topic-names.

Example code:

for tid,topicname in smq:topics() do
   trace("tid:",tid, "topic", topicname)
end
smq:subtopics()

Returns an iterator that lets you iterate over all registered tids/sub-topic-names.

Example code:

for subtid,subtopname in smq:subtopics() do
   trace("sub tid:",subtid, "subtopic", subtopname)
end
smq:setkeepalive(keepidle,keepintv)

Sets the TCP keepalive timeout for persistent SMQ connections. The default time is just below 4 minutes. The SMQ broker sets the keepalive for each new connection by calling sock:setoption("keepalive",true,keepidle,keepintv)

smq:shutdown([msg [,etid]])

You may force one specific client to disconnect by providing 'etid', the client's ephemeral topic ID.

When called without the etid parameter, the function closes all the SMQ clients socket connections. A well behaved SMQ client should then attempt to reconnect. This mode is designed for dynamic runtime upgrade of the broker/server app. You may design the server side logic such that you can dynamically upgrade the server side Lua application and broker without restarting the server.

If msg is provided, the broker will send the SMQ control message Disconnect to all connected clients before closing the connection. A well behaved client should not attempt to reconnect when it receives a Disconnect request.

When called without the etid parameter and when the SMQ broker is used in a cluster setup, all other connected clusters are informed and all other brokers associated with the registered MTL name are also shut down.

isSMQ(request)

Returns true if the HTTP client request is believed to originate from an SMQ connection request.

Example code:

<?lsp
  if require("smq.hub").isSMQ(request) then
    -- Delegate to SMQ broker instance.
  else
   -- Not an SMQ client. Probably a standard HTTP client (browser).
  end
?>