What is the correct way to take the most recent data in a reactivex ReplaySubject?

74 Views Asked by At

I'm new to reactivex (.NET) and I need help with the following problem.

I have a ReplaySubject constructed by passing a time interval of 1 hour. It continuously receives data from a sensor. I want to have "something" that only gives access to the most recent data that has already arrived within a certain amount of time (e.g. one minute). This amount of time is not pre-defined. It is not known at the time the ReplaySubject is constructed. Receiving the most recent data in a List<T> is sufficient.

Which and how reactivex operators can be used? Can you propose a solution?

I've already tried several operators, in particular the ones with "last" in the name but with no success because they wait for the ReplaySubject to complete.

2

There are 2 best solutions below

0
Theodor Zoulias On

The ReplaySubject<T> stores internally timestamp information for each buffered element (source code), but it doesn't expose this information. So, unless you are willing to implement a custom ReplaySubject<T> from scratch, you'll have to duplicate this information. You could use the DateTime.UtcNow, or a Stopwatch as the native implementation, or the Environment.TickCount64 property as in the example below:

ReplaySubject<(Item Value, long TimeStamp)> subject = new(
    TimeSpan.FromMinutes(60.0), Scheduler.Immediate);

//...

subject.OnNext((new Item(), Environment.TickCount64));

//...

long now = Environment.TickCount64; // Milliseconds elapsed since the system started
Item[] recent = subject
    .Where(e => e.TimeStamp >= now - 60_000) // Last 60 seconds
    .Select(e => e.Value)
    .TakeImmediate()
    .ToArray()
    .Wait();

The TakeImmediate operator propagates the elements that are pushed synchronously during the subscription, and then immediately completes:

private static IObservable<T> TakeImmediate<T>(this IObservable<T> source)
{
    return Observable.Create<T>(observer =>
    {
        source.Subscribe(observer).Dispose();
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

Configuring the ReplaySubject<T> with the Scheduler.Immediate is required in order for the TakeImmediate operator to work correctly (to not filter-out very recent items).

3
Enigmativity On

You simply need to do this:

ReplaySubject<int> rs = new(TimeSpan.FromMinutes(1.0));

The subject will only replay values from within the TimeSpan parameter.


If the timespan is changing perhaps play with something like this:

Subject<TimeSpan> t = new();
Subject<int> s = new();

IObservable<int> cq = t.Select(t => s.Replay(t).RefCount()).Switch();