Last SPARK Task taking forever to complete

37 Views Asked by At

I am running a SPARK job and for the most it goes fast but at the last task it gets stuck in one of the stages. I can see there is a lot more shuffle read / rows happening for that task, and tried a bunch of re-partitioning strategies to make sure an even distribution. But still can't get through it. Could you please help? Attaching images for the same too.

The join that I am doing is trying to look-up for some private data which is in a delta lake table (all of this is being done on Databricks).

Table 1 with all desired event logs / rows is: sizeInBytes=218.2 TiB; BUT, filtered on a partition key date for just the last 4 days. Still huge enough I assume, as there are a lot of events.

Table 2 the look up table for the personal fields which are hashed in the above table is: sizeInBytes=1793.9 GiB. This table just has 4 columns. Key, hash, timestamp and type. This is just a simple look up table.

enter image description here enter image description here

Essentially, there are 4 hashed out field that I need to reverse look up and that needs 4 separate joins with this look up table. This is quite expensive, but at this point there is no way out for this. The join is happening on that hashed_key, which I tried to use in reparitioning scheme for the Dataframes. I thought doing this will bring the same hash_keys in the same partition and then they could be picked up by the same executor. This is the hypothesis, but still I see one task running for a long time as it is doing exorbitant amount of shuffle reads and going through a lot more rows.

What could I be doing wrong? Is repartitioning not a good approach here? I read somewhere that I could try ITERATIVE broadcasting. That involved breaking the smaller table (which seems the lookup table here) in to smaller chunks (I think lesser than 8 GB) and then broadcast it multiple times to eventually merge all data later.

Any help would be appreciated as I am getting stuck at the same place with these strategies.

Thank you!

Doing a union on a few types to create first Dataframe. Then joining it with the lookup.

    allIncrementalEvents.as("e")
      .filter(col("e.type") === "authentication")
      .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
      .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
      .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
      .repartition(partitions)

UNION

   allIncrementalEvents.as("e")
      .filter(col("e.type") === "session")
      .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
      .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
      .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
      .repartition(partitions)   

UNION

      allIncrementalEvents.as("e")
      .filter(col("e.type") === "other")
      .filter(lower(col("e.payload.type")).isin(eventConf.eventTypes:_*))
      .filter(lower(col("e.payload.os.name")).isin(eventConf.osNames:_*))
      .filter(lower(col("e.payload.device.manufacturer")).isin(eventConf.manufacturers:_*))
      .repartition(partitions)

Join

 
 extractAuthEvents
        .union(extractSubEvents)
        .union(extractOpenEvents)
        .union(extractSessionEvents)
         .join(reverseLookupTableDf.as("adId"),
            col("adId") === col("adId.hashed"),
            "leftouter"
          )
          .join(reverseLookupTableDf.as("ip"),
            col("ae.ip") === col("ip.hashed"),
            "leftouter"
          )
          .join(reverseLookupTableDf.as("ua"),
            col("ae.ua") === col("ua.hashed"),
            "leftouter"
          )
          .join(reverseLookupTableDf.as("uid"),
            col("ae.uuid") === col("uid.hashed"),
            "leftouter"
          )
0

There are 0 best solutions below