I'm developing a solution where subscribers need to detect when communication with the broker has been lost. However, I couldn't find a way to properly accomplish this, using the Golang API.
Here is a simple example of the situation:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
fmt.Println("Error creating the client:", err)
return
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://public/default/my-topic",
SubscriptionName: "my-sub",
})
if err != nil {
fmt.Println("Error creating the consumer:", err)
return
}
defer consumer.Close()
for {
// From this point on, no matter what happens with the client's connection,
// my consumer doesn't receive an error about it and keeps waiting for new messages
msg, err := consumer.Receive(context.Background())
if err != nil {
fmt.Println("Error receiving the message:", err)
break
}
fmt.Printf("Message received: %s\n", string(msg.Payload()))
consumer.Ack(msg)
}
Even if I bring the broker down, my consumer keeps waiting without notice.
I can make it detect a timeout, via context, but it's not what I want. I need to know that the connection has been lost.
Am I missing something?