I have a func. like below and want calculate the estimated groupBy the key. The data needs to be sorted within the group by time.
It looks like it isn't easy/possible to do with spark.df so I tried with rdd, but even when I use a customized partitioning (number of groups) a "shuffle" in the part/recors? returns wrong results partially (not in all groups).
How can I avoid this and calculate it in parallel with a clean grouping given the sorting? Thanks in advance, Christian
func
def estimate(rows):
estimated = float(0.0)
result = []
for row in rows:
time, key, available, level, reduction, total = row
if level >= 0.3:
estimated += float(available - reduction)
estimated = min(estimated, total)
else:
estimated =float(0.0)
result.append((time, key, available, level, reduction, total, estimated))
return iter(result)
Approach with mapPartitions
def partition_func(key):
return hash(key)
rdd = df_input.rdd.map(lambda row: (row["key"], row))
partitioned_rdd = rdd.partitionBy(numPartitions=n, partitionFunc=partition_func)
new_df = (partitioned_rdd.map(lambda x: x[1])
.mapPartitions(estimate)
.toDF()
)
Approach with groupByKeys.flatMapValues(estimate) also isn't clean in the parallelization.
I would group the dataframe by
keythen useapplyInPandasto calculate the estimated values for each group. This approach will distribute groups corresponding to each unique keys in parallel for efficient computation