Home | Send Feedback

RxJS webSocket, w3c-compatible WebSocket wrapper

Published: 10. April 2020  •  Updated: 20. May 2021  •  javascript

In this blog post, we are going to take a closer look at the webSocket function from the RxJS library. webSocket is a wrapper around the native WebSocket object and returns a WebSocketSubject. The subject can be used to receive and send messages over WebSocket.

WebSocketSubjectConfig

The webSocket function accepts as an argument either a string with the URL of a WebSocket endpoint or a WebSocketSubjectConfig object.

import {webSocket, WebSocketSubject} from 'rxjs/webSocket';

const webSocketSubject = webSocket('ws://localhost:8080/sensor');
// OR
const webSocketSubject = webSocket({ url: 'ws://localhost:8080/sensor' });

With the config object, you can provide additional configurations, like serializer and deserializer. Your application can also install lifecycle hooks that are called when the WebSocket connection opens or closes.

Here's an example that listens for the open and close events. The lifecycle hooks expect a NextObserver implementation. This interface is similar to an Observer, but only the next() method has to be implemented.

const closeSubject = new Subject<CloseEvent>();
closeSubject.subscribe(_ => console.log('Underlying WebSocket connection closed'));

const webSocketSubject = webSocket({
        url: 'ws://localhost:8080/sensor',
        closeObserver: closeSubject,
        openObserver: {
          next: () => console.log('Underlying WebSocket connection open')
        }
      });

Subject implements the Observer interface. Alternatively, you can assign any object with a next() method. This satisfies the NextObserver interface.

Other properties of the WebSocketSubjectConfig object I'm going to discuss in this blog post are serializer, deserializer, and binaryType. You'll see them in action in the last section of this blog post, where I demonstrate a program that exchanges binary messages.

Check out the documentation about WebSocketSubjectConfig to learn about all the provided configuration options.

WebSocketSubject

The object that webSocket returns is of type WebSocketSubject. It is like any Subject, an Observer and an Observable. Subject provides a subscribe() method where one or multiple observers can subscribe, and the subject provides next(), error() and complete() methods to send values.

When somebody calls subscribe(), it attempts to open the WebSocket connection, unless the connection is already open. Even when you call subscribe() multiple times on the same WebSocketSubject instance, the subject only opens one native WebSocket connection. The subject closes the underlying connection if the last subscriber unsubscribes, and it re-establishes the connection if after some time somebody calls subscribe() again.

const webSocketSubject = webSocket('ws://localhost:8080/sensor');

// first subscriber, open WebSocket connection
const subscription1 = webSocketSubject.subscribe(data => {

});

// second subscriber, re-use existing WebSocket connection
const subscription2 = webSocketSubject.subscribe(data => {

});

// unsubscribe subscriber, underlying connection stays open
subscription2.unsubscribe();

// unsubscribe last subscriber, close underlying connection
subscription1.unsubscribe();

When messages from the server arrive, the WebSocketSubject will deserialize them by default with JSON.parse() and emit them into the stream, and all subscribers will receive the message. When the underlying connection closes, the stream completes, and when at some point an error occurs, the stream errors out.

A WebSocket is a bidirectional message channel, so you can not only receive messages. You can also send messages. The WebSocketSubject uses the Observer methods for this purpose. next() sends a value to the server. Note that the WebSocketSubject behaves a bit differently than a regular Subject implementation. Calling next() usually multicasts the value to all subscribers, but not in this case. The value will only be sent to the server, and the subscribers don't get notified unless the server sends something back.

const webSocketSubject = webSocket('ws://localhost:8080/sensor');
const subscription = webSocketSubject.subscribe(data => {
      // subscriber does not receive message 'ping'
});
// sending 'ping' to the server
webSocketSubject.next('ping');

If at the time of calling next() there is no connection, WebSocketSubject internally buffers the messages and sends them as soon a connection is established.

complete() closes the underlying connection. error() sends an error message to the server and also closes the connection. You can't pass arbitrary objects to error(), the object has to follow the WebSocket error format, and it expects an object with a code and an optional reason property.

const webSocketSubject = webSocket('ws://localhost:8080/sensor');
webSocketSubject.subscribe(
   v => console.log('got value ' + v),
    error => console.error('something wrong occurred: ' + error),
    () => console.log('complete')
);

webSocketSubject.error({code: 3001, reason: 'App error'});

Because complete() and error() close the underlying connection, they also close the stream, and all subscribers get notified. Note that calling error() does not error the stream if the native WebSocket connection closes without an error. But when the process of connection closing throws an error, the stream errors, it does not matter if the closing was initiated by complete() or error().

The example above does not call the error callback. It just calls the complete callback and prints complete.

Heartbeat

The WebSocket API in the browser does not have a built-in reconnection manager, unlike EventSource (server-sent events). That means if you need a permanent connection between your client and server, you have to write code that handles connection breaks and implement a reconnection strategy. There are different ways to implement that. You could, for example, install a closeObserver that establishes a new connection after the old connection has been closed. Another option is to add retryWhen into your stream.

const webSocketSubject = webSocket('ws://localhost:8080/sensor');
webSocketSubject.pipe(retryWhen((errors) => errors.pipe(delay(10_000))))
   .subscribe(value => console.log(value));

This example resubscribes to the stream 10 seconds after an error occurred.

In the demo program I wrote for this blog post, I went another route and implemented a heartbeat. The client periodically sends the string "ping" and the server answers with "pong". The following example sends a ping message every 30 seconds. Then it leverages race() to check for timeouts. race() returns the first Observable to emit an item. This is either the Observable with the response from the server or the 3-second delay Observable.


  startHeartbeat(): void {
    this.stopHeartbeat();
    this.networkError = false;

    const heartbeat$ = timer(1_000, 30_000)
      .pipe(
        tap(() => this.connect().next('ping')),
        concatMap(() => {
          return race(
            of('timeout').pipe(delay(3_000)),
            this.connect().pipe(filter(m => m === 'pong'), catchError(() => of('error')))
          );
        })
      );

    this.heartbeatSubscription = heartbeat$.subscribe(msg => {
      if (msg === 'pong') {
        this.networkError = false;
      } else {
        this.networkError = true;
        this.webSocketSubject?.complete();
        this.webSocketSubject = null;
      }
    });

app.component.ts

The heartbeat subscriber checks if the response was either "pong", then everything is okay and no further action is required. In case of an error or timeout, the heartbeat service closes the WebSocket stream.

Multiplexing

The demo application I wrote for this blog post displays random values in a gauge. The user can switch the display between temperature and humidity data. The server periodically sends random messages as JSON to the connected clients.

demo

When the user selects the temperature display, the server sends temperature JSON, and when humidity is selected, the server sends humidity data.

// temperature data object
{"ts":1586530959,"name":"sensor1","temperature":31.62}

// humidity data object
{"ts":1586530978,"name":"sensor2","humidity":70}

To control what object the server has to send, the client sends the string message "subscribe-temp" or "subscribe-hum" to the server. The client can unsubscribe with the string message "unsubscribe-temp" or "unsubscribe-hum", and the server stops sending messages of this type to the client.

We can use the next() method to send these command messages, but the WebSocketSubject has an additional method, not found in other Subject implementations, that makes this process more convenient: multiplex()

This method accepts three parameters and returns an Observable. The first argument is a function returning the subscription message. That means this is the message that the subject sends to the server when somebody subscribes to this observable. The second argument is a function returning the unsubscription message, and the subject sends that message to the server when somebody unsubscribes from the stream.

The last argument is a function that returns a boolean. The purpose of this function is to filter out messages sent by the server. In the following example, we check the presence of the temperature or humidity property. Messages can only pass when this function returns true. This is the same as when you would add the filter() operator to the stream. There is not really a purpose for a filter function in this specific example because the server either sends temperature or humidity messages, never both types at the same time. But we can use it here for filtering out the "pong" messages we get from the heartbeat, which are sent over the same channel.

  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private getDataObservable(): Observable<any> {
    if (!this.webSocketSubject) {
      return throwError(() => new Error('websocket subject not set'));
    }
    if (this.type === 'temp') {
      return this.webSocketSubject.multiplex(() => 'subscribe-temp', () => 'unsubscribe-temp', message => message.temperature);
    } else {
      return this.webSocketSubject.multiplex(() => 'subscribe-hum', () => 'unsubscribe-hum', message => message.humidity);
    }

app.component.ts

Each time the user changes the display type, the application unsubscribes from one stream and subscribes to the other. multiplex() automatically makes sure that the correct unsubscribe and subscribe message is sent to the server.

In the browser developer console, we can observe the flow of messages.

websocket log

Binary messages

So far, we have only exchanged JSON messages. Because this is a common use case, WebSocketSubject uses by default JSON.parse() and JSON.stringify() as deserializer and serializer, respectively. But WebSocket is also capable of sending and receiving binary messages. In the following example, we are going to send and receive binary Protocol Buffers messages.

Because the default is JSON, we have to reconfigure a few things to be able to send and receive binary messages. With the help of WebSocketSubjectConfig we can pass these options to the webSocket() function and it takes care of configuring the underlying WebSocket connection.

First, we change the type of messages coming from the server with the binaryType option. The default is blob, but this demo application requires ArrayBuffer objects. Next, we have to override the default serializer and deserializer. The serializer is called when we send a value from the client to the server. Because the demo program passes ArrayBuffer objects to the next() method, we can return the input without any change. The underlying WebSocket API can handle ArrayBuffer objects natively. When a message arrives from the server, the deserializer function is called. The response is wrapped in a MessageEvent instance, and the data property contains the data, in this case, an ArrayBuffer object. Like in the serializer, we can return it unchanged.

    const ws = webSocket({
      binaryType: 'arraybuffer',
      url: 'ws://localhost:8080/calculator',
      serializer: v => v as ArrayBuffer,
      deserializer: v => v.data,

app.component.ts

The demo application uses the protobufjs library for handling Protocol Buffers messages. As with JSON messages, next() is used for sending values to the server. encode().finish() returns a Uint8Array object, but the WebSocket API can't handle this type, so the application has to extract the underlying ArrayBuffer with buffer and pass that to next().

        next: () => {
          const calculaton = Calculation.encode({
            operation,
            value1: parseFloat(value1Element.value),
            value2: parseFloat(value2Element.value)
          }).finish();
          const offset = calculaton.byteOffset;
          const length = calculaton.byteLength;
          ws.next(calculaton.buffer.slice(offset, offset + length));

app.component.ts

The subscription handler receives the response as an ArrayBuffer object. The application converts the response because the decode method of the protobufjs library expects a Uint8Array object.

    const sub = ws.subscribe(response => {
      const result = Result.decode(new Uint8Array(response as ArrayBuffer));
      resultElement.innerText = String(result.result);
      sub.unsubscribe();
    });

app.component.ts


We can simplify serializing and deserializing a bit by moving the Uint8Array<->ArrayBuffer conversion into the webSocket serializer and deserializer.

    const ws = webSocket({
      binaryType: 'arraybuffer',
      url: 'ws://localhost:8080/calculator',
      serializer: (msg: Uint8Array) => {
        const offset = msg.byteOffset;
        const length = msg.byteLength;
        return msg.buffer.slice(offset, offset + length);
      },
      deserializer: msg => new Uint8Array(msg.data as ArrayBuffer),

app.component.ts

The application can then pass the Uint8Array object it gets from finish() directly to the next() method.

        next: () => {
          const calculaton = Calculation.encode({
            operation,
            value1: parseFloat(value1Element.value),
            value2: parseFloat(value2Element.value)
          }).finish();
          ws.next(calculaton);

app.component.ts

And when it receives the response, which is now a Uint8Array object, it can pass it directly to the decode() method.

    const sub = ws.subscribe(response => {
      const result = Result.decode(response);
      resultElement.innerText = String(result.result);
      sub.unsubscribe();
    });

app.component.ts

You could go even a step further when your application always sends and receives the same Protocol Buffers object. In this case, you could move the Uint8Array<->object conversion into the serializer and deserializer, respectively.


You've reached the end of this blog post about the webSocket() function from the RxJS library. You can find the source code I demonstrated in this blog post for the client and server application on GitHub: https://github.com/ralscha/blog2020/tree/master/rxjs-websocket