Howto call back async function from rx subscribe?

Martin Komischke picture Martin Komischke · Apr 11, 2014 · Viewed 15.2k times · Source

I would like to call back an async function within an Rx subscription.

E.g. like that:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

What needs to be done, in order to catch the exception properly?

Answer

reijerh picture reijerh · May 4, 2015

Paul Betts' answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();

Or:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();