Issue with sequence/explode in date columns - Pyspark

32 Views Asked by At

I'm having issues while processing a DataFrame using SEQUENCE and EXPLODE, the dataframe has 3 columns: Employee_ID HireDate LeftDate And I'm generating a sequence to get a record per month between both dates, per employee, obviously HireDate and LeftDate is different on all employees, so the generated array can be bigger depending on the employee, and there are at least 480k employee records (before the explode).

This is the code I've been using:

df_exploded = df.select(
    col("Employee_ID"),
    explode(
        sequence(
            col("HireDate"),
            when(col("LeftDate").isNotNull(), col("LeftDate")).otherwise(expr("current_date")),
            expr("interval 1 month")
        )
    ).alias("Month"))

When I execute a show() or a display() there's no bigger issue, the real problem comes with other type of executions like count() or write(), where I'm getting the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 8.0 failed 4 times, most recent failure: Lost task 3.3 in stage 8.0 (TID 113) (10.139.64.100 executor 0): java.lang.IllegalArgumentException: Illegal sequence boundaries: 1710115200000000 to 1709683200000000 by 12

I tried by using SQL code and reducing the number of elements in each array but the same result was generated:

df_explode_sql = spark.sql("""
                           WITH TABLE1 AS (SELECT 
                            EMPLOYEE_ID,
                            HIREDATE,
                            CAST(
                              CASE 
                                WHEN HIREDATE < '2023-01-01' THEN '2023-01-01'
                                ELSE HIREDATE
                              END AS DATE
                            ) AS HIREDATE_ALT,
                            LEFTDATE
                          FROM TABLA
                          WHERE LEFTDATE > '2023-01-01')

                        SELECT *,
                          EXPLODE(sequence(HIREDATE_ALT, LEFT_DATE, interval 1 month))
                        FROM TABLE1
                           """)

Also tried with persist() and cache()

Any Ideas how can I solve this? My data is stored in ADLS2 on a Delta table. I'm using Databricks and my cluster has 4 workers, 256 GB RAM, 32 cores.

Thanks in advance.

1

There are 1 best solutions below

0
Antonio García On

have you already verified that the discharge date is really < than the hiring date for all records? It's just that I find those integers so large that the java error looks strange.

1710115200000000 to 1709683200000000 by 12

1710115200000000 is less than 1709683200000000 And it seems that you want to make a sequence from a future date to a past one.