I need to build a queue using NATS. I'm writing a Go service but it seems that it's more about NATS JetStreams configuration.
What I want to do:
a queue, that allows to connect with multiple consumers with the same group ID. NATS server to divide load between consumers within the group and rebalances it when consumers connect/disconnect. Delivery – exactly once in a group.
multiple groups can read same messages. So I can run two groups with different names and both will get the same set of messages.
I succeeded to create a service that meets requirement #1 (see below). I use NATS with JetStreams. NATS server splits messages between several instances with the same group ID. NATS server stores messages in case no consumer is connected.
The problem is that it doesn't fit #2 – if I run another instance with another group ID (Durable field in ConsumerConfig) I get an error:
nats: API error: code=400 err_code=10099 description=multiple non-filtered consumers not allowed on workqueue stream
So the question is – is it possible at all to implement with NATS what I need?
Here's my consumer code.
func consumeFromNats(ctx context.Context) error {
nc, err := nats.Connect("localhost:4222")
// handling error
js, err := jetstream.New(nc)
// handling error
streamConfig := jetstream.StreamConfig{
Name: "TEST.STREAM",
Retention: jetstream.WorkQueuePolicy,
Subjects: []string{"events"},
}
stream, err := js.CreateOrUpdateStream(ctx, streamConfig)
// handling error
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "consumer-group",
})
// handling error
go func() {
<-ctx.Done()
nc.Close()
}()
for {
msg, err := consumer.Next()
// handling error
}
return nil
}
As @mtmk mentioned, workqueue retention policy stream is one that deletes message after it has been processed by any consumer.
In NATS, each consumer is a "view" into the stream - it keeps track of the sequences and acks. Workqueue retention deletes messages from stream as soon as it has been acked. Because of that, you can't have multiple consumers with overlapping subjects on the same workqueue stream.
To get you behavior, you can create a
limitsbased stream, or evenintereststream (which will be closer to current behavior). Check the docs: https://docs.nats.io/nats-concepts/jetstream/streams#retentionpolicyThen, you can create as many consumers as you need. Each consumer will get all messages.
To load balance the messages for one consumer, just bind more clients to it.
Each consumer will get all messages, messages will be load balanced across all clients that bound to it.