Using flows to return a list as a terminal operator

170 Views Asked by At

I have the following that returns a Flow<List<Events>>. But I want return the List<Events>> from the flow instead of Flow<List>

val events = eventDao.getEventsFromTimeStamp(startTimeStamp, endTimeStamp)
                .map { listOfEventEntity ->
                    listOfEventEntity.map { eventEntity ->
                        eventEntity.toEvent()
                    }
                }

I have tried using the following operators

val listOfEvents = events.single()
val listOfEvent = events.toList().flatten()

This is my eventDao that return a flow

fun getEventsFromTimeStamp(startTimeStamp: Long, endTimeStamp: Long): Flow<List<EventEntity>>
1

There are 1 best solutions below

0
nomisRev On

If both single() and toList().flatten() aren't working for you then the Flow is not emitting any items.

single() should results in:

  • The single item of the Flow
  • NoSuchElementException for empty flow
  • IllegalStateException for flow that contains more than one element.
  • Or never returns a value for infinite stream that never emits a value.

This last case is possible if for example you're streaming a query from Android Room, SqlDelight or Kafka but the stream remains empty forever since the table remains empty or no events are published to Kafka.

toList().flatten() should result in:

  • The flattened List of all List values in the Flow.
  • Hangs forever if streaming from Room, SqlDelight, Kafka, etc. Anything that provides an infinite stream.

To make an infinite stream finite you can use the take operator.

Let's take an example:

val infinite: Flow<List<Int>> =
  flow {
    while(true) {
      emit(listOf(1, 2, 3))
    }
  }

val res: List<Int> =
  infinite
    .take(3)
    .toList()
    .flatten() // listOf(1, 2, 3, 1, 2, 3, 1, 2, 3)

In case that the stream is infinite but never emits a value, this will hang. Similar to for single. This could be fixed using a timeout to prevent it hanging forever. It's not an ideal situation, since it will always take the timeout duration to get a result, but at least it will terminate.

val empty: Flow<List<Int>> =
  callbackFlow {
     // await forever, never terminate or emit
    awaitClose { }
  }

val res =
  withTimeoutOrNull(1.seconds) {
   empty
     .take(3)
     .toList().flatten() // same applies for single
  } ?: emptyList() // emptyList()