Home | Send Feedback

Real-time messaging with Cettia and Spring Boot

Published: January 28, 2019  •  java, javascript, spring

Cettia is a web framework, which consists of a JavaScript library that runs in the browser and on Node.js and a Java library for the back end. With Cettia you can build reliable full duplex message channels between a JavaScript client application and a back end running on the Java Virtual Machine. Over this channel your application exchanges messages in real-time.

Under the hood Cettia utilizes WebSocket, EventSource and long polling HTTP connections as transport mechanism. Cettia prefers, by default, WebSocket connections but falls back to other transport protocols if it is not possible to establish a WebSocket connection. You, as the developer, does not have to care about the underlying transport mechanism. Cettia handles this transparently and provides a channel abstraction to your application. Cettia shares similarities with socket.io from the Node.js ecosystem.

Cettia is not only easy to use, it also runs on any web framework on the Java Virtual Machine. In this blog post I focus on Spring Boot, but Cettia also runs on frameworks like Java EE, Vert.x, Grizzly, Netty and Atmosphere.

The source code of Cettia is hosted on Github and released under the Apache-2.0 license:

In the following examples you will notice a package called Asity. Asity is another framework from the same author as Cettia. Asity is an abstraction layer between reusable web handlers and a web framework. This is the glue code that allows Cettia to support so many web frameworks. Asity itself does not depend on Cettia and you can use it for building your own framework. Cettia on the hand is an application of Asity and thus depends on the Asity libraries. Visit the Asity project page to learn more about the project.

Quick start

Client

The easiest way to getting started on the JavaScript client side is by importing the Cettia library from a CDN:

    <script src="https://cdn.jsdelivr.net/npm/cettia-client/cettia-browser.min.js"></script>

index.html

Open a channel to your back end

const socket = cettia.open('https://.../cettia');

Listen on events with on() and send events with send()

    socket.on('open', () => {
       socket.send('ping', 'message from client');
    });
    
    socket.on("pong", data => console.log('message from server: ' + data));

index.html


Server

The set up on the server is a bit more involved. First add the Cettia library and the Asity bridge to your project.

    <dependency>
      <groupId>io.cettia</groupId>
      <artifactId>cettia-server</artifactId>
      <version>1.2.0</version>
    </dependency>
    <dependency>
      <groupId>io.cettia.asity</groupId>
      <artifactId>asity-bridge-spring-webflux5</artifactId>
      <version>3.0.0</version>
    </dependency>  

pom.xml

The following projects depend all on Spring Boot with the reactive web module.

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

pom.xml

Next, we need to configure the Asity bridge and Cettia. What this configuration does is setting up a HTTP endpoint listening on the URL /cettia and a WebSocket endpoint listening on the same URL. You can name the path any way you want.

See also the official documentation for more information: https://cettia.io/#spring-webflux-5

@SpringBootApplication
@EnableWebFlux
public class Application {

    @Bean
    public Server defaultServer() {
        return new DefaultServer();
    }

    @Bean
    public RouterFunction<ServerResponse> httpMapping(Server defaultServer) {
        HttpTransportServer httpTransportServer = new HttpTransportServer()
                .ontransport(defaultServer);
        AsityHandlerFunction asityHandlerFunction = new AsityHandlerFunction()
                .onhttp(httpTransportServer);

        RequestPredicate isNotWebSocket = RequestPredicates
                .headers(headers -> !"websocket"
                        .equalsIgnoreCase(headers.asHttpHeaders().getUpgrade()));

        return RouterFunctions
                .route(RequestPredicates.path("/cettia").and(isNotWebSocket),
                        asityHandlerFunction);
    }

    @Bean
    public HandlerMapping wsMapping(Server defaultServer) {
        WebSocketTransportServer wsTransportServer = new WebSocketTransportServer()
                .ontransport(defaultServer);
        AsityWebSocketHandler asityWebSocketHandler = new AsityWebSocketHandler()
                .onwebsocket(wsTransportServer);
        Map<String, WebSocketHandler> map = new LinkedHashMap<>();
        map.put("/cettia", asityWebSocketHandler);

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);

        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

io.cettia.transport.websocket.WebSocketTransportServer and io.cettia.transport.http.HttpTransportServer are the two Cettia classes that handle the communication between the back end and the JavaScript clients. Instances of these two classes are passed to the Asity bridge that glues Cettia and Spring WebFlux together.

There is no need to configure CORS for the two endpoints, Cettia handles this transparently.

The io.cettia.DefaultServer is responsible for managing the channels, also called sockets. The DefaultServer provides several methods to access these sockets and send messages.

The Java application listens for the ping event, that the JavaScript code above sends. The server.onsocket() handler is called whenever a new socket has been created. Inside this handler we can set up event handlers on the socket with on() and send messages with send().

	@Bean
	public Server defaultServer() {
		Server server = new DefaultServer();

		server.onsocket(socket -> {			
			socket.on("ping", msg -> {
				System.out.println(msg);
				socket.send("pong", "from server");
			});
		});

		return server;
	}

When you start this Spring Boot example and open a browser with http://localhost:8080 you see a console output on the server for the ping message the client has sent and in the browser console you see a pong message from the message the Spring Boot application has sent.

You find the source code for this example on GitHub:
https://github.com/ralscha/blog2019/tree/master/cettia/simple

To start the example from the command line run this command: mvnw spring-boot:run

Sending messages

In the previous quick start section you have seen how to send messages with socket.send() from the client to the back end and vice versa. From the client this is the only way to send messages, because the client is only connected to one back end.

From the server you can also call socket.send() for sending a message to one particular client. But usually you want to send messages to multiple clients. The io.cettia.Server interface and the default implementation io.cettia.DefaultServer manage all connected sockets and provide a find() method to query these sockets. An application can then send messages to the selected clients with one method call.

The find() method expects an implementation of the io.cettia.ServerSocketPredicate interface as argument. The find() method iterates over all sockets and returns a collection with all the sockets where the test() method of the ServerSocketPredicate returns true.

Most of the time you don't have to write your own ServerSocketPredicate implementation and you can reuse one of the predefined predicates from the io.cettia.ServerSocketPredicates class. This class provides several factory methods that return a ServerSocketPredicate implementation.

Sending a message to all registered clients

server.find(ServerSocketPredicates.all()).send("world", "hello");

Send messages to sockets with the tag "room1".

server.find(ServerSocketPredicates.tag("room1")).send("msg", "hi there");

Send messages to sockets that have an attribute "username" and a value "ralph".

server.find(ServerSocketPredicates.attr("username", "ralph")).send("msg", "hi ralph");

Cettia automatically assigns a unique ID to each socket. You can test against this id with id(). This example sends the message to only one client.

String socketId = socket.id();
server.find(ServerSocketPredicates.id(socketId)).send("msg", "a message just for me");

To add and remove tags and attributes the following methods are available in the socket class.

//ATTRIBUTES
//set
socket.set("username", username);

//retrieve
String username = socket.get("username");

//remove
socket.remove("username");

//get all
Map<String,Object> attributes = socket.attributes();


//TAGS
//set
socket.tag("room1", "room2", "room3");

//retrieve
Set<String> tags = socket.tags();

//remove
socket.untag("room1");

Notice that Cettia does not provide a central subscription registry on the server side and the server has no knowledge about the events a client has subscribed to with on(). Therefore, you can't directly broadcast a message from a client to all other connected clients. You always have to install a handler on the back end that relays messages coming from one client to all other clients. Usually when you do that you don't want to send the message back to the originating client. With id() and the negate() method an application can select all sockets except the socket that has sent the message.

server.onsocket(socket -> {
	socket.on("update", msg -> {
		server.find(ServerSocketPredicates.id(socket).negate()).send("update", msg);
	});
});

negate(), or() and and() are methods from the ServerSocketPredicate interface and allows you to combine multiple predicates and negate a certain predicate.

Lifecycle

Cettia sockets follow a well-defined lifecycle and are always in a specific state like open or closed.
https://cettia.io/guides/cettia-tutorial/#socket-lifecycle

To determine the current socket state, call socket.state() in Java and JavaScript. In JavaScript the method returns a string

  if (socket.state() === "closed") {
    //...
  }

and in Java an enum.

  if (socket.state() == State.CLOSED) {
    //...
  }

The Java and JavaScript libraries provide built-in lifecycle events. Whenever a state transition happens the corresponding lifecycle event is emitted and your application may listen to them.

JavaScript

	socket.on('connecting', arg => .....);
	socket.on('open', arg => .....);
	socket.on('close', arg => .....);
	socket.on('waiting', (delay, attempts) => .....);

	socket.on('open', arg => .....);

It is possible to call on() multiple times with the same event name and install multiple handlers for the same event. All these handlers are going to be called whenever the event is emitted.

If your application is no longer interested in listening for a specific event, it can call off() to remove the handler. Make sure that you pass the exact same handler reference from the on() call to off().

const handler = () => ......;
socket.on('open', handler);
//...
socket.off('open', handler);

The library also provides a once() method to install event handlers that are called exactly one time. Cettia automatically removes the handler after it has been called the first time.


Java

In Java, we can use predefined methods like onopen() and onclose() to install a lifecycle event listener. These are just convenience methods, under the hood they call on() with the corresponding event name.

    socket.onopen(v -> {
        System.out.println("open: " + socket.id());
    });
    /* equivalent
    socket.on("open", v -> {
        System.out.println("open: " + socket.id());
    });
    */
    
    socket.onclose(v -> {
        System.out.println("close: " + socket.id());
    });
    
    socket.onerror(t -> {
        System.out.println("error: " + socket.id() + " : " + t.getMessage());
    });
    
    socket.ondelete(v -> {
        System.out.println("delete: " + socket.id());
    });
    
    socket.oncache(obj -> {
        System.out.println("cache: " + socket.id() + " : " + Arrays.toString(obj));
    });

As in JavaScript, an application can install multiple handlers for the same event. Cettia runs all the registered handlers when the socket transitions to this specific state.

    socket.onopen(v -> {
        System.out.println("open: " + socket.id());
    });
    socket.onopen(v -> {
        System.out.println("another open handler: " + socket.id());
    });    

off() removes a handler. Make sure that you pass the same handler reference from the on() call to off(). The library does not provide equivalent offxxxx methods, instead you pass the event name as first argument to the off() call.

  Action<Void> openHandler = v -> {
    System.out.println("open: " + socket.id());
  };
  socket.onopen(openHandler);
  socket.off("open", openHandler);

Disconnect Handling

A connection between client and server can break at any time. When a disconnect occurs, Cettia always tries to reconnect within 60 seconds. But by default, all the messages an application sends during that time are lost. You can solve this by listening to the cache event and temporarely store these events in any kind of storage system and then re-send them the as soon as the connection opens again.

Here the example from the documentation: https://cettia.io/guides/cettia-tutorial/#disconnection-handling

List<Object[]> cache = new CopyOnWriteArrayList<>();
socket.oncache((Object[] args) -> cache.add(args));
socket.onopen(v -> cache.forEach(args -> {
  cache.remove(args);
  socket.send((String) args[0], args[1], (Action<?>) args[2], (Action<?>) args[3]);
}));
socket.ondelete(v -> cache.forEach(args -> System.out.println(socket + " missed event - name: " + args[0] + ", data: " + args[1])));

If there is no active connection when send() is called, the cache event is fired. The cache event receives the same arguments that are passed to the send() call. The handler collects these events in a Java collection. If Cettia is able to restore the connection the open event is emitted and the handler re-sends all stored events and clears the storage. If Cettia is not able to restore the connection within 60 seconds the delete event is fired.


A similar workflow exists for the client side:
https://cettia.io/projects/cettia-javascript-client/1.0.1/reference/#offline-handling

cache is fired whenever send() is called whilst there is no active connection. The cache handler receives the same arguments that are passed to the send() call. There is no delete event emitted in the JavaScript library.

const socket = cettia.open('http://localhost:8080/cettia');
const cache = [];

socket.on('cache', args => cache.push(args));

socket.on('open', () => {
    while (socket.state() === 'opened' && cache.length > 0) {
        const args = cache.shift();
        socket.send(...args);
    }
});

Dashboard Example

The web application in this example displays a couple of EChart charts. The Java back end periodically pushes new data over the Cettia channel to the web application and the charts display the received data.

screenshot dashboard

This example is a npm managed project and utilizes Parcel as build tool. You install the Cettia JavaScript library, like any other npm package, with npm install.

npm install cettia-client

In this application we are going to use four Cettia events: pie, gauge, line, bar. The web application installs a handler for each of these events with socket.on().

    this.eventHandlers = new Map();
    this.eventHandlers.set('pie', this.handlePieResponse.bind(this));
    this.eventHandlers.set('gauge', this.handleGaugeResponse.bind(this));
    this.eventHandlers.set('bar', this.handleBarResponse.bind(this));
    this.eventHandlers.set('line', this.handleLineResponse.bind(this));

app.js

    this.socket = cettia.open('http://127.0.0.1:8080/cettia');

    this.socket.on("open", this.handleChartCheckboxes.bind(this));

    for (const [key, value] of this.eventHandlers.entries()) {
      this.socket.on(key, value);
    }

app.js

The Cettia framework on the server is configured according to the description in the quick start section.

The server periodically sends new data from @Scheduled methods. Here the code that pushes new values for the gauge chart.

  @Scheduled(initialDelay = 1_800, fixedRate = 4_700)
  public void sendGaugeData() {
    this.defaultServer.find(ServerSocketPredicates.tag("gauge")).send("gauge",
        random.nextInt(100));
  }

DataEmitterService.java

The code for the other 3 events looks similar.


The user can customize the view by selecting or deselecting a checkbox and the corresponding chart will be displayed or hidden. That's also the reason why I stored a reference to the event handlers in the Map. If you want to unsubscribe from an event you have to call socket.off() and pass the same handler reference that you passed to on().

Each time the user selects or deselects a checkbox the following method is called. First the method unsubscribes from all events and then subscribes to all events where the corresponding checkbox is selected.

  handleChartCheckboxes() {
    for (const [key, value] of this.eventHandlers.entries()) {
      this.socket.off(key, value);
    }

    const charts = [];
    this.chartCheckboxes.forEach(item => {
      if (item.checked) {
        charts.push(item.value);
        this.socket.on(item.value, this.eventHandlers.get(item.value));
        document.getElementById(item.value).style.display = 'inline-block';
      }
      else {
        document.getElementById(item.value).style.display = 'none';
      }
    });

    this.socket.send('charts', charts);
  }

app.js

Notice that on() and off() are not propagated to the back end. The server has no knowledge about the events the client is subscribed to. In this particular application this would pose a problem when we simply send the messages in the back to all registered clients.

  this.defaultServer.find(ServerSocketPredicates.all()).send("gauge", random.nextInt(100));

The user might disable the gauge chart but the browser would still receive the gauge event. The application ignores the event because it is not subscribed to the event but the data has already been sent over the wire and wasted bandwidth.

To solve this issue the server does not send the data to all clients, instead it only sends the data to clients that have a specific tag assigned to it. For instance the gauge update method sends data only to sockets with the tag "gauge".

this.defaultServer.find(ServerSocketPredicates.tag("gauge"))

The handleChartCheckboxes method above calls this.socket.send('charts', charts); and sends a string array containing all selected chart types to the server each time the user changes one of the checkboxes.

The Java application listen for this event and assigns the tags to the socket.

    this.defaultServer.onsocket(socket -> {
      socket.on("charts", msg -> {
        socket.untag("pie", "gauge", "line", "bar");
        socket.tag(((List<String>) msg).toArray(new String[0]));
      });
    });

DataEmitterService.java

The four data providers select the sockets with the corresponding tag and send the data to them. This way clients just receive the data they are interested in and no bandwidth is wasted.

You find the source code for the complete example on GitHub:
https://github.com/ralscha/blog2019/tree/master/cettia/one

You can start the application from the command line

cd server
mvnw spring-boot:run

// in another terminal
cd client
npm install
npm start

Open a browser with the URL http://localhost:1234

Todo Example

This is an Ionic 4 / Angular application and demonstrates a very trivial Todo application. Each connected user sees the exact same information and whenever a user adds, updates or deletes a todo entry, the change is propagated to all other connected clients.

screenshot todo

In all the examples so far we have only sent text based messages. But Cettia is perfectly capable of handling binary messages too. To demonstrate that, this example exchanges binary Protocol Buffer messages between front and back end. You find the proto definitions here: ChangeEvent.proto


Client

The main set up for Cettia happens in the constructor of the todo service class. The code installs a listener for the initial event. The server sends the stored todo entries with this event to the client as soon as the channel has been established.

Next, the code installs a handler for the update event. This is where the application receives all changes coming from the other clients.

  constructor() {
    this.socket = cettia.open(environment.SERVER_URL);

    this.socket.on('initial', msg => {
      const initialTodos = Todos.decode(msg);
      for (const todo of initialTodos.todos) {
        this.todos.set(todo.id, todo);
      }
      this.todosSubject.next([...this.todos.values()]);
    });

    this.socket.on('update', msg => {
      const changeEvent = ChangeEvent.decode(msg);
      const todo = changeEvent.todo;

      if (changeEvent.change === ChangeEvent.ChangeType.DELETE) {
        this.todos.delete(todo.id);
      } else if (changeEvent.change === ChangeEvent.ChangeType.INSERT) {
        this.todos.set(todo.id, todo);
      } else if (changeEvent.change === ChangeEvent.ChangeType.UPDATE) {
        this.todos.set(todo.id, todo);
      }

      this.todosSubject.next([...this.todos.values()]);
    });

todo.service.ts

Lastly the applications installs a cache and open event handler to cover the disconnect case when our application sends messages to the back end whilst there is no active connection (see the section above about disconnect handling).

    this.socket.on('cache', args => this.cache.push(args));

    this.socket.on('open', () => {
      while (this.socket.state() === 'opened' && this.cache.length > 0) {
        const args = this.cache.shift();
        this.socket.send(...args);
      }
    });

todo.service.ts

Each time the user inserts, updates or deletes an entry, the application creates a ChangeEvent object and sends it as update event to our back end with socket.send().

  deleteTodo(todo: ITodo) {
    const deleted = this.todos.delete(todo.id);
    if (deleted) {
      this.todosSubject.next([...this.todos.values()]);

      const changeEvent = ChangeEvent.create({change: ChangeEvent.ChangeType.DELETE, todo: todo});
      const buffer = ChangeEvent.encode(changeEvent).finish();
      this.socket.send('update', buffer);
    }
  }

  save(todo: ITodo) {
    let changeType;
    if (this.todos.has(todo.id)) {
      changeType = ChangeEvent.ChangeType.UPDATE;
    } else {
      changeType = ChangeEvent.ChangeType.INSERT;
    }

    const changeEvent = ChangeEvent.create({change: changeType, todo: todo});
    const buffer = ChangeEvent.encode(changeEvent).finish();
    this.socket.send('update', buffer);

    this.todos.set(todo.id, todo);
    this.todosSubject.next([...this.todos.values()]);
  }

todo.service.ts


Server

The server is configured according to the description in the quick start section. When the application instantiates DefaultServer it also installs a onsocket handler that is called each time a new channel is created.

Inside this handler an update event handler is installed on the socket, that relays incoming messages from the web application to all the other clients. The client that has sent the message is excluded because he has already applied the change to his local copy of todo entries.

  @Bean
  public Server defaultServer() {
    Server server = new DefaultServer();

    server.onsocket(socket -> {
      socket.on("update", msg -> {
        server.find(ServerSocketPredicates.id(socket).negate()).send("update", msg);
      });
    });

    return server;
  }

Application.java

The last piece of the server consists of the TodoService. In this Spring managed bean the application stores the todo entries and sends them with the "initial" event to newly connected clients.

To maintain the list it also has to subscribe to the "update" event and has to apply the changes from the clients to the internal todos storage.

Lastly it installs handlers for oncache, onopen and ondelete to handle the disconnect case. For this example we need to make sure that all the changes are delivered to all clients. If not, the todo entries on the clients would diverge.

package ch.rasc.cettiatwo;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.google.protobuf.InvalidProtocolBufferException;

import ch.rasc.cettiatwo.ChangeEventOuterClass.ChangeEvent;
import ch.rasc.cettiatwo.ChangeEventOuterClass.Todo;
import ch.rasc.cettiatwo.ChangeEventOuterClass.Todos;
import io.cettia.Server;
import io.cettia.asity.action.Action;

@Service
public class TodoService {

  private final Server defaultServer;

  private final Map<String, Todo> todos = new HashMap<>();

  public TodoService(Server defaultServer) {
    this.defaultServer = defaultServer;

    this.defaultServer.onsocket(socket -> {
      final List<Object[]> cache = new CopyOnWriteArrayList<>();
      socket.onopen(v -> {
        socket.send("initial",
            Todos.newBuilder().addAllTodos(this.todos.values()).build().toByteArray());
      });

      socket.<byte[]>on("update", msg -> {
        try {
          ChangeEvent ce = ChangeEvent.parseFrom(msg);
          Todo todo = ce.getTodo();

          switch (ce.getChange()) {
          case DELETE:
            this.todos.remove(todo.getId());
            break;
          case INSERT:
          case UPDATE:
            this.todos.put(todo.getId(), todo);
            break;
          default:
            break;
          }
        }
        catch (InvalidProtocolBufferException e) {
          LoggerFactory.getLogger(TodoService.class).error("send update", e);
        }
      });

      // Disconnect Handling
      // https://cettia.io/guides/cettia-tutorial/#disconnection-handling

      socket.oncache((Object[] args) -> cache.add(args));

      socket.onopen(v -> cache.forEach(args -> {
        cache.remove(args);
        socket.send((String) args[0], args[1], (Action<?>) args[2], (Action<?>) args[3]);
      }));

      socket.ondelete(v -> cache.forEach(args -> System.out
          .println(socket + " missed event - name: " + args[0] + ", data: " + args[1])));

    });
  }

}

TodoService.java


You find the complete source code for this example on GitHub:
https://github.com/ralscha/blog2019/tree/master/cettia/two

You can build and run the application from the command line

Server:

cd server
mvnw spring-boot:run

Client:

cd client
npm install
ionic serve

Open two browser windows and add/update/delete an entry. As soon as you click Save, the change is propagated to all other clients and their view updates automatically.


This concludes this blog post about Cettia. Cettia is a powerful and easy to use framework for real time messaging between a web framework running on the Java Virtual Machine and a JavaScript web application.

This blog post has not covered every aspect of Cettia. I especially left out the horizontal scaling support that Cettia provides: https://cettia.io/guides/cettia-tutorial/#scaling-a-cettia-application

I recommend reading the documentation. It's a short read and tells you everything about Cettia you need to know.


In this GitHub repository you find more Cettia and Spring Boot examples that I wrote a couple months ago:
https://github.com/ralscha/cettia-demo