TL;DR: A deadlock inside a task run by StaTaskScheduler
. Long version:
I'm using StaTaskScheduler
from ParallelExtensionsExtras by Parallel Team, to host some legacy STA COM objects supplied by a third party. The description of the StaTaskScheduler
implementation details says the following:
The good news is that TPL’s implementation is able to run on either MTA or STA threads, and takes into account relevant differences around underlying APIs like WaitHandle.WaitAll (which only supports MTA threads when the method is provided multiple wait handles).
I thought that would mean the blocking parts of TPL would use a wait API which pumps messages, like CoWaitForMultipleHandles
, to avoid deadlock situations when called on an STA thread.
In my situation, I believe the following is happening: in-proc STA COM object A makes a call to out-of-proc object B, then expects a callback from B via as a part of the outgoing call.
In a simplified form:
var result = await Task.Factory.StartNew(() =>
{
// in-proc object A
var a = new A();
// out-of-proc object B
var b = new B();
// A calls B and B calls back A during the Method call
return a.Method(b);
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);
The problem is, a.Method(b)
never returns. As far as I can tell, this happens because a blocking wait somewhere inside BlockingCollection<Task>
does not pump messages, so my assumption about the quoted statement is probably wrong.
EDITED The same code works when is executed on the UI thread of the test WinForms application (that is, providing TaskScheduler.FromCurrentSynchronizationContext()
instead of staTaskScheduler
to Task.Factory.StartNew
).
What is the right way to solve this? Should I implemented a custom synchronization context, which would explicitly pump messages with CoWaitForMultipleHandles
, and install it on each STA thread started by StaTaskScheduler
?
If so, will the underlying implementation of BlockingCollection
be calling my SynchronizationContext.Wait
method? Can I use SynchronizationContext.WaitHelper
to implement SynchronizationContext.Wait
?
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleTestApp
{
class Program
{
// start and run an STA thread
static void RunStaThread(bool pump)
{
// test a blocking wait with BlockingCollection.Take
var tasks = new BlockingCollection<Task>();
var thread = new Thread(() =>
{
// Create a simple Win32 window
var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);
// subclass it with a custom WndProc
IntPtr prevWndProc = IntPtr.Zero;
var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
{
if (msg == NativeMethods.WM_TEST)
Console.WriteLine("WM_TEST processed");
return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
});
prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
if (prevWndProc == IntPtr.Zero)
throw new ApplicationException();
// post a test WM_TEST message to it
NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);
// BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
try { var task = tasks.Take(); }
catch (Exception e) { Console.WriteLine(e.Message); }
if (pump)
{
// NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
Console.WriteLine("Now start pumping...");
NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
}
});
thread.SetApartmentState(ApartmentState.STA);
thread.Start();
Thread.Sleep(2000);
// this causes the STA thread to end
tasks.CompleteAdding();
thread.Join();
}
static void Main(string[] args)
{
Console.WriteLine("Testing without pumping...");
RunStaThread(false);
Console.WriteLine("\nTest with pumping...");
RunStaThread(true);
Console.WriteLine("Press Enter to exit");
Console.ReadLine();
}
}
// Interop
static class NativeMethods
{
[DllImport("user32")]
public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);
[DllImport("user32")]
public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);
[DllImport("user32.dll")]
public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);
[DllImport("user32.dll")]
public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);
[DllImport("user32.dll")]
public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);
public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);
public const int GWL_WNDPROC = -4;
public const int WS_POPUP = unchecked((int)0x80000000);
public const int WM_USER = 0x0400;
public const int WM_TEST = WM_USER + 1;
}
}
This produces the output:
Testing without pumping... The collection argument is empty and has been marked as complete with regards to additions. Test with pumping... The collection argument is empty and has been marked as complete with regards to additions. Now start pumping... WM_TEST processed Press Enter to exit
My understanding of your problem: you are using StaTaskScheduler
only to organize the classic COM STA apartment for your legacy COM objects. You're not running a WinForms or WPF core message loop on the STA thread of StaTaskScheduler
. That is, you're not using anything like Application.Run
, Application.DoEvents
or Dispatcher.PushFrame
inside that thread. Correct me if this is a wrong assumption.
By itself, StaTaskScheduler
doesn't install any synchronization context on the STA threads it creates. Thus, you're relying upon the CLR to pump messages for you. I've only found an implicit confirmation that the CLR pumps on STA threads, in Apartments and Pumping in the CLR by Chris Brumme:
I keep saying that managed blocking will perform “some pumping” when called on an STA thread. Wouldn’t it be great to know exactly what will get pumped? Unfortunately, pumping is a black art which is beyond mortal comprehension. On Win2000 and up, we simply delegate to OLE32’s CoWaitForMultipleHandles service.
This indicates the CLR uses CoWaitForMultipleHandles
internally for STA threads. Further, the MSDN docs for COWAIT_DISPATCH_WINDOW_MESSAGES
flag mention this:
... in STA is only a small set of special-cased messages dispatched.
I did some research on that, but could not get to pump the WM_TEST
from your sample code with CoWaitForMultipleHandles
, we discussed that in the comments to your question. My understanding is, the aforementioned small set of special-cased messages is really limited to some COM marshaller-specific messages, and doesn't include any regular general-purpose messages like your WM_TEST
.
So, to answer your question:
... Should I implemented a custom synchronization context, which would explicitly pump messages with CoWaitForMultipleHandles, and install it on each STA thread started by StaTaskScheduler?
Yes, I believe that creating a custom synchronization context and overriding SynchronizationContext.Wait
is indeed the right solution.
However, you should avoid using CoWaitForMultipleHandles
, and use MsgWaitForMultipleObjectsEx
instead. If MsgWaitForMultipleObjectsEx
indicates there's a pending message in the queue, you should manually pump it with PeekMessage(PM_REMOVE)
and DispatchMessage
. Then you should continue waiting for the handles, all inside the same SynchronizationContext.Wait
call.
Note there's a subtle but important difference between MsgWaitForMultipleObjectsEx
and MsgWaitForMultipleObjects
. The latter doesn't return and keeps blocking, if there's a message already seen in the queue (e.g., with PeekMessage(PM_NOREMOVE)
or GetQueueStatus
), but not removed. That's not good for pumping, because your COM objects might be using something like PeekMessage
to inspect the message queue. That might later cause MsgWaitForMultipleObjects
to block when not expected.
OTOH, MsgWaitForMultipleObjectsEx
with MWMO_INPUTAVAILABLE
flag doesn't have such shortcoming, and would return in this case.
A while ago I created a custom version of StaTaskScheduler
(available here as ThreadAffinityTaskScheduler
) in attempt to solve a different problem: maintaining a pool of threads with thread affinity for subsequent await
continuations. The thread affinity is vital if you use STA COM objects across multiple awaits
. The original StaTaskScheduler
exhibits this behavior only when its pool is limited to 1 thread.
So I went ahead and did some more experimenting with your WM_TEST
case. Originally, I installed an instance of the standard SynchronizationContext
class on the STA thread. The WM_TEST
message didn't get pumped, which was expected.
Then I overridden SynchronizationContext.Wait
to just forward it to SynchronizationContext.WaitHelper
. It did get called, but still didn't pump.
Finally, I implemented a full-featured message pump loop, here's the core part of it:
// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
// MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
// even if there's a message already seen but not removed in the message queue
nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
count, waitHandles,
(uint)remainingTimeout,
QS_MASK,
NativeMethods.MWMO_INPUTAVAILABLE);
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
return managedResult;
// there is a message, pump and dispatch it
if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
{
NativeMethods.TranslateMessage(ref msg);
NativeMethods.DispatchMessage(ref msg);
}
if (hasTimedOut())
return WaitHandle.WaitTimeout;
}
This does work, WM_TEST
gets pumped. Below is an adapted version of your test:
public static async Task RunAsync()
{
using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
{
Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
await staThread.Run(async () =>
{
Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
// create a simple Win32 window
IntPtr hwnd = CreateTestWindow();
// Post some WM_TEST messages
Console.WriteLine("Post some WM_TEST messages...");
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
Console.WriteLine("Press Enter to continue...");
await ReadLineAsync();
Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));
Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
}, CancellationToken.None);
}
Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}
The output:
Initial thread #9 On STA thread #10 Post some WM_TEST messages... Press Enter to continue... WM_TEST processed: 1 WM_TEST processed: 2 WM_TEST processed: 3 After await, thread #10 Pending messages in the queue: False Exiting STA thread #10 Current thread #12 Press any key to exit
Note this implementation supports both the thread affinity (it stays on the thread #10 after await
) and the message pumping. The full source code contains re-usable parts (ThreadAffinityTaskScheduler
and ThreadWithAffinityContext
) and is available here as self-contained console app. It hasn't been thoroughly tested, so use it at your own risk.