I'm trying to create message bus with ZeroMQ using XPUB-XSUB proxy.
Number of subscribers and publishers will vary as they come and go. Goal is that there could be number of daemons that can communicate through this message bus.
Problem is that when there already is a subscriber connected to XPUB side of the proxy and new publisher connects and immediately starts to send messages, the publisher won't get the first messages.
I assume the problem is that when publisher connects, the information about subscribers doesn't arrive immediately and first messages get discarded on the socket.
Easy but not reliable solution is to add small sleep on publisher side between connecting and sending messages.
Is there good way to wait for subscription forwarding? Or should I use some other type of sockets?
I have following example:
import threading
import time
from zmq import Context, Socket, proxy
from zmq.constants import PUB, SUB, XPUB, XPUB_VERBOSE, XSUB
def message_bus():
context = Context.instance()
in_socket: Socket = context.socket(XSUB)
in_socket.bind("ipc:///tmp/in_socket.ipc")
out_socket: Socket = context.socket(XPUB)
out_socket.bind("ipc:///tmp/out_socket.ipc")
out_socket.setsockopt(XPUB_VERBOSE, True)
proxy(in_socket, out_socket)
def publisher():
context = Context.instance()
bus_in_socket: Socket = context.socket(PUB)
bus_in_socket.connect("ipc:///tmp/in_socket.ipc")
count = 1
while True:
bus_in_socket.send_string(f"message number {count}")
count += 1
time.sleep(0.5)
def subscriber():
context = Context.instance()
bus_out_socket: Socket = context.socket(SUB)
bus_out_socket.connect("ipc:///tmp/out_socket.ipc")
bus_out_socket.subscribe("")
while True:
print(f"subscriber {bus_out_socket.recv_multipart()}")
if __name__ == "__main__":
message_bus_thread = threading.Thread(target=message_bus, daemon=True)
subscriber_thread = threading.Thread(target=subscriber, daemon=True)
publisher_thread = threading.Thread(target=publisher, daemon=True)
message_bus_thread.start()
subscriber_thread.start()
time.sleep(1)
publisher_thread.start()
message_bus_thread.join()
subscriber_thread.join()
publisher_thread.join()
Output:
subscriber [b'message number 2']
subscriber [b'message number 3']
subscriber [b'message number 4']
subscriber [b'message number 5']
subscriber [b'message number 6']
subscriber [b'message number 7']
subscriber ...
As you can see first message [b'message number 1'] is not sent at all.
One solution for this is to use PUSH-PULL sockets instead of PUB-XSUB.