Pulsar UI: subscription backlog empty - entries since first not acked message high

27 Views Asked by At

We have a non-persistent subscription that shows 0 messages in the backlog. When opening the storage view, the subscription shows a lot of messages in the "entries since first not acked message" column.

What exactly does this number mean? We are not seeing any errors in the consumer

EDIT: From what I understand this might be a so called "acknowledgment hole". Described as follows:

As shown in the figure, an acknowledgment hole refers to the unacknowledged message between two consecutive ranges. The number of consumers for a subscription and their consumption rate could all impact the number of acknowledgment holes. As mentioned above, when acknowledgment holes exist, the cursor (more precisely, markDeletePosition) stays before the oldest unacknowledged message. After all messages within a certain broader range are acknowledged, smaller ranges are merged. As a result, the cursor moves accordingly. Note: How to reduce the number of acknowledgment holes in production is a very complicated topic. After all, there may be a variety of factors that could lead to acknowledgment holes, such as exceptions on brokers or clients. Generally, based on the value of individuallyDeletedMessages, we can try to reduce the number of consumers and adjust the frequency of message acknowledgment for performance optimization.

Our consumer is configured as follows - due to the deadLetter config, I would expect such messages to stay at max 3 minutes on the subscription (3x deliveries with max 1 minute backoff)

            pulsarClient.newConsumer()
                .topic(eventTopic)
                .subscriptionName(eventSubscription)
                .consumerName(consumerName + "_" + randomString(8))
                .subscriptionType(SubscriptionType.Key_Shared)
                .enableBatchIndexAcknowledgment(true)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                .messageListener(this::messageListener)
                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                .startPaused(false)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(3)
                        .build())
                .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
                        .minDelayMs(1000)
                        .maxDelayMs(60 * 1000)
                        .multiplier(2)
                        .build())
                .subscribeAsync();
0

There are 0 best solutions below