Problems with mangos - the nanomsg bus protocol provided by Golang package

695 Views Asked by At

I'd like to use nanomsg/nng as the communication basis of a fully distributed peer-to-peer multi-node network, to help construct the dynamic ability of topological discovery and maintenance. Now I get stuck in its Golang package mangos.

The same work has been done in Python and pynng (which is a python binding for nanomsg), but when I use Go and invoke the corresponding methods by mangos instead, their behaviors are totally different. The puzzle is mainly threefold:

  1. The bus-type-Socket's Recv() acts in a blocking mode by default and seems not to be configurable to the non-blocking mode. Documents says:

OptionRecvDeadline is the time until the next Recv times out. The value is a time.Duration. Zero value may be passed to indicate that no timeout should be applied. A negative value indicates a non-blocking operation. By default there is no timeout.

I tried a negative value accordingly, but Recv() was still blocking. What else should I do? and how to understand the difference between "Zero-timeout" and "non-blocking"?

  1. The dialer returned by (s *socket) NewDialer(...) seems to linger on after calling dialer.Close(), since an error will occur when calling a next dialer.Dial() reporting it's still "address in use". But when I tried to Close() the dialer again, error occurs as well reporting it's already closed. I also tried different combinations of the following options, but all the attempts failed
opts := make(map[string]interface{})
opts[mangos.OptionDialAsynch] = true                    // or false
opts[mangos.OptionMaxReconnectTime] = time.Millisecond  // or zero 
opts[mangos.OptionKeepAliveTime] = time.Millisecond     // or even smaller
opts[mangos.OptionKeepAlive] = false                    // or true

What should I do when I want to kill the dialer completely, or want to reuse the "pseudo-closed" dialer some time later?

  1. The bus-type-Socket's Send() is strange. Normally each node is supposed to periodically send a message in my code. I shut down the physical connection of a node (say "Node-X") from the network, keep it offline for a while, and then reconnect it to the network. I found Node-X would re-send lots of messages immediately when it got reconnected. But what I really expect is that Node-X could send those messages to the air even if it has no neighbors.

I wonder if there is any way to come over these problems. I guess it could be missing some options or configurations, but I failed to figure them out.

The following code is used for reproducing the re-dial and re-close errors.

package main

import (
    "fmt"
    "os"
    "time"

    "go.nanomsg.org/mangos/v3"
    "go.nanomsg.org/mangos/v3/protocol/bus"

    // register transports
    _ "go.nanomsg.org/mangos/v3/transport/all"
)

var (
    sock      mangos.Socket
    DialerMap map[string]*mangos.Dialer
    opts      map[string]interface{}
)

func main() {
    var err error
    opts = make(map[string]interface{})
    opts[mangos.OptionDialAsynch] = true
    opts[mangos.OptionMaxReconnectTime] = time.Millisecond
    // opts[mangos.OptionKeepAliveTime] = time.Millisecond
    opts[mangos.OptionKeepAlive] = false
    DialerMap = make(map[string]*mangos.Dialer)

    if sock, err = bus.NewSocket(); err != nil {
        fmt.Println("bus.NewSocket error. ", err)
        os.Exit(1)
    }
    TargetUUID := "node-A"
    TargetAddr := "tcp://192.168.0.172:60000"   // this should be changed to a available address
    MyDial(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyClose(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyDial(TargetUUID, TargetAddr)
    time.Sleep(time.Second * 2)
    MyClose(TargetUUID, TargetAddr)
    time.Sleep(100 * time.Second)

}
func MyDial(TargetUUID string, TargetAddr string) (mangos.Dialer, error) {
    _, is_exist := DialerMap[TargetUUID]
    var err error
    var dialer mangos.Dialer
    if !is_exist {
        dialer, err = sock.NewDialer(TargetAddr, opts)
        if err != nil {
        } else {
            DialerMap[TargetUUID] = &dialer
        }
    }
    dialer = *DialerMap[TargetUUID]
    err = dialer.Dial()
    if err != nil {
        fmt.Println("Dialer fails to dial()", err)
    } else {
        fmt.Println("Dialer succeeds to dial()")
    }

    return dialer, err
}

func MyClose(TargetUUID string, TargetAddr string) {
    dialerAddr, is_exist := DialerMap[TargetUUID]
    if !is_exist {
        fmt.Println("Dialer does not exist")
    }
    dialer := *dialerAddr
    err := dialer.Close()

    if err != nil {
        fmt.Println("dialer fails to close.", err)
    } else {
        fmt.Println("dialer succeeds to close")
    }

}

and console output is

Dialer succeeds to dial()
dialer succeeds to close
Dialer fails to dial() address in use
dialer fails to close. object closed
2

There are 2 best solutions below

1
Garrett D'Amore On BEST ANSWER

I don't usually monitor stackoverflow or reddit for questions like this -- we do have a discord channel (link from the mangos and NNG home pages), as well as a mailing list.

Having said that, let me see if I can help (I'm the author for both NNG and mangos):

  1. OptionRecvDeadline is supported for bus. However, you're correct that it doesn't support non-blocking mode with a negative value, instead the negative value is treated the same as zero, and acts as blocking. This is a documentation bug. To achieve a logical non-blocking, use the value "1", which means one nanosecond, and that will logically equate to non-blocking, although the granularity may be limited by the scheduler latency. (In this case it would be like doing a "go close(channel); <-channel" -- very nearly non-blocking.

I'll see about fixing the documentation.

  1. Calling Close() on the dialer is the right thing to do. It will linger until the pipes are closed, which it does automatically. It is possible that your use of a very short redial time might confound this -- I'll be honest in saying that I had not considered tiny redial times -- usually it's bad form to do this because it means that if the peer is not available your code will spin hard on the processor trying to reconnect. I usually recommend at minimum a 10 millisecond retry interval cap. (mangos.OptionMaxReconnectTime)

  2. I think you're seeing the effect of queueing, but I'm not 100% certain -- I'd need to see a test case reproducing this. Definitely the bus protocol is best effort delivery, and if there are no connected peers then the message is dropped on the floor. (Just rechecked that to be certain.)

2
xc wang On

Thanks to the reply by @Garrett D'Amore, I could now solve my problems in an alternative way, and I (as a a new Golang fan with little knowledge on underlying communication layers) would like to apologize for troubling you with such an elementary and stupid question.

Problem (1) is well answered by the author.

Problem (3) might be coupled with Problem (2), since the author told the mechanism as below and thus eliminated the possibility of send buffering accumulation.

Definitely the bus protocol is best effort delivery, and if there are no connected peers then the message is dropped on the floor. (Just rechecked that to be certain.)

Problem (2), I tried to set mangos.OptionMaxReconnectTime to 100 ms in a first time, but the problem still existed. In a second time, I tried out all kinds of options combinations to configure the socket and the dialer, but attempts failed as well.

Finally, since the author pointed out that

Calling Close() on the dialer is the right thing to do. It will linger until the pipes are closed, which it does automatically. It is possible that your use of a very short redial time might confound this.

I turn to an alternative way to shutdown an old dialer, by explicitly closing all the pipes it has. To achieve this, one could define a callback handler like

var pipe_c chan
func callbackHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
    pAddr := &pipe
    pipe_c <- pAddr
}

Then attach the callbackHandler to the socket

sock.SetPipeEventHook(callbackHandler)

By doing this, the (private var) pipes can be obtained by the user. When one wants to shutdown the dialing connection, he or she can do

dialer.Close()                    // try best to close a dialer automatically
for pAddr, num := range pipeSet {
    (*pAddr).Close()              // explicitly close all the pipes of the dialer
}

And just leave the "pseudo-closed" dialer alone. When one wants to connect to a remote address once again, a new dialer could be created and used.

I don't know whether the old "pseudo-closed" dialer would be accumulated in-memory. But this is already the only solution I could find.