ZMQ_ROUTER Semantics: Dropping Disconnected Peers

194 Views Asked by At

I wanted to develop a reliable multi-client, single-server req/rep communication pattern. I decided to use ZMQ_REQ and ZMQ_ROUTER sockets in order to accomplish this task.

The ZeroMQ RFC states the following:

The ROUTER socket type SHALL create a double queue when a peer connects to it. If this peer disconnects, the ROUTER socket SHALL destroy its double queue and SHALL discard any messages it contains.

According to this semantics I expect messages ROUTER to drop disconnected clients. I developed the following code to test my design.

// Client Code
// cppzmq: v4.10.0
// libzmq: v4.3.5
#include <zmq.hpp>
#include <iostream>

zmq::context_t context{1};

bool reqrep(zmq::message_t &req, std::string addr)
{
    zmq::socket_t client{context, zmq::socket_type::req};
    client.set(zmq::sockopt::rcvtimeo, 2500);
    client.set(zmq::sockopt::sndtimeo, 2500);
    client.set(zmq::sockopt::immediate, true);
    client.set(zmq::sockopt::linger, 0);

    client.connect(addr);
    if (!client.send(req))
    {
        std::cerr << "E: send timeout\n";
        return false;
    }

    zmq::message_t rep;
    if (client.recv(rep))
    {
        std::cout << "I: server replied OK (" << rep.to_string() << ")\n";
        return true;
    }
    else
    {
        std::cerr << "E: receive timeout\n";
        return false;
    }
}

int main()
{
    std::string addr{"ipc:///tmp/server"};

    int seq{};
    while (true)
    {
        zmq::message_t req{std::to_string(seq++)};
        reqrep(req, addr);
    }

    return 0;
}
// Server Code
// cppzmq: v4.10.0
// libzmq: v4.3.5

#include <unistd.h>
#include <zmq_addon.hpp>
#include <iostream>

//  Provide random number from 0..(num-1)
#define within(num) (int)((float)((num) * random()) / (RAND_MAX + 1.0))

int main()
{
    zmq::context_t context(1);
    zmq::socket_t server(context, ZMQ_ROUTER);

    server.bind("ipc:///tmp/server");

    while (1)
    {
        zmq::multipart_t mp;
        auto req = zmq::recv_multipart(server, std::back_inserter(mp));
        std::cout << "I: message received";
        std::cout << mp.str() << std::endl;
        if (!within(20))
        {
            std::cout << "I: simulating CPU overload" << std::endl;
            sleep(20);
        }
        sleep(1); // Do work
        zmq::send_multipart(server, mp);
    }
    return 0;
}

The results doesn't satify my expectations (left: server, right: client). After 20 seconds of sleep, server still keeps the messages from the disconnected peers and processes them. left: server, right: client

Is there something I miss? I will be looking for feedbacks.

2

There are 2 best solutions below

3
jamesdillonharvey On

The connections are managed asynchronously in the zeromq context threads so unless you call disconnect on the client or unbind on the server (or there is a real network issue) no disconnects will happen.

You can investigate further with socket_monitor

https://libzmq.readthedocs.io/en/latest/zmq_socket_monitor.html

0
bazza On

To extend James Harvey's answer, and just in case you are running on Linux (specifically, Linux), there's a neat trick you can do. And, have you come across this because you were simply ctrl-c'ing the client (or other equivalent instant process termination like exit(), return from main(), etc)?

The the answers are "no" and "no", read no further!

On Linux you can use signalfd as a means for signals to be delivered to your process and handled without resorting to signal handler routines. The idea is that you can set up signalfd to capture specified signals, and you get information about them delivered through a file descriptor. The nice thing about this - especially with ZeroMQ - is that you can include the signalfd file descriptor in a call to zmq_poll().

If you do set this up for ctrl-c, of hang-up, or any other signal, you can choose what to do about them at leisure in the program's main loop that's calling zmq_poll(). For instance, I'll have it set to capture ctrl-c, and when that is delivered my programs will typically kick off whatever it is that is required to "finish", usually culminating in calls to zmq_close(), etc, and finally cleaning up the zmq context.

That way anything that ZMQ is going to do to clean up the connection to something else (perhaps sending a zmtp packet to tell the remote to drop the connection) can happen, and things become a lot less confusing.

Signalfd is AFAIK a purely Linux thing. Other OSes may have their own way of achieving the same effect. I consider signalfd to be the sole way any new code should deal with signals, finally getting rid of the tyranny of signal handlers and their limitations and complications.