Using confluent-kafka, I'm trying to work out with Amazon MSK. It seems to connect to MSK since consumer is working but producer is not producing anything. I checked with kafka-console-producer.sh that consumer module is working correctly. We have several backends using the MSK. Here is the Spring Boot(Our other service) configuration I referred to develop my service.
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512
spring.kafka.properties.sasl.jaas.config=\
org.apache.kafka.common.security.scram.ScramLoginModule \
required username={username} password={password};
Of course, {username} and {password} are replaced by ours
My Service's configuration are like below
kafka:
bootstrap_servers: {Our MSK End Point}
security_protocol: SASL_SSL
sasl_mechanism: SCRAM-SHA-512
sasl_username: {username}
sasl_password: {password}
consumer:
session_timeout_ms: 6000
auto_offset_reset: earliest
Inside Producer controller
conf = {
'bootstrap.servers': appcfg.kafka.bootstrap_servers,
'security.protocol': appcfg.kafka.security_protocol,
'sasl.mechanism': appcfg.kafka.sasl_mechanism,
'sasl.username': appcfg.kafka.sasl_username,
'sasl.password': appcfg.kafka.sasl_password
}
Inside Consumer Controller
conf = {
'bootstrap.servers': appcfg.kafka.bootstrap_servers,
'security.protocol': appcfg.kafka.security_protocol,
'group.id': group_id,
'sasl.mechanism': appcfg.kafka.sasl_mechanism,
'sasl.username': appcfg.kafka.sasl_username,
'sasl.password': appcfg.kafka.sasl_password,
'session.timeout.ms': appcfg.kafka.consumer.session_timeout_ms,
'auto.offset.reset': appcfg.kafka.consumer.auto_offset_reset
}
While inside my ec2 instance, it doesn't show any errors with Producer. it doesn't respond anything
produce callback not appearing on logs
def delivery_callback(err, msg):
if err:
log_manager.kafka.error('Message delivery failed: {}'.format(err))
else:
log_manager.kafka.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
In producer
for line in sys.stdin:
try:
producer.produce(topic, line.rstrip(), callback=delivery_callback)
except BufferError:
log_manager.kafka.error('%% Local producer queue is full (%d messages '
'awaiting delivery): try again\n' % len(producer))
except KafkaException as e:
log_manager.kafka.error(traceback.format_exc(e))
except BaseException as e:
log_manager.kafka.error(traceback.format_exc(e))
producer.poll(0)
In consumer
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
print('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
Above sample code is just stdin produce msg and consumer is receiving it.
Any help appreciated!!!
I finally figured out what's wrong with my producer configuration.
I needed to add 'acks': '1' to my producer configuration.