I have a situation where I need to use a custom scheduler to run tasks (these need to be tasks) and the scheduler does not set a synchronization context (so no ObserveOn
, SubscribeOn
, SynchronizationContextScheduler
etc. I gather). The following is how I ended up doing it. Now, I wonder, I'm not really sure if this is the fittest way of doing asynchronous calls and awaiting their results. Is this all right or is there a more robust or idiomatic way?
var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
Task.Factory.StartNew(async () =>
{
return await AsynchronousOperation(i);
}, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);
});
What if awaiting wouldn't be needed?
<edit: I found a concrete, and a simplified example of what I'm doing here. Basically I'm using Rx in Orleans and the above code is bare-bones illustration of what I'm up to. Though I'm also interested in this situation in general too.
The final code
It turns out this was a bit tricky in the Orleans context. I don't see how I could get to use ObserveOn
, which would be just the thing I'd like to use. The problem is that by using it, the Subscribe
would never get called. The code:
var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
//... we need to set the custom scheduler here explicitly anyway.
//See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
//Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
//in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
//the following .Subscribe wouldn't be called.
return Task.Factory.StartNew(async () =>
{
//In reality this is an asynchronous grain call. Doing the "shorthand way"
//(and optionally using ObserveOn) would get the grain called, but not the
//following .Subscribe.
return await AsynchronousOperation(i);
}, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
Trace.WriteLine(i);
});
Also, a link to a related thread at Codeplex Orleans forums.
I strongly recommend against
StartNew
for any modern code. It does have a use case, but it's very rare.If you have to use a custom task scheduler, I recommend using
ObserveOn
with aTaskPoolScheduler
constructed from aTaskFactory
wrapper around your scheduler. That's a mouthful, so here's the general idea:Then you could use
SelectMany
to start an asynchronous operation for each event in a source stream as they arrive.An alternative, less ideal solution is to use
async void
for your subscription "events". This is acceptable, but you have to watch your error handling. As a general rule, don't allow exceptions to propagate out of an async void method.There is a third alternative, where you hook an observable into a TPL Dataflow block. A block like
ActionBlock
can specify its task scheduler, and Dataflow naturally understands asynchronous handlers. Note that by default, Dataflow blocks will throttle the processing to a single element at a time.