RX.NET batched timer processing by key - how to project List<T> instead of T?

88 Views Asked by At

Let's say you have a stream of stock updates StockUpdate:

public class StockUpdate
{
  public string Symbol { get; set; }
  public string Price { get; set; }
}

Every n seconds, you want to emit a List that contains the last value for each of the symbols.

How can I do this with RX.NET? Similar questions on StackOverflow leverage GroupBy/SelectMany with sample, but this would project each item, not a list. I could buffer that list again, but then that would create extra delay.

2

There are 2 best solutions below

3
On

maybe you can try this.. by replacing the Observable.Empty<> by your source object...

a sorry you want a list of as result..

        IObservable<IEnumerable<StockUpdate>> y = Observable.Empty<StockUpdate>()
            .Buffer(TimeSpan.FromSeconds(1))
            .Select(b => 
                    b
                    .GroupBy(su=> su.Symbol)
                    .Select(grp=> new StockUpdate { Symbol = grp.Key, Price = grp.Last().Price }))
            ;
0
On

Are you looking for something like this:

IObservable<StockUpdate> source = ...;

IObservable<IList<StockUpdate>> query =
    source
        .Window(TimeSpan.FromSeconds(1.0))
        .Select(xs => xs.ToList())
        .Merge();