We are facing a scenario where our akka-stream-kaka-consumer processing rate is decreasing whenever there is a lag. When we start it without any lag in partitions, processing rate increases suddenly.
MSK cluster - 10 topics - 40 partitions each => 400 total leader partitions
To achieve high throughput and parallelism in system we implemented akka-stream-kafka consumers subscribing to each topic-partition separately resulting in 1:1 mapping between consumer and partition.
Here is consumer setup:
- Number of ec2 service instances - 7
- Each service spins up 6 consumer for each of the 10 topics resulting resulting in 60 consumers from each service instance.
- Total consumer = Number of instances (7) * Number of consumers on each service instance (60) = 420
So, in total we are starting 420 consumers spread across different instances. As per the RangeAssignor Partition strategy (Default one), each partition will get assigned to different consumer and 400 consumer will use 400 partitions and 20 consumers will remain unused. We have verified this allocation and looks good.
Instance Type used: c5.xlarge
MSK Config:
Apache Kafka version - 2.4.1.1
Total number of brokers - 9 ( spread across 3 AZs)
Broker Type: kafka.m5.large
Broker per Zone: 3
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=40
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.retention.ms=259200000
This is the configuration we are using for each consumers
akka.kafka.consumer {
kafka-clients {
bootstrap.servers = "localhost:9092"
client.id = "consumer1"
group.id = "consumer1"
auto.offset.reset="latest"
}
aws.glue.registry.name="Registry1"
aws.glue.avroRecordType = "GENERIC_RECORD"
aws.glue.region = "region"
kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"
# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {
#Flag to turn on connection checker
enable = true
# Amount of attempts to be performed after a first connection failure occurs
# Required, non-negative integer
max-retries = 3
# Interval for the connection check. Used as the base for exponential retry.
check-interval = 15s
# Check interval multiplier for backoff interval
# Required, positive number
backoff-factor = 2.0
}
}
akka.kafka.committer {
# Maximum number of messages in a single commit batch
max-batch = 10000
# Maximum interval between commits
max-interval = 5s
# Parallelism for async committing
parallelism = 1500
# API may change.
# Delivery of commits to the internal actor
# WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
# SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
delivery = WaitForAck
# API may change.
# Controls when a `Committable` message is queued to be committed.
# OffsetFirstObserved: When the offset of a message has been successfully produced.
# NextOffsetObserved: When the next offset is observed.
when = OffsetFirstObserved
}
akka.http {
client {
idle-timeout = 10s
}
host-connection-pool {
idle-timeout = 10s
client {
idle-timeout = 10s
}
}
}
consumer.parallelism=1500
We are using below code to to materialised the flow from Kafka to empty sink
override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")
val supervisionDecider: Supervision.Decider = {
case _ => Supervision.Resume
}
val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicsName))
.mapAsync(parallelism) {
msg =>
f(msg.record.key(), msg.record.value())
.map(_ => msg.committableOffset)
.recoverWith {
case _ => Future.successful(msg.committableOffset)
}
}
.toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
.withAttributes(supervisionStrategy)
Library versions in code
"com.typesafe.akka" %% "akka-http" % "10.1.11",
"com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
"com.typesafe.akka" %% "akka-stream" % "2.5.30"
The observation are as follows,
- In successive intervals of 1 hour lets say, only some of consumers
are actively consuming the lag and processing at the expected rate. - In next 1 hours, some other consumers become active and actively
consumes from its partitions and then stop processing. - All the lag gets cleared in a single shot as observed from the offsetLag Graph.
We want all the consumers to be running in parallel and processing the messages in real time. This lag of 3 days in processing is causing a major downtime for us. I tried following the given link but we are already on the fixed version https://github.com/akka/alpakka-kafka/issues/549
Can anyone help what we are missing in terms of configuration of consumer or some other issue.
That lag graph seems to me to indicate that your overall system isn't capable of handling all the load, and it almost looks like only one partition at a time is actually making progress.
That phenomenon indicates to me that the processing being done in
fis ultimately gating on the rate at which some queue can be cleared, and that the parallelism in themapAsyncstage is too high, effectively racing the partitions against each other. Since the Kafka consumer batches records (by default in batches of 500, assuming that the consumer's lag is more than 500 records) if tha parallelism is higher than that, all of those records enter the queue at basically the same time as a block. It looks like the parallelism in themapAsyncis 1500; given the apparent use of the Kafka default 500 batch size, this seems way too high: there's no reason for it to be greater than the Kafka batch size, and if you want a more even consumption rate between partitions, it should be a lot less than that batch size.Without details on what happens in
f, it's hard to say what that queue is and how much parallelism should be reduced. But there are some general guidelines I can share:akka-http, it's possible that the processing of messages infinvolves sending an HTTP request to some other service. In that case, it's important to remember that Akka HTTP maintains a queue per targeted host; it's also likely that there's a queue on the target side which governs throughput there. This is somewhat a special case of the second (I/O bound) situation.The I/O bound/blocking situation will be evidenced by very low CPU utilization on your instances. If you're filling the queue per targeted host, you'll see log messages about "Exceeded configured max-open-requests value".
Another thing worth noting is that because the Kafka consumer is inherently blocking, the Alpakka Kafka consumer actors run in their own dispatcher, whose size is by default 16, meaning that per host, only at most 16 consumers or producers can be working at a time. Setting
akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-sizeto at least the number of consumers your app starts up (42 in your 6 consumers each per 7 topic configuration) is probably a good idea. Thread starvation in the Alpakka Kafka dispatcher can cause consumer rebalances which will disrupt consumption.Without making any other changes, I would suggest, for a more even consumption rate across partitions, setting