Is it in general dubious to call Task.Factory.StartNew(async () => {}) in Subscribe?

774 Views Asked by At

I have a situation where I need to use a custom scheduler to run tasks (these need to be tasks) and the scheduler does not set a synchronization context (so no ObserveOn, SubscribeOn, SynchronizationContextScheduler etc. I gather). The following is how I ended up doing it. Now, I wonder, I'm not really sure if this is the fittest way of doing asynchronous calls and awaiting their results. Is this all right or is there a more robust or idiomatic way?

var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
    Task.Factory.StartNew(async () =>
    {
        return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);          
});

What if awaiting wouldn't be needed?

<edit: I found a concrete, and a simplified example of what I'm doing here. Basically I'm using Rx in Orleans and the above code is bare-bones illustration of what I'm up to. Though I'm also interested in this situation in general too.

The final code It turns out this was a bit tricky in the Orleans context. I don't see how I could get to use ObserveOn, which would be just the thing I'd like to use. The problem is that by using it, the Subscribe would never get called. The code:

var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
    //... we need to set the custom scheduler here explicitly anyway.
    //See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
    //Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
    //in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
    //the following .Subscribe wouldn't be called. 
    return Task.Factory.StartNew(async () =>
    { 
       //In reality this is an asynchronous grain call. Doing the "shorthand way"
       //(and optionally using ObserveOn) would get the grain called, but not the
       //following .Subscribe. 
       return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
    Trace.WriteLine(i);
});

Also, a link to a related thread at Codeplex Orleans forums.

2

There are 2 best solutions below

1
On BEST ANSWER

I strongly recommend against StartNew for any modern code. It does have a use case, but it's very rare.

If you have to use a custom task scheduler, I recommend using ObserveOn with a TaskPoolScheduler constructed from a TaskFactory wrapper around your scheduler. That's a mouthful, so here's the general idea:

var factory = new TaskFactory(customScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...

Then you could use SelectMany to start an asynchronous operation for each event in a source stream as they arrive.

An alternative, less ideal solution is to use async void for your subscription "events". This is acceptable, but you have to watch your error handling. As a general rule, don't allow exceptions to propagate out of an async void method.

There is a third alternative, where you hook an observable into a TPL Dataflow block. A block like ActionBlock can specify its task scheduler, and Dataflow naturally understands asynchronous handlers. Note that by default, Dataflow blocks will throttle the processing to a single element at a time.

5
On

Generally speaking, instead of subscribing to execute, it's better/more idiomatic to project the task parameters into the task execution and subscribe just for the results. That way you can compose with further Rx downstream.

e.g. Given a random task like:

static async Task<int> DoubleAsync(int i, Random random)
{
    Console.WriteLine("Started");
    await Task.Delay(TimeSpan.FromSeconds(random.Next(10) + 1));
    return i * 2;
}

Then you might do:

void Main()
{
    var random = new Random();

    // stream of task parameters
    var source = Observable.Range(1, 5);

    // project the task parameters into the task execution, collect and flatten results
    source.SelectMany(i => DoubleAsync(i, random))

          // subscribe just for results, which turn up as they are done
          // gives you flexibility to continue the rx chain here
          .Subscribe(result => Console.WriteLine(result),
                    () => Console.WriteLine("All done."));
}