How to listen for Pub/Sub messages in an ASP.NET Core app continuously?

Mark Vincze picture Mark Vincze · Oct 17, 2017 · Viewed 7k times · Source

I would like to implement an ASP.NET Core API, which is not responding to HTTP requests, but upon startup starts listening to Google Cloud Pub/Sub messages, and it keeps listening indefinitely throughout its lifetime.

What is the preferred way to implement this with the official Pub/Sub SDK?

I can think of two ways:

Approach 1: Just use a SimpleSubscriber, and in the Startup.Configure start listening to messages:

public void Configure(IApplicationBuilder app)
{
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
    var receivedMessages = new List<PubsubMessage>();

    simpleSubscriber.StartAsync((msg, cancellationToken) =>
    {
        // Process the message here.

        return Task.FromResult(SimpleSubscriber.Reply.Ack);
    });

    ...
}

Approach 2: Use a library specifically created to periodically run a job, for example Quartz, Hangfire or FluentScheduler, and every time the job is triggered, pull the new messages with a SubscriberClient.

Which one is the preferred approach? The first one seems simpler, but I'm not sure if it's really reliable.

Answer

Chris picture Chris · Oct 23, 2017

The first approach is definitely how this is intended to be used.

However, see the docs for StartAsync:

Starts receiving messages. The returned Task completes when either StopAsync(CancellationToken) is called or if an unrecoverable fault occurs. This method cannot be called more than once per SubscriberClient instance.

So you do need to handle unexpected StartAsync shutdown on unrecoverable error. The simplest thing to do would be be use an outer loop, although given these errors are considered unrecoverable it is likely something about the call needs to be changed before it can succeed.

The code might look like this:

while (true)
{
    // Each SubscriberClientinstance must only be used once.
    var subscriberClient = await SubscriberClient.CreateAsync(subscriptionName);
    try
    {
        await subscriberClient.StartAsync((msg, cancellationToken) =>
        {
            // Process the message here.
            return Task.FromResult(SimpleSubscriber.Reply.Ack);
        });
    }
    catch (Exception e)
    {
        // Handle the unrecoverable error somehow...
    }
}

If this doesn't work as expected, please let us know.

Edit: SimpleSubscriber was renamed to SubscriberClient in the library so the answer has been edited accordingly.