How to build Kafka-like queue with NATS

122 Views Asked by At

I need to build a queue using NATS. I'm writing a Go service but it seems that it's more about NATS JetStreams configuration.

What I want to do:

  1. a queue, that allows to connect with multiple consumers with the same group ID. NATS server to divide load between consumers within the group and rebalances it when consumers connect/disconnect. Delivery – exactly once in a group.

  2. multiple groups can read same messages. So I can run two groups with different names and both will get the same set of messages.

I succeeded to create a service that meets requirement #1 (see below). I use NATS with JetStreams. NATS server splits messages between several instances with the same group ID. NATS server stores messages in case no consumer is connected.

The problem is that it doesn't fit #2 – if I run another instance with another group ID (Durable field in ConsumerConfig) I get an error:

nats: API error: code=400 err_code=10099 description=multiple non-filtered consumers not allowed on workqueue stream

So the question is – is it possible at all to implement with NATS what I need?

Here's my consumer code.

func consumeFromNats(ctx context.Context) error {
    nc, err := nats.Connect("localhost:4222")
    // handling error

    js, err := jetstream.New(nc)
    // handling error

    streamConfig := jetstream.StreamConfig{
        Name:      "TEST.STREAM",
        Retention: jetstream.WorkQueuePolicy,
        Subjects:  []string{"events"},
    }

    stream, err := js.CreateOrUpdateStream(ctx, streamConfig)
    // handling error

    consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Durable: "consumer-group",
    })
    // handling error

    go func() {
        <-ctx.Done()
        nc.Close()
    }()

    for {
        msg, err := consumer.Next()
        // handling error

    }

    return nil
}

2

There are 2 best solutions below

4
Jarema On

As @mtmk mentioned, workqueue retention policy stream is one that deletes message after it has been processed by any consumer.

In NATS, each consumer is a "view" into the stream - it keeps track of the sequences and acks. Workqueue retention deletes messages from stream as soon as it has been acked. Because of that, you can't have multiple consumers with overlapping subjects on the same workqueue stream.

To get you behavior, you can create a limits based stream, or even interest stream (which will be closer to current behavior). Check the docs: https://docs.nats.io/nats-concepts/jetstream/streams#retentionpolicy

Then, you can create as many consumers as you need. Each consumer will get all messages.

To load balance the messages for one consumer, just bind more clients to it.

Each consumer will get all messages, messages will be load balanced across all clients that bound to it.

0
Sab On

I have a similar use case and used the following configuration jetstream.InterestPolicy with Durable Consumers, each consumer can be subscribed by multiple processors (load balancing) and with different consumer name same messages can be consumer (different processing groups)