Kafka consumer using AWS_MSK_IAM ClassCastException error

3.2k Views Asked by At

I have MSK running on AWS and I'd like to consume information using AWS_MSK_IAM authentication.

My MSK is properly configured and I can consume the information using Kafka CLI with the following command:

../bin/kafka-console-consumer.sh --bootstrap-server b-1.kafka.*********.***********.amazonaws.com:9098 --consumer.config client_auth.properties --topic TopicTest --from-beginning

My client_auth.properties has the following information:

# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

When I try to consume from my Databricks cluster using spark, I receive the following error:

Caused by: kafkashaded.org.apache.kafka.common.KafkaException: java.lang.ClassCastException: software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to kafkashaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler

Here is my cluster config: enter image description here

The libraries I'm using in the cluster:

enter image description here

And the code I'm running on Databricks:

raw = (
    spark
        .readStream
        .format('kafka')
        .option('kafka.bootstrap.servers', 'b-.kafka.*********.***********.amazonaws.com:9098')
        .option('subscribe', 'TopicTest') 
        .option('startingOffsets', 'earliest')
        .option('kafka.sasl.mechanism', 'AWS_MSK_IAM')
        .option('kafka.security.protocol', 'SASL_SSL')
        .option('kafka.sasl.jaas.config', 'software.amazon.msk.auth.iam.IAMLoginModule required;')
        .option('kafka.sasl.client.callback.handler.class', 'software.amazon.msk.auth.iam.IAMClientCallbackHandler')
        .load()
)
3

There are 3 best solutions below

0
Vishnu Bhagyanath On

Though I haven't tested this, based on the comment from Andrew on being theoretically able to relocate the dependency, I dug a bit into the source of aws-msk-iam-auth. They have a compileOnly('org.apache.kafka:kafka-clients:2.4.1') in their build.gradle. Hence the uber jar doesn't contain this library and is picked up from whatever databricks has (and shaded).

They are also relocating all their dependent jars with a prefix. So changing the compileOnly to implementation and rebuilding the uber jar with gradle clean shadowJar should include and relocate the kafka jars without any conflicts when uploading to databricks.

1
Carlo Abi Chahine On

I faced the same issue, I forked aws-msk-iam-auth in order to make it compatible with databricks. Just add the jar from the following release https://github.com/Iziwork/aws-msk-iam-auth-for-databricks/releases/tag/v1.1.2-databricks to your cluster.

0
Kudrat On

I also have faced this issue and found the mistake in the config. Databricks uses shaded kafka libraries, that's why we should use paths to shaded libraries in the config as described here:

sasl.jaas.config = shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class = shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler