I want to know how to connect confluent cloud to databricks. I wantto read data from confluent to spark dataframe.
I have used this code:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", confluentBootstrapserver) \
.option("kafka.security.protocol", "SSL") \
.option("subscribe", confluentTopic) \
.option("startingOffsets", "earliest") \
.option("kafka.sasl.jaas.config",
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required
username\"**********\" password = \"******************************\";").load()
i have used API key in username and secret in password. and provided topic name in confluentTopic.
i am getting various types of error " java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics". before that i was getting consumer can not be created. I am new to this so please elaborate on your answer.
You can use below code blocks.
Create new key in your cluster tab as below.
Copy and save the Api key and secret.
Next, you need to give these key and secret in
kafka.sasl.jaas.configspark option instead of giving username and password as below. I think you given the same and it worked in my environment.Next, create a function to get results from binary type and do udf register.
Display the results.
Output:
and in spark
Check whether you giving correct port
9092, may be you batch data is too large or there may be problem with access to databricks from confluent.