How to make ZMQ pub client socket buffer messages while sub server socket is down

1.9k Views Asked by At

Given 2 applications where application A is using a publisher client to contentiously stream data to application B which has a sub server socket to accept that data, how can we configure pub client socket in application A such that when B is being unavailable (like its being redeployed, restarted) A buffers all the pending messages and when B becomes available buffered messages go trough and socket catches up with real time stream?

In a nutshell, how do we make PUB CLIENT socket buffer messages with some limit while SUB SERVER is unavailable?

The default behaviour for PUB client is to drop in mute state, but it would be great if we could change that to a limit sized buffer, is it possible with zmq? or do i need to do it on application level...

I've tried setting HWM and LINGER in my sockets, but if i'm not wrong they are only responsible for slow consumer case, where my publisher is connected to subscriber, but subscriber is so slow that publisher starts to buffer messages (hwm will limit number of those messages)...

I'm using jeromq since i'm targeting jvm platform.

3

There are 3 best solutions below

0
vach On BEST ANSWER

I'm posting a quick update since the other two answers (though very informative were actually wrong), and i dont want others to be misinformed from my accepted answer. Not only you can do this with zmq, it is actually the default behaviour.

The trick is that if you publisher client never connected to the subscriber server before it keeps dropping messages (and that is why i was thinking it does not buffer messages), but if your publisher connects to subscriber and you restart subscriber, publisher will buffer messages until HWM is reached which is exactly what i asked for... so in short publisher wants to know there is someone on the other end accepting messages only after that it will buffer messages...

Here is some sample code which demonstrates this (you might need to do some basic edits to compile it).

I used this dependency only org.zeromq:jeromq:0.5.1.

zmq-publisher.kt

fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.PUB)

   socket.hwm = 10000
   socket.linger = 0
   "connecting to $uri".log()
   socket.connect(uri)

   fun publish(path: String, msg: Msg) {
      ">> $path | ${msg.json()}".log()
      socket.sendMore(path)
      socket.send(msg.toByteArray())
   }

   var count = 0

   while (notInterrupted()) {
      val msg = telegramMessage("message : ${++count}")
      publish("/some/feed", msg)
      println()

      sleepInterruptible(1.second)
   }
}

and of course zmq-subscriber.kt


fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.SUB)

   socket.hwm = 10000
   socket.receiveTimeOut = 250

   "connecting to $uri".log()
   socket.bind(uri)

   socket.subscribe("/some/feed")

   while (true) {
      val path = socket.recvStr() ?: continue
      val bytes = socket.recv()
      val msg = Msg.parseFrom(bytes)
      "<< $path | ${msg.json()}".log()
   }
}

Try running publisher first without subscriber, then when you launch subscriber you missed all the messages so far... now without restarting publisher, stop subscriber wait for some time and start it again.

Here is an example of one of my services actually benefiting from this... This is the structure [current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]

Because i restart the service in the middle, all the publishers start buffering their messages, the final service that was observing ~200 messages per second observes drop to 0 (those 1 or 2 are heartbeats) then sudden burst of 1000+ messages come in, because all publishers flushed their buffers (restart took about 5 seconds)... I am actually not loosing a single message here...

enter image description here

Note that you must have subscriber:server <= publisher:client pair (this way publisher knows "there is only one place i need to deliver these messages to" (you can try binding on publisher and connecting on subscriber but you will not see publisher buffering messages anymore simply because its questionable if subscriber that just disconnected did it because it no longer needs the data or because it failed)

6
smac89 On

As we've discussed in the comments there is no way for the publisher to buffer messages while having nothing connected to it, it will simply drop any new messages:

From the docs:

If a publisher has no connected subscribers, then it will simply drop all messages.

This means your buffer needs to be outside of zeromq's care. Your buffer could then be a list, or a database, or any other method of storage you choose, but you cannot use your publisher for doing that.

Now the next problem is dealing with how to detect that a subscriber has connected/disconnected. This is needed to tell us when we need to start reading from the buffer/filling the buffer.

I suggest using Socket.monitor and listening for the ZMQ_EVENT_CONNECTED and ZMQ_EVENT_DISCONNECTED, as these will tell you when a client has connected/disconnected and thus enable you to switching to filling your buffer of choice. Of course, there might be other ways of doing this that does not directly involve zeromq, but that's up to you to decide.

4
user3666197 On

First of all, welcome to the world of Zen-of-Zero, where latency matters most

PROLOGUE :

ZeroMQ was designed by a Pieter HINTJENS' team of ultimately experienced masters - Martin SUSTRIK to be named first. The design was professionally crafted so as to avoid any unnecessary latency. So asking about having a (limited) persistence? No, sir, not confirmed - PUB/SUB Scalable Formal Communication Pattern Archetype will not have it built-in, right because of the added problems and decreased performance and scalability ( add-on latency, add-on processing, add-on memory-management ).

If one needs a (limited) persistence (for absent remote-SUB-side agent(s)' connections ), feel free to implement it on the app-side, or one may design and implement a new ZMTP-compliant such behaviour-pattern Archetype, extending the ZeroMQ framework, if such work goes into stable and publicly accepted state, but do not request the high-performance, latency-shaved standard PUB/SUB having polished the almost linear scalability ad astra, to get modified in this direction. It is definitely not a way to go.

Solution ?

App-side may easily implement your added logic, using dual-pointer circular buffers, working in a sort-of (app-side-managed)-Persistence-PROXY, yet in-front-of the PUB-sender.

Your design may get successful in squeezing some additional sauce from the ZeroMQ internal details in case your design also enjoys to use the recently made available built-in ZeroMQ-socket_monitor-component to setup an additional control-layer and receive there a stream of events as seen from "inside" the PUB-side Context-instance, where some additional network and connection-management related events may bring more light into your (app-side-managed)-Persistence-PROXY

Yet, be warned that

The _zmq_socket_monitor()_ method supports only connection-oriented transports, that is, TCP, IPC, and TIPC.

so one may straight forget about this in case any of the ultimately interesting transport-classes was planned to be used { inproc:// | norm:// | pgm:// | epgm:// | vmci:// }


Heads up !

There are inaccurate, if not wrong, pieces of information from our Community honorable member smac89, who tried his best to address your additional interest expressed in the comment:

"...zmq optimizes publishing on topics? like if you keep publishing on some 100char long topic rapidly, is it actually sending the topic every time or it maps to some int and sends the int subsequently...?"

telling you:

"It will always publish the topic. When I use the pub-sub pattern, I usually publish the topic first and then the actual message, so in the subscriber I just read the first frame and ignore it and then read the actual message"

ZeroMQ does not work this way. There is nothing as a "separate" <topic> followed by a <message-body>, but rather the opposite

The TOPIC and the mechanisation of topic-filtering works in a very different way.

1) you never know, who .connect()-s:
i.e. one can be almost sure the version 2.x till version 4.2+ will handle the topic-filtering in different manner ( ZMTP:RFC defines intial capability-version handshaking, to let the Context-instance decide, which version of topic-filtering will have to be used:
ver 2.x used to move all messages to all peers, and let all the SUB-sides ( of ver 2.x+ ) be delivered the message ( and let the SUB-side Context-instance process the local topic-list filter processing )

whereas
ver 4.2+ are sure to perform the topic-list filter processing on **the PUB-side Context-instance (CPU-usage grows, network-transport the opposite ), so your SUB-side will never be delivered a byte of "useless" read "not-subscribed" to messages.

2) (you may, but) there is no need to separate a "topic" into a first-frame of a thus-implied multi-frame message. Perhaps just the opposite ( it is a rather anti-pattern to do this in high performance, low-latecy distributed system design.

Topic filtering process is defined and works byte-wise, from left-to-right, pattern matching for each of the topic-list member value agains the delivered message payload.

Adding extra data, extra frame-management processing just and only does increase the end-to-end latency and processing overhead. Never a good idea to do this instead of proper design work.


EPILOGUE :

There are no easy wins nor any low-hanging fruit in professional design, the less if or ultra-low-latency are the design targets.

On the other hand, be sure that ZeroMQ framework was made with this in mind and these efforts were crowned with stable, ultimately performant well-balanced set of tools for smart (by design), fast (in operation) and scalable (as hell may envy) signaling/messaging services people love to use right because of this design wisdom.

Wish you live happy with ZeroMQ as it is and feel free to add any additional set of features "in front" of the ZeroMQ layer, inside your application suite of choice.