I would like to combine two scalaz streams with a predicate which selects the next element from either stream. For instance, I would like this test to pass:
val a = Process(1, 2, 5, 8)
val b = Process(3, 4, 5, 7)
choose(a, b)(_ < _).toList shouldEqual List(1, 2, 3, 4, 5, 5, 7, 8)
As you can see, we can't do something clever like zip and order the two elements because one of the processes may be selected consecutively at times.
I took a stab at a solution that I thought would work. It compiled! But damn it if it doesn't do anything. The JVM just hangs :(
import scalaz.stream.Process._
import scalaz.stream._
object StreamStuff {
def choose[F[_], I](a:Process[F, I], b:Process[F, I])(p: (I, I) => Boolean): Process[F, I] =
(a.awaitOption zip b.awaitOption).flatMap {
case (Some(ai), Some(bi)) =>
if(p(ai, bi)) emit(ai) ++ choose(a, emit(bi) ++ b)(p)
else emit(bi) ++ choose(emit(ai) ++ a, b)(p)
case (None, Some(bi)) => emit(bi) ++ b
case (Some(ai), None) => emit(ai) ++ a
case _ => halt
}
}
Note that the above was my second attempt. In my first attempt I tried to create a Tee but I couldn't figure out how to un-consume the loser element. I felt that I needed something recursive like I have here.
I am using streams version 0.7.3a.
Any tips (including incremental hints because I'd like to simply learn how to figure these things out on my own) are greatly appreciated!!
I'll give a couple of hints and an implementation below, so you might want to cover the screen if you want to work out a solution yourself.
Disclaimer: this is just the first approach that came to mind, and my familiarity with the scalaz-stream API is a little rusty, so there may be nicer ways to implement this operation, this one might be totally wrong in some horrible way, etc.
Hint 1
Instead of trying to "unconsume" the losing elements, you can pass them along in the next recursive call.
Hint 2
You can avoid having to accumulate more than one losing element by indicating which side lost last.
Hint 3
I often find it easier to sketch out an implementation using ordinary collections first when I'm working with Scalaz streams. Here's the helper method we'll need for lists:
That assumes we've already got a losing element in hand, but now we can write the method we actually want to use:
And then:
Okay, looks fine. There are nicer ways we could write this for lists, but this implementation matches the shape of what we'll need to do for
Process.Implementation
And here's more or less the equivalent with scalaz-stream:
And lets check it again:
I wouldn't put this into production without some more testing, but it looks like it works.