I would like to call back an async function within an Rx subscription.
E.g. like that:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
What needs to be done, in order to catch the exception properly?
Paul Betts' answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(l => Observable.FromAsync(asyncMethod))
.Concat()
.Subscribe();
Or:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
.Concat()
.Subscribe();