Apache Spark stops processing after a specific number of rows

39 Views Asked by At

I am using PySpark to ingest data from a Kafka topic and index it into Elasticsearch. All the technologies are deployed within docker containers. I have two spark workers, with 6 cores and 10G of memory each. After processing a certain number of documents (approx 48000) my Spark Structured Streaming program halts, with no errors. Everything up till that moment is successfully indexed; and additional data is available within the Kafka topic which remains unconsumed. What I have noticed is, it always stops at a magic number 3 partitions less than the partitions I set in my

csv_tweets = csv_tweets.repartition(48)

Command. For example, where I have set this number to 48, the last printed log is:

24/02/06 01:14:22 INFO TaskSetManager: Finished task 15.0 in stage 2.0 (TID 17) in 150639 ms on 172.21.0.14 (executor 0) (45/48)

Similarly, if I set it to 24, it halts at 21/24.

#!/usr/bin/env python3
<imports>

def execute_spark_processing(tweets_topic):
    #Create Spark session
    spark = SparkSession.builder \
            .appName("SoDa-TAP") \
            .master("spark://spark-master:7077") \
            .config('spark.driver.maxResultSize', '0') \
            .config("spark.local.dir", "/tmp") \
            .config('spark.sql.repl.eagerEval.enabled', True) \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.streaming.kafka.consumer.cache.enabled', 'false') \
            .config('spark.kryoserializer.buffer.max', '2000M') \
            .config('spark.network.timeout', '600s') \
            .getOrCreate()
        
    print("Spark version: ", spark.version)
    #.config('spark.driver.maxResultSize', '1G') \
    #.config("spark.driver.memory", "1g") \

    # # Create Spark context
    sc = spark.sparkContext

    #Kafka topic to create the schema of the data to be processed
    schema_topic = tweets_topic
    schema_inference_sample_size = 50000
    print("About to infer schema")
    def infer_topic_schema_json(schema_topic, schema_inference_sample_size):
        df_json = (spark.read.format("kafka")
                .option("kafka.bootstrap.servers", "broker:29092")
                .option("subscribe", schema_topic)
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss", "false")
                .load()
                .withColumn("value", F.expr("string(value)"))
                .select("value"))
        #print(df_json.show(df_json.count()))
        df_read = spark.read.json(df_json.rdd.map(lambda x: x.value), multiLine=True)
        return df_read.schema.json()
    #.option("maxOffsetsPerTrigger", schema_inference_sample_size)

    infer_schema = True
    #schema_location = "disorders/schemas/control_schema.json" #"schemas/"+tweets_topic+"_schema.json"
    schema_location = "/app/schemas/"+tweets_topic+"_schema.json"

    if not infer_schema: 
        try:
            with open(schema_location, 'r') as f:
                print("Reading pre-saved schema.")
                topic_schema_txt = json.load(f)
        except:
            infer_schema = True
            pass

    if infer_schema:
        topic_schema_txt = infer_topic_schema_json(schema_topic, schema_inference_sample_size)
        with open(schema_location, 'w') as f:
            json.dump(topic_schema_txt, f)

    topic_schema = StructType.fromJson(json.loads(topic_schema_txt))

   
    print("Schema Read Successfully.")

    csv_tweets = spark.readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "broker:29092") \
                .option("subscribe", tweets_topic) \
                .option("startingOffsets", "earliest") \
                .option("failOnDataLoss", "false") \
                .option("checkpointLocation", "/app/tmp/read_checkpoint/") \
                .load() \
                .withColumn("value", F.expr("string(value)")) \
                .select("value") \
                .withColumn('value', F.from_json(col("value"), topic_schema)) \
                .select("value.payload.*")

    print("-----Read Stream Initialized-----")

    #.option("maxOffsetsPerTrigger", "50000") \

    #repartition of dataframe based on # of cores * 4 as a golden rule
    csv_tweets = csv_tweets.repartition(48)

    #---------------------------TEXT TRANSFORMATIONS--------------------------------------
    if "text" in csv_tweets.columns:
        csv_tweets = csv_tweets.withColumn('fix_text', apply_fix_tweet_text('text'))\
        .withColumn('text_word_count', apply_calculate_word_count('fix_text'))\
        #<other transformations>
            
    #---------------------------TEXT TRANSFORMATIONS--------------------------------------

    query = csv_tweets.writeStream \
        .outputMode("append") \
        .format("org.elasticsearch.spark.sql") \
        .option("checkpointLocation", "/app/tmp/write_checkpoint/") \
        .option("es.resource", tweets_topic) \
        .option("es.nodes", "es01") \
        .option("es.port", "9200") \
        .start()

    query.awaitTermination()

I have been attempting to debug this for many weeks, to no avail. Would appreciate any help.

0

There are 0 best solutions below