Source code for textsmith.pubsub

"""
The pub/sub message passing methods and handlers needed for TextSmith.

Copyright (C) 2020 Nicholas H.Tollervey (ntoll@ntoll.org).

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>
"""
import asyncio
import structlog  # type: ignore
from asyncio_redis import Subscription  # type: ignore
from asyncio_redis.exceptions import Error, ErrorReply  # type: ignore


logger = structlog.get_logger()


[docs]class PubSub: """ Contains methods needed to manage listening for messages broadcast on the pub/sub layer of the game. """ def __init__(self, subscriber: Subscription) -> None: """ The subscriber object represents a connection to Redis in "subscribe" mode (i.e. listening for messages). """ # Key: user_id Value: deque of pending messages (for all users # connected this instance). self.connected_users = {} # type: dict # The Redis connection used to subscribe to pub/sub messages. self.subscriber = subscriber # A flag to show if new messages are retrievable. self.listening = False # Schedule a task to constantly listen for new messages on # subscribed-to channels. self.listener = asyncio.create_task(self.listen())
[docs] async def subscribe(self, user_id: int, connection_id: str) -> None: """ Ensure there's an entry for the referenced user's message queue. Add the user ID to the list of channels this instance subscribes to via Redis. Log this event. """ self.connected_users[user_id] = asyncio.Queue() try: await self.subscriber.subscribe( [ str(user_id), ] ) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error subscribing to channel.", user_id=user_id, connection_id=connection_id, exc_info=ex, redis_error=True, ) raise ex logger.msg("Subscribe.", user_id=user_id, connection_id=connection_id)
[docs] async def unsubscribe(self, user_id: int, connection_id: str) -> None: """ Remove the user ID from the list of channels to which this instance subscribes via Redis. Delete the message queue for the referenced user. Log this event. If there are undelivered messages, log these. """ self.connected_users.pop(user_id, None) try: await self.subscriber.unsubscribe( [ str(user_id), ] ) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error unsubscribing from channel.", user_id=user_id, connection_id=connection_id, exc_info=ex, redis_error=True, ) raise ex logger.msg( "Unsubscribe.", user_id=user_id, connection_id=connection_id )
[docs] async def listen(self) -> None: """ Listen to the messages on subscribed channels. Each channel represents an object ID. If the object ID is a user connected to this application, then it's put into the message queue for that user, to be sent via the websocket connection. """ self.listening = True while self.listening: try: message = await self.subscriber.next_published() user_id = int(message.channel) logger.msg("Message.", user_id=user_id, value=message.value) if user_id in self.connected_users: await self.connected_users[user_id].put(message.value) except ValueError: logger.msg( "Bad Message.", channel=message.channel, value=message.value, ) except StopIteration: # pragma: no cover logger.msg( "Broken subscriber.", ) self.listening = False break except (Error, ErrorReply) as ex: # pragma: no cover self.listening = False logger.msg( "Error listening to Redis PubSub.", exc_info=ex, redis_error=True, ) break
[docs] async def get_message(self, user_id: int) -> str: """ Return the next message in the message queue for the referenced user. Otherwise, return an empty string (indicating no messages). """ if not self.listening: raise ValueError(f"Cannot get messages for user {user_id}.") message_queue = self.connected_users.get(user_id) if message_queue: result = await message_queue.get() return result else: return ""
[docs] async def stop(self) -> None: """ Cleanly stop listening to the Redis PubSub. """ self.listener.cancel() try: await self.listener except asyncio.CancelledError: logger.msg("Stop PubSub.", subscribed=self.connected_users) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error stopping listener to Redis PubSub.", exc_info=ex, redis_error=True, )