Async/await as a replacement of coroutines

jakobbotsch picture jakobbotsch · Apr 4, 2014 · Viewed 9.8k times · Source

I use C# iterators as a replacement for coroutines, and it has been working great. I want to switch to async/await as I think the syntax is cleaner and it gives me type safety. In this (outdated) blog post, Jon Skeet shows a possible way to implement it.

I chose to go a slightly different way (by implementing my own SynchronizationContext and using Task.Yield). This worked fine.

Then I realized there would be a problem; currently a coroutine doesn't have to finish running. It can be stopped gracefully at any point where it yields. We might have code like this:

private IEnumerator Sleep(int milliseconds)
{
    Stopwatch timer = Stopwatch.StartNew();
    do
    {
        yield return null;
    }
    while (timer.ElapsedMilliseconds < milliseconds);
}

private IEnumerator CoroutineMain()
{
    try
    {
        // Do something that runs over several frames
        yield return Coroutine.Sleep(5000);
    }
    finally
    {
        Log("Coroutine finished, either after 5 seconds, or because it was stopped");
    }
}

The coroutine works by keeping track of all enumerators in a stack. The C# compiler generates a Dispose function which can be called to ensure that the 'finally' block is correctly invoked in CoroutineMain, even if the enumeration isn't finished. This way we can stop a coroutine gracefully, and still ensure finally blocks are invoked, by calling Dispose on all the IEnumerator objects on the stack. This is basically manually unwinding.

When I wrote my implementation with async/await I realized that we would lose this feature, unless I'm mistaken. I then looked up other coroutine solutions, and it doesn't look like Jon Skeet's version handles it in any way either.

The only way I can think of to handle this would be to have our own custom 'Yield' function, which would check if the coroutine was stopped, and then raise an exception that indicated this. This would propagate up, executing finally blocks, and then be caught somewhere near the root. I don't find this pretty though, as 3rd party code could potentially catch the exception.

Am I misunderstanding something, and is this possible to do in an easier way? Or do I need to go the exception way to do this?

EDIT: More information/code has been requested, so here's some. I can guarantee this is going to be running on only a single thread, so there's no threading involved here. Our current coroutine implementation looks a bit like this (this is simplified, but it works in this simple case):

public sealed class Coroutine : IDisposable
{
    private class RoutineState
    {
        public RoutineState(IEnumerator enumerator)
        {
            Enumerator = enumerator;
        }

        public IEnumerator Enumerator { get; private set; }
    }

    private readonly Stack<RoutineState> _enumStack = new Stack<RoutineState>();

    public Coroutine(IEnumerator enumerator)
    {
        _enumStack.Push(new RoutineState(enumerator));
    }

    public bool IsDisposed { get; private set; }

    public void Dispose()
    {
        if (IsDisposed)
            return;

        while (_enumStack.Count > 0)
        {
            DisposeEnumerator(_enumStack.Pop().Enumerator);
        }

        IsDisposed = true;
    }

    public bool Resume()
    {
        while (true)
        {
            RoutineState top = _enumStack.Peek();
            bool movedNext;

            try
            {
                movedNext = top.Enumerator.MoveNext();
            }
            catch (Exception ex)
            {
                // Handle exception thrown by coroutine
                throw;
            }

            if (!movedNext)
            {
                // We finished this (sub-)routine, so remove it from the stack
                _enumStack.Pop();

                // Clean up..
                DisposeEnumerator(top.Enumerator);


                if (_enumStack.Count <= 0)
                {
                    // This was the outer routine, so coroutine is finished.
                    return false;
                }

                // Go back and execute the parent.
                continue;
            }

            // We executed a step in this coroutine. Check if a subroutine is supposed to run..
            object value = top.Enumerator.Current;
            IEnumerator newEnum = value as IEnumerator;
            if (newEnum != null)
            {
                // Our current enumerator yielded a new enumerator, which is a subroutine.
                // Push our new subroutine and run the first iteration immediately
                RoutineState newState = new RoutineState(newEnum);
                _enumStack.Push(newState);

                continue;
            }

            // An actual result was yielded, so we've completed an iteration/step.
            return true;
        }
    }

    private static void DisposeEnumerator(IEnumerator enumerator)
    {
        IDisposable disposable = enumerator as IDisposable;
        if (disposable != null)
            disposable.Dispose();
    }
}

Assume we have code like the following:

private IEnumerator MoveToPlayer()
{
  try
  {
    while (!AtPlayer())
    {
      yield return Sleep(500); // Move towards player twice every second
      CalculatePosition();
    }
  }
  finally
  {
    Log("MoveTo Finally");
  }
}

private IEnumerator OrbLogic()
{
  try
  {
    yield return MoveToPlayer();
    yield return MakeExplosion();
  }
  finally
  {
    Log("OrbLogic Finally");
  }
}

This would be created by passing an instance of the OrbLogic enumerator to a Coroutine, and then running it. This allows us to tick the coroutine every frame. If the player kills the orb, the coroutine doesn't finish running; Dispose is simply called on the coroutine. If MoveTo was logically in the 'try' block, then calling Dispose on the top IEnumerator will, semantically, make the finally block in MoveTo execute. Then afterwards the finally block in OrbLogic will execute. Note that this is a simple case and the cases are much more complex.

I am struggling to implement similar behavior in the async/await version. The code for this version looks like this (error checking omitted):

public class Coroutine
{
    private readonly CoroutineSynchronizationContext _syncContext = new CoroutineSynchronizationContext();

    public Coroutine(Action action)
    {
        if (action == null)
            throw new ArgumentNullException("action");

        _syncContext.Next = new CoroutineSynchronizationContext.Continuation(state => action(), null);
    }

    public bool IsFinished { get { return !_syncContext.Next.HasValue; } }

    public void Tick()
    {
        if (IsFinished)
            throw new InvalidOperationException("Cannot resume Coroutine that has finished");

        SynchronizationContext curContext = SynchronizationContext.Current;
        try
        {
            SynchronizationContext.SetSynchronizationContext(_syncContext);

            // Next is guaranteed to have value because of the IsFinished check
            Debug.Assert(_syncContext.Next.HasValue);

            // Invoke next continuation
            var next = _syncContext.Next.Value;
            _syncContext.Next = null;

            next.Invoke();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(curContext);
        }
    }
}

public class CoroutineSynchronizationContext : SynchronizationContext
{
    internal struct Continuation
    {
        public Continuation(SendOrPostCallback callback, object state)
        {
            Callback = callback;
            State = state;
        }

        public SendOrPostCallback Callback;
        public object State;

        public void Invoke()
        {
            Callback(State);
        }
    }

    internal Continuation? Next { get; set; }

    public override void Post(SendOrPostCallback callback, object state)
    {
        if (callback == null)
            throw new ArgumentNullException("callback");

        if (Current != this)
            throw new InvalidOperationException("Cannot Post to CoroutineSynchronizationContext from different thread!");

        Next = new Continuation(callback, state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        throw new NotSupportedException();
    }

    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
    {
        throw new NotSupportedException();
    }

    public override SynchronizationContext CreateCopy()
    {
        throw new NotSupportedException();
    }
}

I don't see how to implement similar behavior to the iterator version using this. Apologies in advance for the lengthy code!

EDIT 2: The new method seems to be working. It allows me to do stuff like:

private static async Task Test()
{
    // Second resume
    await Sleep(1000);
    // Unknown how many resumes
}

private static async Task Main()
{
    // First resume
    await Coroutine.Yield();
    // Second resume
    await Test();
}

Which provides a very nice way of building AI for games.

Answer

noseratio picture noseratio · Apr 4, 2014

Updated, a follow-up blog post: Asynchronous coroutines with C# 8.0 and IAsyncEnumerable.


I use C# iterators as a replacement for coroutines, and it has been working great. I want to switch to async/await as I think the syntax is cleaner and it gives me type safety...

IMO, it's a very interesting question, although it took me awhile to fully understand it. Perhaps, you didn't provide enough sample code to illustrate the concept. A complete app would help, so I'll try to fill this gap first. The following code illustrates the usage pattern as I understood it, please correct me if I'm wrong:

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303

    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        private IEnumerator Sleep(int milliseconds)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    yield return null;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
        }

        void EnumeratorTest()
        {
            var enumerator = Sleep(100);
            enumerator.MoveNext();
            Thread.Sleep(500);
            //while (e.MoveNext());
            ((IDisposable)enumerator).Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().EnumeratorTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }
    }
}

Here, Resource.Dispose gets called because of ((IDisposable)enumerator).Dispose(). If we don't call enumerator.Dispose(), then we'll have to uncomment //while (e.MoveNext()); and let the iterator finish gracefully, for proper unwinding.

Now, I think the best way to implement this with async/await is to use a custom awaiter:

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303
    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        async Task SleepAsync(int milliseconds, Awaiter awaiter)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    await awaiter;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
            Console.WriteLine("Exit SleepAsync");
        }

        void AwaiterTest()
        {
            var awaiter = new Awaiter();
            var task = SleepAsync(100, awaiter);
            awaiter.MoveNext();
            Thread.Sleep(500);

            //while (awaiter.MoveNext()) ;
            awaiter.Dispose();
            task.Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().AwaiterTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }

        // custom awaiter
        public class Awaiter :
            System.Runtime.CompilerServices.INotifyCompletion,
            IDisposable
        {
            Action _continuation;
            readonly CancellationTokenSource _cts = new CancellationTokenSource();

            public Awaiter()
            {
                Console.WriteLine("Awaiter()");
            }

            ~Awaiter()
            {
                Console.WriteLine("~Awaiter()");
            }

            public void Cancel()
            {
                _cts.Cancel();
            }

            // let the client observe cancellation
            public CancellationToken Token { get { return _cts.Token; } }

            // resume after await, called upon external event
            public bool MoveNext()
            {
                if (_continuation == null)
                    return false;

                var continuation = _continuation;
                _continuation = null;
                continuation();
                return _continuation != null;
            }

            // custom Awaiter methods
            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get { return false; }
            }

            public void GetResult()
            {
                this.Token.ThrowIfCancellationRequested();
            }

            // INotifyCompletion
            public void OnCompleted(Action continuation)
            {
                _continuation = continuation;
            }

            // IDispose
            public void Dispose()
            {
                Console.WriteLine("Awaiter.Dispose()");
                if (_continuation != null)
                {
                    Cancel();
                    MoveNext();
                }
            }
        }
    }
}

When it's time to unwind, I request the cancellation inside Awaiter.Dispose and drive the state machine to the next step (if there's a pending continuation). This leads to observing the cancellation inside Awaiter.GetResult (which is called by the compiler-generated code). That throws TaskCanceledException and further unwinds the using statement. So, the Resource gets properly disposed of. Finally, the task transitions to the cancelled state (task.IsCancelled == true).

IMO, this is a more simple and direct approach than installing a custom synchronization context on the current thread. It can be easily adapted for multithreading (some more details here).

This should indeed give you more freedom than with IEnumerator/yield. You could use try/catch inside your coroutine logic, and you can observe exceptions, cancellation and the result directly via the Task object.

Updated, AFAIK there is no analogy for the iterator's generated IDispose, when it comes to async state machine. You really have to drive the state machine to an end when you want to cancel/unwind it. If you want to account for some negligent use of try/catch preventing the cancellation, I think the best you could do is to check if _continuation is non-null inside Awaiter.Cancel (after MoveNext) and throw a fatal exception out-of-the-band (using a helper async void method).