Can't get previous emitted values from Flow

727 Views Asked by At

Can't get previous emitted values from Flow.

class TestActivity: ComponentActivity() {
...

private val flowA = MutableStateFlow(0)

private val flowB = MutableStateFlow("")

init {
    flowB.onEach { Log.d("flowtest", "test - $it") }
        .launchIn(lifecycleScope)
}

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    flowB.value = "1"
    flowB.value = "2"
    flowB.value = "3"

    flowA.map { "ignore" }
        .onEach {
            flowB.value = "4"
            flowB.value = "5"
            flowB.value = "6"
        }
        .launchIn(lifecycleScope)

    flowA.value = 0
    
...
}

expect

test - 1
test - 2
test - 3
test - 4
test - 5
test - 6

result
test - 1
test - 2
test - 3
test - 6

What is missing point that concept of Flow?

How can I get previous emitted values?

2

There are 2 best solutions below

3
Tenfour04 On BEST ANSWER

This is a bit of a guess since I'm not bothering to put together a project and test it.

First, there are two things to remember about StateFlow.

  1. It is limited to a history of 1. It can only replay a single value to new subscribers.
  2. It is conflated. This means that if a collector is slower than the emissions coming in, the collector will miss some of those emitted values and only get the latest one.

So, looking at your code, at first glance I would expect you to see only:

test - 3
test - 6

This is because you're emitting on the main thread and collecting on the main thread, so whenever you have multiple StateFlow value changes in a row, I would expect only the last one called in a method to "stick" since the collector is having to wait its turn for the main thread to be relinquished before it can collect its next value.

So why do 1 and 2 appear?

Well, actually, lifecycleScope doesn't use Dispatchers.Main. It uses Dispatchers.Main.immediate, which behaves a little differently. The immediate version runs suspending code immediately in place if you are already on the Main thread, instead of yielding to other coroutines first.

So, I'm theorizing that when you change the value on the main thread, but you are collecting flowB's onEach on Dispatchers.Main.immediate, so it gets a chance to immediately run its onEach right in place each time you emit a value to flowB.

But, a reminder, I haven't actually tested flows with immediate to test this hypothesis. It's just the only reason I can think of that would explain the behavior. To test this theory yourself, you can change to using launchIn(lifecycleScope + Dispatchers.Main) in both places and see if 1 and 2 disappear.

1
Mohamed Galal On

In fact, there is something wrong with this code because the value property is a stateFlow property, not a sharedFlow property. In addition, you can't add value to statedFlow after collecting data if the collector and suscriber in the same coroutine. So the right code will be:

val flowA = MutableSharedFlow<Int>()

val flowB = MutableStateFlow("")

flowA.map { "test $it" }
    .onEach { flowB.value = it }

flowB.value = "1"
flowB.value = "2"
flowB.value = "3"

flowB.collect { println(it) }

and the result will be: 3

because the stateFlow just keeps the last emitted one

if you want to get all values you can use SharedFlow like that:

suspend fun main():Unit = coroutineScope {

val flowA = MutableSharedFlow<Int>()

launch {
    flowA.collect {
        println(it)
    }
}

launch {
    flowA.emit(1)
    flowA.emit(2)
    flowA.emit(3)
}

And you can check the documentation to get more Info SharedFlow StateFlow