graphql-java - How to use subscriptions with spring boot?

muetzerich picture muetzerich · May 30, 2017 · Viewed 9.3k times · Source

In a project I use graphql-java and spring boot with a postgreSQL Database. Now I would like to use the subscription feature published in version 3.0.0. Unfortunately, the information about the application of the subsciption function is not very mature.

How is the approach to achieve real-time functionality using graphql-java with subscriptions?

Answer

kaqqao picture kaqqao · Jan 27, 2018

As of recent graphql-java versions, subscriptions are fully supported. The DataFetcher for a subscription must return a org.reactivestreams.Publisher, and graphql-java will take care of mapping the query function over the results.

The feature is nicely documented and there's a complete example using web sockets available in the official repo.

If you have a reactive data source in place (e.g. Mongo with a reactive driver, or probably anything that R2DBC supports), you're all set. Just use @Tailable and Spring Data will already give you a Flux (which implements Publisher) and there's nothing else you need to do.

As for a more manual Spring specific implementation, I can't imagine it being too hard to use Spring's own event mechanism (a nice tutorial here as well) to underlie the Publisher.

Every time there's an incoming subscription, create and register a new listener with the application context: context.addApplicationListener(listener) that will publish to the correct Publisher. E.g. in the DataFetcher:

// Somehow create a publisher, probably using Spring's Reactor project. Or RxJava.
Publisher<ResultObject> publisher = ...; 
//The listener reacts on application events and pushes new values through the publisher
ApplicationListener listener = createListener(publisher);
context.addApplicationListener(listener);
return publisher;

When the web socket disconnects or you somehow know the event stream is finished, you must make sure to remove the listener.

I haven't actually tried any of this, mind you, I'm just thinking aloud.

Another option is to use Reactor directly (with or without Spring WebFlux). There's a sample using Reactor and WebSocket (through GraphQL SPQR Spring Boot Starter) here.

You create a Publisher like this:

//This is really just a thread-safe wrapper around Map<String, Set<FluxSink<Task>>>
private final ConcurrentMultiRegistry<String, FluxSink<Task>> subscribers = new ConcurrentMultiRegistry<>();

@GraphQLSubscription
public Publisher<Task> taskStatusChanged(String taskId) {
    return Flux.create(subscriber -> subscribers.add(taskId, subscriber.onDispose(() -> subscribers.remove(taskId, subscriber))), FluxSink.OverflowStrategy.LATEST);
}

And then push new values from elsewhere (probably a related mutation or a reactive storage) like this:

subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));

E.g.

@GraphQLMutation
public Task updateTask(@GraphQLNonNull String taskId, @GraphQLNonNull Status status) {
    Task task = repo.byId(taskId); //find the task
    task.setStatus(status); //update the task
    repo.save(task); //persist the task
    //Notify all the subscribers following this task
    subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));
    return task;
}

With SPQR Spring Starter, this is all that's needed to get you an Apollo-compatible subscription implementation.