Simple Message Queues (SMQ) Broker

The SMQ broker API is divided into three parts:

  1. API for creating an SMQ broker instance and upgrading HTTP(S) requests to persistent SMQ connections.
  2. API for publishing and subscribing, similar to how a client can subscribe and publish messages.
  3. API for broker management.

You only need the first set of APIs(Part I) if you do not plan on writing any server code that interacts with the broker.

The Peer Table

The broker maintains a Lua table for each connected SMQ client. We refer to this table as the "peer table". The broker is responsible for maintaining the following values:

key value description
sock socket The client's active socket connection object. This is the same value as the parameter passed into function sock2peer.
tid number The unique ephemeral topic ID associated with this client.
topics nil|table List of TIDs (tids) the client is subscribed to or nil if the client is not subscribed to any topics. Keys in this table are the subscribed topic tids, and the values are the sockets associated with the tids. You can iterate this table as follows: for tid, sock in pairs(t.topics or {}) do ....
uid string Client's universally unique ID provided by the client.
info string The optional 'info' data sent by a client when it connects.
phantom n/a Set to a value if the table represents a phantom connection.

You may read the above values and you may add additional information to the peer table, but you must under no circumstances modify the values used by the broker. Doing so will make the broker malfunction.

I. API for creating an SMQ broker and upgrading HTTP(S) requests

The broker is stored as a Lua module, and the module is loaded by using the "require" function:

local broker = require("smq.broker") 

Use of parenthesis () is optional in Lua for functions that take only one parameter and when this parameter is a literal string or a table constructor. We can therefore write the above line as:

local broker = require"smq.broker" 

The returned value, saved as the local variable 'broker' above, is a Lua table (object). This table includes one function, the "create" function, which is used in the example below.

local broker = require"smq.broker" -- Fetch "broker" module
local smq = broker.create() -- Create an SMQ broker instance

The "create" function creates an SMQ broker instance. We can simplify the above Lua module loading and creating of an SMQ instance as follows:

 local smq = require"smq.broker".create()

The Lua scripting language includes a garbage collector that will eventually remove all unreferenced data. The above construction keeps the SMQ instance (smq) in a "local" variable, which is not referenced (anchored). This instance will therefore be garbage collected. We can simply remove the keyword "local" in front of "smq", thus referencing the smq instance in the current environment. In Lua, an environment is a Lua table. The Barracuda Application Server provides many environments. One example of an environment is the request/response environment in an LSP page. It is also natural to think that this is a good location for creating an SMQ instance since SMQ clients connecting to an SMQ broker initially start by sending an HTTP request. The HTTP request is upgraded to a persistent SMQ connection by function "connect". The following LSP page illustrates how a server side LSP page can upgrade an incoming HTTP or HTTPS request to an SMQ connection.

 
<?lsp
   smq = require"smq.broker".create() -- Create one broker instance
   smq.connect(request) -- Upgrade HTTP(S) request to an SMQ connection
?>

In the above LSP page, we keep 'smq' as a global variable in the request/response environment, and the SMQ instance will not garbage collect as long as the request/response environment is referenced. However, the request/response environment, which is also known as the command environment, is an ephemeral (short lived) environment that only exists for the duration of the HTTP request. A new command environment is created for each request. The above construction creates a new SMQ instance for each incoming HTTP request and then leaves the entire environment free floating, thus enabling the garbage collector to collect the environment and all data referenced in this environment.

One may to make sure we only create on SMQ instance in an LSP page is to reference the instance in the LSP page table, a table that remains persistent as long as the application that owns the LSP page remains referenced.

 
<?lsp
   local smq = page.smq -- fetch from 'page' table
   if not smq then -- first time accessed
      smq = require"smq.broker".create() -- Create one broker instance
      page.smq = smq -- Store (reference) broker instance
   end
   smq.connect(request) -- Upgrade HTTP(S) request to an SMQ connection
?>

The (if) statement on line 3 will trigger during the first time it is accessed after a server start, provided that the page has never executed before. The code on lines 4 and 5 creates a broker instance and then saves the resulting broker instance in the page table to isolate it from garbage collection.

Line 7 takes the LSP page connection object and passes it into the smq.connect() function. A SMQ connection request is either a standard HTTP or HTTPS request, which is upgraded to a persistent SMQ connection by the "connect" function. This connection is then, maintained until the SMQ client decides to disconnect. For security reasons, the "connect" function analyzes the incoming request and sends a [404 page not found] error message, (if) the request is not originating from an SMQ client request. You may test this by navigating to an LSP page with the above code and you should receive a [404 page not found] error message displayed within the browser.

Creating and storing a broker within an LSP page may be suitable for very basic operations or limited use of the broker as shown within the above example; however, it is not recommended (if) you are designing any extensive server side logic that will have interaction with the broker. In contrast, separation of the logic from the LSP page should be maintained by creating the broker within a separate module (or) within an application environment such as the (.preload script).

The following example shows how to separate the logic from the LSP page and where the LSP page only calls the "connect" function.

We start by creating a .preload file. This file is stored in the root of the application. A Barracuda Application Server powered product such as the Mako Server can be instructed to load applications and the .preload script is where you include code logic for your given application.

 
local function authenticate(credentials, info)
   local username,pwdhash=credentials:match("([^:]+):(.+)")
   pwdhash = pwdhash and ba.b64decode(pwdhash)
   if username and pwdhash then
      -- The local data will typically be fetched from a database
      local dbuname="username" -- For test only
      local dbpasswd="password"
      local data=string.format("%u%s%s%s",
                               info.seed,info.sock:peername(),dbuname,dbpasswd)
      if pwdhash == ba.crypto.hash("sha512")(data)(true) then
         return 0 -- Authenticated
      end
   end
   return 6, "Incorrect credentials"
end

-- Set 'smq' as global in application's environment
smq=require"smq.broker".create{authenticate=authenticate}

In the above code example, we create an SMQ instance on line 8 and pass in parameters for setting an authenticator callback function. The callback function (line 1-15) is called each time a new client connects. The authenticator function is designed for hash based password encryptions and may be used with the JavaScript client onauth example. The username and password is hard coded in the above authenticator example. You would normally fetch this information from a database.

The smq variable set in the application's environment is made available from the LSP pages with the "app." prefix. We then create a very small LSP page that simply calls the onconnect method as follows:

 
<?lsp
   app.smq.connect(request)
?>

More information on how the various environments may be made available to work with server side code can be found within the documentation for the Command (Request/Response) Environment.

A suggestion is to also read the Mako Server information page on how to use the .preload script, if you are using the Mako Server. The Mako Server is a Barracuda Application Server derivative product.

create([op])
Creates and returns an SMQ instance: require"smq.broker".create([op])

Example: see the above LSP page for example code.

smq.connect(cmd [,arg])

The smq.connect function is typically called from an LSP page or a directory function and upgrades (morphs) an incoming HTTP(S) request originating from an SMQ client into a persistent SMQ connection. The function grabs the active socket connection from the request object, thus invalidating any further use of this object after that the function returns. The function will send an HTTP 404 response message should the incoming request not originate from an SMQ client.

Example: see the above LSP page for example code.

II. Server API for publishing and subscribing

The pub/sub API enables server code to act as any other SMQ client. Server code can both publish and subscribe to messages sent to and from both browsers and devices.

Note: Lua strings can contain any value including binary data. Strings in Lua use 8 bit wide characters. Since JavaScript code utilizes 16 bit wide characters, data strings intended for interpretation by a JavaScript client must be provided as UTF-8 by the server code. Use of an UTF-8 enabled editor is recommended if you plan on sending strings to JavaScript code that includes non English characters. JavaScript code can also receive raw data. See the subscribe method in the JavaScript client and the "datatype" setting for details.

smq.create(topic, [tid])

Create a topic and fetch the topic ID (TID). The SMQ protocol is optimized and does not directly use a string when publishing, but a number. The server randomly creates a 32 bit number and persistently stores the topic name and number. The 'create' method can optionally be used prior to publishing a message on a specific topic. Otherwise, the publish method can be used directly with a topic string since the publish method takes care of first creating a topic if you publish to a topic unknown to the broker. The broker will not invoke the permittop callback when the topic is created by server code.

smq.createsub(subtopic [,stid])

Create a sub-topic and fetch the subtopic ID. The createsub method can optionally be used prior to publishing a message on a specific topic and sub-topic. Alternatively, the publish method may be used directly with topic and sub-topic strings, respectively. The publish method will manage the sequence and creation of a topic, then sub-topic in circumstances where the topic and/or sub-topic names are unknown to the broker. The broker will not invoke the permitsubtop callback when the topic is created by server code.

smq.gettid()

Get the server client's ephemeral TID. Each client is assigned a unique TID, and this TID is included when publishing a message. All subscribers receiving the message published by a client can use this (TID) for identification purposes or for sending messages directly to the "publisher" of the message. See the subscribe onmsg for example code.

smq.observe(topic,onchange)

Request the broker to provide change notification events when the number of subscribers to a specific topic changes. Ephemeral TIDs can also be observed. The number of connected subscribers for a unique ephemeral ID can only be one, which means the client is connected. Receiving a change notification for an ephemeral ID means the client has disconnected and that you no longer will get any change notifications for the observed TID. The broker automatically "unobserves" observed ephemeral TIDs when a change notification is sent.

smq.unobserve(topic)

Stop receiving change notifications for a topic or ephemeral TID.

smq.publish(data, topic [,subtopic])

Publish messages to a topic and optionally to a sub-topic. Topics may be topic names (strings), TIDs (numbers), or ephemeral TIDs (numbers). Messages published to unresolved topic names are instantly resolved. Topic names are resolved by calling smq.create and/or smq.createsub.

smq.subscribe(topic [,op])

Subscribe to a topic and optionally to a sub-topic. You may subscribe multiple times to the same topic if you use sub-topics. Subscribing to a topic without providing a sub-topic introduces a "catch all" for sub-topics that does not correspond to any subscribed sub-topics.

The topic name "self" is interpreted as subscribing to the server's own Ephemeral Topic ID. Subscribing to your own Topic ID makes it possible for other connected clients to send a message directly to the server.

smq.unsubscribe(topic)

Requests the broker to unsubscribe the server from a topic. All registered onmsg callback functions, including all callbacks for sub-topics will be removed from the broker.

smq.onmsg(onmsg)

Install a global "onmsg" callback function. The global "onmsg" function is called if you subscribe to a topic and do not provide a callback function when subscribing. The onmsg function is also called if a "topic" callback function fails to decode the data as JSON.

smq.queuesize()
smq.tid2topic(tid)

Translates TID to topic name.

smq.topic2tid(topic)

Translates topic name to TID.

smq.subtopic2tid(subtopic)

Translates sub-topic name to sub-TID.

smq.tid2subtopic(tid)

Translates sub-TID to sub-topic name.

III. API for broker management

smq.peers()

Returns an iterator that lets you iterate over all connected clients.

Example code:

for sock,peer in smq.peers() do
   trace("IP address:",sock:peername(), "UID", peer.uid)
end

See function smq.sock2peer for more information on how to use the peer table returned by the iterator.

smq.sock2peer(sock)
smq.etid2peer(etid)
smq.topics()

Returns an iterator that lets you iterate over all registered tids/topic-names.

Example code:

for tid,topicname in smq.topics() do
   trace("tid:",tid, "topic", topicname)
end
smq.subtopics()

Returns an iterator that lets you iterate over all registered tids/sub-topic-names.

Example code:

for subtid,subtopname in smq.subtopics() do
   trace("sub tid:",subtid, "sub topic", subtopname)
end
smq.setkeepalive(time)

Sets the TCP keepalive timeout for persistent SMQ connections. The default time is just below 4 minutes. The SMQ broker sets the keepalive for each new connection by calling sock:setoption("keepalive",true,keepalive,keepalive)

smq.shutdown([msg [,etid]])

When called without any arguments, the function closes all the client SMQ's socket connections. A well behaved SMQ client should then attempt to reconnect. This mode is designed for dynamic runtime upgrade of the broker/server app. You may design the server side logic such that you can dynamically upgrade the server side Lua application and broker without restarting the server.

If msg is provided, the broker will send the SMQ control message Disconnect to all connected clients before closing the connection. A well behaved client should not attempt to reconnect when it receives a Disconnect request.

You may force one specific client to disconnect by providing 'etid', the client's ephemeral topic ID.

isSMQ(request)

Returns true if the HTTP client request is believed to originate from an SMQ connection request.

Example code:

<?lsp
  if require("smq.broker").isSMQ(request) then
    -- Delegate to SMQ broker instance.
  else
   -- Not an SMQ client. Probably a standard HTTP client (browser).
  end
?>