Pyspark - sequential estimate function. - parallel

23 Views Asked by At

I have a func. like below and want calculate the estimated groupBy the key. The data needs to be sorted within the group by time.

It looks like it isn't easy/possible to do with spark.df so I tried with rdd, but even when I use a customized partitioning (number of groups) a "shuffle" in the part/recors? returns wrong results partially (not in all groups).

How can I avoid this and calculate it in parallel with a clean grouping given the sorting? Thanks in advance, Christian

func

def estimate(rows):
    estimated = float(0.0)
    result = []

    for row in rows:
        time, key, available, level, reduction, total = row
        if level >= 0.3:
           estimated += float(available -  reduction)
           estimated = min(estimated, total)
        else:
            estimated =float(0.0)
        result.append((time, key, available, level, reduction, total, estimated))
    return iter(result)

Approach with mapPartitions

def partition_func(key):
    return hash(key)

rdd = df_input.rdd.map(lambda row: (row["key"], row))
partitioned_rdd = rdd.partitionBy(numPartitions=n, partitionFunc=partition_func)


new_df = (partitioned_rdd.map(lambda x: x[1])        
          .mapPartitions(estimate)
          .toDF()
         )

Approach with groupByKeys.flatMapValues(estimate) also isn't clean in the parallelization.

1

There are 1 best solutions below

1
Shubham Sharma On

I would group the dataframe by key then use applyInPandas to calculate the estimated values for each group. This approach will distribute groups corresponding to each unique keys in parallel for efficient computation

def estimate(pdf):
    acc, result = 0, []
    pdf = pdf.sort_values('time')

    for r in pdf.itertuples():
        acc += r.available - r.reduction
        acc = min(acc, r.total)
        result.append(acc)

    return pdf.assign(estimated=result)

schema = T.StructType([*df_input.schema.fields, T.StructField('estimated', T.DoubleType())])
df_result = df_input.groupBy('key').applyInPandas(estimate, schema=schema)