Skipping null values in a Scan method of an Observable collection

85 Views Asked by At

I have an IObservable of items with a timestamp. I use the Scan method to wrap each item and add a reference to the last valid wrapped item which was received.

IObservable<IWrappedItem> wrappedItems = 
  allItems.Scan(seedItem, 
    (lastWrappedItem, currentItem) => 
      WrapItem(currentItem, lastWrappedItem)));

This is the signature of WrapItem:

IWrappedItem WrapItem(Item currentItem, IWrappedItem lastItem);

We needed to change the WrapItem method so it skips invalid (not-null) items and returns null. The seedItem will most probably be null, and the WrapItem method can handle it.

I need to update the way I use Scan with something like this:

IObservable<IWrappedItem> wrappedItems = allItems.Scan(seedItem, (lastWrappedItem, currentItem) => 
{
  IWrappedItem wrappedItem = WrapItem(currentItem, lastWrappedItem);
  if (wrappedItem == null)
  {
    // Do something clever to skip this invalid item
    // Next item should have a reference to lastWrappedItem
  }
  return wrappedItem;
}));

How can I implement this behavior without returning null values to the new collection, while keeping the Observable pattern? Is there a different method that I should use instead of "Scan"?

2

There are 2 best solutions below

2
Shahar On BEST ANSWER

EDIT: @TheodorZoulias - Thanks for your important comment. I tested it and saw that two parallel observers will not run concurrently, but the second one receives the last lastWrappedItem object of the first one as its seed instead of null. So I wrapped my observable with Defer as @Enigmativity suggested and it works correctly.

I found the answer to my question, I implemented a custom generic Scan method that receives the WrapItem function and ignores null values returned from it. It implements Scan using Select and Where methods.

This is my updated implementation:

public static IObservable<TAccumulate> ScanObservableAndFilterNulls<TSource, TAccumulate>(this IObservable<TSource> items, TAccumulate seed, Func<TSource, TAccumulate, TAccumulate> wrapItemFunc)
{
  // needed in order to protect from cases of concurrent observers
  // every observable will receive a new instance of the member `previousDataItem`
  return Observable.Defer(() =>
  {
    // use the seed before beginning the scan implementation
    TAccumulate lastWrappedItem = seed;
    // implement the custom Scan method
    return items.Select(item => wrapItemFunc(item, lastWrappedItem))
      .Where(wrappedItem =>
      {
        if (wrappedItem != null)
        {
          // update the lastWrappedItem only when the wrapped item is valid
          lastWrappedItem = wrappedItem;
          return true;
        }
        // skip invalid wrapped items, but keep the reference to the last valid item
        return false;
      });
  });
}

This method can be used like this:

public static IObservable<IWrappedItem> ScanAndWrapItems(IObservable<Item> allItems, IWrappedItem seedItem)
{
  return allItems.ScanObservableAndFilterNulls(seedItem, WrapItem);
}

I didn't benchmark the new method to assess the performance penalty, but I believe it would be slower than the original Scan method...

1
JDChris100 On

You should just be able to simply use the Where method https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.where?view=net-7.0

IObservable<IWrappedItem> wrappedItems = allItems.Where(item => item != null).Scan(seedItem, (lastWrappedItem, currentItem) => 
{
  IWrappedItem wrappedItem = WrapItem(currentItem, lastWrappedItem);
  if (wrappedItem == null)
  {
    // Do something clever to skip this invalid item
    // Next item should have a reference to lastWrappedItem
  }
  return wrappedItem;
}));