I have an Observable that produces values at variable speed. In order to not get overwhelmed with values I added a Throttle of three seconds so I only get a value if no further values were published for three seconds. Want I want though is to end the stream if I get a certain amount of updates within a time period and replace it with another observable.
For example, if I get 50 updates in within three seconds, end the stream and replace it with a different stream, similar to how Catch can replace an observable that was terminated by an exception with another one.
Something like the below but no exception is being thrown so can't use Catch:
myObservable
.Throttle(TimeSpan.FromSeconds(3)) //Not sure if we need to remove Throttle
.Catch<long, Exception>(e => Observable.Return(long)0) //Instead of catching an exception, some way to monitor how many updates are coming in before throttling
.Subscribe
EDIT: I added a marble diagram to try to show what I am looking for.
The initial observable produces values at a variable rate. Values 1-6 come in, none within a burst of 50 in 3 seconds, and these values pass through to throttle, and the final values of 1, 5, and 6 are produced.
Then, the initial observable produces values 7-60, within 3 seconds. Here is where I am trying to do what "???" is showing. The idea was to recognize that 50 or more items were produced within the set timeframe, complete the original obs. and replace it with one I provide, similar to how you can provide an obs. sequence in Catch to replace one that errored (for example if I saw the original sequence produced that huge burst and threw an exception).
After the initial obs. is replaced, the sequence continues with the new one, with the produced items going through the existing throttle.
If only 49 items come within the timespan checked in "???", those values will all pass through to Throttle and only the last one would be produced. If no updates come in at all then nothing happens and no output is produced.
Hopefully what I am asking is a bit more clear now.

You can use
Scan()to build a sliding window of the last 50 items (couldn't get it work withBuffer()orWindow(), but I guess it's possible). Each item is enriched with a timestamp. Then for each sliding window, you check the first and last timestamp and check if they are too close. If that is the case, you switch to the other observable.To "enrich" the value with a timestamp, you simply use
Select():Then use
Scan()to build the sliding window:Now your "values" are a list of tuples, where each entry has the value and the timestamp it was emitted. We use
TakeWhile()to stop emitting values when the start and end timestamps are too close:For debugging purposes, we will print the values of the first and last entry:
Then we "extract" the original value again:
Now we have a stream which will "die" when the timestamps are too close. We can switch to the other observable with a simple
Concat()(orSwitch()):Here is the full source code:
This can generate an output like this:
You still have to readd the
Throttle()call at the right position again.