I have a gRPC service implemented using tonic which returns a stream of values. This stream is created inside a tokio task and send over to client using a tokio mpsc channel.
Problem is that the spawned task which is sending the partial results is not aborted after client disconnected and the Receiver is dropped causing errors when sending to the channel.
Simplified code:
#[tonic::async_trait]
impl ServiceTrait for MyService {
type MyStream = BoxStream<'static, Result<MyResponse, tonic::Status>>;
async fn get_stream(
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<Self::MyStream>, tonic::Status> {
let (tx, rx) = mpsc::channel::<Result<MyResponse, tonic::Status>>(1);
// I need this task to be aborted when rx is dropped
let producer_task_handle = tokio::spawn({
// spawn many parallel tasks with ratelimiting
...
// each task sends its result to tx
tx.send(response).await.unwrap() // panics when rx is dropped after client disconnects
});
Ok(tonic::Response::new(ReceiverStream::new(rx).boxed()))
}
}
How can I abort the producer task when the channel is closed? Or is there a better way to do this? I had a working version which returned streams, but that is no longer an option.
After some consideration I decided to use
CancellationTokenwithDropGuardWrapping the receiver stream in a structure with the
DropGuardembedded ensures that once the stream is dropped, the cancellation token is cancelled and the task can be abortedUsage: