confluent-kafka consumer working but not producer

985 Views Asked by At

Using confluent-kafka, I'm trying to work out with Amazon MSK. It seems to connect to MSK since consumer is working but producer is not producing anything. I checked with kafka-console-producer.sh that consumer module is working correctly. We have several backends using the MSK. Here is the Spring Boot(Our other service) configuration I referred to develop my service.

    spring.kafka.properties.security.protocol=SASL_SSL
    spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512 
    spring.kafka.properties.sasl.jaas.config=\
        org.apache.kafka.common.security.scram.ScramLoginModule \
        required username={username} password={password};

Of course, {username} and {password} are replaced by ours

My Service's configuration are like below

kafka:
  bootstrap_servers: {Our MSK End Point}
  security_protocol: SASL_SSL
  sasl_mechanism: SCRAM-SHA-512
  sasl_username: {username}
  sasl_password: {password}
  consumer:
      session_timeout_ms: 6000
      auto_offset_reset: earliest

Inside Producer controller

conf = {
            'bootstrap.servers': appcfg.kafka.bootstrap_servers,
            'security.protocol': appcfg.kafka.security_protocol,
            'sasl.mechanism': appcfg.kafka.sasl_mechanism,
            'sasl.username': appcfg.kafka.sasl_username,
            'sasl.password': appcfg.kafka.sasl_password
        }

Inside Consumer Controller

conf = {
            'bootstrap.servers': appcfg.kafka.bootstrap_servers,
            'security.protocol': appcfg.kafka.security_protocol,
            'group.id': group_id,
            'sasl.mechanism': appcfg.kafka.sasl_mechanism,
            'sasl.username': appcfg.kafka.sasl_username,
            'sasl.password': appcfg.kafka.sasl_password,
            'session.timeout.ms': appcfg.kafka.consumer.session_timeout_ms,
            'auto.offset.reset': appcfg.kafka.consumer.auto_offset_reset
        }

While inside my ec2 instance, it doesn't show any errors with Producer. it doesn't respond anything

produce callback not appearing on logs

def delivery_callback(err, msg):
    if err:
        log_manager.kafka.error('Message delivery failed: {}'.format(err))
    else:
        log_manager.kafka.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

In producer

 for line in sys.stdin:
        try:
            producer.produce(topic, line.rstrip(), callback=delivery_callback)
        except BufferError:
            log_manager.kafka.error('%% Local producer queue is full (%d messages '
                             'awaiting delivery): try again\n' % len(producer))
        except KafkaException as e:
            log_manager.kafka.error(traceback.format_exc(e))
        except BaseException as e:
            log_manager.kafka.error(traceback.format_exc(e))
        producer.poll(0)

In consumer

 try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                print('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

Above sample code is just stdin produce msg and consumer is receiving it.

Any help appreciated!!!

1

There are 1 best solutions below

0
ChrisNex On

I finally figured out what's wrong with my producer configuration.

I needed to add 'acks': '1' to my producer configuration.