Skip to main content

Server - Websocket Transport

rsocket-py supports multiple transport protocols. The Websocket transport is available by installing rsocket using one of the extra features:

  • pip3 install rsocket[aiohttp]
  • pip3 install rsocket[quart]

In order to use a Websocket transport, instantiate a TransportAioHttpWebsocket or TransportQuartWebsocket and pass it to an RSocketServer instance. There are a few helpers to ease the creation of a server or a client.

Examples

Example using aiohttp:

import logging
import sys
from asyncio import Future

from aiohttp import web

from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.transports.aiohttp_websocket import websocket_handler_factory


class Handler(BaseRequestHandler):

async def request_response(self, payload: Payload) -> Future:
return create_future(Payload(b'pong'))


if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
app = web.Application()
app.add_routes([web.get('/', websocket_handler_factory(handler_factory=Handler))])
web.run_app(app, port=port)

Example using quart:

import logging
import sys
from asyncio import Future

from quart import Quart

from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.transports.quart_websocket import websocket_handler

app = Quart(__name__)


class Handler(BaseRequestHandler):

async def request_response(self, payload: Payload) -> Future:
return create_future(Payload(b'pong'))


@app.websocket("/")
async def ws():
await websocket_handler(handler_factory=Handler)


if __name__ == "__main__":
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
app.run(port=port)