In my previous blog posts (here, here and here) I showed you examples of server-sent events (SSE) applications that send messages from a server to a browser. Server-sent events is an easy way to send messages from a server to the client in real-time.
Unlike a WebSocket connection, only the server can send messages, but for many use cases, this is sufficient.
In the browser, you have a special object, EventSource, that handles the connection and converts the responses into events.
Under the hood, server-sent events are using an HTTP connection, and the client tries to keep this connection open as long as possible. As long as this connection is open, the server can send messages (HTTP streaming). If for any reason, the client loses the connection, he tries to reconnect until a new connection is established with the server.
The server can send arbitrary text events, but he has to follow the rules of the SSE text protocol. You can find more about the protocol on the specification page:
https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
Because SSE is using HTTP and a text protocol, you can use any HTTP client to access these services.
For the following examples, I use the server from this project. It periodically sends memory information of the Java Virtual Machine. To start it run this command: mvn spring-boot:run
A popular HTTP client for the command line is curl. To connect to the example server, run this command in a shell.
curl.exe http://localhost:8080/memory
And you should see the raw SSE protocol stream.
data:{"heap":155193288,"nonHeap":63686384,"ts":1519646918675}
data:{"heap":155193288,"nonHeap":63820536,"ts":1519646919676}
data:{"heap":155193288,"nonHeap":63820568,"ts":1519646920675}
Another approach is to use one of the many HTTP client libraries. For Java, you can choose from different libraries: OkHttp, Apache HttpComponents, Google HTTP Client Library.
With Java 11, you don't need an external library because this version introduced a HTTP client to the core Java platform.
Spring 5 ships with an HTTP client based on the reactive programming model. Here an example of how you can access the example SSE stream with this client
Logger logger = LoggerFactory.getLogger("main");
ParameterizedTypeReference<ServerSentEvent<String>> typeRef = new ParameterizedTypeReference<>() {
/* nothing_here */};
while (true) {
try {
final Flux<ServerSentEvent<String>> stream = WebClient
.create("http://localhost:8080").get().uri("/memory")
.accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(typeRef);
stream.subscribe(sse -> logger.info("Received: {}", sse));
TimeUnit.MINUTES.sleep(10);
}
catch (Exception e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(2);
}
As output, you see something similar to this.
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data={"heap":160226728,"nonHeap":64349072,"ts":1519647595676}]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data=null]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data={"heap":160226728,"nonHeap":64348752,"ts":1519647596676}]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data=null]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data={"heap":160226728,"nonHeap":64331664,"ts":1519647597675}]
Received: ServerSentEvent [id = 'null', event='null', retry=null, comment='null', data=null]
SSE Client ¶
HTTP clients can consume SSE streams, but they don't understand the SSE protocol. For instance, the Spring 5 example prints out the empty line, which is part of the SSE protocol, and separates two data events.
Or if a server splits a message into multiple data packets, an SSE client needs to handle this and join these lines together into one event.
data: line one
data: line two
data: line three
Another feature of an SSE client is the automatic reconnection mechanism. An SSE client automatically reconnects to the server when he loses the connection.
The time between losing the connection and reconnecting is a browser dependant value (in Chrome, it is 2 seconds).
But you can change this value, not from the client but the server. A server may send a special message with the retry
keyword, and the client has to take this value and use it as the reconnection time.
retry: 10000
An HTTP client only sees the raw messages and does not understand the SSE protocol. You can certainly build your own SSE client on top of these clients, but it is easier to use an existing SSE client library.
In Java, we can use the okhttp-eventsource library, a SSE client built on top of the OkHttp HTTP client library.
The library is released under the Apache License 2, and the source code is hosted on GitHub.
The library is modeled after the EventSource object in the browsers, and you find many similarities.
First, you need to write an implementation of the EventHandler interface.
public class SimpleEventHandler implements EventHandler {
@Override
public void onOpen() throws Exception {
System.out.println("onOpen");
}
@Override
public void onClosed() throws Exception {
System.out.println("onClosed");
}
@Override
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
System.out.println(messageEvent.getData());
}
@Override
public void onComment(String comment) throws Exception {
System.out.println("onComment");
}
@Override
public void onError(Throwable t) {
System.out.println("onError: " + t);
}
}
onOpen
and onClosed
are called when the connection is established resp. closed. The onComment
method is called when the server sends an SSE comment.
The EventSource object in the browser ignores comments, but here you can handle them.
A server can periodically send a comment to prevent the connection from timing out.
: a comment
data: first event
id: 1
The library calls onError
when an error occurs.
The onMessage
is called each time a new data:
packet arrives. This behavior is different from the browser EventSource object, where only unnamed/default events trigger the onmessage
handler. For named events you have to register a handler with eventSource.addEventListener('eventname', handler)
.
But with this Java library, the onMessage
method is a catch-all handler that receives all data events. That's the reason the event name is given to the method as the first parameter.
If the server sends an unnamed/default event (data packet without event), the parameter event has the value "message".
In the following example, the server sends two data events. The method onMessage
is called twice.
First, with the event parameter set to "message" and in the second call set to "time".
data: 11
event: time
data: 1928192192
The MessageEvent instance contains the lastEventId and the payload (getData) of the event (everything after the data:
string).
To access the SSE stream, we have to create an instance of the SimpleEventHandler and instantiate an EventSource object.
EventHandler eventHandler = new SimpleEventHandler();
String url = String.format("http://localhost:8080/memory");
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url))
.reconnectTime(3, TimeUnit.SECONDS);
try (EventSource eventSource = builder.build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
When you run the example, you see an output like this.
onOpen
{"heap":175327208,"nonHeap":64556544,"ts":1519649749675}
{"heap":175327208,"nonHeap":64559880,"ts":1519649750675}
{"heap":175327208,"nonHeap":64612912,"ts":1519649751675}
{"heap":175327208,"nonHeap":64615616,"ts":1519649752676}
You can also test the reconnection feature. Start the client, stop the server, and restart it with mvn spring-boot:run
.
You see an output similar to this. After the server stopped, the client tries to reconnect.
onOpen
{"heap":152491256,"nonHeap":63258664,"ts":1519649828300}
{"heap":154184776,"nonHeap":63967936,"ts":1519649829299}
{"heap":154184776,"nonHeap":63978136,"ts":1519649830300}
13:57:10.594 [okhttp-eventsource-stream-[]-0] WARN c.l.eventsource.EventSource. - Connection unexpectedly closed.
onClosed
onError: java.net.ConnectException: Failed to connect to localhost/0:0:0:0:0:0:0:1:8080
13:57:12.623 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 1908 milliseconds before reconnecting...
onError: java.net.ConnectException: Failed to connect to localhost/0:0:0:0:0:0:0:1:8080
13:57:16.544 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 3607 milliseconds before reconnecting...
13:57:20.287 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Connected to Event Source stream.
onOpen
{"heap":147478720,"nonHeap":62957832,"ts":1519649841080}
{"heap":149660224,"nonHeap":63779152,"ts":1519649842080}
{"heap":149660224,"nonHeap":63791272,"ts":1519649843081}
Another difference to the browser EventSource client is that you can set the reconnection time on the client.
eventSource.setReconnectionTimeMs(3000);
This call sets the reconnection time to 3 seconds (default is 1 second)
The library uses an exponential backoff algorithm and uses some random jitter to calculate the next reconnection time.
The first reconnection attempt happens after 3 * 1 seconds (+/- a random value), then 3 * 2, 3 * 4, and so on.
The reconnection time is not going to exceed 30 seconds; the maximum value the library uses.
Example of reconnection delays with the start value 3000.
14:06:47.263 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 4300 milliseconds before reconnecting...
14:06:53.578 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 11455 milliseconds before reconnecting...
14:07:07.055 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 21681 milliseconds before reconnecting...
14:07:30.761 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 21000 milliseconds before reconnecting...
14:07:53.784 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 29206 milliseconds before reconnecting...
14:08:23.111 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Connected to Event Source stream.
onOpen
{"heap":159887208,"nonHeap":64027520,"ts":1519650503899}
{"heap":161764312,"nonHeap":64517192,"ts":1519650504898}
14:08:25.864 [okhttp-eventsource-stream-[]-0] WARN c.l.eventsource.EventSource. - Connection unexpectedly closed.
onClosed
14:08:27.878 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 4576 milliseconds before reconnecting...
14:08:34.479 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 7952 milliseconds before reconnecting...
14:08:44.456 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 20343 milliseconds before reconnecting...
14:09:06.812 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 26778 milliseconds before reconnecting...
14:09:35.615 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 23116 milliseconds before reconnecting...
14:10:00.745 [okhttp-eventsource-stream-[]-0] INFO c.l.eventsource.EventSource. - Waiting 15215 milliseconds before reconnecting...
This behavior is different from the EventSource SSE client in the browser. The browser always uses the same reconnection time between attempts (In Chrome 2 seconds by default).
Examples ¶
For the following examples, I created a subinterface of the EventHandler interface that implements default methods for most methods except onMessage
.
This simplifies the implementation of an event handler because you no longer have to implement all methods.
public interface DefaultEventHandler extends EventHandler {
@Override
default void onOpen() throws Exception {
// nothing here
}
@Override
default void onClosed() throws Exception {
// nothing here
}
@Override
default void onComment(String comment) throws Exception {
// nothing here
}
@Override
default void onError(Throwable t) {
// nothing here
}
}
The following example accesses the MediaWiki RecentChange stream. This stream publishes all changes that are happening in real-time on Wikipedia.
You find more information about this service here: https://wikitech.wikimedia.org/wiki/EventStreams
First, we implement the handler. The response of the service is a JSON, and the code extracts the title and type fields and prints them out.
public class WikipediaChangeHandler implements DefaultEventHandler {
@Override
public void onMessage(String event, MessageEvent messageEvent) throws Exception {
// System.out.println(messageEvent.getData());
try (JsonReader jsonReader = Json
.createReader(new StringReader(messageEvent.getData()))) {
JsonObject jsonObject = jsonReader.readObject();
JsonValue title = jsonObject.getValue("/title");
JsonValue changeType = jsonObject.getValue("/type");
System.out.println(changeType.toString() + " : " + title.toString());
}
}
}
The main class connects to the service and registers the handler.
EventHandler eventHandler = new WikipediaChangeHandler();
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
try (EventSource eventSource = builder.build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
You should see an output like this.
"new" : "Evrenos"
"edit" : "Q47224163"
"edit" : "Q39979240"
"edit" : "Q26578292"
"edit" : "Liste over Bulgarias visestatsoverhoder"
"edit" : "File:Drumlithie station site geograph-3420917-by-Ben-Brooksbank.jpg"
"edit" : "Q45452385"
Here are other interesting examples that use the MediaWiki RecentChange stream from a browser:
https://github.com/unixpi/wikimedia-challenge
https://esjewett.github.io/wm-eventsource-demo/
In the next example, we access the SSE stream from emojitracker. That's a service that tracks the usage of Emojis on Twitter in real-time.
The service sends the data in this format.
{"1F62D":1}
{"1F3E1":1,"1F430":1,"1F483":1,"1F49C":1,"1F4DE":1,"1F4E2":1,"1F632":1,"1F636":1}
The hexadecimal number represents the character code, and the number after the colon represents the number of occurrences.
For this example, I implemented the handler as an anonymous inner class.
The code extracts the two numbers with a regular expression and then calls Character.getName
to get a textual description of the emoji.
package ch.rasc.sseclient;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.launchdarkly.eventsource.EventSource;
public class EmojiTracker {
public static void main(String[] args) throws InterruptedException {
final Pattern dataRegex = Pattern.compile("\"([A-F0-9]+)\":(\\d+)");
String url = "http://emojitrack-gostreamer.herokuapp.com/subscribe/eps";
EventSource.Builder builder = new EventSource.Builder(
(DefaultEventHandler) (event, messageEvent) -> {
Matcher matcher = dataRegex.matcher(messageEvent.getData());
while (matcher.find()) {
int characterCode = Integer.parseInt(matcher.group(1), 16);
System.out.print(Character.getName(characterCode));
System.out.print(" (");
System.out.print(matcher.group(2));
System.out.println(")");
}
}, URI.create(url));
try (EventSource eventSource = builder.build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
}
As output, you should get something similar to this:
TENNIS RACQUET AND BALL (1)
SMILING FACE WITH OPEN MOUTH AND SMILING EYES (1)
FACE WITH TEARS OF JOY (1)
BIRTHDAY CAKE (1)
REVOLVING HEARTS (1)
SMILING FACE WITH OPEN MOUTH (1)
BLACK SUN WITH RAYS (1)
SOFT ICE CREAM (1)
FACE WITH TEARS OF JOY (1)
WINKING FACE (1)
SMILING FACE WITH HEART-SHAPED EYES (2)
FACE THROWING A KISS (1)
You find the source code of all examples on GitHub:
https://github.com/ralscha/blog/tree/master/sse-client