ZeroMQ subscribe with xsub
I had a scenario where I wanted multiple publisher to broadcast information to a single subscribe. This subscriber would make a best effort to dedupe the information and present a "realtime" view of what is happening across multiple nodes.
Reliability wasn't the primary concern, back chatter is not wanted, and publishers do not care if anyone is listening. Functionally, this is a validation that the publisher is online and functioning as expected.
Traditional PUB
SUB
would require the subscribe to connect to the publisher and listen for events. In this scenario, I need many publishers to connect to a single subscribe, enter XSUB
.
XSUB
is commonly paired with XPUB
to provide a many pubs to many subs proxy configuration. XSUB
is an extended subscriber socket and allow a subscriber to bind to an address. PUB
natively supports connecting to a remote address. Armed with these two, it's possible to build a multi-pub to single-sub topology.
The publisher will connect to the remote subscribe and begin broadcasting messages:
package main
import (
"log"
"time"
zmq "github.com/pebbe/zmq4"
)
func main() {
pub, _ := zmq.NewSocket(zmq.PUB)
err := pub.Connect("tcp://127.0.0.1:9002")
if err != nil {
log.Fatal(err)
}
log.Println("zmq publisher connected to 127.0.0.1:9002")
defer pub.Close()
for {
log.Println("sending message")
pub.SendMessage("test", "testing")
pub.SendMessage("greet", "hello world")
time.Sleep(time.Second)
}
}
The subscribe will create an XSUB
socket and bind to and address. The main sticking point is that XSUB
does not support setting socket options for subscribing to message filters. Based on the code in the xsub source, we can simply send a message that subscribes to the appropriate channel.
package main
import (
"log"
zmq "github.com/pebbe/zmq4"
)
func main() {
// create an XSUB socket and bind it to a port
xsub, _ := zmq.NewSocket(zmq.XSUB)
err := xsub.Bind("tcp://*:9002")
if err != nil {
log.Fatal(err)
}
log.Println("zmq xsub listening on *:9002")
defer xsub.Close()
// subscribe the XSUB socket to all messages
_, err = xsub.SendBytes([]byte{0x01}, 0)
if err != nil {
log.Fatal(err)
}
// process incoming messages
for {
msg, e := xsub.RecvMessageBytes(0)
if e != nil {
log.Println(e)
continue
}
msgType := string(msg[0])
msgBody := string(msg[1])
switch msgType {
case "test":
{
log.Println("all", "test", msgBody)
}
case "greet":
{
log.Println("all", "greet", msgBody)
}
}
}
}
In the above code we want to subscribe to all channels, so we simply short circuit this by a sending a single message with a 0x01 payload.
_, err = xsub.SendBytes([]byte{0x01}, 0)
if err != nil {
log.Fatal(err)
}
That's all there is to it!