I have some library (socket networking) code that provides a Task
-based API for pending responses to requests, based on TaskCompletionSource<T>
. However, there's an annoyance in the TPL in that it seems to be impossible to prevent synchronous continuations. What I would like to be able to do is either:
TaskCompletionSource<T>
that is should not allow callers to attach with TaskContinuationOptions.ExecuteSynchronously
, orSetResult
/ TrySetResult
) in a way that specifies that TaskContinuationOptions.ExecuteSynchronously
should be ignored, using the pool insteadSpecifically, the issue I have is that the incoming data is being processed by a dedicated reader, and if a caller can attach with TaskContinuationOptions.ExecuteSynchronously
they can stall the reader (which affects more than just them). Previously, I have worked around this by some hackery that detects whether any continuations are present, and if they are it pushes the completion onto the ThreadPool
, however this has significant impact if the caller has saturated their work queue, as the completion will not get processed in a timely fashion. If they are using Task.Wait()
(or similar), they will then essentially deadlock themselves. Likewise, this is why the reader is on a dedicated thread rather than using workers.
So; before I try and nag the TPL team: am I missing an option?
Key points:
ThreadPool
as an implementation, as it needs to work when the pool is saturatedThe example below produces output (ordering may vary based on timing):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
The problem is the fact that a random caller managed to get a continuation on "Main thread". In the real code, this would be interrupting the primary reader; bad things!
Code:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
New in .NET 4.6:
.NET 4.6 contains a new TaskCreationOptions
: RunContinuationsAsynchronously
.
Since you're willing to use Reflection to access private fields...
You can mark the TCS's Task with the TASK_STATE_THREAD_WAS_ABORTED
flag, which would cause all continuations not to be inlined.
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Edit:
Instead of using Reflection emit, I suggest you use expressions. This is much more readable and has the advantage of being PCL-compatible:
var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
Expression.Lambda<Action<Task>>(
Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Without using Reflection:
If anyone's interested, I've figured out a way to do this without Reflection, but it is a bit "dirty" as well, and of course carries a non-negligible perf penalty:
try
{
Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
source.TrySetResult(123);
Thread.ResetAbort();
}