Cettia is a web framework that consists of a JavaScript library that runs in the browser and on Node.js, and a Java library for the back end. With Cettia, you can build reliable full-duplex message channels between a JavaScript client application and a back end running on the Java Virtual Machine. Over this channel, your application exchanges messages in real-time.
Under the hood, Cettia utilizes WebSocket, EventSource, and long polling HTTP connections as transport mechanisms. Cettia prefers WebSocket connections by default, but falls back to other transport protocols if it is not possible to establish a WebSocket connection. As the developer, you do not have to worry about the underlying transport mechanism. Cettia handles this transparently and provides a channel abstraction to your application. Cettia shares similarities with socket.io from the Node.js ecosystem.
Cettia is not only easy to use, but it also runs on any web framework on the Java Virtual Machine. In this blog post, I focus on Spring Boot, but Cettia also runs on frameworks like Java EE, Vert.x, Grizzly, Netty, and Atmosphere.
The source code of Cettia is hosted on Github and released under the Apache-2.0 license:
In the following examples, you will notice a package called Asity. Asity is another framework from the same author as Cettia. Asity is an abstraction layer between reusable web handlers and a web framework. This is the glue code that allows Cettia to support so many web frameworks. Asity itself does not depend on Cettia, and you can use it for building your own framework. Cettia, on the other hand, is an application of Asity and thus depends on the Asity libraries. Visit the Asity project page to learn more about the project.
Quickstart ¶
Client ¶
The easiest way to get started on the JavaScript client-side is by importing the Cettia library from a CDN:
<script src="https://cdn.jsdelivr.net/npm/cettia-client/cettia-browser.min.js"></script>
Open a channel to your back end
const socket = cettia.open('https://.../cettia');
Listen on events with on()
and send events with send()
socket.on('open', () => {
socket.send('ping', 'message from client');
});
socket.on("pong", data => console.log('message from server: ' + data));
Server ¶
The setup on the server is a bit more involved. First, add the Cettia library and the Asity bridge to your project.
<dependency>
<groupId>io.cettia</groupId>
<artifactId>cettia-server</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>io.cettia.asity</groupId>
<artifactId>asity-bridge-spring-webflux5</artifactId>
<version>3.0.0</version>
</dependency>
The following projects all depend on Spring Boot with the reactive web module.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Next, we need to configure the Asity bridge and Cettia.
This configuration sets up an HTTP endpoint listening on the URL /cettia
and a WebSocket endpoint listening on the same URL. You can name the path anything you want.
See also the official documentation for more information: https://cettia.io/#spring-webflux-5
@SpringBootApplication
@EnableWebFlux
public class Application {
@Bean
public Server defaultServer() {
return new DefaultServer();
}
@Bean
public RouterFunction<ServerResponse> httpMapping(Server defaultServer) {
HttpTransportServer httpTransportServer = new HttpTransportServer()
.ontransport(defaultServer);
AsityHandlerFunction asityHandlerFunction = new AsityHandlerFunction()
.onhttp(httpTransportServer);
RequestPredicate isNotWebSocket = RequestPredicates
.headers(headers -> !"websocket"
.equalsIgnoreCase(headers.asHttpHeaders().getUpgrade()));
return RouterFunctions
.route(RequestPredicates.path("/cettia").and(isNotWebSocket),
asityHandlerFunction);
}
@Bean
public HandlerMapping wsMapping(Server defaultServer) {
WebSocketTransportServer wsTransportServer = new WebSocketTransportServer()
.ontransport(defaultServer);
AsityWebSocketHandler asityWebSocketHandler = new AsityWebSocketHandler()
.onwebsocket(wsTransportServer);
Map<String, WebSocketHandler> map = new LinkedHashMap<>();
map.put("/cettia", asityWebSocketHandler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
return mapping;
}
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
io.cettia.transport.websocket.WebSocketTransportServer
and io.cettia.transport.http.HttpTransportServer
are the two Cettia classes
that handle the communication between the back end and the JavaScript clients. Instances of these two classes are passed to
the Asity bridge that glues Cettia and Spring WebFlux together.
There is no need to configure CORS for the two endpoints; Cettia handles this transparently.
The io.cettia.DefaultServer
is responsible for managing the channels, also called sockets. The DefaultServer
provides
several methods to access these sockets and send messages.
The Java application listens for the ping
event that the JavaScript code above sends.
The server.onsocket()
handler is called whenever a new socket has been created. Inside this handler, we can set up event handlers on the socket with on()
and send messages with send()
.
@Bean
public Server defaultServer() {
Server server = new DefaultServer();
server.onsocket(socket -> {
socket.on("ping", msg -> {
System.out.println(msg);
socket.send("pong", "from server");
});
});
return server;
}
When you start this Spring Boot example and open a browser with http://localhost:8080, you see console output on the server for the ping message the client has sent, and in the browser console, you see a pong message from the message the Spring Boot application has sent.
You find the source code for this example on GitHub: https://github.com/ralscha/blog2019/tree/master/cettia/simple
To start the example from the command line, run this command: mvnw spring-boot:run
Sending messages ¶
In the previous quick start section, you have seen how to send messages with socket.send()
from the client to the back end and vice versa.
From the client, this is the only way to send messages because the client is only connected to one back end.
From the server, you can also call socket.send()
for sending a message to one particular client. But usually, you want to send
messages to multiple clients. The io.cettia.Server
interface and the default implementation io.cettia.DefaultServer
manage all connected sockets and provide a find()
method to query these sockets. An application can then send messages to the selected clients with one method call.
The find()
method expects an implementation of the io.cettia.ServerSocketPredicate
interface as an argument. The find()
method iterates over
all sockets and returns a collection with all the sockets where the test()
method of the ServerSocketPredicate
returns true.
Most of the time, you don't have to write your own ServerSocketPredicate
implementation, and you can reuse one of the predefined predicates from
the io.cettia.ServerSocketPredicates
class. This class provides several factory methods that return a ServerSocketPredicate
implementation.
Sending a message to all registered clients
server.find(ServerSocketPredicates.all()).send("world", "hello");
Send messages to sockets with the tag "room1".
server.find(ServerSocketPredicates.tag("room1")).send("msg", "hi there");
Send messages to sockets that have an attribute "username" and a value "ralph".
server.find(ServerSocketPredicates.attr("username", "ralph")).send("msg", "hi ralph");
Cettia automatically assigns a unique ID to each socket. You can test against this id with id()
.
This example sends the message to only one client.
String socketId = socket.id();
server.find(ServerSocketPredicates.id(socketId)).send("msg", "a message just for me");
To add and remove tags and attributes, the following methods are available in the socket class.
//ATTRIBUTES
//set
socket.set("username", username);
//retrieve
String username = socket.get("username");
//remove
socket.remove("username");
//get all
Map<String,Object> attributes = socket.attributes();
//TAGS
//set
socket.tag("room1", "room2", "room3");
//retrieve
Set<String> tags = socket.tags();
//remove
socket.untag("room1");
Notice that Cettia does not provide a central subscription registry on the server-side, and the server has no knowledge about the events a client has subscribed to with on()
. Therefore, you can't directly broadcast a message from a client to all other connected clients.
You always have to install a handler on the back end that relays messages coming from one client to all other clients. Usually, when you do that, you don't want to send the message back to the originating client. With id()
and the negate()
method, an application can select all sockets except the socket that has sent the message.
server.onsocket(socket -> {
socket.on("update", msg -> {
server.find(ServerSocketPredicates.id(socket).negate()).send("update", msg);
});
});
negate()
, or()
, and and()
are methods from the ServerSocketPredicate
interface and allow you to
combine multiple predicates and negate a specific predicate.
Lifecycle ¶
Cettia sockets follow a well-defined lifecycle and are always in a specific state like open or closed. https://cettia.io/guides/cettia-tutorial/#socket-lifecycle
To determine the current socket state, call socket.state()
in Java and JavaScript.
In JavaScript, the method returns a string.
if (socket.state() === "closed") {
//...
}
and in Java an enum.
if (socket.state() == State.CLOSED) {
//...
}
The Java and JavaScript libraries provide built-in lifecycle events. Whenever a state transition happens, the corresponding lifecycle event is emitted, and your application may listen to them.
JavaScript ¶
socket.on('connecting', arg => .....);
socket.on('open', arg => .....);
socket.on('close', arg => .....);
socket.on('waiting', (delay, attempts) => .....);
socket.on('open', arg => .....);
It is possible to call on()
multiple times with the same event name and install multiple handlers for the same event. All these handlers are going to be called
whenever the event is emitted.
If your application is no longer interested in listening for a specific event, it can call off()
to remove the handler. Make sure that you pass the exact same handler reference
from the on()
call to off()
.
const handler = () => ......;
socket.on('open', handler);
//...
socket.off('open', handler);
The library also provides a once()
method to install event handlers that are called precisely one time. Cettia automatically removes the handler
after it has been called the first time.
Java ¶
In Java, we can use predefined methods like onopen()
and onclose()
to install a lifecycle event listener. These are just convenience
methods; under the hood, they call on()
with the corresponding event name.
socket.onopen(v -> {
System.out.println("open: " + socket.id());
});
/* equivalent
socket.on("open", v -> {
System.out.println("open: " + socket.id());
});
*/
socket.onclose(v -> {
System.out.println("close: " + socket.id());
});
socket.onerror(t -> {
System.out.println("error: " + socket.id() + " : " + t.getMessage());
});
socket.ondelete(v -> {
System.out.println("delete: " + socket.id());
});
socket.oncache(obj -> {
System.out.println("cache: " + socket.id() + " : " + Arrays.toString(obj));
});
As in JavaScript, an application can install multiple handlers for the same event. Cettia runs all the registered handlers when the socket transitions to this specific state.
socket.onopen(v -> {
System.out.println("open: " + socket.id());
});
socket.onopen(v -> {
System.out.println("another open handler: " + socket.id());
});
off()
removes a handler. Make sure that you pass the same handler reference from the on()
call to off()
.
The library does not provide equivalent offxxxx
methods. Instead, you pass the event name as the first argument to the off()
call.
Action<Void> openHandler = v -> {
System.out.println("open: " + socket.id());
};
socket.onopen(openHandler);
socket.off("open", openHandler);
Disconnect Handling ¶
A connection between client and server can break at any time. When a disconnect occurs, Cettia always tries to reconnect within 60 seconds. But by default, all the messages an application sends during that time are lost. You can solve this by listening to the cache
event
and temporarily store these events in any kind of storage system and then re-send them as soon as the connection opens again.
Here is the example from the documentation: https://cettia.io/guides/cettia-tutorial/#disconnection-handling
List<Object[]> cache = new CopyOnWriteArrayList<>();
socket.oncache((Object[] args) -> cache.add(args));
socket.onopen(v -> cache.forEach(args -> {
cache.remove(args);
socket.send((String) args[0], args[1], (Action<?>) args[2], (Action<?>) args[3]);
}));
socket.ondelete(v -> cache.forEach(args -> System.out.println(socket + " missed event - name: " + args[0] + ", data: " + args[1])));
If there is no active connection when send()
is called, the cache
event is fired. The cache
event receives the same arguments that are passed to the send()
call.
The handler collects these events in a Java collection. If Cettia is able to restore the connection, the open
event is emitted, and the handler re-sends all stored events and clears the storage. If Cettia is not able to restore the connection within 60 seconds, the delete
event is fired.
A similar workflow exists for the client-side:
https://cettia.io/projects/cettia-javascript-client/1.0.1/reference/#offline-handling
cache
is fired whenever send()
is called while there is no active connection. The cache handler receives the same arguments
that are passed to the send()
call. There is no delete
event emitted in the JavaScript library.
const socket = cettia.open('http://localhost:8080/cettia');
const cache = [];
socket.on('cache', args => cache.push(args));
socket.on('open', () => {
while (socket.state() === 'opened' && cache.length > 0) {
const args = cache.shift();
socket.send(...args);
}
});
Dashboard Example ¶
The web application in this example displays a couple of ECharts charts. The Java back end periodically pushes new data over the Cettia channel to the web application, and the charts display the received data.
This example is an npm-managed project and utilizes Parcel as the build tool. You install the Cettia JavaScript library,
like any other npm package, with npm install
.
npm install cettia-client
In this application, we are going to use four Cettia events: pie
, gauge
, line
, bar
.
The web application installs a handler for each of these events with socket.on()
.
this.eventHandlers = new Map();
this.eventHandlers.set('pie', this.handlePieResponse.bind(this));
this.eventHandlers.set('gauge', this.handleGaugeResponse.bind(this));
this.eventHandlers.set('bar', this.handleBarResponse.bind(this));
this.eventHandlers.set('line', this.handleLineResponse.bind(this));
start() {
this.startButton.disabled = true;
this.stopButton.disabled = false;
this.socket = cettia.open('http://127.0.0.1:8080/cettia');
this.socket.on("open", this.handleChartCheckboxes.bind(this));
for (const [key, value] of this.eventHandlers.entries()) {
this.socket.on(key, value);
}
}
The Cettia framework on the server is configured according to the description in the quick start section.
The server periodically sends new data from @Scheduled
methods.
Here is the code that pushes new values for the gauge chart.
@Scheduled(initialDelay = 1_800, fixedRate = 4_700)
public void sendGaugeData() {
this.defaultServer.find(ServerSocketPredicates.tag("gauge")).send("gauge",
random.nextInt(100));
}
The code for the other 3 events looks similar.
The user can customize the view by selecting or deselecting a checkbox, and the corresponding chart will be displayed or hidden.
That's also the reason why I stored a reference to the event handlers in the Map
. If you want
to unsubscribe from an event, you have to call socket.off()
and pass the same handler reference that you passed to on()
.
Each time the user selects or deselects a checkbox, the following method is called. First, the method unsubscribes from all events and then subscribes to all events where the corresponding checkbox is selected.
handleChartCheckboxes() {
for (const [key, value] of this.eventHandlers.entries()) {
this.socket.off(key, value);
}
const charts = [];
this.chartCheckboxes.forEach(item => {
if (item.checked) {
charts.push(item.value);
this.socket.on(item.value, this.eventHandlers.get(item.value));
document.getElementById(item.value).style.display = 'inline-block';
}
else {
document.getElementById(item.value).style.display = 'none';
}
});
this.socket.send('charts', charts);
}
Notice that on()
and off()
are not propagated to the back end. The server does not know the events the
client is subscribed to. In this particular application, this would pose a problem when we simply send the messages
in the back to all registered clients.
this.defaultServer.find(ServerSocketPredicates.all()).send("gauge", random.nextInt(100));
The user might disable the gauge chart, but the browser would still receive the gauge
event. The application ignores the event because it is not subscribed to the event, but the data has already been sent over the wire and wasted bandwidth.
To solve this issue, the server does not send the data to all clients. Instead, it only sends the data to clients that have a specific tag assigned to it. For instance, the gauge update method sends data only to sockets with the tag "gauge".
this.defaultServer.find(ServerSocketPredicates.tag("gauge"))
The handleChartCheckboxes
method above calls this.socket.send('charts', charts);
and sends a string array
containing all selected chart types to the server each time the user changes one of the checkboxes.
The Java application listens for this event and assigns the tags to the socket.
this.defaultServer.onsocket(socket -> {
socket.on("charts", msg -> {
socket.untag("pie", "gauge", "line", "bar");
socket.tag(((List<String>) msg).toArray(new String[0]));
});
});
The four data providers select the sockets with the corresponding tag and send the data to them. This way, clients just receive the data they are interested in, and no bandwidth is wasted.
You find the source code for the complete example on GitHub:
https://github.com/ralscha/blog2019/tree/master/cettia/one
You can start the application from the command line.
cd server
mvnw spring-boot:run
// in another terminal
cd client
npm install
npm start
Open a browser with the URL http://localhost:1234
Todo Example ¶
This is an Ionic / Angular application and demonstrates a very trivial Todo application. Each connected user sees the same information, and whenever a user adds, updates, or deletes a todo entry, the change is propagated to all other connected clients.
In all the examples so far, we have only sent text-based messages. But Cettia is perfectly capable of handling binary messages too. To demonstrate that, this example exchanges binary Protocol Buffer messages between front and back end. You find the proto definitions here: ChangeEvent.proto
Client ¶
The main setup for Cettia happens in the constructor of the todo service class.
The code installs a listener for the initial
event. The server sends the stored todo entries with
this event to the client as soon as the channel has been established.
Next, the code installs a handler for the update
event. This is where the application receives all changes coming from the other clients.
this.socket = cettia.open(environment.SERVER_URL);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.socket.on('initial', (msg: any) => {
const initialTodos = Todos.decode(msg);
for (const todo of initialTodos.todos) {
if (todo.id) {
this.todos.set(todo.id, todo);
}
}
this.todosSubject.next([...this.todos.values()]);
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.socket.on('update', (msg: any) => {
const changeEvent = ChangeEvent.decode(msg);
const todo = changeEvent.todo;
if (todo?.id) {
if (changeEvent.change === ChangeEvent.ChangeType.DELETE) {
this.todos.delete(todo.id);
} else if (changeEvent.change === ChangeEvent.ChangeType.INSERT) {
this.todos.set(todo.id, todo);
} else if (changeEvent.change === ChangeEvent.ChangeType.UPDATE) {
this.todos.set(todo.id, todo);
}
this.todosSubject.next([...this.todos.values()]);
}
});
Lastly, the application installs a cache
and open
event handler to cover the disconnect case when our application sends messages to the back end while there is no active connection (see the section above about disconnect handling).
this.socket.on('open', () => {
while (this.socket.state() === 'opened' && this.cache.length > 0) {
const args = this.cache.shift();
this.socket.send(...args);
}
});
Each time the user inserts, updates, or deletes an entry, the application creates a ChangeEvent
object
and sends it as an update
event to our back end with socket.send()
.
deleteTodo(todo: ITodo): void {
if (todo.id) {
const deleted = this.todos.delete(todo.id);
if (deleted) {
this.todosSubject.next([...this.todos.values()]);
const changeEvent = ChangeEvent.create({change: ChangeEvent.ChangeType.DELETE, todo});
const buffer = ChangeEvent.encode(changeEvent).finish();
this.socket.send('update', buffer);
}
}
}
save(todo: ITodo): void {
let changeType;
if (todo.id) {
if (this.todos.has(todo.id)) {
changeType = ChangeEvent.ChangeType.UPDATE;
} else {
changeType = ChangeEvent.ChangeType.INSERT;
}
const changeEvent = ChangeEvent.create({change: changeType, todo});
const buffer = ChangeEvent.encode(changeEvent).finish();
this.socket.send('update', buffer);
this.todos.set(todo.id, todo);
this.todosSubject.next([...this.todos.values()]);
}
}
Server ¶
The server is configured according to the description in the quick start section.
When the application instantiates DefaultServer
, it also installs an onsocket
handler
that is called each time a new channel is created.
Inside this handler, an update
event handler is installed on the socket, that relays incoming
messages from the web application to all the other clients. The client that has sent the message is excluded because he has already applied the change to his local copy of todo entries.
@Bean
public Server defaultServer() {
Server server = new DefaultServer();
server.onsocket(socket -> {
socket.on("update", msg -> {
server.find(ServerSocketPredicates.id(socket).negate()).send("update", msg);
});
});
return server;
}
The last piece of the server consists of the TodoService. In this Spring-managed bean, the application stores the todo entries and sends them with the "initial" event to newly connected clients.
To maintain the list, it also has to subscribe to the "update" event and has to apply the changes
from the clients to the internal todos
storage.
Lastly, it installs handlers for oncache
, onopen
, and ondelete
to handle the disconnect case.
For this example, we need to make sure that all the changes are delivered to all clients. If not,
the todo entries on the clients would diverge.
@Service
public class TodoService {
private final Server defaultServer;
private final Map<String, Todo> todos = new HashMap<>();
public TodoService(Server defaultServer) {
this.defaultServer = defaultServer;
this.defaultServer.onsocket(socket -> {
final List<Object[]> cache = new CopyOnWriteArrayList<>();
socket.onopen(v -> {
socket.send("initial",
Todos.newBuilder().addAllTodos(this.todos.values()).build().toByteArray());
});
socket.<byte[]>on("update", msg -> {
try {
ChangeEvent ce = ChangeEvent.parseFrom(msg);
Todo todo = ce.getTodo();
switch (ce.getChange()) {
case DELETE:
this.todos.remove(todo.getId());
break;
case INSERT:
case UPDATE:
this.todos.put(todo.getId(), todo);
break;
default:
break;
}
}
catch (InvalidProtocolBufferException e) {
LoggerFactory.getLogger(TodoService.class).error("send update", e);
}
});
// Disconnect Handling
// https://cettia.io/guides/cettia-tutorial/#disconnection-handling
socket.oncache((Object[] args) -> cache.add(args));
socket.onopen(v -> cache.forEach(args -> {
cache.remove(args);
socket.send((String) args[0], args[1], (Action<?>) args[2], (Action<?>) args[3]);
}));
socket.ondelete(v -> cache.forEach(args -> System.out
.println(socket + " missed event - name: " + args[0] + ", data: " + args[1])));
});
}
}
You find the complete source code for this example on GitHub:
https://github.com/ralscha/blog2019/tree/master/cettia/two
You can build and run the application from the command line.
Server:
cd server
mvnw spring-boot:run
Client:
cd client
npm install
ionic serve
Open two browser windows and add/update/delete an entry. As soon as you click Save, the change is propagated to all other clients, and their view updates automatically.
This concludes this blog post about Cettia. Cettia is a powerful and easy-to-use framework for real-time messaging between a web framework running on the Java Virtual Machine and a JavaScript web application.
This blog post has not covered every aspect of Cettia. I mainly left out the horizontal scaling support that Cettia provides: https://cettia.io/guides/cettia-tutorial/#scaling-a-cettia-application
I recommend reading the documentation. It's a short read and tells you everything about Cettia you need to know.
- https://cettia.io/
- https://cettia.io/guides/getting-started/
- https://cettia.io/guides/cettia-tutorial/
- https://cettia.io/projects/cettia-javascript-client/1.0.1/reference/
In this GitHub repository, you find more Cettia and Spring Boot examples that I wrote a couple of months ago:
https://github.com/ralscha/cettia-demo