Source code for textsmith.datastore

Functions that CRUD state stored in a Redis datastore. Data for objects is
stored in Redis Hashes whose values are serialized as strings of JSON.

Copyright (C) 2020 Nicholas H.Tollervey (

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <>
import os
import binascii
import hashlib
import json
import structlog  # type: ignore
from datetime import datetime
from typing import Sequence, Dict, Union, Mapping, Any
from asyncio_redis import Pool  # type: ignore
from asyncio_redis.exceptions import Error, ErrorReply  # type: ignore
from textsmith import constants

logger = structlog.get_logger()

[docs]class DataStore: """ Gathers together methods to implement storage related operations via Redis. """ def __init__(self, redis: Pool) -> None: """ The redis object is a connection pool to a Redis instance. """ self.redis = redis
[docs] def user_key(self, email: str) -> str: """ Given a user's unique email address, return the key to use to reference the user in the Redis database. """ return f"user:{email}"
[docs] def token_key(self, token: str) -> str: """ Given a token value, return the key to use to retrieve the associated user's details. """ return f"token:{token}"
[docs] def last_seen_key(self, user_id: int) -> str: """ Given a user id, return the key to use to set the timestamp at which the user was last seen on the server. """ return f"lastseen:{user_id}"
[docs] def inventory_key(self, object_id: int) -> str: """ Given an object id, return the key to use to record the objects contained within the referenced object. This is recording "what do I contain?" """ return f"inventory:{object_id}"
[docs] def location_key(self, object_id: int) -> str: """ Given an object id, return the key used to record the id of the object that contains the referenced object. This is recording "who contains me?" """ return f"location:{object_id}"
[docs] def hash_password(self, password: str) -> str: """ Hash a password for safe storage. """ salt = hashlib.sha256(os.urandom(60)).hexdigest().encode("ascii") pwdhash = hashlib.pbkdf2_hmac( "sha512", password.encode("utf-8"), salt, 100000 ) pwdhash = binascii.hexlify(pwdhash) return (salt + pwdhash).decode("ascii")
[docs] def verify_password( self, stored_password: str, provided_password: str ) -> bool: """ Verify a stored password hash against a plaintext provided password. """ salt = stored_password[:64] stored_password = stored_password[64:] hashed = hashlib.pbkdf2_hmac( "sha512", provided_password.encode("utf-8"), salt.encode("ascii"), 100000, ) pwdhash = binascii.hexlify(hashed).decode("ascii") return pwdhash == stored_password
[docs] async def add_object( self, **attributes: Dict[ str, Union[ str, int, float, bool, Sequence[Union[str, int, float, bool]] ], ], ) -> int: """ Create a new object. The new object's parent object is referenced by parent_id. """ # Get the new object's unique ID. try: object_id = int(await self.redis.incr("object_counter")) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error incrementing object_counter.", exc_info=ex, redis_error=True, ) raise ex logger.msg("Created new object.", object_id=object_id) # Add attributes to the object. if attributes: await self.annotate_object(object_id, **attributes) # Return the new object's id return object_id
[docs] async def annotate_object( self, object_id: int, **attributes: Dict[ str, Union[ str, int, float, bool, Sequence[Union[str, int, float, bool]] ], ], ) -> None: """ Annotate attributes to the object. """ data = { attribute: json.dumps(value) for attribute, value in attributes.items() } try: transaction = await self.redis.multi() await transaction.hmset(str(object_id), data) await transaction.exec() except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error annotating object.", object_id=object_id, attributes=attributes, exc_info=ex, redis_error=True, ) raise ex logger.msg( "Annotated attributes to object.", object_id=object_id, data=attributes, )
[docs] async def get_objects( self, ids: Sequence[int] ) -> Dict[ int, Dict[ str, Union[ str, int, float, bool, Sequence[Union[str, int, float, bool]] ], ], ]: """ Given a list of object IDs, return a dictionary whose keys are object IDs and values are a dictionary of the related attributes of each object. """ try: results = {} transaction = await self.redis.multi() for object_id in ids: results[object_id] = await transaction.hgetall_asdict( str(object_id) ) await transaction.exec() # Build result dictionary. object_attributes: Dict[ int, Dict[ str, Union[ str, int, float, bool, Sequence[Union[str, int, float, bool]], ], ], ] = {} for object_id, result in results.items(): values = await result obj = {} if constants.IS_DELETED in values: # Ignore deleted objects. continue for key, value in values.items(): obj[key] = json.loads(value) obj["id"] = object_id object_attributes[object_id] = obj except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error getting attributes for objects.", object_ids=ids, exc_info=ex, redis_error=True, ) raise ex return object_attributes
[docs] async def get_attribute( self, object_id: int, attribute: str ) -> Union[str, int, float, bool, Sequence[Union[str, int, float, bool]]]: """ Given an object ID and attribute, return the associated value or raise a KeyError to indicate the attribute doesn't exist on the object. """ try: # Check attribute exists. exists = await self.redis.hexists(str(object_id), attribute) if not exists: raise KeyError( f"Attribute '{attribute}' on #{object_id} does not exist." ) result = await self.redis.hget(str(object_id), attribute) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error getting attribute for object.", object_id=object_id, attribute=attribute, exc_info=ex, redis_error=True, ) raise ex return json.loads(result)
[docs] async def delete_attributes( self, object_id: int, attributes: Sequence[str] ) -> None: """ Given an object ID and list of attributes, delete them. Returns the number of attributes deleted. """ try: transaction = await self.redis.multi() result = await transaction.hdel(str(object_id), attributes) await transaction.exec() number_changed = await result except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error deleting attributes from object.", object_id=object_id, attributes=attributes, exc_info=ex, redis_error=True, ) raise ex logger.msg( f"Deleted {number_changed} attributes from object.", object_id=object_id, attributes=attributes, ) return number_changed
[docs] async def user_exists(self, email: str) -> bool: """ Returns a boolean indication if a user linked to the referenced email address exists within the system. """ try: result = await self.redis.exists(self.user_key(email)) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error checking if user exists.", user_email=email, exc_info=ex, redis_error=True, ) raise ex return result
[docs] async def create_user(self, email: str, confirmation_token: str) -> int: """ Create metadata for the new user identified by the referenced email address and using the referenced password. Return the id of the object in the database associated with this user. """ try: # Make an object in the world. meta_data = {constants.IS_USER: True} object_id = await self.add_object(**meta_data) # type: ignore # Generate some simple metadata about the new user. user = { "email": email, "active": False, "object_id": object_id, } # JSON-ify for Redis. data = { attribute: json.dumps(value) for attribute, value in user.items() } transaction = await self.redis.multi() # Set the meta-data. await transaction.hmset(self.user_key(email), data) # Set the link from the emailed token to the user for password # creation and account confirmation. await transaction.set(self.token_key(confirmation_token), email) await transaction.exec() except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error creating user.", user_email=email, confirmation_token=confirmation_token, exc_info=ex, redis_error=True, ) raise ex logger.msg("Created user.", user=data) # Return the object created to represent the user in the world. return object_id
[docs] async def token_to_email( self, confirmation_token: str ) -> Union[str, None]: """ Given a confirmation token, will return the related email address. If no email or token exists, returns None. """ try: email = await self.redis.get(self.token_key(confirmation_token)) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error getting email from token.", confirmation_token=confirmation_token, exc_info=ex, redis_error=True, ) raise ex if email: return email return None
[docs] async def email_to_object_id(self, email: str) -> int: """ Return the id of the in game object representing the player identified by the referenced email address. """ try: key = self.user_key(email) object_id = await self.redis.hget(key, "object_id") except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error getting object id from email.", user_email=email, exc_info=ex, redis_error=True, ) raise ex if object_id: return json.loads(object_id) # Return false-y 0 to indicate no object id found for the referenced # email address. return 0
[docs] async def set_user_password(self, email: str, password: str) -> bool: """ Given a user identified by the referenced email address, update their password to the one provided as an argument to this function. Passwords cannot be set for non-existent users, nor inactive users. """ hashed_password = self.hash_password(password) # JSON-ify for Redis. data = {"password": json.dumps(hashed_password)} try: key = self.user_key(email) flag = await self.redis.hget(key, "active") if flag: # The user exists. is_active = json.loads(flag) if is_active: # The user is active. await self.redis.hmset(key, data) logger.msg("Set password.", user_email=email) return True except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error setting password.", user_email=email, exc_info=ex, redis_error=True, ) raise ex return False
[docs] async def confirm_user( self, confirmation_token: str, password: str ) -> str: """ Given a confirmation token sets the referenced password against the email address related to the token. This is the final step in user confirmation. """ email = await self.token_to_email(confirmation_token) if email: await self.set_user_active(email, True) await self.set_user_password(email, password) else: msg = "Unable to confirm user with token." logger.msg( msg, confirmation_token=confirmation_token, ) raise ValueError(msg) try: await self.redis.delete([self.token_key(confirmation_token)]) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error deleting token.", user_email=email, confirmation_token=confirmation_token, exc_info=ex, redis_error=True, ) raise ex logger.msg("User confirmed email address.", user_email=email) return email
[docs] async def verify_user(self, email: str, password: str) -> bool: """ Given an email address and password, will check that the credentials are valid for signing into the system. """ try: result = await self.redis.hgetall_asdict(self.user_key(email)) if not result: return False user_data = {key: json.loads(val) for key, val in result.items()} # Check the user is active. if not user_data.get("active", False): # Inactive users can never log in. return False except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error getting stored password.", user_email=email, exc_info=ex, redis_error=True, ) raise ex return self.verify_password(user_data["password"], password)
[docs] async def set_user_active( self, email: str, active_flag: bool = True ) -> None: """ Set the "active" flag against the user identified via the email address to the value of "active_flag". """ # JSON-ify for Redis. data = {"active": json.dumps(active_flag)} try: await self.redis.hmset(self.user_key(email), data) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error setting user active.", user_email=email, new_active_flag_value=active_flag, exc_info=ex, redis_error=True, ) raise ex logger.msg( "Set user active flag.", user_email=email, active=active_flag )
[docs] async def set_last_seen(self, email: str) -> None: """ Set the last_seen value for the user identified by the referenced object id. """ now = try: object_id = await self.email_to_object_id(email) key = self.last_seen_key(object_id) await self.redis.set(key, now) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error setting last seen.", user_email=email, exc_info=ex, redis_error=True, ) raise ex logger.msg("Set last seen.", user_email=email, last_seen=now)
[docs] async def get_last_seen(self, user_id: int) -> Union[datetime, None]: """ Returns a datetime object representing the moment at which the user, whose in-game object is referenced in the arguments, was last seen. """ try: key = self.last_seen_key(user_id) val = await self.redis.get(key) if val: # The user was last seen at a certain time. return datetime.fromisoformat(val) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error getting last seen.", user_id=user_id, exc_info=ex, redis_error=True, ) raise ex return None
[docs] async def delete_user(self, email: str) -> None: """ Soft delete the user whilst keeping all the objects owned by the user (who is identified by the referenced email address). This involves setting the user as inactive (so they can't log in) and ensuring they are not contained within another object. """ await self.set_user_active(email, False) try: object_id = await self.email_to_object_id(email) await self.set_container(object_id, -1) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error deleting user.", user_email=email, exc_info=ex, redis_error=True, ) raise ex logger.msg("Deleted user.", user_email=email)
[docs] async def delete_object(self, object_id: int) -> None: """ Soft delete an object from the database. This involves setting the is_deleted flag and ensuring the object isn't contained within another object. The current time is set for the deleted flag. """ try: attrs: Mapping[ str, Union[ str, int, float, bool, Sequence[Union[str, int, float, bool]], ], ] = {constants.IS_DELETED:} await self.annotate_object(object_id, **attrs) await self.set_container(object_id, -1) except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error deleting object.", object_id=object_id, exc_info=ex, redis_error=True, ) raise ex logger.msg("Deleted object.", object_id=object_id)
[docs] async def set_container(self, object_id: int, container_id: int) -> None: """ Ensure the referenced object is set to be contained by the object referenced as container_id. If the container_id < 0, then the referenced object_id is not contained anywhere. """ try: location_key = self.location_key(object_id) id_val = json.dumps(object_id) # Get current container for the referenced object. old_container_id = await self.redis.get(location_key) transaction = await self.redis.multi() # If required, remove object from old container. if old_container_id: await transaction.srem( self.inventory_key(json.loads(old_container_id)), [ id_val, ], ) if container_id < 0: # The object is not contained. await transaction.delete( [ location_key, ] ) else: # Add object to new container. await transaction.sadd( self.inventory_key(container_id), [ id_val, ], ) # Point object to new container. await transaction.set(location_key, json.dumps(container_id)) await transaction.exec() except (Error, ErrorReply) as ex: # pragma: no cover logger.msg( "Error moving object object.", object_id=object_id, container_id=container_id, exc_info=ex, redis_error=True, ) raise ex logger.msg( "Moved object.", object_id=object_id, container_id=container_id )
[docs] async def get_contents( self, object_id: int ) -> Dict[ int, Dict[ str, Union[ str, int, float, bool, Sequence[Union[str, int, float, bool]] ], ], ]: """ Return a dictionary containing all the objects contained within the referenced object. """ contents = await self.redis.smembers_asset( self.inventory_key(object_id) ) result = await self.get_objects([int(i) for i in contents]) return result
[docs] async def get_user_context(self, user_id: int) -> Dict[str, Any]: """ Return the user object and a representation of the object containing the user. This is used to obtain the minimal context needed for social interactions (saying, emoting etc):: { "user": { .. object representing the user .. }, "room": { .. object representing the room .. }, } """ objects = [ user_id, ] room_id = await self.get_location(user_id) if room_id: objects.append(room_id) raw_objects = await self.get_objects(objects) result = { "user": raw_objects[user_id], } if room_id: result["room"] = raw_objects[room_id] return result
[docs] async def get_users_in_room( self, object_id: int ) -> Sequence[ Dict[ str, Union[ str, int, float, bool, Sequence[Union[str, int, float, bool]] ], ] ]: """ Return a list of object ids for users who are contained within the room identified by the object id passed into the method. """ objects = await self.get_contents(object_id) result = [] for object_id, obj in objects.items(): if obj.get(constants.IS_USER, False): result.append(obj) return result
[docs] async def get_script_context(self, user_id: int) -> Dict: """ Returns a complete context in order that a script can be executed. """ result = await self.get_user_context(user_id) if result.get("room"): room_id = int(result["room"]["id"]) # type: ignore objects = await self.get_contents(room_id) exits = [] # To hold all objects that represent an exit. users = [] # To hold all objects that represent other users. things = [] # To hold all objects in the current room. for obj in objects.values(): if obj.get(constants.IS_EXIT, False): exits.append(obj) elif obj.get(constants.IS_USER, False): if obj["id"] != user_id: users.append(obj) else: things.append(obj) result["exits"] = exits result["users"] = users result["things"] = things return result
[docs] async def get_location(self, object_id) -> Union[int, None]: """ Given an object_id, return the id of the object that contains it. If the object is not contained within another object, return None. """ result = await self.redis.get(self.location_key(object_id)) if result: return json.loads(result) return None