Suppose there is a Subject<T> at endpoint A and an IObservable<T> on endpoint B.
Endpoint A sends exactly one object of T using OnNext() and never calls OnComplete(). I don't have a control over that, it's in an external assembly. When I subscribe on endpoint B in my one, I don't get my lambda called:
endpoint.Subscribe(t => { doSomething(); });
However, when I subscribe like this:
using var cts = new CancellationTokenSource();
await endpoint.ForEachAsync(t => { doSomething(); /* cts.Cancel(); */ }, cts.Token);
I do get my lambda called. But there's a drawback. Since the publisher never calls OnCompleted(), I can't escape that async task unless I call cts.Cancel() after doSomething() method. It kinda works, but I don't like the idea of throwing exceptions as a part of a flow. Is there a better way?
Ok, one of complicated solutions could be:
And thanks to @Enigmativity for a hint, it can be that simple: