I do a lot with messaging architectures, and because I work on embedded systems so much lately, my main broker protocol has been MQTT, used with Javascript. I learned something that surprised me this morning, even though it really shouldn’t have, given some thought.
MQTT is a common protocol used in IoT. It stands for “Message Queuing Telemetry Transport“, and the current specification for MQTT is 5.0. Common brokers include Eclipse Mosquitto, EMQX, and HiveMQ.
As its name implies, MQTT is designed to be super-light. It has some pretty nice features, including termination semantics, quality of service (guaranteed delivery), retention, and other facets, but the primary use is for fairly simple communication of small messages – the header has room for a “payload length” and it’s two bytes, so your maximum packet size is somewhere in the region of 64k bytes.
The thing about MQTT that got me was the subscription model. It would be convenient to have a handler per subscription:
// this code does not do what it seems to expect.
client.subscribe('rpc/1',
(message) => {
console.log('hey, we got a message on rpc/1: ${message.toString()}');
});
client.subscribe('rpc/2',
(message) => {
client.publish('rpc/1', `hello, ${message}`);
})
As the comment says – who reads comments, right? – this code does not work. It looks like it should do something specific when a message comes in on rpc/1
, and something different when a message comes in on rpc/2
– but it doesn’t. It actually solely establishes two subscriptions for the client
connection and those “message handlers” will not be executed, ever, under normal circumstances.
MQTT clients in Javascript, using the mqtt
and async-mqtt
modules, use Javascript’s stream paradigm. Connections use on()
to establish event handlers for events as they come through. Thus, a connection event emits a connect
event, an incoming message emits a message
event, and so on.
If you wanted to subscribe to two topics, as expressed above, you’d have a simpler subscription process:
client.subscribe('rpc/1');
client.subscribe('rpc/2');
This tells the client connection (and the broker) to handle any messages matching the topic name, including any wildcards you might want. You can have as many subscriptions on a single client as you like, although I imagine the brokers have rational limits.
That applies in the same way to message handlers. You add a message handler as a callback for the message
event:
client.on('message', (topic, message) => {
// assumes 'message' is a human-readable string!
console.log(topic, message.toString());
});
Here’s the thing: you can have multiple message handlers, too. And they’ll get every message that the client
is subscribed to.
If we’re subscribed to rpc/1
and rpc/2
, that same message handler gets any message posted to either of those topics. It won’t get any other messages – presuming we haven’t added any more subscriptions – but it will get every message for those subscribed topics.
What’s more, if we add another message handler – via client.on('message', ...)
again – every message handler will get every message, without discrimination.
If the handlers should need to handle only specific messages, then they each have to implement that functionality – filtering on topic, for example, or message content.
An alternative approach – and the one I think is more appropriate, within the limits of resource consumption – is to have multiple MQTT connections, each one with subscriptions that match a specific functionality.
In our first broken example, we have two topics, rpc/1
and rpc/2
, where a message written to rpc/1
emits a sort of “hello world” message, and a message written to rpc/2
causes a message to be published to rpc/1
.
If we’re preserving connections to the broker, our message handler would have to look something like this:
client.on('message', (topic, message)=>{
if(topic.endsWith('1')) {
console.log('hello', message.toString());
}
if(topic.endsWith('2')) {
client.publish('rpc/1', message.toString());
}
});
In environments where sockets are less expensive – i.e., we aren’t worried about counting how many sockets we use – we can be a lot more clear:
const helloClient=MQTT.connect('tcp:localhost:1883');
const sayHelloClient=MQTT.connect('tcp:localhost:1883');
helloClient.on('connect', ()=>{
helloClient.subscribe('rpc/1');
});
helloClient.on('message', (topic, message)=>{
console.log('hello', message.toString());
});
sayHelloClient.on('connect', ()=>{
helloClient.subscribe('rpc/2');
});
sayHelloClient.on('message', (topic, message)=>{
sayHelloClient.publish('rpc/1', message.toString());
});
In most message queueing libraries, you would set up a handler for incoming messages on each subscription, but MQTT is designed to be lighter than that. This shows a clean way to handle topic propagation in MQTT.