Spark UDAF/Aggregator to process groups of records in order

185 Views Asked by At

I want to do some custom groupBy aggregations with Spark which require handling the records in order (timestamp), and the processing of the n-th record needs the output of the processing of the previous (n-1) records (sounds a bit like a streaming task?). The input is in a big set of files partitioned by date.

The current solution I have is to implement a custom org.apache.spark.sql.expressions.Aggregator, which incrementally inserts all input records into a buffer and does all aggregation at the very end. Pseudo-code is like:

class MyAgg extends Aggregator[IN, SortedList[IN], OUT] {
    override def zero: SortedList[IN] = SortedList.empty

    override def reduce(b: SortedList[IN], e: Event): SortedList[IN] =
        insert_into_b(e)

    override def merge(b1: SortedList[IN], b2: SortedList[IN]): SortedList[IN] =
        merge_two_lists(b1, b2)

    override def finish(b: SortedList[IN]): OUT =
        my_main_aggregation_happens_here:
            b.foldLeft ...
}

val result = myInputDS.groupBy(_.key).agg((new MyAgg()).toColumn) 

This solution works, but I have a big concern about the performance, as the reduce phase doesn't reduce anything at all, and all records need to be stored in memory till the very end. I hope that there are better solutions for that.

Could you please help? Thanks.

0

There are 0 best solutions below