Kafka producer localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused

168 Views Asked by At

I have a perfectly fine working Kafka basic setup in a Docker compose:

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.3
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: [ "CMD", "bash", "-c", "echo 'ruok' | nc localhost 2181" ]
      interval: 10s
      timeout: 5s
      retries: 5

  broker:
    image: confluentinc/cp-kafka:7.5.3
    hostname: broker
    container_name: broker
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
    healthcheck:
      test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
      interval: 10s
      timeout: 5s
      retries: 5

on which I send data from on local Python script:

import json
import time

from confluent_kafka import Producer


def delivery_report(err, msg):
    if err is not None:
        print("Message delivery failed: {}".format(err))
    else:
        print("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))


def send_to_kafka(data):
    topic = "eco2mix-national-tr"
    producer = Producer({"bootstrap.servers": "localhost:9092"})

    for d in data:
        producer.produce(
            topic,
            key=d["date_heure"],
            value=json.dumps(d).encode("utf-8"),
            on_delivery=delivery_report,
        )

        producer.flush()

        time.sleep(5)

I order to make my project fully containerized, I added the following container in my Docker compose:

  data:
    build:
      context: ./kafka-streaming
    depends_on:
      broker:
        condition: service_healthy
    container_name: data

along with this Dockerfile to run my Python script:

FROM python:3.12-slim-bullseye

WORKDIR /opt/python/scripts
COPY . /opt/python/scripts

RUN pip install --no-cache-dir -r requirements.txt

ENTRYPOINT ["python" ,"historic_data_retrieving.py"]

I also made sure to change the Kafka Producer bootstrap.servers to broker:9029.

However, my data container is now raising error when trying to connect to the Kafka consumer:

%3|1708609196.773|FAIL|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
2024-02-22T13:39:57.038366371Z %3|1708609197.038|FAIL|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)

I also tried another combination by mixing localhost, broker, 9092 and 29092, but none of them seems to solve this issue.

Edit

When I modify my Python script in the data container with broker:29092, I have the same error as previously:

%3|1708629234.267|FAIL|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
2024-02-22T19:13:54.460485742Z %3|1708629234.460|FAIL|rdkafka#producer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
1

There are 1 best solutions below

0
OneCricketeer On

As written, your code will only work outside of Docker.

I'd suggest using environment variables

producer = Producer({"bootstrap.servers": os.environ.get("KAFKA_BOOTSTRAP", "localhost:9092")})

Then override that in Compose to use broker:29092, which is the correct address of the other container