Source code for evennia.typeclasses.tags

"""
Tags are entities that are attached to objects in the same way as
Attributes. But contrary to Attributes, which are unique to an
individual object, a single Tag can be attached to any number of
objects at the same time.

Tags are used for tagging, obviously, but the data structure is also
used for storing Aliases and Permissions. This module contains the
respective handlers.

"""

from collections import defaultdict

from django.conf import settings
from django.db import models

from evennia.locks.lockfuncs import perm as perm_lockfunc
from evennia.utils.utils import make_iter, to_str

_TYPECLASS_AGGRESSIVE_CACHE = settings.TYPECLASS_AGGRESSIVE_CACHE

# ------------------------------------------------------------
#
# Tags
#
# ------------------------------------------------------------


[docs]class Tag(models.Model): """ Tags are quick markers for objects in-game. An typeobject can have any number of tags, stored via its db_tags property. Tagging similar objects will make it easier to quickly locate the group later (such as when implementing zones). The main advantage of tagging as opposed to using tags is speed; a tag is very limited in what data it can hold, and the tag key+category is indexed for efficient lookup in the database. Tags are shared between objects - a new tag is only created if the key+category combination did not previously exist, making them unsuitable for storing object-related data (for this a regular Attribute should be used). The 'db_data' field is intended as a documentation field for the tag itself, such as to document what this tag+category stands for and display that in a web interface or similar. The main default use for Tags is to implement Aliases for objects. this uses the 'aliases' tag category, which is also checked by the default search functions of Evennia to allow quick searches by alias. """ db_key = models.CharField( "key", max_length=255, null=True, help_text="tag identifier", db_index=True ) db_category = models.CharField( "category", max_length=64, null=True, blank=True, help_text="tag category", db_index=True ) db_data = models.TextField( "data", null=True, blank=True, help_text="optional data field with extra information. This is not searched for.", ) # this is "objectdb" etc. Required behind the scenes db_model = models.CharField( "model", max_length=32, null=True, help_text="database model to Tag", db_index=True ) # this is None, alias or permission db_tagtype = models.CharField( "tagtype", max_length=16, null=True, blank=True, help_text="overall type of Tag", db_index=True, ) class Meta: "Define Django meta options" verbose_name = "Tag" unique_together = (("db_key", "db_category", "db_tagtype", "db_model"),) index_together = (("db_key", "db_category", "db_tagtype", "db_model"),) def __lt__(self, other): return str(self) < str(other) def __str__(self): return str( "<Tag: %s%s>" % (self.db_key, "(category:%s)" % self.db_category if self.db_category else "") )
# # Handlers making use of the Tags model #
[docs]class TagProperty: """ Tag Property. """ taghandler_name = "tags"
[docs] def __init__(self, category=None, data=None): """ Tag property descriptor. Allows for setting tags on an object as Django-like 'fields' on the class level. Since Tags are almost always used for querying, Tags are always created/assigned along with the object. Make sure the property/tagname does not collide with an existing method/property on the class. If it does, you must use tags.add() instead. Note that while you _can_ check e.g. `obj.tagname,this will give an AttributeError if the Tag is not set. Most often you want to use `obj.tags.get("tagname")` to check if a tag is set on an object. Example: :: class Character(DefaultCharacter): mytag = TagProperty() # category=None mytag2 = TagProperty(category="tagcategory") """ self._category = category self._data = data self._key = ""
def __set_name__(self, cls, name): """ Called when descriptor is first assigned to the class (not the instance!). It is called with the name of the field. """ self._key = name def __get__(self, instance, owner): """ Called when accessing the tag as a property on the instance. """ try: return getattr(instance, self.taghandler_name).get( key=self._key, category=self._category, return_list=False, raise_exception=True ) except AttributeError: self.__set__(instance, self._category) def __set__(self, instance, category): """ Assign a new category to the tag. It's not possible to set 'data' this way. """ self._category = category ( getattr(instance, self.taghandler_name).add( key=self._key, category=self._category, data=self._data ) ) def __delete__(self, instance): """ Called when running `del` on the property. Will disconnect the object from the Tag. Note that the tag will be readded on next fetch unless the TagProperty is also removed in code! """ getattr(instance, self.taghandler_name).remove(key=self._key, category=self._category)
[docs]class TagCategoryProperty: """ Tag Category Property. """ taghandler_name = "tags"
[docs] def __init__(self, *default_tags): """ Assign a property for a Tag Category, with any number of Tag keys. This is often more useful than the `TagProperty` since it's common to want to check which tags of a particular category the object is a member of. Args: *args (str or callable): Tag keys to assign to this property, using the category given by the name of the property. Note that, if these tags are always set on the object, if they are removed by some other means, they will be re-added when this property is accessed. Furthermore, changing this list after the object was created, will not remove any old tags (there is no way for the property to know if the new list is new or not). If a callable, it will be called without arguments to return the tag key. It is not possible to set tag `data` this way (use the Taghandler directly for that). Tag keys are not case sensitive. Raises: ValueError: If the input is not a valid tag key or tuple. Notes: It is not possible to set Tags with a `None` category using a `TagCategoryProperty` - use `obj.tags.add()` instead. Example: :: class RogueCharacter(DefaultCharacter): guild = TagCategoryProperty("thieves_guild", "merchant_guild") """ self._category = "" self._default_tags = self._parse_tag_input(*default_tags)
def _parse_tag_input(self, *args): """ Parse input to the property. Args: *args (str or callable): Tags, either as strings or `callable`, which should return the tag key when called without arguments. Keys are not case sensitive. Returns: list: A list of tag keys. """ tags = [] for tagkey in args: if callable(tagkey): tagkey = tagkey() tags.append((str(tagkey).lower())) return tags def __set_name__(self, cls, name): """ Called when descriptor is first assigned to the class (not the instance!). It is called with the name of the field. """ self._category = name def __get__(self, instance, owner): """ Called when accessing the tag as a property on the instance. Returns a list of tags under the given category. """ taghandler = getattr(instance, self.taghandler_name) default_tags = self._default_tags tags = taghandler.get(category=self._category, return_list=True) missing_default_tags = set(default_tags) - set(tags) if missing_default_tags: getattr(instance, self.taghandler_name).batch_add( *[(tag, self._category) for tag in missing_default_tags] ) tags += missing_default_tags # to avoid a second db call return tags def __set__(self, instance, *args): """ Assign a new set of tags to the category. Note that we can't know if previous tags were assigned from this property or from TagHandler, so we don't remove old tags. To refresh to only have the tags in this constructor, first use `del` on this property and re-access the property with the changed default list. """ getattr(instance, self.taghandler_name).batch_add(*[(tag, self._category) for tag in args]) def __delete__(self, instance): """ Called when running `del` on the property. Will remove all tags of this category from the object. Note that next time this desriptor is accessed, the default ones will be re-added! Note: This will remove _all_ tags of this category from the object. This is necessary in order to be able to be able to combine this with `__set__` to get a tag list where property and handler are in sync. """ getattr(instance, self.taghandler_name).remove(category=self._category)
[docs]class TagHandler(object): """ Generic tag-handler. Accessed via TypedObject.tags. """ _m2m_fieldname = "db_tags" _tagtype = None
[docs] def __init__(self, obj): """ Tags are stored internally in the TypedObject.db_tags m2m field with an tag.db_model based on the obj the taghandler is stored on and with a tagtype given by self.handlertype Args: obj (object): The object on which the handler is set. """ self.obj = obj self._objid = obj.id self._model = obj.__dbclass__.__name__.lower() self._cache = {} # store category names fully cached self._catcache = {} # full cache was run on all tags self._cache_complete = False
def _query_all(self): """ Get all tags for this object. """ query = { "%s__id" % self._model: self._objid, "tag__db_model": self._model, "tag__db_tagtype": self._tagtype, } return [ conn.tag for conn in getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query) ] def _fullcache(self): """ Cache all tags of this object. """ if not _TYPECLASS_AGGRESSIVE_CACHE: return tags = self._query_all() self._cache = dict( ( "%s-%s" % ( to_str(tag.db_key).lower(), tag.db_category.lower() if tag.db_category else None, ), tag, ) for tag in tags ) self._cache_complete = True def _getcache(self, key=None, category=None): """ Retrieve from cache or database (always caches) Args: key (str, optional): Tag key to query for category (str, optional): Tag category Returns: args (list): Returns a list of zero or more matches found from cache or database. Notes: When given a category only, a search for all objects of that category is done and a the category *name* is is stored. This tells the system on subsequent calls that the list of cached tags of this category is up-to-date and that the cache can be queried for category matches without missing any. The TYPECLASS_AGGRESSIVE_CACHE=False setting will turn off caching, causing each tag access to trigger a database lookup. """ key = str(key).strip().lower() if key else None category = category.strip().lower() if category else None if key: cachekey = "%s-%s" % (key, category) tag = _TYPECLASS_AGGRESSIVE_CACHE and self._cache.get(cachekey, None) if tag and (not hasattr(tag, "pk") and tag.pk is None): # clear out Tags deleted from elsewhere. We must search this anew. tag = None del self._cache[cachekey] if tag: return [tag] # return cached entity else: query = { "%s__id" % self._model: self._objid, "tag__db_model": self._model, "tag__db_tagtype": self._tagtype, "tag__db_key__iexact": key.lower(), "tag__db_category__iexact": category.lower() if category else None, } conn = getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query) if conn: tag = conn[0].tag if _TYPECLASS_AGGRESSIVE_CACHE: self._cache[cachekey] = tag return [tag] else: # only category given (even if it's None) - we can't # assume the cache to be complete unless we have queried # for this category before catkey = "-%s" % category if _TYPECLASS_AGGRESSIVE_CACHE and catkey in self._catcache: return [tag for key, tag in self._cache.items() if key.endswith(catkey)] else: # we have to query to make this category up-date in the cache query = { "%s__id" % self._model: self._objid, "tag__db_model": self._model, "tag__db_tagtype": self._tagtype, "tag__db_category__iexact": category.lower() if category else None, } tags = [ conn.tag for conn in getattr(self.obj, self._m2m_fieldname).through.objects.filter( **query ) ] if _TYPECLASS_AGGRESSIVE_CACHE: for tag in tags: cachekey = "%s-%s" % (tag.db_key, category) self._cache[cachekey] = tag # mark category cache as up-to-date self._catcache[catkey] = True return tags return [] def _setcache(self, key, category, tag_obj): """ Update cache. Args: key (str): A cleaned key string category (str or None): A cleaned category name tag_obj (tag): The newly saved tag """ if not _TYPECLASS_AGGRESSIVE_CACHE: return if not key: # don't allow an empty key in cache return key, category = ( str(key).strip().lower(), category.strip().lower() if category else category, ) cachekey = "%s-%s" % (key, category) catkey = "-%s" % category self._cache[cachekey] = tag_obj # mark that the category cache is no longer up-to-date self._catcache.pop(catkey, None) self._cache_complete = False def _delcache(self, key, category): """ Remove tag from cache Args: key (str): A cleaned key string category (str or None): A cleaned category name """ key, category = ( str(key).strip().lower(), category.strip().lower() if category else category, ) catkey = "-%s" % category if key: cachekey = "%s-%s" % (key, category) self._cache.pop(cachekey, None) else: [self._cache.pop(key, None) for key in self._cache if key.endswith(catkey)] # mark that the category cache is no longer up-to-date self._catcache.pop(catkey, None) self._cache_complete = False
[docs] def reset_cache(self): """ Reset the cache from the outside. """ self._cache_complete = False self._cache = {} self._catcache = {}
[docs] def add(self, key=None, category=None, data=None): """ Add a new tag to the handler. Args: key (str or list): The name of the tag to add. If a list, add several Tags. category (str, optional): Category of Tag. `None` is the default category. data (str, optional): Info text about the tag(s) added. This can not be used to store object-unique info but only eventual info about the tag itself. Notes: If the tag + category combination matches an already existing Tag object, this will be re-used and no new Tag will be created. """ if not key: return if not self._cache_complete: self._fullcache() for tagstr in make_iter(key): if not tagstr: continue tagstr = str(tagstr).strip().lower() category = str(category).strip().lower() if category else category data = str(data) if data is not None else None # this will only create tag if no matches existed beforehand (it # will overload data on an existing tag since that is not # considered part of making the tag unique) tagobj = self.obj.__class__.objects.create_tag( key=tagstr, category=category, data=data, tagtype=self._tagtype ) getattr(self.obj, self._m2m_fieldname).add(tagobj) self._setcache(tagstr, category, tagobj)
[docs] def has(self, key=None, category=None, return_list=False): """ Checks if the given Tag (or list of Tags) exists on the object. Args: key (str or iterable): The Tag key or tags to check for. If `None`, search by category. category (str, optional): Limit the check to Tags with this category (note, that `None` is the default category). Returns: has_tag (bool or list): If the Tag exists on this object or not. If `tag` was given as an iterable then the return is a list of booleans. Raises: ValueError: If neither `tag` nor `category` is given. """ ret = [] category = category.strip().lower() if category is not None else None if key: for tag_str in make_iter(key): tag_str = str(tag_str).strip().lower() ret.append(bool(self._getcache(tag_str, category))) elif category: ret.extend(bool(tag) for tag in self._getcache(category=category)) else: raise ValueError("Either tag or category must be provided.") if return_list: return ret return ret[0] if len(ret) == 1 else ret
[docs] def get( self, key=None, default=None, category=None, return_tagobj=False, return_list=False, raise_exception=False, ): """ Get the tag for the given key, category or combination of the two. Args: key (str or list, optional): The tag or tags to retrieve. default (any, optional): The value to return in case of no match. category (str, optional): The Tag category to limit the request to. Note that `None` is the valid, default category. If no `key` is given, all tags of this category will be returned. return_tagobj (bool, optional): Return the Tag object itself instead of a string representation of the Tag. return_list (bool, optional): Always return a list, regardless of number of matches. raise_exception (bool, optional): Raise AttributeError if no matches are found. Returns: tags (list): The matches, either string representations of the tags or the Tag objects themselves depending on `return_tagobj`. If 'default' is set, this will be a list with the default value as its only element. Raises: AttributeError: If finding no matches and `raise_exception` is True. """ ret = [] for keystr in make_iter(key): # note - the _getcache call removes case sensitivity for us ret.extend( [ tag if return_tagobj else to_str(tag.db_key) for tag in self._getcache(keystr, category) ] ) if not ret: if raise_exception: raise AttributeError(f"No tags found matching input {key}, {category}.") elif return_list: return [default] if default is not None else [] else: return default return ret if return_list else (ret[0] if len(ret) == 1 else ret)
[docs] def remove(self, key=None, category=None): """ Remove a tag from the handler based ond key and/or category. Args: key (str or list, optional): The tag or tags to retrieve. category (str, optional): The Tag category to limit the request to. Note that `None` is the valid, default category Notes: If neither key nor category is specified, this acts as .clear(). """ if not key: # only category self.clear(category=category) return for key in make_iter(key): if not (key or key.strip()): # we don't allow empty tags continue tagstr = str(key).strip().lower() category = category.strip().lower() if category else category # This does not delete the tag object itself. Maybe it should do # that when no objects reference the tag anymore (but how to check)? # For now, tags are never deleted, only their connection to objects. tagobj = getattr(self.obj, self._m2m_fieldname).filter( db_key=tagstr, db_category=category, db_model=self._model, db_tagtype=self._tagtype ) if tagobj: getattr(self.obj, self._m2m_fieldname).remove(tagobj[0]) self._delcache(key, category)
[docs] def clear(self, category=None): """ Remove all tags from the handler. Args: category (str, optional): The Tag category to limit the request to. Note that `None` is the valid, default category. """ if not self._cache_complete: self._fullcache() query = { "%s__id" % self._model: self._objid, "tag__db_model": self._model, "tag__db_tagtype": self._tagtype, } if category: query["tag__db_category"] = category.strip().lower() getattr(self.obj, self._m2m_fieldname).through.objects.filter(**query).delete() self._cache = {} self._catcache = {} self._cache_complete = False
[docs] def all(self, return_key_and_category=False, return_objs=False): """ Get all tags in this handler, regardless of category. Args: return_key_and_category (bool, optional): Return a list of tuples `[(key, category), ...]`. return_objs (bool, optional): Return tag objects. Returns: tags (list): A list of tag keys `[tagkey, tagkey, ...]` or a list of tuples `[(key, category), ...]` if `return_key_and_category` is set. """ if _TYPECLASS_AGGRESSIVE_CACHE: if not self._cache_complete: self._fullcache() tags = sorted(self._cache.values()) else: tags = sorted(self._query_all()) if return_key_and_category: # return tuple (key, category) return [(to_str(tag.db_key), tag.db_category) for tag in tags] elif return_objs: return tags else: return [to_str(tag.db_key) for tag in tags]
[docs] def batch_add(self, *args): """ Batch-add tags from a list of tuples. Args: *args (tuple or str): Each argument should be a `tagstr` keys or tuple `(keystr, category)` or `(keystr, category, data)`. It's possible to mix input types. Notes: This will generate a mimimal number of self.add calls, based on the number of categories involved (including `None`) (data is not unique and may be overwritten by the content of a latter tuple with the same category). """ keys = defaultdict(list) data = {} for tup in args: tup = make_iter(tup) nlen = len(tup) if nlen == 1: # just a key, no category keys[None].append(tup[0]) elif nlen == 2: keys[tup[1]].append(tup[0]) else: keys[tup[1]].append(tup[0]) data[tup[1]] = tup[2] # overwrite previous for category, key in keys.items(): self.add(key=key, category=category, data=data.get(category, None))
[docs] def batch_remove(self, *args): """ Batch-remove tags from a list of tuples. Args: *args (tuple or str): Each argument should be a `tagstr` keys or tuple `(keystr, category)` or `(keystr, category, data)` (the `data` field is ignored, only `keystr`/`category` matters). It's possible to mix input types. """ keys = defaultdict(list) for tup in args: tup = make_iter(tup) nlen = len(tup) if nlen == 1: # just a key, no category keys[None].append(tup[0]) elif nlen > 1: keys[tup[1]].append(tup[0]) for category, key in keys.items(): self.remove(key=key, category=category, data=data.get(category, None))
def __str__(self): return ",".join(self.all())
[docs]class AliasProperty(TagProperty): """ Allows for setting aliases like Django fields: :: class Character(DefaultCharacter): # note that every character will get the alias bob. Make sure # the alias property does not collide with an existing method # or property on the class. bob = AliasProperty() """ taghandler_name = "aliases"
[docs]class AliasHandler(TagHandler): """ A handler for the Alias Tag type. """ _tagtype = "alias"
[docs]class PermissionProperty(TagProperty): """ Allows for setting permissions like Django fields: :: class Character(DefaultCharacter): # note that every character will get this permission! Make # sure it doesn't collide with an existing method or property. myperm = PermissionProperty() """ taghandler_name = "permissions"
[docs]class PermissionHandler(TagHandler): """ A handler for the Permission Tag type. """ _tagtype = "permission"
[docs] def check(self, *permissions, require_all=False): """ Straight-up check the provided permission against this handler. The check will pass if - any/all given permission exists on the handler (depending on if `require_all` is set). - If handler sits on puppeted object and this is a hierarachical perm, the puppeting Account's permission will also be included in the check, prioritizing the Account's perm (this avoids escalation exploits by puppeting a too-high prio character) - a permission is also considered to exist on the handler, if it is *lower* than a permission on the handler and this is a 'hierarchical' permission given in `settings.PERMISSION_HIERARCHY`. Example: If the 'Developer' hierarchical perm perm is set on the handler, and we check for the 'Builder' perm, the check will pass. Args: *permissions (str): Any number of permissions to check. By default, the permission is passed if any of these (or higher, if a hierarchical permission defined in settings.PERMISSION_HIERARCHY) exists in the handler. Permissions are not case-sensitive. require_all (bool): If set, *all* provided permissions much pass the check for the entire check to pass. By default only one needs to pass. Returns: bool: If the provided permission(s) pass the check on this handler. Example: :: can_enter = obj.permissions.check("Blacksmith", "Builder") Notes: This works the same way as the `perms` lockfunc and could be replicated with a lock check against the lockstring "locktype: perm(perm1) OR perm(perm2) OR ..." (using AND for the `require_all` condition). """ if require_all: return all(perm_lockfunc(self.obj, None, perm) for perm in permissions) else: return any(perm_lockfunc(self.obj, None, perm) for perm in permissions)