WebSocket und der Cluster

Früher in der guten alten Zeit waren Webanwendungen verbindungslos: Man hat eine Seite angefragt, die Seite wurde ausgeliefert und die Verbindung war beendet. Man hat was in Formulare eingetragen, auf „absenden“ gedrückt und die Verbindung war beendet. Genau darauf sind Applicationserver optimiert. Ok, es gab da noch diese Sache mit den Session Daten, die unter einem Session Schlüssel gespeichert wurden. Die haben schon jede Menge Ärger gemacht, wenn es um einen Cluster ging. Dafür gab es dann Mechanismen, die Daten auf alle Instanzen des Cluster verteilt haben oder zumindest Session affine Loadbalancer, die einen Request immer wieder auf die Clusterinstanz geschickt haben, auf der die Session auch tatsächlich war. Jetzt haben wir allerdings etwas lustigeres: permanente Verbindungen und die Möglichkeit, Daten zu dem Client aktiv zu pushen. Am niegelnagelneusten macht man das mit WebSockets, die seit Java EE 7 auch Teil des Standards sind.

Das funktioniert so weit auch alles erst einmal ganz prima, solange man die Event Architektur sofort bei Ankunft im WebSocket Endpoint vergisst und ab da alles schön synchron aufruft. Was aber, wenn man im Backend einen Event erzeugt und an das Frontend via WebSocket zurückschicken möchte?

CDI Events als Callback für WebSockets

Ich bin vor einiger Zeit über diesen Artikel gestolpert. Die wesentliche Idee ist, einen WebSocket Endpoint zu einem Observer für CDI Events zu machen. Das ist so von der Sache her schon einmal ganz schön cool und funktioniert auch tatsächlich. Der folgende Beispielcode ist einfach eine Erweiterung des Beispiels. Für das angegebene Beispiel ist es nicht einmal unbedingt nötig – doch dazu später mehr. So ein Endpoint könnte also so aussehen:

@Singleton
@ServerEndpoint("/websocket")
public class WebsocketEndpoint {

    @Inject
    private SessionBean sessionBean;

    private static final Logger LOG = Logger.getLogger(WebsocketEndpoint.class.getName());

    private static final Set sessions = Collections.synchronizedSet(new HashSet());

    @OnOpen
    public void onOpen(final Session session) {
        try {
            session.getBasicRemote().sendText("session opened");
            sessions.add(session);

            if (sessionBean == null) {
                LOG.log(Level.INFO, "senderBean is null");
            }
        } catch (IOException ex) {
            LOG.log(Level.SEVERE, null, ex);
        }
    }

    @OnMessage
    public void onMessage(final String message, final Session client) {
        if (sessionBean != null) {
            try {
                client.getBasicRemote().sendText("sending message to SessionBean...");
            } catch (IOException ex) {
                LOG.log(Level.SEVERE, "Error on WebSocket", ex);
            }

            sessionBean.handleMessage(message);
        }
    }

    @OnClose
    public void onClose(final Session session) {
        try {
            session.getBasicRemote().sendText("WebSocket Session closed");
            sessions.remove(session);
        } catch (IOException ex) {
            LOG.log(Level.SEVERE, "Error on close.", ex);
        }
    }

    public void onCDIEvent(@Observes @WsMessage String msg) {
        LOG.log(Level.INFO, "Got Callback at WebSocket!");
        for (Session s : sessions) {
            try {
                s.getBasicRemote().sendText("message from CDI: " + msg);
            } catch (IOException ex) {
                LOG.log(Level.SEVERE, "Can's send Message: ", ex);
            }
        }
    }
}

und die SessionBean dazu:

@Stateless
public class SessionBean {

    private static final Logger LOG = Logger.getLogger(SessionBean.class.getName());

    @Inject
    @WsMessage
    Event cdiEvent;

    @Asynchronous
    public void handleMessage(String message) {
        LOG.log(Level.INFO, "handling message {0}.", message);
        cdiEvent.fire(message);
    }
}

Der Qualifier ist einfach ein Qualifier, den Netbeans selbst generieren kann. Wie man sieht, benutze ich hier keine MDB wie im Beispiel, sondern einfach einen Asynchronen Aufruf. Der Effekt ist aber der gleiche. Wir halten also fest: man kann mit CDI Events von einer EJB ins Frontend Nachrichten schicken! Sehr hübsch!

Ein weiteres Detail, das ins Auge sticht, ist, dass der WebSocket Endpoint gleichzeitig als Singleton annotiert ist. Das darf man laut Doku und war die einzige Möglichkeit, die EJB in den Endpoint hineinzubekommen. Das funktinierte mitunter auch erst nach einem Server Neustart aber immerhin. Ich bin offensichtlich nicht der einzige mit diesem Problem. Eine Singleton EJB ist dann die logische Wahl. Ich speichere dort schließlich alle offenen Verbindungen und möchte deshalb auch immer dieselbe Bean haben. Daraus ergibt sich dann eine weitere Möglichkeit für einen Callback:

Callback in den WebSocket via Singleton Bean

Da der WebSocket Endpint bereits eine EJB ist, kann ich prinzipiell diese injecten und direkt aufrufen. Wer das von der SessionBean aus versucht, wird feststellen, dass der Applicationserver unzufrieden ist: wir haben eine zirkuläre Abhängigkeit, da sich Endpoint und Bean gegenseitig referenzieren. Wenn man aber tatsächlich über eine MDB geht, kann diese dann problemlos den WebSocket Endpoint aufrufen. Die Session Bean sähe dann etwa so aus:

@Stateless
public class SessionBean {

    private static final Logger LOG = Logger.getLogger(SessionBean.class.getName());

    @Resource(mappedName = "jms/myQueue")
    private Queue myQueue;
    @Inject
    private JMSContext jmsContext;

    public void handleMessage(String message) {
        LOG.log(Level.INFO, "handling message {0}.", message);
        jmsContext.createProducer().send(myQueue, "(" + message + ")");
    }
}

und die dazugehörige MDB so:

@MessageDriven(mappedName = "jms/myQueue")
public class MessageBean implements MessageListener {

    private static final Logger LOG = Logger.getLogger(MessageBean.class.getName());

    @Inject
    WebsocketEndpoint endpoint;

    @Override
    public void onMessage(Message message) {
        try {
            // rufe die Eventmethode direkt auf
            endpoint.onCDIEvent(message.getBody(String.class));
        } catch (JMSException ex) {
            LOG.log(Level.SEVERE, "Error extractiong Message.", ex);
        }
    }
}

Ok, ganz prima. Geht. Champagner! Und jetzt denkt sich der unbedarfte Entwickler: „Juhu, jetzt mache ich einfach aus der Queue ein Topic und schon habe ich eine 1a Lösung für den Cluster!“

Nachrichten im Cluster

Wenn man sich überlegt, wie JMS im Glassfish standardmäßig aufgebaut ist, entdeckt man, dass jede Clusterinstanz eine eigene Broker-Instanz mit hochzieht. Üblicherweise landen also die Nachrichten, die von einer Instanz geschickt werden bei dem lokalen Broker und werden auch wieder an die lokale Instanz geschickt. In dem Szenario funktioniert zumindest das Beispiel mit der Queue schon einmal recht wahrscheinlich.

Cluster Instanz

Cluster Instanz

Wenn jetzt allerdings der Broker ausfällt, wird der von der Nachbarinstanz verwendet. Dass die Nachricht wieder bei uns abgeliefert wird, ist unwahrscheinlicher. Bei einer Konfiguration mit einem externen Broker ist es ohnehin zufällig, wo die Nachricht abgeliefert wird. Sie muss ja nur einmal garantiert bei einer angemeldeten MDB ankommen.

Cluster

Cluster

Das ist natürlich irgendwie doof, denn die Clients sind ja jetzt mit einer stehenden Verbindung mit einer Instanz des Clusters physikalisch verbunden. Es gibt also auch keine Möglichkeit, die Verbindungen mal eben zu replizieren. Aber das gilt ja für eine Queue. Wir wollten ja ohnehin ein Topic verwenden und damit dann alle Instanzen befüttern…

Auch hier sind wir nicht die ersten mit der Idee und lesen, dass da bereits schon einmal jemand drüber gestolpert ist. Es ist also zwar spezifiziert, dass ein Topic zu jeder Application – also zu Jeder MDB-Klasse geliefert werden muss, aber nicht zu jeder Instanz dieser Klasse. Da ich auf dem Cluster nur verschiedene Instanzen der selben MDB verteile, reicht es, wenn eine dieser Instanzen die Nachricht erhält. Die Idee eines Clusters ist ja, dass alle Instanzen identisch sind und es demzufolge gleichgültig ist, wer die Arbeit macht. Nur damit funktioniert die Verteilung leider nicht wie gehofft.

Paradigmenwechsel

Die Gleichheit der Cluster Instanzen ist durch das Vorhandensein physikalischer Verbindungen nicht mehr gegeben. Das Bedeuten zum einen, dass bei dem Ausfall einer Instanz aller darauf vorhandenen Verbindungen wegsterben (Stichwort Reconnect im Client) und zum anderen, dass es nicht egal ist, welche Instanz einen Request behandelt. So lange also JMS 2.1 noch nicht da ist, muss man sich selbst behelfen. Das großartige Atmosphere löst schon einmal einen großen Teil dieser Probleme. So kümmert es sich auch um die Verteilung von Nachrichten im Cluster. (Oh, und Primefaces hat übrigens Atmosphere bereits integriert.)

Wenn man interessante Dinge wie Message Routing braucht wie beispielsweise bei einem private Chat, um mal das Lieblingsbeispiel für WebSockets zu strapazieren, bei dem zwei Benutzer direkt miteinander kommunizieren sollen, wird es schon kniffelig. In dem Fall ist ein Broadcast von allen Nachrichten auf alle Clusterinstanzen etwas ineffizient. Da müsste man sich dann überlegen, ob man nicht lieber Referenzen zu allen Instanzen hält und die Nachricht direkt an den Empfänger schickt. Also mehr wie bei einem Switch im Gegensatz zu einem Hub.

Es bleibt also spannend.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind markiert *


× acht = 48