I'm trying to write a notification system between a server and multiple clients using gRPC server streaming in protobuf-net.grpc (.NET Framework 4.8).
I based my service off of this example. However, if I understand the example correctly, it is only able to handle a single subscriber (as _subscriber
is a member variable of the StockTickerService
class).
My test service looks like this:
private readonly INotificationService _notificationService;
private readonly Channel<Notification> _channel;
public ClientNotificationService(INotificationService notificationService)
{
_notificationService = notificationService;
_notificationService.OnNotification += OnNotification;
_channel = Channel.CreateUnbounded<Notification>();
}
private async void OnNotification(object sender, Notification notification)
{
await _channel.Writer.WriteAsync(notification);
}
public IAsyncEnumerable<Notification> SubscribeAsync(CallContext context = default)
{
return _channel.AsAsyncEnumerable(context.CancellationToken);
}
INotificationService
just has an event OnNotification
, which is fired when calling its Notify
method.
I then realized that System.Threading.Channels implements the Producer/Consumer pattern, but I need the Publisher/Subscriber pattern. When trying it out, indeed only one of the clients gets notified, instead of all of them.
It would also be nice if the server knew when a client disconnects, which seems impossible when returning _channel.AsAsyncEnumerable
.
So how can I modify this in order to
- serve multiple clients, with all of them being notified when
OnNotification
is called - and log when a client disconnects?
For 1, you'd need an implementation of a publisher/subscriber API; each call to
SubscribeAsync
will always represent a single conversation between gRPC endpoints, so you'll need your own mechanism for broadcasting that to multiple consumers. Maybe RX is worth investigating thereFor 2,
context.CancellationToken
should be triggered by client-disconnect