I have a fs2 Stream Stream[F, C] where C <: Coproduct. And I want to transform it into a Stream[F, H] where H <: HList. This HList should contain all members that the coproduct C had.
So, essentially, a Pipe[F, C, H] .
The fs2 Pipe will work by waiting for at least one of each of the coproduct's members to be pulled, and then once at least one of each are pulled, finally combine it into a HList and output it.
So, it will be used pretty much like so:
type MyCoprod = A :+: B :+: C :+: CNil
type MyHList = A :: B :: C :: HNil
val stream: Stream[F, MyHList] = Stream
.emits(List(A, B, C)) // my coproducts
.through(pullAll) // i want to wait for A, B, C to pulled at least once and outputted
.map { hlist => ... }
I am very very new to Shapeless, and this is what I could think of before hitting a roadblock:
trait WaitFor[F[_], C <: Coproduct] {
type Out <: HList
def apply: Pipe[F, C, Out]
}
object WaitFor {
type Aux[F[_], C <: Coproduct, Out0 <: HList] =
WaitFor[F, C] { type Out = Out0 }
implicit def make[F[_], C <: Coproduct, L <: HList](implicit
toHList: ToHList.Aux[C, L]
): Aux[F, C, L] = new WaitFor.Aux[F, C, L] {
override type Out = L
override def apply: Pipe[F, C, Out] = {
def go(s2: Stream[F, C], currHList: L): Pull[F, L, Unit] = {
s2.pull.uncons1.flatMap {
case Some((coproduct, s3)) => {
// add or update coproduct member to currHList
// if currHList is the same as L (our output type) then output it (Pull.output1(currHList)) and clear currHList
// if not, keep iterating:
go(s3, ???)
}
case None => Pull.done
}
}
go(s1, ???).stream
}
}
def pullAll[F[_], C <: Coproduct](
stream: Stream[F, C]
)(implicit ev: WaitFor[F, C]): Stream[F, ev.Out] = {
stream.through(ev.apply)
}
}
My roadblock starts here:
override def apply: Pipe[F, C, Out] = ???
and that's when my knowledge of Shapeless exhausts.
My idea is to keep track of all coproduct members in a tuple (Option[C1], Option[C2], ...).
Once every element in the tuple is Some, I'll covert them to a HList and output them in the Stream.
(I'll be using FS2 Pull to keep track of the state recursively so I'm not worried about that).
But my issue is that, at the value level, there's no way for me to know how long the tuple will be, and for me to construct a tuple.
Any pointers so I can solve this?
Thanks
Let's do it step by step:
A :+: B :+: C :+: CNilA, newestBetcA :: B :: C :: HNilHListvalue, you should also reset your intermediate values storageOption[A] :: Option[B] :: Option[C] :: HNilSo, let's write a type class which would help us with it:
This code doesn't assume how we would store our cache not how we would update it. So we might test it with some impure code:
We can check with Scaste that it prints what we expect it would.
Now, it's a matter of how we'll thread this intermediate result through the FS2 Stream. One way would be to use
RefOne might modify this code to fit their aesthetics: extract
updateCacheto some function, use state monad or whatever. I guess turning it into pipe would be, e.g.: