1

There are 1 best solutions below

0
DileeprajnarayanThumula On BEST ANSWER

You can define a Kafka connection with the options below:

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.bootstrap.servers": bootstrap_servers,
    "group.id": group_id,
    "subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()

This code sets up the necessary configurations for connecting to a Kafka broker using the SASL_SSL security protocol with plain text credentials. It then reads streaming data from the specified Kafka topic into a DataFrame df.

In production, it's recommended to store your JAAS config in a file named jaas.conf and remove the kafka.sasl.jaas.config option from your Spark application. Instead, you should pass the file path to the spark-submit command using the --driver-java-options flag.

options = {
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.bootstrap.servers": bootstrap_servers,
    "group.id": group_id,
    "subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()

For spark-submit, provide the file path to jaas.conf using --driver-java-options:

spark-submit \
  --driver-java-options -Djava.security.auth.login.config=/path/to/jaas.conf \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 yourapp.py

Make sure to adjust the file path and package versions according to your application's requirements.

Reference: SO link