rsocket-js
rsocket-js
implements the 1.0 version of the RSocket protocol
and is designed for use in Node.js and browsers.
Packages
The following packages are published to npm:
- rsocket-core
- rsocket-flowable
- rsocket-tcp-client
- rsocket-tcp-server
- rsocket-websocket-client
- rsocket-websocket-server
Status
The following are currently implemented:
- RSocketClient / RSocketServer
- Node.js TCP/WebSocket server/client transport
- Browser WebSocket client (binary)
- TCK client for spec compliance testing
- UTF-8 and Binary encoding at the transport layer
- Optional JSON (de)serialization at the rsocket layer (send and receive objects instead of strings)
- ReactiveStream data types
Reactive Streams
rsocket-js includes an implementation of the Reactive Streams API in JavaScript. Note that unlike standard Rx Observables, Reactive Streams are lazy, pull-based, and support back-pressure. Two types are implemented:
Flowable
: An implementation of the Reactive StreamsPublisher
type, providing a demand-driven stream of values over time.Single
: LikeFlowable
, but resolves to a single value.
rsocket-js public API methods typically return values of these types.
WebSocket Client & Server example
Client Example
The client sends a request/response
message to the server on an interval, and exits after a certain amount of time has elapsed.
// rsocket-client.js
const { RSocketClient } = require('rsocket-core');
const RSocketWebsocketClient = require('rsocket-websocket-client').default;
const WebSocket = require('ws');
function now() {
return new Date().getTime();
}
async function connect(options) {
const transportOptions = {
url: 'ws://127.0.0.1:9898',
wsCreator: (url) => {
return new WebSocket(url);
},
};
const setup = {
keepAlive: 1000000,
lifetime: 100000,
dataMimeType: 'text/plain',
metadataMimeType: 'text/plain',
};
const transport = new RSocketWebsocketClient(transportOptions);
const client = new RSocketClient({ setup, transport });
return await client.connect();
}
async function run() {
return new Promise(async (resolve, reject) => {
const rsocket = await connect();
const start = now();
const interval = setInterval(() => {
rsocket.requestResponse({ data: 'What is the current time?' }).subscribe({
onComplete: (response) => {
console.log(response);
},
onError: (error) => {
console.error(error);
},
onSubscribe: (cancel) => {
/* call cancel() to stop onComplete/onError */
},
});
if (now() - start >= 5000) {
clearInterval(interval);
resolve();
}
}, 750);
});
}
Promise.resolve(run()).then(
() => process.exit(0),
(error) => {
console.error(error.stack);
process.exit(1);
}
);
Server Example
The server responds to request/response
messages with the current time.
// rsocket-server.js
const { RSocketServer } = require('rsocket-core');
const RSocketWebSocketServer = require('rsocket-websocket-server');
const { Single } = require('rsocket-flowable');
const WebSocketTransport = RSocketWebSocketServer.default;
const host = '127.0.0.1';
const port = 9898;
const transportOpts = {
host: host,
port: port,
};
const transport = new WebSocketTransport(transportOpts);
const statuses = {
PENDING: 'pending',
CANCELLED: 'cancelled',
};
const getRequestHandler = (requestingRSocket, setupPayload) => {
function handleRequestResponse(payload) {
let status = statuses.PENDING;
console.log(`requestResponse request`, payload);
return new Single((subscriber) => {
function handleCancellation() {
status = statuses.CANCELLED;
}
subscriber.onSubscribe(() => handleCancellation());
/**
* Leverage `setTimeout` to simulate a delay
* in responding to the client.
*/
setTimeout(() => {
if (status === statuses.CANCELLED) {
return;
}
const msg = `${new Date()}`;
console.log(`requestResponse response`, msg);
try {
subscriber.onComplete({
data: msg,
metadata: null, // or new Buffer(...)
});
} catch (e) {
subscriber.onError(e);
}
}, 100);
});
}
return {
requestResponse: handleRequestResponse,
};
};
const rSocketServer = new RSocketServer({
transport,
getRequestHandler,
});
console.log(`Server starting on port ${port}...`);
rSocketServer.start();
More Examples
Browse the following repositories for more rsocket-js
examples: