Notify only specific user(s) through WebSockets, when something is modified in the database

Tiny picture Tiny · Sep 6, 2015 · Viewed 6.9k times · Source

In order to notify all users through WebSockets, when something is modified in selected JPA entities, I use the following basic approach.

@ServerEndpoint("/Push")
public class Push {

    private static final Set<Session> sessions = new LinkedHashSet<Session>();

    @OnOpen
    public void onOpen(Session session) {
        sessions.add(session);
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session);
    }

    private static JsonObject createJsonMessage(String message) {
        return JsonProvider.provider().createObjectBuilder().add("jsonMessage", message).build();
    }

    public static void sendAll(String text) {
        synchronized (sessions) {
            String message = createJsonMessage(text).toString();

            for (Session session : sessions) {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }
}

When a selected JPA entity is modified, an appropriate CDI event is raised which is to be observed by the following CDI observer.

@Typed
public final class EventObserver {

    private EventObserver() {}

    public void onEntityChange(@Observes EntityChangeEvent event) {
        Push.sendAll("updateModel");
    }
}

The observer/consumer invokes the static method Push#sendAll() defined in the WebSockets endpoint which sends a JSON message as a notification to all the associated users/connections.

The logic inside the sendAll() method needs to be modified somehow, when only selected users are to be notified.

  • Notify only the user who is responsible for modifying the entity in question (it may be an admin user or a registered user who can modify something only after their successful login).
  • Notify only specific user/s (not all). "Specific" means for example, when a post is voted up on this site, only the post owner is notified (the post may be voted up by any other user with sufficient privileges).

When an initial handshake is establised, HttpSession can be accessed as stated in this answer but it is still insufficient to accomplish the tasks as mentioned above by two bullets. Since it is available when the first handshake request is made, any attribute set to that session afterwards will not be available in the server endpoint i.e. in other words, any session attribute set after a handshake is established will not be available.

What is the most acceptable/canonical way to notify only selected users as mentioned above? Some conditional statement(s) in the sendAll() method or something else somewhere is required. It appears that it has to do something other than only the user's HttpSession.

I use GlassFish Server 4.1 / Java EE 7.

Answer

BalusC picture BalusC · Sep 9, 2015

Session?

Since it is available when the first handshake request is made, any attribute set to that session afterwards will not be available in the server endpoint i.e. in other words, any session attribute set after a handshake is established will not be available

It seems that you got bitten by the ambiguity of the word "session". The lifetime of a session depends on the context and the client. A websocket (WS) session does not have the same lifetime as a HTTP session. Like as that an EJB session does not have the same lifetime as a HTTP session. Like as that a legacy Hibernate session does not have the same lifetime as a HTTP session. Etcetera. The HTTP session, which you likely already understand, is explained here How do servlets work? Instantiation, sessions, shared variables and multithreading. The EJB session is explained here JSF request scoped bean keeps recreating new Stateful session beans on every request?

WebSocket lifecycle

The WS session is tied to the context represented by the HTML document. The client is basically the JavaScript code. The WS session starts when JavaScript does new WebSocket(url). The WS session stops when JavaScript explicitly invokes close() function on the WebSocket instance, or when the associated HTML document gets unloaded as result of a page navigation (clicking a link/bookmark or modifying URL in browser's address bar), or a page refresh, or a browser tab/window close. Do note that you can create multiple WebSocket instances within the very same DOM, usually each with different URL path or query string parameters.

Each time when a WS session starts (i.e. each time when JavaScript does var ws = new WebSocket(url);), then this will fire a handshake request wherein you thus have access to the associated HTTP session via the below Configurator class as you already found out:

public class ServletAwareConfigurator extends Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        config.getUserProperties().put("httpSession", httpSession);
    }

}

This is thus not called only once per HTTP session or HTML document as you seemed to expect. This is called every time a new WebSocket(url) is created.

Then a brand new instance of the @ServerEndpoint annotated class will be created and its @OnOpen annotated method will be invoked. If you're familiar with JSF/CDI managed beans, just treat that class as if it's a @ViewScoped and the method as if it's a @PostConstruct.

@ServerEndpoint(value="/push", configurator=ServletAwareConfigurator.class)
public class PushEndpoint {

    private Session session;
    private EndpointConfig config;

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        this.session = session;
        this.config = config;
    }

    @OnMessage
    public void onMessage(String message) {
        // ...
    }

    @OnError
    public void onError(Throwable exception) {
        // ...
    }

    @OnClose
    public void onClose(CloseReason reason) {
        // ...
    }

}

Note that this class is unlike e.g. a servlet not application scoped. It's basically WS session scoped. So each new WS session gets its own instance. That's why you can safely assign Session and EndpointConfig as an instance variable. Depending on the class design (e.g. abstract template, etc), you could if necessary add back Session as 1st argument of all those other onXxx methods. This is also supported.

The @OnMessage annotated method will be invoked when JavaScript does webSocket.send("some message"). The @OnClose annotated method will be called when the WS session is closed. The exact close reason can if necessary be determined by close reason codes as available by CloseReason.CloseCodes enum. The @OnError annotated method will be called when an exception is thrown, usually as an IO error on the WS connection (broken pipe, connection reset, etc).

Collect WS sessions by logged-in user

Coming back to your concrete functional requirement of notifying only specific users, you should after the above explanation understand that you can safely rely on modifyHandshake() to extract the logged-in user from the associated HTTP session, every time, provided that new WebSocket(url) is created after the user is logged-in.

public class UserAwareConfigurator extends Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        User user = (User) httpSession.getAttribute("user");
        config.getUserProperties().put("user", user);
    }

}

Inside the WS endpoint class with a @ServerEndpoint(configurator=UserAwareConfigurator.class), you can get hand of it in @OnOpen annotated method as below:

@OnOpen
public void onOpen(Session session, EndpointConfig config) {
    User user = (User) config.getUserProperties().get("user");
    // ...
}

You should collect them in the application scope. You can collect them in a static field of the endpoint class. Or, better, if CDI support in WS endpoint is not broken in your environment (works in WildFly, not in Tomcat+Weld, not sure about GlassFish), then simply collect them in an application scoped CDI managed bean which you in turn @Inject in the endpoint class.

When User instance is not null (i.e. when an user is logged in), then remember that an user can have multiple WS sessions. So, you'd basically need to collect them in a Map<User, Set<Session>> structure, or perhaps a more fine grained mapping which maps them by user ID or group/role instead, which after all allows easier finding specific users. It all depends on the final requirements. Here's at least a kickoff example using an application scoped CDI managed bean:

@ApplicationScoped
public class PushContext {

    private Map<User, Set<Session>> sessions;

    @PostConstruct
    public void init() {
        sessions = new ConcurrentHashMap<>();
    }

    void add(Session session, User user) {
        sessions.computeIfAbsent(user, v -> ConcurrentHashMap.newKeySet()).add(session);
    }

    void remove(Session session) {
        sessions.values().forEach(v -> v.removeIf(e -> e.equals(session)));
    }

}
@ServerEndpoint(value="/push", configurator=UserAwareConfigurator.class)
public class PushEndpoint {

    @Inject
    private PushContext pushContext;

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        User user = (User) config.getUserProperties().get("user");
        pushContext.add(session, user);
    }

    @OnClose
    public void onClose(Session session) {
        pushContext.remove(session);
    }

}

Finally you can send a message to specific user(s) as below in PushContext:

public void send(Set<User> users, String message) {
    Set<Session> userSessions;

    synchronized(sessions) {
        userSessions = sessions.entrySet().stream()
            .filter(e -> users.contains(e.getKey()))
            .flatMap(e -> e.getValue().stream())
            .collect(Collectors.toSet());
    }

    for (Session userSession : userSessions) {
        if (userSession.isOpen()) {
            userSession.getAsyncRemote().sendText(message);
        }
    }
}

The PushContext being a CDI managed bean has the additional advantage that it's injectable in any other CDI managed bean, allowing easier integration.

Fire CDI event with associated users

In your EntityListener, where you fire the CDI event most likely as per your previous related question Real time updates from database using JSF/Java EE, you already have the changed entity at hands and thus you should be able to find the users associated with it via their relationships in the model.

Notify only the user who is responsible for modifying the entity in question (it may be an admin user or a registered user who can modify something only after their successful login)

@PostUpdate
public void onChange(Entity entity) {
    Set<User> editors = entity.getEditors();
    beanManager.fireEvent(new EntityChangeEvent(editors));
}

Notify only specific user/s (not all). "Specific" means for example, when a post is voted up on this site, only the post owner is notified (the post may be voted up by any other user with sufficient privileges).

@PostUpdate
public void onChange(Entity entity) {
    User owner = entity.getOwner();
    beanManager.fireEvent(new EntityChangeEvent(Collections.singleton(owner)));
}

And then in the CDI event observer, pass it forth:

public void onEntityChange(@Observes EntityChangeEvent event) {
    pushContext.send(event.getUsers(), "message");
}

See also: