How to implement proxy/broker for (X)PUB/(X)SUB messaging in ZMQ?

myWallJSON picture myWallJSON · Jan 29, 2013 · Viewed 7.6k times · Source

So I was reading this article on how to create proxy/broker for (X)PUB/(X)SUB messaging in ZMQ. There is this nice picture of what shall architecture look like :

data flow: user code -> PUB -> XSUB -> user code-> XPUB -> SUB -> user code; subscription flow: user code <- PUB <- XSUB <- user code <- XPUB <- SUB <- user code;

But when I look at XSUB socket description I do not get how to forward all subscriptions via it due to the fact that its Outgoing routing strategy is N/A

So how one shall implement (un)subscription forwarding in ZeroMQ, what is minimal user code for such forwarding application (one that can be inserted between simple Publisher and Subscriber samples)?

Answer

minrk picture minrk · Jan 29, 2013

XPUB does receive messages - the only messages it receives are subscriptions from connected subscribers, and these messages should be forwarded upstream as-is via XSUB.

The very simplest way to relay messages is with zmq_proxy:

xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)

which will relay messages to/from xpub and xsub. Optionally, you can add a PUB socket to monitor the traffic that passes through in either direction.

If you want user code in the middle to implement extra routing logic, you would do something like this, which re-implements the inner loop of zmq_proxy:

def broker(ctx):
    xpub = ctx.socket(zmq.XPUB)
    xpub.bind(xpub_url)
    xsub = ctx.socket(zmq.XSUB)
    xsub.bind(xsub_url)

    poller = zmq.Poller()
    poller.register(xpub, zmq.POLLIN)
    poller.register(xsub, zmq.POLLIN)
    while True:
        events = dict(poller.poll(1000))
        if xpub in events:
            message = xpub.recv_multipart()
            print "[BROKER] subscription message: %r" % message[0]
            xsub.send_multipart(message)
        if xsub in events:
            message = xsub.recv_multipart()
            # print "publishing message: %r" % message
            xpub.send_multipart(message)

        # insert user code here

full working (Python) example