Channels
In this section we will add basic channel support:
- Joining and leaving channels
- Sending messages to channels
See resulting code on GitHub
Shared code
Let's add a channel
property to the Message
class. It will contain the name of the channel the message is intended for.
class Message:
...
channel: Optional[str] = None
Server side
Data-classes
We will add functionality to store the channel state. Add the following fields to the ChatData
class:
from asyncio import Queue
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Set
from weakref import WeakSet
@dataclass(frozen=True)
class ChatData:
...
channel_users: Dict[str, Set[SessionId]] = field(default_factory=lambda: defaultdict(WeakSet))
channel_messages: Dict[str, Queue] = field(default_factory=lambda: defaultdict(Queue))
In the channel_users
dict, the keys are channel names, and the value is a set of user session ids. A WeakSet is used to automatically remove logged-out users.
In the channel_messages
dict, the keys are the channel names, and the value is a Queue of messages sent by users to the channel.
Helper methods
Next, we will define some helper methods for managing channel messages:
ensure_channel_exists
: initialize the data for a new channel if it doesn't exist.channel_message_delivery
: an asyncio task which will deliver channel messages to all the users in a channel.
def ensure_channel_exists(channel_name: str):
if channel_name not in chat_data.channel_users:
chat_data.channel_users[channel_name] = WeakSet()
chat_data.channel_messages[channel_name] = Queue()
asyncio.create_task(channel_message_delivery(channel_name))
If the channel doesn't exist yet (Line 2) It will be added to the channel_users
and channel_messages
dictionaries.
Line 5 starts an asyncio task (described below) which will deliver messages sent to the channel, to the channel's users.
async def channel_message_delivery(channel_name: str):
while True:
try:
message = await chat_data.channel_messages[channel_name].get()
for session_id in chat_data.channel_users[channel_name]:
user_specific_message = Message(user=message.user,
content=message.content)
chat_data.user_session_by_id[session_id].messages.put_nowait(user_specific_message)
except Exception as exception:
logging.error(str(exception), exc_info=True)
The above method will loop infinitely and watch the channel_messages
queue of the specified
channel (Line 8). Upon receiving a message, it will be delivered to all the users in the channel (Lines 9-13).
The final helper will look up username by session id:
def find_username_by_session(session_id: SessionId) -> Optional[str]:
session = chat_data.user_session_by_id.get(session_id)
if session is None:
return None
return session.username
Join/Leave Channel
Now let's add the channel join/leave handling request-response endpoints.
class ChatUserSession:
def router_factory(self):
router = RequestRouter()
@router.response('channel.join')
async def join_channel(payload: Payload) -> Awaitable[Payload]:
channel_name = payload.data.decode('utf-8')
ensure_channel_exists(channel_name)
chat_data.channel_users[channel_name].add(self._session.session_id)
return create_response()
@router.response('channel.leave')
async def leave_channel(payload: Payload) -> Awaitable[Payload]:
channel_name = payload.data.decode('utf-8')
chat_data.channel_users[channel_name].discard(self._session.session_id)
return create_response()
Send channel message
Next we add the ability to send channel message. We will modify the send_message
method:
class ChatUserSession:
def router_factory(self):
router = RequestRouter()
@router.response('message')
async def send_message(payload: Payload) -> Awaitable[Payload]:
message = Message(**json.loads(payload.data))
logging.info('Received message for user: %s, channel: %s', message.user, message.channel)
target_message = Message(self._session.username, message.content, message.channel)
if message.channel is not None:
await chat_data.channel_messages[message.channel].put(target_message)
elif message.user is not None:
session = find_session_by_username(message.user)
await session.messages.put(target_message)
return create_response()
Lines 16-20 decide whether it is a private message or a channel message, and add it to the relevant queue.
List channels
class ChatUserSession:
def router_factory(self):
router = RequestRouter()
@router.stream('channels')
async def get_channels() -> Publisher:
count = len(chat_data.channel_messages)
generator = ((Payload(ensure_bytes(channel)), index == count) for (index, channel) in
enumerate(chat_data.channel_messages.keys(), 1))
return StreamFromGenerator(lambda: generator)
Lines 6-11 define an endpoint for getting a list of channels. It uses the StreamFromGenerator
helper. Note that the argument to this class
is a factory method for the generator, not the generator itself.
Get channel users
class ChatUserSession:
def router_factory(self):
router = RequestRouter()
@router.stream('channel.users')
async def get_channel_users(payload: Payload) -> Publisher:
channel_name = utf8_decode(payload.data)
if channel_name not in chat_data.channel_users:
return EmptyStream()
count = len(chat_data.channel_users[channel_name])
generator = ((Payload(ensure_bytes(find_username_by_session(session_id))), index == count) for
(index, session_id) in
enumerate(chat_data.channel_users[channel_name], 1))
return StreamFromGenerator(lambda: generator)
Lines 6-11 define an endpoint for getting a list of users in a given channel. The find_username_by_session
helper method is used to
convert the session ids to usernames.
If the channel does not exist (Line 10) the EmptyStream
helper can be used as a response.
Client side
We will add the methods on the ChatClient
to interact with the new server functionality:
from typing import List
from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
from rsocket.extensions.helpers import composite, route
from rsocket.frame_helpers import ensure_bytes
from rsocket.payload import Payload
from rsocket.helpers import utf8_decode
from shared import encode_dataclass
class ChatClient:
async def join(self, channel_name: str):
request = Payload(ensure_bytes(channel_name), composite(route('channel.join')))
await self._rsocket.request_response(request)
return self
async def leave(self, channel_name: str):
request = Payload(ensure_bytes(channel_name), composite(route('channel.leave')))
await self._rsocket.request_response(request)
return self
async def channel_message(self, channel: str, content: str):
print(f'Sending {content} to channel {channel}')
await self._rsocket.request_response(Payload(encode_dataclass(Message(channel=channel, content=content)),
composite(route('message'))))
async def list_channels(self) -> List[str]:
request = Payload(metadata=composite(route('channels')))
response = await AwaitableRSocket(self._rsocket).request_stream(request)
return list(map(lambda _: utf8_decode(_.data), response))
async def get_users(self, channel_name: str) -> List[str]:
request = Payload(ensure_bytes(channel_name), composite(route('channel.users')))
users = await AwaitableRSocket(self._rsocket).request_stream(request)
return [utf8_decode(user.data) for user in users]
Lines 15-23 define the join/leave methods. They are both simple routed request_response
calls, with the channel name as the payload data.
Lines 25-28 define the list_channels method. This method uses the AwaitableRSocket
adapter to simplify getting the response stream as a list.
Lines 30-31 define the get_users method, which lists a channel's users.
Update the print_message
method to include the channel:
def print_message(data: bytes):
message = Message(**json.loads(data))
print(f'{self._username}: from {message.user} ({message.channel}): {message.content}')
Let's test the new functionality using the following code:
async def messaging_example(user1: ChatClient, user2: ChatClient):
user1.listen_for_messages()
user2.listen_for_messages()
await user1.join('channel1')
await user2.join('channel1')
print(f'Channels: {await user1.list_channels()}')
await user1.private_message('user2', 'private message from user1')
await user1.channel_message('channel1', 'channel message from user1')
await asyncio.sleep(1)
user1.stop_listening_for_messages()
user2.stop_listening_for_messages()
Call the example method from the main
method and pass it the two chat clients:
user1 = ChatClient(client1)
user2 = ChatClient(client2)
await user1.login('user1')
await user2.login('user2')
await messaging_example(user1, user2)