Skip to main content

Statistics

As a last step, we will add passing some statistics between the client and the server:

  • The client will be able to send its memory usage to the server.
  • The server will report the number of users and channels. The client will be able to specify which of these statistics it wants.

See resulting code on GitHub

Shared code

We will define some data-classes to represent the payloads being sent between the client and server:

from dataclasses import dataclass, field
from typing import Optional, List

@dataclass(frozen=True)
class ServerStatistics:
user_count: Optional[int] = None
channel_count: Optional[int] = None

@dataclass()
class ServerStatisticsRequest:
ids: Optional[List[str]] = field(default_factory=lambda: ['users', 'channels'])
period_seconds: Optional[int] = field(default_factory=lambda: 2)

@dataclass(frozen=True)
class ClientStatistics:
memory_usage: Optional[int] = None

Lines 4-7 define the data sent to the client upon request. It contains two optional fields, the user count and the channel count.

Lines 9-12 define a request from the client which specified which statistics it wants and how often to report. The ids list represents the two values in the ServerStatistics class.

Lines 14-16 define the statistics sent from the client to the server.

Server side

Data-classes

First we will add a field on the UserSessionData to store the last statistics sent by the client:

from dataclasses import dataclass
from typing import Optional

from shared import ClientStatistics

@dataclass()
class UserSessionData:
...
statistics: Optional[ClientStatistics] = None

Endpoints

We will add two endpoints, one for receiving from the client, and one for requesting specific statistics from the server.

Client send statistics

import json

from shared import ClientStatistics
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter

class ChatUserSession:
def router_factory(self):
router = RequestRouter()

...

@router.fire_and_forget('statistics')
async def receive_statistics(payload: Payload):
statistics = ClientStatistics(**json.loads(utf8_decode(payload.data)))
self._session.statistics = statistics

Lines 14-17 defines an endpoint for receiving statistics from the client. It uses the fire-and-forget request type, since this data is not critical to the application. No return value is required from this method, and if provided will be ignored.

Receive requested statistics

We will add a helper method for creating a new statistics response:

def new_statistics_data(statistics_request: ServerStatisticsRequest):
statistics_data = {}

if 'users' in statistics_request.ids:
statistics_data['user_count'] = len(chat_data.user_session_by_id)

if 'channels' in statistics_request.ids:
statistics_data['channel_count'] = len(chat_data.channel_messages)

return ServerStatistics(**statistics_data)

Next we define the endpoint for sending statistics to the client:

import asyncio
import json

from shared import ClientStatistics, ServerStatisticsRequest, ServerStatistics, encode_dataclass
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import Subscriber, DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter

class ChatUserSession:
def router_factory(self):
router = RequestRouter()

@router.channel('statistics')
async def send_statistics():

class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):

def __init__(self, session: UserSessionData):
super().__init__()
self._session = session
self._requested_statistics = ServerStatisticsRequest()

def cancel(self):
self._sender.cancel()

def subscribe(self, subscriber: Subscriber):
super().subscribe(subscriber)
subscriber.on_subscribe(self)
self._sender = asyncio.create_task(self._statistics_sender())

async def _statistics_sender(self):
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = new_statistics_data(self._requested_statistics)

self._subscriber.on_next(dataclass_to_payload(next_message))
except Exception:
logging.error('Statistics', exc_info=True)

def on_next(self, value: Payload, is_complete=False):
request = ServerStatisticsRequest(**json.loads(utf8_decode(value.data)))

logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')

if request.ids is not None:
self._requested_statistics.ids = request.ids

if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds

response = StatisticsChannel(self._session)

return response, response

Lines 16-57 defines an endpoint for sending statistics to the client. It uses the request-channel request type, which will allow the client to both receive the server statistics, and update the server as to which statistics it wants to receive.

Client side

On the client side we will add the methods to access the new server side functionality:

  • send_statistics
  • listen_for_statistics
import resource

from shared import ServerStatistics, ClientStatistics
from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload

class ChatClient:

async def send_statistics(self):
memory_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
payload = Payload(encode_dataclass(ClientStatistics(memory_usage=memory_usage)),
metadata=composite(route('statistics')))
await self._rsocket.fire_and_forget(payload)

The send_statistics uses a fire-and-forget request (Line 15) to send statistics to the server. This request does not receive a response, so does not wait for confirmation that the payload was delivered, as it is not critical information (at least for this tutorial).

Next we will request statistics from the server. First we will define a handler to listen on the channel request and control it:

import json
from asyncio import Event
from datetime import timedelta
from typing import List

from examples.tutorial.step6.models import ServerStatistics, ServerStatisticsRequest, dataclass_to_payload
from reactivestreams.publisher import DefaultPublisher
from reactivestreams.subscriber import DefaultSubscriber
from reactivestreams.subscription import DefaultSubscription
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload

class StatisticsHandler(DefaultPublisher, DefaultSubscriber, DefaultSubscription):

def __init__(self):
super().__init__()
self.done = Event()

def on_next(self, value: Payload, is_complete=False):
statistics = ServerStatistics(**json.loads(utf8_decode(value.data)))
print(statistics)

if is_complete:
self.done.set()

def cancel(self):
self.subscription.cancel()

def set_requested_statistics(self, ids: List[str]):
self._subscriber.on_next(dataclass_to_payload(ServerStatisticsRequest(ids=ids)))

def set_period(self, period: timedelta):
self._subscriber.on_next(
dataclass_to_payload(ServerStatisticsRequest(period_seconds=int(period.total_seconds()))))

Next we will use this new handler in the ChatClient:

from rsocket.extensions.helpers import composite, route
from rsocket.payload import Payload

class ChatClient:

def listen_for_statistics(self) -> StatisticsHandler:
self._statistics_subscriber = StatisticsHandler()
self._rsocket.request_channel(Payload(metadata=composite(
route('statistics')
)), publisher=self._statistics_subscriber).subscribe(self._statistics_subscriber)
return self._statistics_subscriber

def stop_listening_for_statistics(self):
self._statistics_subscriber.cancel()

Finally, let's try out this new functionality in the client:

async def statistics_example(user1):
await user1.send_statistics()

statistics_control = user1.listen_for_statistics()

await asyncio.sleep(5)

statistics_control.set_requested_statistics(['users'])

await asyncio.sleep(5)

user1.stop_listening_for_statistics()

Call this new method from the client main method.