I am using PySpark to ingest data from a Kafka topic and index it into Elasticsearch. All the technologies are deployed within docker containers. I have two spark workers, with 6 cores and 10G of memory each. After processing a certain number of documents (approx 48000) my Spark Structured Streaming program halts, with no errors. Everything up till that moment is successfully indexed; and additional data is available within the Kafka topic which remains unconsumed. What I have noticed is, it always stops at a magic number 3 partitions less than the partitions I set in my
csv_tweets = csv_tweets.repartition(48)
Command. For example, where I have set this number to 48, the last printed log is:
24/02/06 01:14:22 INFO TaskSetManager: Finished task 15.0 in stage 2.0 (TID 17) in 150639 ms on 172.21.0.14 (executor 0) (45/48)
Similarly, if I set it to 24, it halts at 21/24.
#!/usr/bin/env python3
<imports>
def execute_spark_processing(tweets_topic):
#Create Spark session
spark = SparkSession.builder \
.appName("SoDa-TAP") \
.master("spark://spark-master:7077") \
.config('spark.driver.maxResultSize', '0') \
.config("spark.local.dir", "/tmp") \
.config('spark.sql.repl.eagerEval.enabled', True) \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.streaming.kafka.consumer.cache.enabled', 'false') \
.config('spark.kryoserializer.buffer.max', '2000M') \
.config('spark.network.timeout', '600s') \
.getOrCreate()
print("Spark version: ", spark.version)
#.config('spark.driver.maxResultSize', '1G') \
#.config("spark.driver.memory", "1g") \
# # Create Spark context
sc = spark.sparkContext
#Kafka topic to create the schema of the data to be processed
schema_topic = tweets_topic
schema_inference_sample_size = 50000
print("About to infer schema")
def infer_topic_schema_json(schema_topic, schema_inference_sample_size):
df_json = (spark.read.format("kafka")
.option("kafka.bootstrap.servers", "broker:29092")
.option("subscribe", schema_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.withColumn("value", F.expr("string(value)"))
.select("value"))
#print(df_json.show(df_json.count()))
df_read = spark.read.json(df_json.rdd.map(lambda x: x.value), multiLine=True)
return df_read.schema.json()
#.option("maxOffsetsPerTrigger", schema_inference_sample_size)
infer_schema = True
#schema_location = "disorders/schemas/control_schema.json" #"schemas/"+tweets_topic+"_schema.json"
schema_location = "/app/schemas/"+tweets_topic+"_schema.json"
if not infer_schema:
try:
with open(schema_location, 'r') as f:
print("Reading pre-saved schema.")
topic_schema_txt = json.load(f)
except:
infer_schema = True
pass
if infer_schema:
topic_schema_txt = infer_topic_schema_json(schema_topic, schema_inference_sample_size)
with open(schema_location, 'w') as f:
json.dump(topic_schema_txt, f)
topic_schema = StructType.fromJson(json.loads(topic_schema_txt))
print("Schema Read Successfully.")
csv_tweets = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:29092") \
.option("subscribe", tweets_topic) \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("checkpointLocation", "/app/tmp/read_checkpoint/") \
.load() \
.withColumn("value", F.expr("string(value)")) \
.select("value") \
.withColumn('value', F.from_json(col("value"), topic_schema)) \
.select("value.payload.*")
print("-----Read Stream Initialized-----")
#.option("maxOffsetsPerTrigger", "50000") \
#repartition of dataframe based on # of cores * 4 as a golden rule
csv_tweets = csv_tweets.repartition(48)
#---------------------------TEXT TRANSFORMATIONS--------------------------------------
if "text" in csv_tweets.columns:
csv_tweets = csv_tweets.withColumn('fix_text', apply_fix_tweet_text('text'))\
.withColumn('text_word_count', apply_calculate_word_count('fix_text'))\
#<other transformations>
#---------------------------TEXT TRANSFORMATIONS--------------------------------------
query = csv_tweets.writeStream \
.outputMode("append") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/app/tmp/write_checkpoint/") \
.option("es.resource", tweets_topic) \
.option("es.nodes", "es01") \
.option("es.port", "9200") \
.start()
query.awaitTermination()
I have been attempting to debug this for many weeks, to no avail. Would appreciate any help.