I have been trying to send messages to AWS MSK using python confluent-kafka library.
I want to ensure single delivery of each message which is why I am using transaction-based producer. I am currently sending 500k messages per transaction.
The sending part of the transaction is working fine and giving us the required throughput, however, when I commit the transactions, some of the transaction are randomly timeout.
In normal flow, when the issue does not occur, the commit transaction part takes no time (few seconds). However, I have added a timeout of 10 minutes on the commit transactions and still some of them timeout.
Here is the code that I am using:-
connection_config={
"bootstrap.servers": server-url,
"security.protocol": "SASL_SSL",
"sasl.username": "test",
"sasl.password": "test",
"sasl.mechanism": "SCRAM-SHA-512",
"enable.idempotence": "True",
"transaction.timeout.ms": 1200000,
"acks": "all",
"queue.buffering.max.messages": 200,
"retries": 50
}
p = Producer(connection_config)
p.init_transactions()
p.begin_transaction()
logging.info("Connection successful, writing messages..")
for index, record in enumerate(data):
try:
p.produce(topic_name, json.dumps(record).encode('utf-8'), callback=receipt)
p.poll(0)
except BufferError as e:
p.flush()
p.produce(topic_name, json.dumps(record).encode('utf-8'), callback=receipt)
logging.info("Flushing remaining messages to kafka ")
p.flush()
logging.info(f"Sending complete for producer,commiting transaction")
p.commit_transaction(int(producer_timeout))
Here is the configuration that I am using for MSK (kafka):-
auto.create.topics.enable=true
default.replication.factor=2
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=50
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.max.timeout.ms=1200000
num.network.threads=10
Error on timeout:-
cimpl.KafkaException: KafkaError{code=_TIMED_OUT,val=-185,str="Transactional API operation (commit_transaction) timed out"}
I have tried looking at the server logs and could not find anything relevant to why this is happening.Can someone please help in debugging this issue.Thanks a lot.
I have tried to decrease the number of messages per transaction and can see that the failure rate improves if messages are less however how much I have found on net is the more the messages per transaction, it is better.
Records are sent faster than they can delivered to the broker. Please adjust below producer side properties :