In this blog post, we are going to take a closer look at the webSocket
function from the RxJS library.
webSocket
is a wrapper around the native WebSocket
object and returns a WebSocketSubject
.
The subject can be used to receive and send messages over WebSocket.
WebSocketSubjectConfig ¶
The webSocket
function accepts as an argument either a string with the URL of a WebSocket endpoint or a WebSocketSubjectConfig
object.
import {webSocket, WebSocketSubject} from 'rxjs/webSocket';
const webSocketSubject = webSocket('ws://localhost:8080/sensor');
// OR
const webSocketSubject = webSocket({ url: 'ws://localhost:8080/sensor' });
With the config object, you can provide additional configurations, like serializer and deserializer. Your application can also install lifecycle hooks that are called when the WebSocket connection opens or closes.
Here's an example that listens for the open and close events. The lifecycle hooks expect a NextObserver
implementation.
This interface is similar to an Observer
, but only the next()
method has to be implemented.
const closeSubject = new Subject<CloseEvent>();
closeSubject.subscribe(_ => console.log('Underlying WebSocket connection closed'));
const webSocketSubject = webSocket({
url: 'ws://localhost:8080/sensor',
closeObserver: closeSubject,
openObserver: {
next: () => console.log('Underlying WebSocket connection open')
}
});
Subject
implements the Observer
interface.
Alternatively, you can assign any object with a next()
method. This satisfies the NextObserver
interface.
Other properties of the WebSocketSubjectConfig
object I'm going to discuss in this blog post are serializer
, deserializer
, and binaryType
.
You'll see them in action in the last section of this blog post, where I demonstrate a program that exchanges binary messages.
Check out the documentation about WebSocketSubjectConfig
to learn about all the provided
configuration options.
WebSocketSubject ¶
The object that webSocket
returns is of type WebSocketSubject
. It is like any Subject
,
an Observer
and an Observable
. Subject
provides a subscribe()
method where
one or multiple observers can subscribe, and the subject provides next()
, error()
and complete()
methods to send values.
When somebody calls subscribe()
, it attempts to open the WebSocket connection, unless the connection is already open. Even when you call subscribe()
multiple times on the same WebSocketSubject
instance, the subject only opens one native WebSocket connection. The subject closes the underlying connection if
the last subscriber unsubscribes, and it re-establishes the connection if after some time somebody calls subscribe()
again.
const webSocketSubject = webSocket('ws://localhost:8080/sensor');
// first subscriber, open WebSocket connection
const subscription1 = webSocketSubject.subscribe(data => {
});
// second subscriber, re-use existing WebSocket connection
const subscription2 = webSocketSubject.subscribe(data => {
});
// unsubscribe subscriber, underlying connection stays open
subscription2.unsubscribe();
// unsubscribe last subscriber, close underlying connection
subscription1.unsubscribe();
When messages from the server arrive, the WebSocketSubject
will deserialize them by default with JSON.parse()
and emit them into the stream, and all subscribers will receive the message.
When the underlying connection closes, the stream completes, and when at some point an error occurs, the stream errors out.
A WebSocket is a bidirectional message channel, so you can not only receive messages. You can also send messages. The WebSocketSubject
uses the Observer
methods for this purpose. next()
sends a value to the server. Note that the WebSocketSubject
behaves a bit differently than a
regular Subject
implementation. Calling next()
usually multicasts the value to all subscribers, but not in this case. The value will only be sent to
the server, and the subscribers don't get notified unless the server sends something back.
const webSocketSubject = webSocket('ws://localhost:8080/sensor');
const subscription = webSocketSubject.subscribe(data => {
// subscriber does not receive message 'ping'
});
// sending 'ping' to the server
webSocketSubject.next('ping');
If at the time of calling next()
there is no connection, WebSocketSubject
internally buffers the messages and sends them as soon a connection is established.
complete()
closes the underlying connection. error()
sends an error message to the server and also closes the connection.
You can't pass arbitrary objects to error()
, the object has to follow the WebSocket error format, and it expects an object with a code
and an optional reason
property.
const webSocketSubject = webSocket('ws://localhost:8080/sensor');
webSocketSubject.subscribe(
v => console.log('got value ' + v),
error => console.error('something wrong occurred: ' + error),
() => console.log('complete')
);
webSocketSubject.error({code: 3001, reason: 'App error'});
Because complete()
and error()
close the underlying connection, they also close the stream, and all subscribers get notified. Note that calling error()
does not error the stream if the native WebSocket connection closes without an error. But when the process of connection closing
throws an error, the stream errors, it does not matter if the closing was initiated by complete()
or error()
.
The example above does not call the error callback. It just calls the complete callback and prints complete
.
Heartbeat ¶
The WebSocket API in the browser does not have a built-in reconnection manager, unlike EventSource
(server-sent events).
That means if you need a permanent connection between your client and server, you have to write code that handles connection breaks and implement a reconnection strategy. There are different
ways to implement that. You could, for example, install a closeObserver
that establishes a new connection after the old connection has been closed. Another option is to add
retryWhen
into your stream.
const webSocketSubject = webSocket('ws://localhost:8080/sensor');
webSocketSubject.pipe(retryWhen((errors) => errors.pipe(delay(10_000))))
.subscribe(value => console.log(value));
This example resubscribes to the stream 10 seconds after an error occurred.
In the demo program I wrote for this blog post, I went another route and implemented a heartbeat. The client periodically sends the string "ping"
and the server answers with "pong"
.
The following example sends a ping message every 30 seconds. Then it leverages race()
to check for timeouts.
race()
returns the first Observable to emit an item. This is either the Observable with the response from the server or the 3-second delay Observable.
startHeartbeat(): void {
this.stopHeartbeat();
this.networkError = false;
const heartbeat$ = timer(1_000, 30_000)
.pipe(
tap(() => this.connect().next('ping')),
concatMap(() => {
return race(
of('timeout').pipe(delay(3_000)),
this.connect().pipe(filter(m => m === 'pong'), catchError(() => of('error')))
);
})
);
this.heartbeatSubscription = heartbeat$.subscribe(msg => {
if (msg === 'pong') {
this.networkError = false;
} else {
this.networkError = true;
this.webSocketSubject?.complete();
this.webSocketSubject = null;
}
});
The heartbeat subscriber checks if the response was either "pong"
, then everything is okay and no further action is required.
In case of an error or timeout, the heartbeat service closes the WebSocket stream.
Multiplexing ¶
The demo application I wrote for this blog post displays random values in a gauge. The user can switch the display between temperature and humidity data. The server periodically sends random messages as JSON to the connected clients.
When the user selects the temperature display, the server sends temperature JSON, and when humidity is selected, the server sends humidity data.
// temperature data object
{"ts":1586530959,"name":"sensor1","temperature":31.62}
// humidity data object
{"ts":1586530978,"name":"sensor2","humidity":70}
To control what object the server has to send, the client sends the string message "subscribe-temp"
or "subscribe-hum"
to the server.
The client can unsubscribe with the string message "unsubscribe-temp"
or "unsubscribe-hum"
, and the server stops sending messages of this type to the client.
We can use the next()
method to send these command messages, but the WebSocketSubject
has an additional method, not found in other Subject implementations,
that makes this process more convenient: multiplex()
This method accepts three parameters and returns an Observable
. The first argument is a function returning the subscription message. That means this is the message that the subject sends to the server when somebody subscribes to this observable. The second argument is a function returning the unsubscription message,
and the subject sends that message to the server when somebody unsubscribes from the stream.
The last argument is a function that returns a boolean. The purpose of this function is to filter out messages sent by the server. In the following example,
we check the presence of the temperature
or humidity
property. Messages can only pass when this function returns true. This is the same as when you would add the filter()
operator
to the stream. There is not really a purpose for a filter function in this specific example because the server either sends temperature or humidity messages, never both types at the same time. But
we can use it here for filtering out the "pong"
messages we get from the heartbeat, which are sent over the same channel.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private getDataObservable(): Observable<any> {
if (!this.webSocketSubject) {
return throwError(() => new Error('websocket subject not set'));
}
if (this.type === 'temp') {
return this.webSocketSubject.multiplex(() => 'subscribe-temp', () => 'unsubscribe-temp', message => message.temperature);
} else {
return this.webSocketSubject.multiplex(() => 'subscribe-hum', () => 'unsubscribe-hum', message => message.humidity);
}
Each time the user changes the display type, the application unsubscribes from one stream and subscribes to the other. multiplex()
automatically makes
sure that the correct unsubscribe and subscribe message is sent to the server.
In the browser developer console, we can observe the flow of messages.
Binary messages ¶
So far, we have only exchanged JSON messages. Because this is a common use case, WebSocketSubject
uses by default JSON.parse()
and JSON.stringify()
as deserializer and serializer, respectively.
But WebSocket is also capable of sending and receiving binary messages. In the following example, we are going to send and receive binary Protocol Buffers messages.
Because the default is JSON, we have to reconfigure a few things to be able to send and receive binary messages.
With the help of WebSocketSubjectConfig
we can pass these options to the webSocket()
function
and it takes care of configuring the underlying WebSocket connection.
First, we change the type of messages coming from the server with the binaryType
option. The default is blob
, but this demo application requires ArrayBuffer
objects.
Next, we have to override the default serializer and deserializer. The serializer is called when we send a value from the client to the server.
Because the demo program passes ArrayBuffer
objects to the next()
method, we can return the input without any change. The underlying WebSocket API can handle ArrayBuffer
objects natively.
When a message arrives from the server, the deserializer function is called. The response is wrapped in a MessageEvent
instance, and the data
property contains the data, in this case, an ArrayBuffer
object. Like in the serializer, we can return it unchanged.
const ws = webSocket({
binaryType: 'arraybuffer',
url: 'ws://localhost:8080/calculator',
serializer: v => v as ArrayBuffer,
deserializer: v => v.data,
The demo application uses the protobufjs library for handling Protocol Buffers messages.
As with JSON messages, next()
is used for sending values to the server.
encode().finish()
returns a Uint8Array
object, but the WebSocket API can't handle this type, so the
application has to extract the underlying ArrayBuffer
with buffer
and pass that to next()
.
next: () => {
const calculaton = Calculation.encode({
operation,
value1: parseFloat(value1Element.value),
value2: parseFloat(value2Element.value)
}).finish();
const offset = calculaton.byteOffset;
const length = calculaton.byteLength;
ws.next(calculaton.buffer.slice(offset, offset + length));
The subscription handler receives the response as an ArrayBuffer
object.
The application converts the response because the decode
method of the protobufjs library expects a Uint8Array
object.
const sub = ws.subscribe(response => {
const result = Result.decode(new Uint8Array(response as ArrayBuffer));
resultElement.innerText = String(result.result);
sub.unsubscribe();
});
We can simplify serializing and deserializing a bit by moving the Uint8Array<->ArrayBuffer
conversion into the webSocket
serializer and deserializer.
const ws = webSocket({
binaryType: 'arraybuffer',
url: 'ws://localhost:8080/calculator',
serializer: (msg: Uint8Array) => {
const offset = msg.byteOffset;
const length = msg.byteLength;
return msg.buffer.slice(offset, offset + length);
},
deserializer: msg => new Uint8Array(msg.data as ArrayBuffer),
The application can then pass the Uint8Array
object it gets from finish()
directly to the next()
method.
next: () => {
const calculaton = Calculation.encode({
operation,
value1: parseFloat(value1Element.value),
value2: parseFloat(value2Element.value)
}).finish();
ws.next(calculaton);
And when it receives the response, which is now a Uint8Array
object, it can pass it directly to the decode()
method.
const sub = ws.subscribe(response => {
const result = Result.decode(response);
resultElement.innerText = String(result.result);
sub.unsubscribe();
});
You could go even a step further when your application always sends and receives the same Protocol Buffers object.
In this case, you could move the Uint8Array<->object
conversion into the serializer and deserializer, respectively.
You've reached the end of this blog post about the webSocket()
function from the RxJS library. You can find the source code I demonstrated
in this blog post for the client and server application on GitHub: https://github.com/ralscha/blog2020/tree/master/rxjs-websocket