Glue/Spark: Filter a large dynamic frame with thousands of conditions

964 Views Asked by At

I am trying to filter a timeseries glue dynamic frame with millions of rows having data:

id  val ts  
a   1.3 2022-05-03T14:18:00.000Z
a   9.2 2022-05-03T12:18:00.000Z
c   8.2 2022-05-03T13:48:00.000Z

I have another pandas dataframe with thousands of rows:

id  start_ts                        end_ts  
a   2022-05-03T14:00:00.000Z    2022-05-03T14:18:00.000Z
a   2022-05-03T11:38:00.000Z    2022-05-03T12:18:00.000Z
c   2022-05-03T13:15:00.000Z    2022-05-03T13:48:00.000Z

I want to filter all the rows in the time series dynamic frame having condition they have the same id and the ts is between start_ts and end_ts.

My current approach is too slow to solve the problem:

I am first iterating over the pandas_df and storing multiple filtered glue dynamic frames into an array

dfs=[]
for index, row in pandas_df.iterrows():
    df = Filter.apply(ts_dynamicframe, f=lambda x: ((row['start_ts'] <= x['ts'] <= row['end_ts']) and x['id'] == index))
    dfs.append(df)

and then unioning all the dynamicframes together.

df = dfs[0]

dfs.pop(0)

for _df in dfs: 
    df = df.union(_df)

the materialization takes too long and never finishes..

print("Count: ", df.count())

What could be more efficient approaches to solving this problem with spark/glue?

1

There are 1 best solutions below

0
wwnde On

Use a range join

Data

df=spark.createDataFrame([('a' ,  1.3 ,'2022-05-03T14:18:00.000Z'),
    ('a' ,  9.2, '2021-05-03T12:18:00.000Z'),
    ('c' ,  8.2, '2022-05-03T13:48:00.000Z')],
    ('id' , 'val', 'ts'  ))
    
    
    
     
    df1=spark.createDataFrame([('a' ,  '2022-05-03T14:00:00.000Z'  ,  '2022-05-03T14:18:00.000Z'),
    ('a'  , '2022-05-03T11:38:00.000Z' , '2022-05-03T12:18:00.000Z'),
    ('c' ,  '2022-05-03T13:15:00.000Z' ,  '2022-05-03T13:48:00.000Z')],
    ('id' , 'start_ts' , 'end_ts' ))
    
    #Convert to timestamp if not yet converted
    df= df.withColumn('ts', to_timestamp('ts'))
    df1= df1.withColumn('start_ts', to_timestamp('start_ts')).withColumn('end_ts', to_timestamp('end_ts'))

Solution

#convert to SQL table
df1.createOrReplaceTempView('df1')
df.createOrReplaceTempView('df')


#Use range between
spark.sql("SELECT * FROM df,df1 WHERE df.id= df1.id AND df.ts BETWEEN df1.start_ts and df1.end_ts").show()

outcome

+---+---+-------------------+---+-------------------+-------------------+
| id|val|                 ts| id|           start_ts|             end_ts|
+---+---+-------------------+---+-------------------+-------------------+
|  a|1.3|2022-05-03 14:18:00|  a|2022-05-03 14:00:00|2022-05-03 14:18:00|
|  c|8.2|2022-05-03 13:48:00|  c|2022-05-03 13:15:00|2022-05-03 13:48:00|
+---+---+-------------------+---+-------------------+-------------------+