I am working on apache kafka and pyspark to run some data analytics job. I am using Mongodb Kafka Source Connector to produce data in a pyspark structured streaming job. In pyspark script I am using Spark Mongodb connector to store processed data. The entire setup is working fine in local.
Now I am deploying MongoDB Kafka connector to a AWS MSK cluster using MSK connect. I have set up the cluster and connector in MSK which is up and running. I have also set up an EMR cluster and uploaded all pyspark script to a S3 bucket. I have used AWS console to create all these clusters and bucket and manually uploaded the pyspark scripts. When I am executing pyspark script in AWS EMR Step it's giving me following error
24/01/31 10:24:52 INFO AdminMetadataManager: [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
24/01/31 10:24:52 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_402]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_402]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) ~[org.apache.kafka_kafka-clients-3.3.2.jar:?]
at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAdmin$1(KafkaOffsetReaderAdmin.scala:499) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.withRetries(KafkaOffsetReaderAdmin.scala:518) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.partitionsAssignedToAdmin(KafkaOffsetReaderAdmin.scala:498) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.fetchEarliestOffsets(KafkaOffsetReaderAdmin.scala:288) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:249) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at scala.Option.getOrElse(Option.scala:189) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:246) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:98) ~[org.apache.spark_spark-sql-kafka-0-10_2.12-3.4.1.jar:3.4.1]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$2(MicroBatchExecution.scala:455) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.Option.getOrElse(Option.scala:189) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:455) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator.foreach(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator.foreach$(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) ~[scala-library-2.12.15.jar:?]
at scala.collection.IterableLike.foreach(IterableLike.scala:74) ~[scala-library-2.12.15.jar:?]
at scala.collection.IterableLike.foreach$(IterableLike.scala:73) ~[scala-library-2.12.15.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:56) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableLike.map(TraversableLike.scala:286) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableLike.map$(TraversableLike.scala:279) ~[scala-library-2.12.15.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:108) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
I have also done SSH login into EMR cluster's master node and tried to run spark-submit command but the error is same as above. Below is my spark script...
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField, TimestampType, IntegerType, ArrayType
from pyspark.sql.functions import *
def calculateTotalTimeSaved(startTime, endTime, profileViewed):
return (endTime - startTime) + (profileViewed * 30)
# Create a spark session
spark = SparkSession. \
builder. \
appName("myapp"). \
master("local[*]"). \
getOrCreate()
# Define schema for documents
documentSchema = StructType([
StructField('_id', StringType(), True),
StructField('profile_viewed', IntegerType(), True),
StructField('user_id', IntegerType(), True),
StructField('created_at', TimestampType(), True),
StructField('updated_at', TimestampType(), True),
StructField('settings_end_date', TimestampType(), True)
])
# Defining schema for mongodb ObjectId field
objectIdSchema = StructType([
StructField('$oid', StringType(), True)
])
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "b-2.XXXXXX.l1o3qz.c16.kafka.us-east-1.amazonaws.com:9094,b-1.XXXXXXX.l1o3qz.c16.kafka.us-east-1.amazonaws.com:9094") \
.option("subscribe", "topic") \
.option("startingOffsets", "earliest") \
.option("auto.commit.interval.ms", "1000") \
.load()
# Extracting data from value column
df = df.select(from_json(col("value").cast("string"), documentSchema).alias("document"))
df = df.select("document.*", from_json(col("document._id").cast("string"), objectIdSchema).alias("id"))
df = df.select(
col("id.$oid").alias("id"),
"user_id",
"profile_viewed",
"created_at",
"updated_at",
"settings_end_date",
)
df = df.withColumn("start_time", col("created_at").cast("long"))
df = df.withColumn("end_time", col("settings_end_date").cast("long"))
df = df.withColumn("total_time_saved", calculateTotalTimeSaved(col("start_time"), col("end_time"), col("profile_viewed")))
# Function to do some extra aggregation on each micro batch output results
def processForeachBatch(batchDf, batchId):
# Transform and write batchDF
# persist dataframe in case it has be reused multiple times
batchDf.persist()
batchDf = batchDf.orderBy(col("updated_at").desc())
# deduplicate records from the dataframe based on id
batchDf = batchDf.dropDuplicates(["id"])
batchDf = batchDf.select(
col("id").cast("string").alias("log_id"),
col("user_id").cast("integer"),
col("profile_viewed").cast("integer"),
col("created_at").cast("string"),
col("updated_at").cast("string"),
col("settings_end_date").cast("string"),
col("start_time").cast("integer"),
col("end_time").cast("integer"),
col("total_time_saved").cast("integer")
)
# batchDf = batchDf.filter(col("user_id") == 2655)
batchDf.show()
mongoURL = "mongodb+srv://XXXX:[email protected]/data_analytics.tts_log?authSource=admin&replicaSet=atlas-bftscy-shard-0&readPreference=primary"
batchDf.write \
.format("mongodb") \
.mode("append") \
.option("connection.uri", mongoURL) \
.option("database", "data_analytics") \
.option("collection", "tts_log") \
.option("idFieldList", "log_id") \
.option("upsertDocument", True) \
.save()
# free memory
batchDf.unpersist()
# Foreach batch function invoke
outputDF = df \
.writeStream \
.foreachBatch(processForeachBatch)
outputDF = outputDF.start()
outputDF.awaitTermination()
I am using following command to execute pyspark script in EMR Cluster master node:
spark-submit --deploy-mode client --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.4.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,org.mongodb.spark:mongo-spark-connector_2.12:10.2.0 s3://path/to/stream-data-analytics.py
I am using kafka version 3.5.1 and spark version 3.4.1. MSK cluster is running on private subnets. I'm assuming the issue might be here.