Access Server-sent events from Java

Published: February 26, 2018  •  java

In my previous blog posts (here, here and here) I showed you examples of server-sent events (sse) applications that send messages from a server to a browser. Server-sent events is an easy way to send messages from a server to the client in real time. Unlike a WebSocket connection only the server can send messages but for a lot of use cases this is sufficient.

In the browser you have a special object, EventSource, that handles the connection and converts the responses into events.

Under the hood server-sent events is using a HTTP connection and the client tries to keep this connection open as long as possible. As long as this connection is open, the server can send messages (http streaming). If for any reason the client loses the connection, he tries to reconnect until a new connection is established with the server.

The server can send arbitrary text events but he has to follow the rules of the sse text protocol. You can find more about the protocol on the specification page:
https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events


Because sse is using HTTP and a text protocol you can use any HTTP client to access these services.

For the following example I use the server from this project. It periodically sends memory information of the Java Virtual Machine. To start it execute this command: mvn spring-boot:run

A popular HTTP client for the command line is curl. To connect to the example server run this command in a shell.

curl.exe http://localhost:8080/memory

And you should see the raw sse protocol stream

data:{"heap":155193288,"nonHeap":63686384,"ts":1519646918675}

data:{"heap":155193288,"nonHeap":63820536,"ts":1519646919676}

data:{"heap":155193288,"nonHeap":63820568,"ts":1519646920675}

Another approach is to use one of the many HTTP client libraries. For Java, you can choose from different libraries: OkHttp, Apache HttpComponents, Google HTTP Client Library. Java 9 introduced a new HTTP client, which is currently in the incubating module.

Spring 5 shipped with a new HTTP client based on the reactive programming model. Here an example how you can access the example sse stream with this client

    Logger logger = LoggerFactory.getLogger("main");
    ParameterizedTypeReference<ServerSentEvent<String>> typeRef = new ParameterizedTypeReference<>() {
      /* nothing_here */};

    while (true) {
      try {
        final Flux<ServerSentEvent<String>> stream = WebClient.create("http://localhost:8080")
            .get().uri("/memory").accept(MediaType.TEXT_EVENT_STREAM).retrieve()
            .bodyToFlux(typeRef);
        stream.subscribe(sse -> logger.info("Received: {}", sse));
        TimeUnit.MINUTES.sleep(10);
      }
      catch (Exception e) {
        e.printStackTrace();
      }
      TimeUnit.SECONDS.sleep(2);
    }

Spring5WebClient.java

As output you see something similar to this

Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data={"heap":160226728,"nonHeap":64349072,"ts":1519647595676}]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data=null]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data={"heap":160226728,"nonHeap":64348752,"ts":1519647596676}]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data=null]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data={"heap":160226728,"nonHeap":64331664,"ts":1519647597675}]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data=null]

Sse Client

HTTP clients can consume sse streams but they don't understand the sse protocol. For instance, the Spring 5 example printed out the empty line, which is part of the sse protocol and separates two data events.

Or if a server splits a message into multiple data packets, a sse client needs to handle this and join these lines together into one event.

data: line one
data: line two
data: line three

Another feature of a sse client is the automatic reconnection mechanism. A sse client automatically reconnects to the server when he loses the connection. The time between losing the connection and reconnecting is a browser dependant value (in Chrome it is 2 seconds). But you can change this value, not from the client but from the server. A server may send a special message with the retry keyword and the client has to take this value and use it as the reconnection time.

retry: 10000

A HTTP client only sees the raw messages and does not understand the sse protocol. You can certainly build your own sse client on top of these clients but it is easier to use an existing sse client library.

In Java, we can use the okhttp-eventsource library, a sse client built on top of the OkHttp HTTP client library. The library is released under the Apache License 2 and the source code is hosted on GitHub.

The library is modelled after the EventSource object in the browsers and you find many similarities.

First you need to write an implementation of the EventHandler interface.

public class SimpleEventHandler implements EventHandler {

  @Override
  public void onOpen() throws Exception {
    System.out.println("onOpen");
  }

  @Override
  public void onClosed() throws Exception {
    System.out.println("onClosed");
  }

  @Override
  public void onMessage(String event, MessageEvent messageEvent) throws Exception {
    System.out.println(messageEvent.getData());
  }

  @Override
  public void onComment(String comment) throws Exception {
    System.out.println("onComment");
  }

  @Override
  public void onError(Throwable t) {
    System.out.println("onError: " + t);
  }

}

SimpleEventHandler.java

onOpen and onClosed are called when the connection is established resp. closed. The onComment method is called when the server sends a sse comment. The EventSource object in the browser ignores comments but here you can handle them. A server can periodically send a comment to prevent the connection from timing out.

: a comment

data: first event
id: 1

The library calls onError when an error occurs.

The onMessage is called each time a new data: packet arrives. This behaviour is different to the browser EventSource object where only unnamed/default events trigger the onmessage handler. For named events you have to register a handler with eventSource.addEventListener('eventname', handler).

But with this Java library the onMessage method is a catch all handler that receives all data events. That's the reason the event name is given to the method as the first parameter. If the server sends an unnamed/default event (data packet without event) the parameter event has the value "message".

In the following example the server sends two data events. The method onMessage is called twice. First with the event parameter set to "message" and in the second call set to "time".

data: 11

event: time
data: 1928192192

The MessageEvent instance contains the lastEventId and the payload (getData) of the event (everything after the data: string).


To access the sse stream we have to create an instance of the SimpleEventHandler and instantiate an EventSource object.

    EventHandler eventHandler = new SimpleEventHandler();
    String url = String.format("http://localhost:8080/memory");
    EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));

    try (EventSource eventSource = builder.build()) {
      eventSource.setReconnectionTimeMs(3000);
      eventSource.start();

      TimeUnit.MINUTES.sleep(10);
    }

Simple.java

When you run the example, you see an output like this.

onOpen
{"heap":175327208,"nonHeap":64556544,"ts":1519649749675}
{"heap":175327208,"nonHeap":64559880,"ts":1519649750675}
{"heap":175327208,"nonHeap":64612912,"ts":1519649751675}
{"heap":175327208,"nonHeap":64615616,"ts":1519649752676}

You can also test the reconnection feature. Start the client, stop the server and restart it with mvn spring-boot:run. You see an output similar to this. After the server stopped, the client tries to reconnect.

onOpen
{"heap":152491256,"nonHeap":63258664,"ts":1519649828300}
{"heap":154184776,"nonHeap":63967936,"ts":1519649829299}
{"heap":154184776,"nonHeap":63978136,"ts":1519649830300}
13:57:10.594 [okhttp-eventsource-stream-[]-0] WARN  c.l.eventsource.EventSource. - Connection unexpectedly closed.
onClosed
onError: java.net.ConnectException: Failed to connect to localhost/0:0:0:0:0:0:0:1:8080
13:57:12.623 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 1908 milliseconds before reconnecting...
onError: java.net.ConnectException: Failed to connect to localhost/0:0:0:0:0:0:0:1:8080
13:57:16.544 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 3607 milliseconds before reconnecting...
13:57:20.287 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Connected to Event Source stream.
onOpen
{"heap":147478720,"nonHeap":62957832,"ts":1519649841080}
{"heap":149660224,"nonHeap":63779152,"ts":1519649842080}
{"heap":149660224,"nonHeap":63791272,"ts":1519649843081}

Another difference to the browser EventSource client is that you can set the reconnection time on the client.

  eventSource.setReconnectionTimeMs(3000);

This call sets the reconnection time to 3 seconds (default is 1 second)

The library uses an exponentially back off algorithm and uses some random jitter to calculate the next reconnection time. The first reconnection attempt happens after 3 * 1 seconds (+/- a random value), then 3 * 2, 3 * 4 and so on. The reconnection time will not exceed 30 seconds, the maximum value the library uses.

Examples of the reconnection time with the start value of 3000.

14:06:47.263 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 4300 milliseconds before reconnecting...
14:06:53.578 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 11455 milliseconds before reconnecting...
14:07:07.055 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 21681 milliseconds before reconnecting...
14:07:30.761 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 21000 milliseconds before reconnecting...
14:07:53.784 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 29206 milliseconds before reconnecting...
14:08:23.111 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Connected to Event Source stream.
onOpen
{"heap":159887208,"nonHeap":64027520,"ts":1519650503899}
{"heap":161764312,"nonHeap":64517192,"ts":1519650504898}
14:08:25.864 [okhttp-eventsource-stream-[]-0] WARN  c.l.eventsource.EventSource. - Connection unexpectedly closed.
onClosed
14:08:27.878 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 4576 milliseconds before reconnecting...
14:08:34.479 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 7952 milliseconds before reconnecting...
14:08:44.456 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 20343 milliseconds before reconnecting...
14:09:06.812 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 26778 milliseconds before reconnecting...
14:09:35.615 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 23116 milliseconds before reconnecting...
14:10:00.745 [okhttp-eventsource-stream-[]-0] INFO  c.l.eventsource.EventSource. - Waiting 15215 milliseconds before reconnecting...

This behaviour is different to the EventSource sse client in the browser. The browser always uses the same reconnection time between attempts (In Chrome 2 seconds by default).


Examples

For the following examples I created a subinterface of the EventHandler interface, that implements default methods for most methods except onMessage. This simplifies the implementation of an event handler because you no longer have to implement all methods.

public interface DefaultEventHandler extends EventHandler {

  @Override
  default void onOpen() throws Exception {
    // nothing here
  }

  @Override
  default void onClosed() throws Exception {
    // nothing here
  }

  @Override
  default void onComment(String comment) throws Exception {
    // nothing here
  }

  @Override
  default void onError(Throwable t) {
    // nothing here
  }

}

DefaultEventHandler.java


The following example accesses the MediaWiki RecentChange stream. This stream publishes all changes that are happening in real time on Wikipedia. You find more information about this service here: https://wikitech.wikimedia.org/wiki/EventStreams

First we implement the handler. The response of the service is a JSON and the code extracts the title and type field and prints them out.

public class WikipediaChangeHandler implements DefaultEventHandler {

  @Override
  public void onMessage(String event, MessageEvent messageEvent) throws Exception {
    // System.out.println(messageEvent.getData());

    try (JsonReader jsonReader = Json
        .createReader(new StringReader(messageEvent.getData()))) {
      JsonObject jsonObject = jsonReader.readObject();
      JsonValue title = jsonObject.getValue("/title");
      JsonValue changeType = jsonObject.getValue("/type");
      System.out.println(changeType.toString() + " : " + title.toString());
    }

  }

}

WikipediaChangeHandler.java

The main class connects to the service and registers the handler.

    EventHandler eventHandler = new WikipediaChangeHandler();
    String url = "https://stream.wikimedia.org/v2/stream/recentchange";
    EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));

    try (EventSource eventSource = builder.build()) {
      eventSource.start();
      
      TimeUnit.MINUTES.sleep(10);
    }

WikipediaChanges.java

You should see an output like this

"new" : "Evrenos"
"edit" : "Q47224163"
"edit" : "Q39979240"
"edit" : "Q26578292"
"edit" : "Liste over Bulgarias visestatsoverhoder"
"edit" : "File:Drumlithie station site geograph-3420917-by-Ben-Brooksbank.jpg"
"edit" : "Q45452385"

Here are other interesting examples that use the MediaWiki RecentChange stream from a browser: https://sachaysl.github.io/wikimedia-challenge/
https://esjewett.github.io/wm-eventsource-demo/


In the next example we access the sse stream from emojitracker. That's a service that tracks the usage of Emojis on Twitter in real time.

The service sends the data in this format

{"1F62D":1}
{"1F3E1":1,"1F430":1,"1F483":1,"1F49C":1,"1F4DE":1,"1F4E2":1,"1F632":1,"1F636":1}

The hexadecimal number represents the character code and the number after the colon represents the number of occurrences.

For this example I implemented the handler as an anonymous inner class. The code extracts the two numbers with a regular expression and then calls Character.getName to get a textual description of the emoji.

    final Pattern dataRegex = Pattern.compile(""([A-F0-9]+)":(\d+)");
    
    String url = "http://emojitrack-gostreamer.herokuapp.com/subscribe/eps";
    EventSource.Builder builder = new EventSource.Builder(new DefaultEventHandler() {
      
      @Override
      public void onMessage(String event, MessageEvent messageEvent) throws Exception {
        Matcher matcher = dataRegex.matcher(messageEvent.getData());
        while(matcher.find()) {          
          int characterCode = Integer.parseInt(matcher.group(1), 16);
          System.out.print(Character.getName(characterCode));
          System.out.print(" (");
          System.out.print(matcher.group(2));
          System.out.println(")");
        }
        
      }
    }, URI.create(url));

    try (EventSource eventSource = builder.build()) {
      eventSource.start();
      
      TimeUnit.MINUTES.sleep(10);
    }
  }

EmojiTracker.java

As output you should get something similar to this:

TENNIS RACQUET AND BALL (1)
SMILING FACE WITH OPEN MOUTH AND SMILING EYES (1)
FACE WITH TEARS OF JOY (1)
BIRTHDAY CAKE (1)
REVOLVING HEARTS (1)
SMILING FACE WITH OPEN MOUTH (1)
BLACK SUN WITH RAYS (1)
SOFT ICE CREAM (1)
FACE WITH TEARS OF JOY (1)
WINKING FACE (1)
SMILING FACE WITH HEART-SHAPED EYES (2)
FACE THROWING A KISS (1)

You find the source code of all examples on GitHub:
https://github.com/ralscha/blog/tree/master/sse-client