How to detect that the connection to broker has been lost, on Golang pulsar API?

22 Views Asked by At

I'm developing a solution where subscribers need to detect when communication with the broker has been lost. However, I couldn't find a way to properly accomplish this, using the Golang API.

Here is a simple example of the situation:

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        fmt.Println("Error creating the client:", err)
        return
    }
    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "persistent://public/default/my-topic",
        SubscriptionName: "my-sub",
    })
    if err != nil {
        fmt.Println("Error creating the consumer:", err)
        return
    }
    defer consumer.Close()

    for {
        // From this point on, no matter what happens with the client's connection,
        // my consumer doesn't receive an error about it and keeps waiting for new messages
        msg, err := consumer.Receive(context.Background())
        if err != nil {
            fmt.Println("Error receiving the message:", err)
            break
        }
        fmt.Printf("Message received: %s\n", string(msg.Payload()))
        consumer.Ack(msg)
    }

Even if I bring the broker down, my consumer keeps waiting without notice.

I can make it detect a timeout, via context, but it's not what I want. I need to know that the connection has been lost.

Am I missing something?

0

There are 0 best solutions below