I have a stream of integers that I need to process using a structure that uses a CurrentValueSubject to know whether or not it is allowed to do the processing. Since CurrentValueSubject doesn't complete, if I do the processing inside a flatMap(), the overall stream (fooTheBars) only gets to complete if the stream of integers is empty.
If I comment out the processing part (.flatMap { bar in fooer.doFoo(with: bar) }) the stream completes normally because the CurrentValueSubject is not involved. But I need it to complete normally after all the items are processed regardless of the number of items.
Output I see:
[0, 1, 2]
doing foo with 0
doing foo with 1
doing foo with 2
received value
received value
received value
What I want it to output:
[0, 1, 2]
doing foo with 0
doing foo with 1
doing foo with 2
received value
received value
received value
received completion // hooray, completion!
Output when the stream is empty (completes normally which is what I want):
[]
received completion // hooray, completion!
If I comment out the processing (.flatMap()), everything is fine:
[0, 1, 2]
received value
received value
received value
received completion // hooray, completion!
How do I modify the code below the comment so that the overall stream completes when all the items finish processing? Is using CurrentValueSubject like that an inherently bad pattern?
struct Fooer {
private let readyToFoo = CurrentValueSubject<Bool, Never>(true)
func doFoo(with item: Int) -> AnyPublisher<Void, Never> {
readyToFoo
.filter { $0 }
.map { _ in () }
.handleEvents(receiveOutput: {
print("doing foo with \(item)") // processing
})
.delay(for: .seconds(1), scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
}
func getBars() -> AnyPublisher<Int, Never> {
let bars = (0..<(Bool.random() ? 0 : 3))
print(Array(bars))
return bars.publisher
.eraseToAnyPublisher()
}
// can't change anything above this comment
let fooer = Fooer()
let fooTheBars = getBars()
.flatMap { bar in
fooer.doFoo(with: bar)
}
.sink(receiveCompletion: { completion in
print("received completion")
}, receiveValue: { _ in
print("received value")
})
If I understand correctly, you only want
doFooto publish one value. i.e. the "processing" of abaris complete as soon asdoFoopublishes one value.Therefore, you can stop the
doFoopublisher by just taking thefirst().This produces your intended output. And if
readyToFoohappens to holdfalsewhen you calldoFoo,doFoowon't publish. It is only whenreadToFoosends atrue, doesdoFoopublishes its first value.