RSocketServer - rsocket-js
RSocketServer
is the high level abstraction leveraged to create a server running the RSocket protocol.
An RSocketServer
server can be used to communicate with any RSocket Client implemented against the same protocol version as the server, and which implements the same transport as the server.
To get started creating an RSocket server, you will first need to install the rsocket-core
NPM package, and atleast one transport protocol implementation. See the server portion of WebSocket Client Server Example for an example of an implemented getRequestHandler
.
Transports
A transport is the abstraction which handles the underlying network communication portion of the RSocket applicaiton protocol.
Available network transports for RSocketServer
include:
getRequestHandler
When creating a RSocketServer
instance, the constructor options require a function (getRequestHandler
) to be provided that can return an object matching the Responder
interface. This object is responsible for mapping callback/handler functions to the various RSocket message types, and returning an appropriate Single
or Flowable
from rsocket-flowable
that will produce/injest data for the request.
const getRequestHandler = (requestingRSocket, setupPayload) => {
function handleFireAndForget() {
...
}
function handleRequestResponse(payload) {
return new Single((subscriber) => {
...
});
}
function handleRequestStream(payload) {
return new Flowable((subscriber) => {
...
});
}
function handleRequestChannel(payload) {
return new Flowable((subscriber) => {
...
});
}
function handleMetadataPush(payload) {
return new Single((subscriber) => {
...
});
}
return {
fireAndForget: handleFireAndForget,
requestResponse: handleRequestResponse,
requestStream: handleRequestStream,
requestChannel: handleRequestChannel,
metadataPush: handleMetadataPush
};
};
const rSocketServer = new RSocketServer({
getRequestHandler,
...
});
Client Cancellation
An important characteristic of RSocket's protocol is the concept of cancellation.
In the context of an RSocket server, once a client connection/request has begun, it is possible that the client which initiated the request may decide it no longer wishes to continue and signal to the server that it wishes to cancel.
In the event that a client signals to an RSocket server that it wishes to cancel a request, the server should avoid calling the onComplete
or onNext
callbacks for the requests resulting Single
or Flowable
instances.
Cancellation Example
const statuses = {
PENDING: 'pending',
CANCELLED: 'cancelled',
};
const getRequestHandler = (requestingRSocket, setupPayload) => {
function handleRequestResponse(payload) {
const simulatedDelayMilli = 100;
let status = statuses.PENDING;
return new Single((subscriber) => {
/**
* In the event that the client cancels the request before
* the server can respond, we will change our status to cancelled
* and avoid calling `onComplete` on the `subscriber` instance in the
* `nextTick` callback.
*/
function handleCancellation() {
status = statuses.CANCELLED;
}
subscriber.onSubscribe(() => handleCancellation());
/**
* Leverage `setTimeout` to simulate a delay
* in responding to the client, and thus giving it
* time to decide to "cancel".
*/
setTimeout(() => {
/**
* If the client cancelled the request we can
* return early and avoid doing any of the work below.
*/
if (status === statuses.CANCELLED) {
return;
}
const msg = `${new Date()}`;
try {
subscriber.onComplete({
data: msg,
metadata: null,
});
} catch (e) {
subscriber.onError(e);
}
}, simulatedDelayMilli);
});
}
return {
requestResponse: handleRequestResponse,
};
};