Unexpected Kafka request of type METADATA during SASL handshake On Linux Machine

41 Views Asked by At

I try to create a docker-compose with strimzi-kafka. And my docker-compose looks like:

version: '3.8'

networks:
  dev_net:
    external: true

services:
  zoo1:
    image: quay.io/strimzi/kafka:0.38.0-kafka-3.6.0
    hostname: zoo1.devops
    container_name: zoo1
    ports:
      - "${ZOOKEEPER_PORT_1}:${ZOOKEEPER_PORT_1}"
      - "12888:12888"
      - "13888:13888"
    volumes:
      - ./docker/zookeeper/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/zookeeper1:/var/lib/zookeeper/data
      - ./kafka-data-new/zookeeper-logs1:/var/lib/zookeeper/log
    command:
      - /bin/bash
      - -c
      - cd /opt/kafka/strimzi && ./start.sh
    networks:
      - dev_net
    environment:
      ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT_1}
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1.devops:12888:13888;zoo2.devops:22888:23888;
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      LOG_DIR: /tmp/logs

  zoo2:
    image: quay.io/strimzi/kafka:0.38.0-kafka-3.6.0
    hostname: zoo2.devops
    container_name: zoo2
    ports:
      - "${ZOOKEEPER_PORT_2}:${ZOOKEEPER_PORT_2}"
      - "22888:22888"
      - "23888:23888"
    volumes:
      - ./docker/zookeeper/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/zookeeper2:/var/lib/zookeeper/data
      - ./kafka-data-new/zookeeper-logs2:/var/lib/zookeeper/log
    command:
      - /bin/bash
      - -c
      - cd /opt/kafka/strimzi && ./start.sh
    networks:
      - dev_net
    environment:
      ZOOKEEPER_CLIENT_PORT: ${ZOOKEEPER_PORT_2}
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1.devops:12888:13888;zoo2.devops:22888:23888;
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      LOG_DIR: /tmp/logs


  kafka1: &kafka
    image: quay.io/strimzi/kafka:0.38.0-kafka-3.6.0
    hostname: kafka1.devops
    container_name: kafka1
    ports:
      - "19092:19092"
      - "${KAFKA_PLAIN_PORT_1}:${KAFKA_PLAIN_PORT_1}"
    volumes:
      - ./docker/kafka/config:/opt/kafka/config/strimzi
      - ./docker/kafka/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/broker1/data:/var/lib/kafka/data
    command:
      - /bin/bash
      - -c
      - cd /opt/kafka/strimzi && ./start.sh
    networks:
      - dev_net
    depends_on:
      - zoo1
      - zoo2
    environment: &kafka-environment
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zoo1.devops:${ZOOKEEPER_PORT_1}
      KAFKA_ADVERTISED_LISTENERS: JWT://kafka1.devops:9092,EXTERNAL://kafka1.devops:19092,PLAIN://kafka1.devops:${KAFKA_PLAIN_PORT_1}
      KAFKA_LISTENERS: EXTERNAL://kafka1.devops:19092,JWT://kafka1.devops:9092,INTROSPECT://kafka1.devops:9093,JWTPLAIN://kafka1.devops:9094,INTROSPECTPLAIN://kafka1.devops:9095,JWTREFRESH://kafka1.devops:9096,PLAIN://kafka1.devops:${KAFKA_PLAIN_PORT_1},SCRAM://kafka1.devops:9101
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:SASL_PLAINTEXT,JWT:SASL_PLAINTEXT,INTROSPECT:SASL_PLAINTEXT,JWTPLAIN:SASL_PLAINTEXT,INTROSPECTPLAIN:SASL_PLAINTEXT,JWTREFRESH:SASL_PLAINTEXT,PLAIN:SASL_PLAINTEXT,SCRAM:SASL_PLAINTEXT
      KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
      KAFKA_INTER_BROKER_LISTENER_NAME: JWT
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
      KAFKA_PRINCIPAL_BUILDER_CLASS: io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder

      # Common settings for all the listeners
      # username extraction from JWT token claim
      OAUTH_USERNAME_CLAIM: preferred_username
      OAUTH_CONNECT_TIMEOUT_SECONDS: 20

      # Configuration of individual listeners
      KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    oauth.groups.claim=\"$$.realm_access.roles\" ;
      KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      KAFKA_LISTENER_NAME_JWT_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      KAFKA_LISTENER_NAME_EXTERNAL_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    oauth.groups.claim=\"$$.realm_access.roles\" ;
      KAFKA_LISTENER_NAME_EXTERNAL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      KAFKA_LISTENER_NAME_EXTERNAL_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler


      KAFKA_LISTENER_NAME_INTROSPECT_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.introspection.endpoint.uri=\"${KEYCLOAK_INTROSPECT_URI}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_INTROSPECT_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

      KAFKA_LISTENER_NAME_JWTPLAIN_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_JWTPLAIN_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler

      KAFKA_LISTENER_NAME_INTROSPECTPLAIN_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_INTROSPECTPLAIN_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required    oauth.introspection.endpoint.uri=\"${KEYCLOAK_INTROSPECT_URI}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_INTROSPECTPLAIN_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler

      KAFKA_LISTENER_NAME_JWTREFRESH_OAUTHBEARER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required    oauth.jwks.endpoint.uri=\"${KEYCLOAK_CERTS}\"    oauth.valid.issuer.uri=\"${KEYCLOAK_ISSUER}\"    oauth.token.endpoint.uri=\"${KEYCLOAK_TOKEN_URI}\"    oauth.client.id=\"${KEYCLOAK_KAFKA_CLIENT}\"    oauth.client.secret=\"${KEYCLOAK_KAFKA_SECRET}\"    oauth.jwks.refresh.min.pause.seconds=\"2\"    unsecuredLoginStringClaim_sub=\"${UNSECURE_CLAIM}\" ;
      KAFKA_LISTENER_NAME_JWTREFRESH_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      # Enable re-authentication
      KAFKA_LISTENER_NAME_JWTREFRESH_OAUTHBEARER_CONNECTIONS_MAX_REAUTH_MS: 45000

      KAFKA_LISTENER_NAME_PLAIN_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_PLAIN_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required    username=\"${KAFKA_ADMIN_USERNAME}\"    password=\"${KAFKA_ADMIN_PASSWORD}\"    user_admin=\"${KAFKA_ADMIN_PASSWORD}\"    user_monitor=\"${KAFKA_MONITOR_PASSWORD}\" ;

      KAFKA_LISTENER_NAME_SCRAM_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512
      KAFKA_LISTENER_NAME_SCRAM_SCRAM__2DSHA__2D512_SASL_JAAS_CONFIG: org.apache.kafka.common.security.scram.ScramLoginModule required    username=\"${KAFKA_ADMIN_USERNAME}\"    password=\"${KAFKA_ADMIN_PASSWORD}\" ;

      # Authorizer configuration
      KAFKA_AUTHORIZER_CLASS_NAME: io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer

      KAFKA_STRIMZI_AUTHORIZATION_TOKEN_ENDPOINT_URI: ${KEYCLOAK_TOKEN_URI}
      KAFKA_STRIMZI_AUTHORIZATION_CLIENT_ID: ${KEYCLOAK_KAFKA_CLIENT}
      KAFKA_STRIMZI_AUTHORIZATION_CLIENT_SECRET: ${KEYCLOAK_KAFKA_SECRET}
      KAFKA_STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME: ${KAFKA_CLUSTER}
      KAFKA_STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL: true
      KAFKA_STRIMZI_AUTHORIZATION_READ_TIMEOUT_SECONDS: 45

      # Parameters controlling the refreshing of grants
      KAFKA_STRIMZI_AUTHORIZATION_GRANTS_REFRESH_POOL_SIZE: 4

      # Any change to permissions will be reflected within 10 seconds
      # Has to be set to 10 seconds for keycloak-authz*-tests/**/RefreshTest
      KAFKA_STRIMZI_AUTHORIZATION_GRANTS_REFRESH_PERIOD_SECONDS: 45

      # If a grants fetch fails, immediately perform one retry
      KAFKA_STRIMZI_AUTHORIZATION_HTTP_RETRIES: 1

      # Use grants fetched for another session if available
      KAFKA_STRIMZI_AUTHORIZATION_REUSE_GRANTS: true

      KAFKA_STRIMZI_AUTHORIZATION_ENABLE_METRICS: true

      KAFKA_SUPER_USERS: User:${KAFKA_ADMIN_USERNAME};User:${KAFKA_SERVICE_ACCOUNT}

      # OAuth metrics configuration

      OAUTH_ENABLE_METRICS: true
      #   When enabling metrics we also have to explicitly configure JmxReporter to have metrics available in JMX
      #   The following value will be available as env var STRIMZI_OAUTH_METRIC_REPORTERS
      STRIMZI_OAUTH_METRIC_REPORTERS: org.apache.kafka.common.metrics.JmxReporter

      #   The following value will turn to 'strimzi.oauth.metric.reporters=...' in 'strimzi.properties' file
      #   However, that won't work as the value may be filtered to the component that happens to initialise OAuthMetrics
      #- KAFKA_STRIMZI_OAUTH_METRIC_REPORTERS=org.apache.kafka.common.metrics.JmxReporter


      # Other configuration
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_SESSION_TIMEOUT_MS: 400000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 450000
      KAFKA_REQUEST_TIMEOUT_MS: 450000
      KAFKA_MAX_REQUEST_SIZE: 100000000
      KAFKA_MESSAGE_MAX_BYTES: 100000000
      KAFKA_MAX_PARTITION_FETCH_BYTES: 100000000

      # For start.sh script to know where the keycloak is listening
      KEYCLOAK_URI: ${KEYCLOAK_URI}
      KEYCLOAK_HOST: ${KEYCLOAK_HOST}
      REALM: ${KEYCLOAK_REALM}
      KAFKA_ADMIN_USERNAME: ${KAFKA_ADMIN_USERNAME}
      KAFKA_ADMIN_PASSWORD: ${KAFKA_ADMIN_PASSWORD}


  kafka2:
    <<: *kafka
    hostname: kafka2.devops
    container_name: kafka2
    ports:
      - "29092:29092}"
      - "${KAFKA_PLAIN_PORT_2}:${KAFKA_PLAIN_PORT_2}"
    volumes:
      - ./docker/kafka/config:/opt/kafka/config/strimzi
      - ./docker/kafka/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/broker2/data:/var/lib/kafka/data
    environment:
      <<: *kafka-environment
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_LISTENERS: JWT://kafka2.devops:9092,EXTERNAL://kafka2.devops:29092,PLAIN://kafka2.devops:${KAFKA_PLAIN_PORT_2}
      KAFKA_LISTENERS: EXTERNAL://kafka2.devops:29092,JWT://kafka2.devops:9092,INTROSPECT://kafka2.devops:9093,JWTPLAIN://kafka2.devops:9094,INTROSPECTPLAIN://kafka2.devops:9095,JWTREFRESH://kafka2.devops:9096,PLAIN://kafka2.devops:${KAFKA_PLAIN_PORT_2},SCRAM://kafka2.devops:9101


  kafka3:
    <<: *kafka
    hostname: kafka3.devops
    container_name: kafka3
    ports:
      - "39092:39092"
      - "${KAFKA_PLAIN_PORT_3}:${KAFKA_PLAIN_PORT_3}"
    volumes:
      - ./docker/kafka/config:/opt/kafka/config/strimzi
      - ./docker/kafka/scripts:/opt/kafka/strimzi
      - ./kafka-data-new/broker3/data:/var/lib/kafka/data
    environment:
      <<: *kafka-environment
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_LISTENERS: JWT://kafka3.devops:9092,EXTERNAL://kafka3.devops:39092,PLAIN://kafka3.devops:${KAFKA_PLAIN_PORT_3}
      KAFKA_LISTENERS: EXTERNAL://kafka3.devops:39092,JWT://kafka3.devops:9092,INTROSPECT://kafka3.devops:9093,JWTPLAIN://kafka3.devops:9094,INTROSPECTPLAIN://kafka3.devops:9095,JWTREFRESH://kafka3.devops:9096,PLAIN://kafka3.devops:${KAFKA_PLAIN_PORT_3},SCRAM://kafka3.devops:9101


  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    restart: always
    ports:
      - "${CMAK_PORT}:8080"
    networks:
      - dev_net
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    environment:
      AUTH_TYPE: "LOGIN_FORM"
      SPRING_SECURITY_USER_NAME: ${CMAK_USERNAME}
      SPRING_SECURITY_USER_PASSWORD: ${CMAK_PASSWORD}
      KAFKA_CLUSTERS_0_NAME: ${KAFKA_CLUSTER}
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1.devops:${KAFKA_PLAIN_PORT_1},kafka2.devops:${KAFKA_PLAIN_PORT_2},kafka3.devops:${KAFKA_PLAIN_PORT_3}
      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${KAFKA_ADMIN_USERNAME}\" password=\"${KAFKA_ADMIN_PASSWORD}\";"

it works very well on windows operation system but when I try to run it on linux, I got errors like: <data-plane-kafka-network-thread-2-ListenerName(EXTERNAL)-SASL_PLAINTEXT-2> [SocketServer listenerType=ZK_BROKER, nodeId=2] Failed authentication with /INTERNAL_IP (channelId=INTERNAL_IP :29092-INTERNAL_IP :56818-15) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector) and brokers are not available.

Is there any specific implementation for linux machine? Any suggestion?

2

There are 2 best solutions below

0
Sha On BEST ANSWER

Solved with two changes:

First one is, disabled network:

#networks:
#  - dev_net

Second one is, specified external ip address on advertised.listeners:

KAFKA_ADVERTISED_LISTENERS: JWT://kafka1.devops:9092,EXTERNAL://${KAFKA_EXTERNAL_URL}:19092,PLAIN://kafka1.devops:${KAFKA_PLAIN_PORT_1}
1
shemun On

I think you have to set inter.broker.listener.name for every broker so they can communicate with each other.

„ Name of listener used for communication between brokers. If this is unset, the listener name is defined by security.inter.broker.protocol. It is an error to set this and security.inter.broker.protocol properties at the same time. „

https://kafka.apache.org/documentation/#brokerconfigs