StaTaskScheduler and STA thread message pumping

avo picture avo · Jan 19, 2014 · Viewed 10.5k times · Source

TL;DR: A deadlock inside a task run by StaTaskScheduler. Long version:

I'm using StaTaskScheduler from ParallelExtensionsExtras by Parallel Team, to host some legacy STA COM objects supplied by a third party. The description of the StaTaskScheduler implementation details says the following:

The good news is that TPL’s implementation is able to run on either MTA or STA threads, and takes into account relevant differences around underlying APIs like WaitHandle.WaitAll (which only supports MTA threads when the method is provided multiple wait handles).

I thought that would mean the blocking parts of TPL would use a wait API which pumps messages, like CoWaitForMultipleHandles, to avoid deadlock situations when called on an STA thread.

In my situation, I believe the following is happening: in-proc STA COM object A makes a call to out-of-proc object B, then expects a callback from B via as a part of the outgoing call.

In a simplified form:

var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);

The problem is, a.Method(b) never returns. As far as I can tell, this happens because a blocking wait somewhere inside BlockingCollection<Task> does not pump messages, so my assumption about the quoted statement is probably wrong.

EDITED The same code works when is executed on the UI thread of the test WinForms application (that is, providing TaskScheduler.FromCurrentSynchronizationContext() instead of staTaskScheduler to Task.Factory.StartNew).

What is the right way to solve this? Should I implemented a custom synchronization context, which would explicitly pump messages with CoWaitForMultipleHandles, and install it on each STA thread started by StaTaskScheduler?

If so, will the underlying implementation of BlockingCollection be calling my SynchronizationContext.Wait method? Can I use SynchronizationContext.WaitHelper to implement SynchronizationContext.Wait?


EDITED with some code showing that a managed STA thread doesn't pump when doing a blocking wait. The code is a complete console app ready for copy/paste/run:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding(); 

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}

This produces the output:

Testing without pumping...
The collection argument is empty and has been marked as complete with regards to additions.

Test with pumping...
The collection argument is empty and has been marked as complete with regards to additions.
Now start pumping...
WM_TEST processed
Press Enter to exit

Answer

noseratio picture noseratio · Jan 27, 2014

My understanding of your problem: you are using StaTaskScheduler only to organize the classic COM STA apartment for your legacy COM objects. You're not running a WinForms or WPF core message loop on the STA thread of StaTaskScheduler. That is, you're not using anything like Application.Run, Application.DoEvents or Dispatcher.PushFrame inside that thread. Correct me if this is a wrong assumption.

By itself, StaTaskScheduler doesn't install any synchronization context on the STA threads it creates. Thus, you're relying upon the CLR to pump messages for you. I've only found an implicit confirmation that the CLR pumps on STA threads, in Apartments and Pumping in the CLR by Chris Brumme:

I keep saying that managed blocking will perform “some pumping” when called on an STA thread. Wouldn’t it be great to know exactly what will get pumped? Unfortunately, pumping is a black art which is beyond mortal comprehension. On Win2000 and up, we simply delegate to OLE32’s CoWaitForMultipleHandles service.

This indicates the CLR uses CoWaitForMultipleHandles internally for STA threads. Further, the MSDN docs for COWAIT_DISPATCH_WINDOW_MESSAGES flag mention this:

... in STA is only a small set of special-cased messages dispatched.

I did some research on that, but could not get to pump the WM_TEST from your sample code with CoWaitForMultipleHandles, we discussed that in the comments to your question. My understanding is, the aforementioned small set of special-cased messages is really limited to some COM marshaller-specific messages, and doesn't include any regular general-purpose messages like your WM_TEST.

So, to answer your question:

... Should I implemented a custom synchronization context, which would explicitly pump messages with CoWaitForMultipleHandles, and install it on each STA thread started by StaTaskScheduler?

Yes, I believe that creating a custom synchronization context and overriding SynchronizationContext.Wait is indeed the right solution.

However, you should avoid using CoWaitForMultipleHandles, and use MsgWaitForMultipleObjectsEx instead. If MsgWaitForMultipleObjectsEx indicates there's a pending message in the queue, you should manually pump it with PeekMessage(PM_REMOVE) and DispatchMessage. Then you should continue waiting for the handles, all inside the same SynchronizationContext.Wait call.

Note there's a subtle but important difference between MsgWaitForMultipleObjectsEx and MsgWaitForMultipleObjects. The latter doesn't return and keeps blocking, if there's a message already seen in the queue (e.g., with PeekMessage(PM_NOREMOVE) or GetQueueStatus), but not removed. That's not good for pumping, because your COM objects might be using something like PeekMessage to inspect the message queue. That might later cause MsgWaitForMultipleObjects to block when not expected.

OTOH, MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE flag doesn't have such shortcoming, and would return in this case.

A while ago I created a custom version of StaTaskScheduler (available here as ThreadAffinityTaskScheduler) in attempt to solve a different problem: maintaining a pool of threads with thread affinity for subsequent await continuations. The thread affinity is vital if you use STA COM objects across multiple awaits. The original StaTaskScheduler exhibits this behavior only when its pool is limited to 1 thread.

So I went ahead and did some more experimenting with your WM_TEST case. Originally, I installed an instance of the standard SynchronizationContext class on the STA thread. The WM_TEST message didn't get pumped, which was expected.

Then I overridden SynchronizationContext.Wait to just forward it to SynchronizationContext.WaitHelper. It did get called, but still didn't pump.

Finally, I implemented a full-featured message pump loop, here's the core part of it:

// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}

This does work, WM_TEST gets pumped. Below is an adapted version of your test:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

The output:

Initial thread #9
On STA thread #10
Post some WM_TEST messages...
Press Enter to continue...
WM_TEST processed: 1
WM_TEST processed: 2
WM_TEST processed: 3

After await, thread #10
Pending messages in the queue: False
Exiting STA thread #10
Current thread #12
Press any key to exit

Note this implementation supports both the thread affinity (it stays on the thread #10 after await) and the message pumping. The full source code contains re-usable parts (ThreadAffinityTaskScheduler and ThreadWithAffinityContext) and is available here as self-contained console app. It hasn't been thoroughly tested, so use it at your own risk.