I'm trying to implement a timeout mechanism on a suspend function which returns a SharedFlow. I was launching a coroutine using CoroutineScope(coroutineContext).launch { } but I've read that it's not a good practice. It's suggested to make function a CoroutineScope extension instead. But it's necessary for me to make that function suspend. How can I implement a timeout mechanism for a suspend function which needs to return the flow?
Here is a simplified version of my implementation:
class Client {
sealed interface State {
object Loading: State
data class Success(val message: String): State
object Error: State
}
interface MyCallListener {
suspend fun onResult(result: State)
}
val methodListeners: MutableMap<String, MyCallListener> = mutableMapOf()
suspend inline fun call(): SharedFlow<State> {
val id = randomUUID()
val flow = MutableSharedFlow<State>()
//onResult is called elsewhere. this way, it's possible to emit a result to the flow
methodListeners[id] = object : MyCallListener {
override suspend fun onResult(result: State) {
flow.emit(result)
}
}
flow.tryEmit(State.Loading)
send(methodMessage) //this is a suspend function which uses a suspend function of ktors websocket session
CoroutineScope(coroutineContext).launch {
var latestValue: State? = null
launch {
flow.collect {
latestValue = it
}
}
delay(5000)
if (latestValue is State.Loading) {
flow.emit(State.Error)
}
}
return flow
}
fun onReceiveMessage(id: String, message: String) {
methodListeners[id]?.onResult(State.Success(message))
}
}
I need to call that function from another suspend function:
class MyUseCase(
val client: Client
) {
suspend fun foo() = client.call()
}
This currently works but since it's not recommended to use CoroutineScope(coroutineContext) and I can't make it a CoroutineScope extension, is there a better way to do this?
Edit: Here is more context on what I'm trying to accomplish. The client is a WebSocket message handler. I'm sending WebSocket messages called "method" with an id, and keep a listener instance with that id. When a result message is received with the same id, I get the listener instance with it and call listener.onResult. I implemented this call function so I can receive the method result with a flow. But sometimes, the server never sends a result message with that id. So the state is stuck as loading state. I wanted to implement a timeout mechanism for that matter.
I’m guessing a little at what you’re trying to do. I think the SharedFlow should be a
valproperty since it’s shared. You don’t want multiple calls to this function silently killing previous flows that it returned (which your code does by replacing the listener each time). And then the listener should not be publicly changeable either.The behavior here is that when you start collecting the flow, it emits states that were set with
updateState(), and if one becomes stale (not updated within 5 seconds) it emits an error state.