I'd like to run a Task that has a "heartbeat" that keeps running at a specific time interval until the task completes.
I'm thinking an extension method like this would work well:
public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken)
For example:
public class Program {
public static void Main() {
var cancelTokenSource = new CancellationTokenSource();
var cancelToken = cancelTokenSource.Token;
var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken);
withHeartbeatTask.Wait();
Console.WriteLine("Long running task completed!");
Console.ReadLine()
}
private static void SomeLongRunningTask() {
Console.WriteLine("Starting long task");
Thread.Sleep(TimeSpan.FromSeconds(9.5));
}
private static int _heartbeatCount = 0;
private static void PerformHeartbeat(CancellationToken cancellationToken) {
Console.WriteLine("Heartbeat {0}", ++_heartbeatCount);
}
}
This program should output:
Starting long task
Heartbeat 1
Heartbeat 2
Heartbeat 3
Heartbeat 4
Heartbeat 5
Heartbeat 6
Heartbeat 7
Heartbeat 8
Heartbeat 9
Long running task completed!
Note that it should not (under normal circumstances) output "Heartbeat 10" since the heartbeat starts after the initial timeout (i.e. 1 second). Similarly, if the task takes less time than the heartbeat interval, the heartbeat should not occur at all.
What is a good way to implement this?
Background information: I have a service that's listening to an Azure Service Bus queue. I'd like to not Complete the message (which would permanently remove it from the queue) until I finish processing it, which could take longer than the maximum message LockDuration of 5 minutes. Thus, I need to use this heartbeat approach to call RenewLockAsync before the lock duration expires so that the message doesn't timeout while lengthy processing is occurring.
Here's my attempt:
public static class TaskExtensions {
/// <summary>
/// Issues the <paramref name="heartbeatAction"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
/// </summary>
public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) {
return;
}
var stopHeartbeatSource = new CancellationTokenSource();
cancellationToken.Register(stopHeartbeatSource.Cancel);
await Task.WhenAny(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatAction, stopHeartbeatSource.Token));
stopHeartbeatSource.Cancel();
}
private static async Task PerformHeartbeats(TimeSpan interval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
try {
await Task.Delay(interval, cancellationToken);
if (!cancellationToken.IsCancellationRequested) {
heartbeatAction(cancellationToken);
}
}
catch (TaskCanceledException tce) {
if (tce.CancellationToken == cancellationToken) {
// Totally expected
break;
}
throw;
}
}
}
}
or with a slight tweak, you can even make the heartbeat async as in:
/// <summary>
/// Awaits a fresh Task created by the <paramref name="heartbeatTaskFactory"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
/// </summary>
public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) {
return;
}
var stopHeartbeatSource = new CancellationTokenSource();
cancellationToken.Register(stopHeartbeatSource.Cancel);
await Task.WhenAll(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatTaskFactory, stopHeartbeatSource.Token));
if (!stopHeartbeatSource.IsCancellationRequested) {
stopHeartbeatSource.Cancel();
}
}
public static Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory) {
return WithHeartbeat(primaryTask, heartbeatInterval, heartbeatTaskFactory, CancellationToken.None);
}
private static async Task PerformHeartbeats(TimeSpan interval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
try {
await Task.Delay(interval, cancellationToken);
if (!cancellationToken.IsCancellationRequested) {
await heartbeatTaskFactory(cancellationToken);
}
}
catch (TaskCanceledException tce) {
if (tce.CancellationToken == cancellationToken) {
// Totally expected
break;
}
throw;
}
}
}
which would allow you to change the sample code to something like this:
private static async Task PerformHeartbeat(CancellationToken cancellationToken) {
Console.WriteLine("Starting heartbeat {0}", ++_heartbeatCount);
await Task.Delay(1000, cancellationToken);
Console.WriteLine("Finishing heartbeat {0}", _heartbeatCount);
}
The PerformHeartbeat could be replaced with an async call like RenewLockAsync so that you wouldn't have to waste thread time using a blocking call like RenewLock that the Action approach would require.
I'm answering my own question per SO guidelines, but I'm also open to more elegant approaches to this problem.