See below the C# code for a Combinator that zips together two inputs

public class TransformScript
{
    public IObservable<Tuple<bool,bool>> Process(IObservable<bool> source1, IObservable<bool> source2)
    {
        return source1.Zip(source2,(s1,s2) => Tuple.Create(s1,s2));
    }
}

This code takes in two input streams of booleans and then fires a tuple whenever both of the slots in the zip are filled. In contrast, a CombineLatest waits for the tuple to fill for the first time (and then fires) and thereafter fires whenever one of the slots is updated, even if the other remains static.

I would like a behaviour that is similar, in some sense, to both of these, but very different in a key way for which I think using a Tuple is probably not wise.

Each time an element is received from a given output stream (say source1), that element should be passed forward without the other stream yielding an output. I am, in effect, compressing the two output streams into a single output stream which yields labelled output, so that I can pass that single object through a bottle neck. Explicitly attaching a label to each stream, merging them into a new stream and then extracting the elements based off the label posthoc is not an option.

Another illustration is to compare to a merge (note that the objects in my case are of the same type). See the marble diagram below for the merge operator

source 1  -----1-----1-----1-----
source 2  ---2----2---------2----
merge     ---m-m--m--m-----mm----

Here the output is only a single stream which is all my bottleneck can take, but the identity of the different items has been lost

source 1  -----1-----1-----1-----
source 2  ---2----2---------2----
whatIwant ---2-1--2--1-----12----

This would allow me to, on the other side of the bottleneck instantly seperate out two streams to recover source 1 and source 2. But again, I want to do so without adding labels to them.

How can I modify (or completely rewrite) the code above to acheive this kind of behaviour?

1

There are 1 best solutions below

1
ibebbs On

Use an Either monad with the following code:

public class TransformScript
{
    public IObservable<Either<T1,T2>> Process<T1,T2>(IObservable<T1> source1, IObservable<T2> source2)
    {
        return Observable.Merge(
            source1.Select(value => Either.First(value)),
            source2.Select(value => Either.Second(value))
        );
    }
}

You can then discriminate between First or Second parts of the Either monad later in the RX pipeline.