I am using a Task for a long-running asynchronous processing operation that I want to be able to pause and resume at arbitrary moments. Luckily for me one of the TPL's own authors at Microsoft already came up with a solution to this problem. The only trouble is his solution doesn't work properly.
When you take out the await Task.Delay(100) in the code below, the code will stop honoring pause requests after the very first one. It appears the SomeMethodAsync code resumes execution on the same thread as the other task if the value of Thread.CurrentThread.ManagedThreadId is to be believed. Also the output of SomeMethodAsync suggests that it is running on several threads.
I have always found the TPL rather confusing and difficult to work with and async/await even more so, so I am having a hard time understanding what's even going on here. I'd be very grateful if anyone could explain.
Minimalist example code:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PauseTokenTest {
class Program {
static void Main() {
var pts = new PauseTokenSource();
Task.Run(() =>
{
while (true) {
Console.ReadLine();
Console.WriteLine(
$"{Thread.CurrentThread.ManagedThreadId}: Pausing task");
pts.IsPaused = !pts.IsPaused;
}
});
SomeMethodAsync(pts.Token).Wait();
}
public static async Task SomeMethodAsync(PauseToken pause) {
for (int i = 0; ; i++) {
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: {i}");
// Comment this out and repeatedly pausing and resuming will no longer work.
await Task.Delay(100);
await pause.WaitWhilePausedAsync();
}
}
}
public class PauseTokenSource {
internal static readonly Task s_completedTask =
Task.FromResult(true);
volatile TaskCompletionSource<bool> m_paused;
public bool IsPaused {
get { return m_paused != null; }
set {
if (value) {
Interlocked.CompareExchange(
ref m_paused, new TaskCompletionSource<bool>(), null);
} else {
while (true) {
var tcs = m_paused;
if (tcs == null) return;
if (Interlocked.CompareExchange(ref m_paused, null, tcs) == tcs) {
tcs.SetResult(true);
break;
}
}
}
}
}
public PauseToken Token { get { return new PauseToken(this); } }
internal Task WaitWhilePausedAsync() {
var cur = m_paused;
return cur != null ? cur.Task : s_completedTask;
}
}
public struct PauseToken {
readonly PauseTokenSource m_source;
internal PauseToken(PauseTokenSource source) {
m_source = source;
}
public bool IsPaused {
get { return m_source != null && m_source.IsPaused; }
}
public Task WaitWhilePausedAsync() {
return IsPaused ? m_source.WaitWhilePausedAsync() :
PauseTokenSource.s_completedTask;
}
}
}
This happens because of the way task continuations are executed. In many places in the framework, task continuations are executed synchronously when possible. It's not always possible, but one place where it does become common is on thread pool threads, which are often treated as though they're exchangeable.
awaitis one of those places where synchronous continuations are used; this is not officially documented anywhere AFAIK but I describe the behavior on my blog.So, what is essentially happening is this:
SomeMethodAsyncis paused (awaiting the task returned fromWaitWhilePausedAsync- without a context), which attaches the rest ofSomeMethodAsyncas a continuation on that task. Then when theTask.Run(thread pool) thread togglesIsPaused, it completes the task that was returned fromWaitWhilePausedAsync. The runtime then looks at the context associated with that continuation (none), so that continuation is considered compatible with any thread pool thread. And hey, the current thread is a thread pool thread! So it just runs the continuation directly.The fun part about that is that without another
await, theSomeMethodAsyncmethod becomes entirely synchronous: it always retrieves an already-completed task fromWaitWhilePausedAsync, so it just continues executing its infinite loop forever. As a reminder,awaitbehaves synchronously with already-completed tasks (as I describe on my blog). This infinite loop is running within theIsPausedsetter, so theTask.Runcode never continues its loop to callReadLineagain.If
SomeMethodAsyncdoes have anawait, then it can behave asynchronously, returning back to its caller (completing the continuation), and allowing theTask.Runloop to continue executing.As suggested by Theodor Zoulias, passing the
TaskCreationOptions.RunContinuationsAsynchronouslyflag to theTaskCompletionSource<bool>will also work. In that case, the task continuation is run asynchronously (on a separate thread pool thread), rather than executed directly on the calling thread.IIRC, the blog post you referenced predates the
RunContinuationsAsynchronouslyflag. It also predates the (non-generic)TaskCompletionSource.Also as suggested by Theodor Zoulias, I have a
PauseTokenSourcein my Nito.AsyncEx library that avoids this issue. It also usesRunContinuationsAsynchronously.