How to use async func inside Nethereum StreamingWebSocketClient subscription

76 Views Asked by At

After getting log inside subscribe method I want to call async function, but Subscribe function only takes Action<FilterLog>, so using async-await keyword is not possible.

How can I use await keyword inside this subscription ?

code example:

public static async Task GetLogsTokenTransfer_Observable_Subscription()
    {
        using(var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws"))
        { 
            var filterTransfers = Event<TransferEventDTO>.GetEventABI().CreateFilterInput();
            var subscription = new EthLogsObservableSubscription(client);

            subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(log =>
            {

                var decoded = Event<TransferEventDTO>.DecodeEvent(log);
                if (decoded != null)
                {
                    MyAsyncMethodHere(); // Can not use await keyword !!!!
                }
            });

            await client.StartAsync();
            await subscription.SubscribeAsync(filterTransfers);
            await Task.Delay(TimeSpan.FromMinutes(1));
            await subscription.UnsubscribeAsync();
            await Task.Delay(TimeSpan.FromSeconds(5));
        }
    }
2

There are 2 best solutions below

0
Kamil Kiełbasa On BEST ANSWER

I'm not sure that this is the best approach but in my applications, i'm using Observable.FromAsync. Smth like this:

subscription.GetSubscriptionDataResponsesAsObservable()
   .Select(log => Observable.FromAsync(async () => {
       await ProcessEvent(log);
   }))
   .Concat()
   .Subscribe();

The Concat method is pretty important here, because it's ensures that the will be no overlapping in task execution

0
Auri On

Nothing is preventing you from using async operations in the observer.

        observable.Subscribe(async data =>
        {
            Console.WriteLine("Data received");

            await Task.Delay(1000);

            Console.WriteLine("Data processed");
        });