Source code for evennia.server.inputfuncs

"""
Functions for processing input commands.

All global functions in this module whose name does not start with "_"
is considered an inputfunc. Each function must have the following
callsign (where inputfunc name is always lower-case, no matter what the
OOB input name looked like):

    inputfunc(session, *args, **kwargs)

Where "options" is always one of the kwargs, containing eventual
protocol-options.
There is one special function, the "default" function, which is called
on a no-match. It has this callsign:

    default(session, cmdname, *args, **kwargs)

Evennia knows which modules to use for inputfuncs by
settings.INPUT_FUNC_MODULES.

"""

import importlib
from codecs import lookup as codecs_lookup

from django.conf import settings

from evennia.accounts.models import AccountDB
from evennia.commands.cmdhandler import cmdhandler
from evennia.utils.logger import log_err
from evennia.utils.utils import to_str

BrowserSessionStore = importlib.import_module(settings.SESSION_ENGINE).SessionStore


# always let "idle" work since we use this in the webclient
_IDLE_COMMAND = settings.IDLE_COMMAND
_IDLE_COMMAND = (_IDLE_COMMAND,) if _IDLE_COMMAND == "idle" else (_IDLE_COMMAND, "idle")
_GA = object.__getattribute__
_SA = object.__setattr__


_STRIP_INCOMING_MXP = settings.MXP_ENABLED and settings.MXP_OUTGOING_ONLY
_STRIP_MXP = None


def _NA(o):
    return "N/A"


def _maybe_strip_incoming_mxp(txt):
    global _STRIP_MXP
    if _STRIP_INCOMING_MXP:
        if not _STRIP_MXP:
            from evennia.utils.ansi import strip_mxp as _STRIP_MXP
        return _STRIP_MXP(txt)
    return txt


_ERROR_INPUT = "Inputfunc {name}({session}): Wrong/unrecognized input: {inp}"


# All global functions are inputfuncs available to process inputs


[docs]def text(session, *args, **kwargs): """ Main text input from the client. This will execute a command string on the server. Args: session (Session): The active Session to receive the input. text (str): First arg is used as text-command input. Other arguments are ignored. """ # from evennia.server.profiling.timetrace import timetrace # text = timetrace(text, "ServerSession.data_in") txt = args[0] if args else None # explicitly check for None since text can be an empty string, which is # also valid if txt is None: return # this is treated as a command input # handle the 'idle' command if txt.strip() in _IDLE_COMMAND: session.update_session_counters(idle=True) return txt = _maybe_strip_incoming_mxp(txt) if session.account: # nick replacement puppet = session.puppet if puppet: txt = puppet.nicks.nickreplace(txt, categories=("inputline"), include_account=True) else: txt = session.account.nicks.nickreplace( txt, categories=("inputline"), include_account=False ) kwargs.pop("options", None) cmdhandler(session, txt, callertype="session", session=session, **kwargs) session.update_session_counters()
[docs]def bot_data_in(session, *args, **kwargs): """ Text input from the IRC and RSS bots. This will trigger the execute_cmd method on the bots in-game counterpart. Args: session (Session): The active Session to receive the input. text (str): First arg is text input. Other arguments are ignored. """ txt = args[0] if args else None # Explicitly check for None since text can be an empty string, which is # also valid if txt is None: return # this is treated as a command input # handle the 'idle' command if txt.strip() in _IDLE_COMMAND: session.update_session_counters(idle=True) return txt = _maybe_strip_incoming_mxp(txt) kwargs.pop("options", None) # Trigger the execute_cmd method of the corresponding bot. session.account.execute_cmd(session=session, txt=txt, **kwargs) session.update_session_counters()
[docs]def echo(session, *args, **kwargs): """ Echo test function """ if _STRIP_INCOMING_MXP: txt = strip_mxp(txt) session.data_out(text="Echo returns: %s" % args)
[docs]def default(session, cmdname, *args, **kwargs): """ Default catch-function. This is like all other input functions except it will get `cmdname` as the first argument. """ err = ( "Session {sessid}: Input command not recognized:\n" " name: '{cmdname}'\n" " args, kwargs: {args}, {kwargs}".format( sessid=session.sessid, cmdname=cmdname, args=args, kwargs=kwargs ) ) if session.protocol_flags.get("INPUTDEBUG", False): session.msg(err) log_err(err)
_CLIENT_OPTIONS = ( "ANSI", "XTERM256", "MXP", "UTF-8", "SCREENREADER", "ENCODING", "MCCP", "SCREENHEIGHT", "SCREENWIDTH", "INPUTDEBUG", "RAW", "NOCOLOR", "NOGOAHEAD", "LOCALECHO", )
[docs]def client_options(session, *args, **kwargs): """ This allows the client an OOB way to inform us about its name and capabilities. This will be integrated into the session settings Keyword Args: get (bool): If this is true, return the settings as a dict (ignore all other kwargs). client (str): A client identifier, like "mushclient". version (str): A client version ansi (bool): Supports ansi colors xterm256 (bool): Supports xterm256 colors or not mxp (bool): Supports MXP or not utf-8 (bool): Supports UTF-8 or not screenreader (bool): Screen-reader mode on/off mccp (bool): MCCP compression on/off screenheight (int): Screen height in lines screenwidth (int): Screen width in characters inputdebug (bool): Debug input functions nocolor (bool): Strip color raw (bool): Turn off parsing localecho (bool): Turn on server-side echo (for clients not supporting it) """ old_flags = session.protocol_flags if not kwargs or kwargs.get("get", False): # return current settings options = dict((key, old_flags[key]) for key in old_flags if key.upper() in _CLIENT_OPTIONS) session.msg(client_options=options) return def validate_encoding(val): # helper: change encoding try: codecs_lookup(val) except LookupError: raise RuntimeError("The encoding '|w%s|n' is invalid. " % val) return val def validate_size(val): return {0: int(val)} def validate_bool(val): if isinstance(val, str): return True if val.lower() in ("true", "on", "1") else False return bool(val) flags = {} for key, value in kwargs.items(): key = key.lower() if key == "client": flags["CLIENTNAME"] = to_str(value) elif key == "version": if "CLIENTNAME" in flags: flags["CLIENTNAME"] = "%s %s" % (flags["CLIENTNAME"], to_str(value)) elif key == "ENCODING": flags["ENCODING"] = validate_encoding(value) elif key == "ansi": flags["ANSI"] = validate_bool(value) elif key == "xterm256": flags["XTERM256"] = validate_bool(value) elif key == "mxp": flags["MXP"] = validate_bool(value) elif key == "utf-8": flags["UTF-8"] = validate_bool(value) elif key == "screenreader": flags["SCREENREADER"] = validate_bool(value) elif key == "mccp": flags["MCCP"] = validate_bool(value) elif key == "screenheight": flags["SCREENHEIGHT"] = validate_size(value) elif key == "screenwidth": flags["SCREENWIDTH"] = validate_size(value) elif key == "inputdebug": flags["INPUTDEBUG"] = validate_bool(value) elif key == "nocolor": flags["NOCOLOR"] = validate_bool(value) elif key == "raw": flags["RAW"] = validate_bool(value) elif key == "nogoahead": flags["NOGOAHEAD"] = validate_bool(value) elif key == "localecho": flags["LOCALECHO"] = validate_bool(value) elif key in ( "Char 1", "Char.Skills 1", "Char.Items 1", "Room 1", "IRE.Rift 1", "IRE.Composer 1", ): # ignore mudlet's default send (aimed at IRE games) pass elif key not in ("options", "cmdid"): err = _ERROR_INPUT.format(name="client_settings", session=session, inp=key) session.msg(text=err) session.protocol_flags.update(flags) # we must update the protocol flags on the portal session copy as well session.sessionhandler.session_portal_partial_sync({session.sessid: {"protocol_flags": flags}})
[docs]def get_client_options(session, *args, **kwargs): """ Alias wrapper for getting options. """ client_options(session, get=True)
[docs]def get_inputfuncs(session, *args, **kwargs): """ Get the keys of all available inputfuncs. Note that we don't get it from this module alone since multiple modules could be added. So we get it from the sessionhandler. """ inputfuncsdict = dict( (key, func.__doc__) for key, func in session.sessionhandler.get_inputfuncs().items() ) session.msg(get_inputfuncs=inputfuncsdict)
[docs]def login(session, *args, **kwargs): """ Peform a login. This only works if session is currently not logged in. This will also automatically throttle too quick attempts. Keyword Args: name (str): Account name password (str): Plain-text password """ if not session.logged_in and "name" in kwargs and "password" in kwargs: from evennia.commands.default.unloggedin import create_normal_account account = create_normal_account(session, kwargs["name"], kwargs["password"]) if account: session.sessionhandler.login(session, account)
_gettable = { "name": lambda obj: obj.key, "key": lambda obj: obj.key, "location": lambda obj: obj.location.key if obj.location else "None", "servername": lambda obj: settings.SERVERNAME, }
[docs]def get_value(session, *args, **kwargs): """ Return the value of a given attribute or db_property on the session's current account or character. Keyword Args: name (str): Name of info value to return. Only names in the _gettable dictionary earlier in this module are accepted. """ name = kwargs.get("name", "") obj = session.puppet or session.account if name in _gettable: session.msg(get_value={"name": name, "value": _gettable[name](obj)})
def _testrepeat(**kwargs): """ This is a test function for using with the repeat inputfunc. Keyword Args: session (Session): Session to return to. """ import time kwargs["session"].msg(repeat="Repeat called: %s" % time.time()) _repeatable = {"test1": _testrepeat, "test2": _testrepeat} # example only # "
[docs]def repeat(session, *args, **kwargs): """ Call a named function repeatedly. Note that this is meant as an example of limiting the number of possible call functions. Keyword Args: callback (str): The function to call. Only functions from the _repeatable dictionary earlier in this module are available. interval (int): How often to call function (s). Defaults to once every 60 seconds with a minimum of 5 seconds. stop (bool): Stop a previously assigned ticker with the above settings. """ from evennia.scripts.tickerhandler import TICKER_HANDLER name = kwargs.get("callback", "") interval = max(5, int(kwargs.get("interval", 60))) if name in _repeatable: if kwargs.get("stop", False): TICKER_HANDLER.remove( interval, _repeatable[name], idstring=session.sessid, persistent=False ) else: TICKER_HANDLER.add( interval, _repeatable[name], idstring=session.sessid, persistent=False, session=session, ) else: session.msg("Allowed repeating functions are: %s" % (", ".join(_repeatable)))
[docs]def unrepeat(session, *args, **kwargs): "Wrapper for OOB use" kwargs["stop"] = True repeat(session, *args, **kwargs)
_monitorable = {"name": "db_key", "location": "db_location", "desc": "desc"} def _on_monitor_change(**kwargs): fieldname = kwargs["fieldname"] obj = kwargs["obj"] name = kwargs["name"] session = kwargs["session"] outputfunc_name = kwargs["outputfunc_name"] category = None # Attributes stored in the MonitorHandler with categories are # stored as fieldname "db_value[category_name]", but we need to # separate [category_name] because the actual attribute is stored on # the object as "db_value" with a separate "category" field. if hasattr(obj, "db_category") and obj.db_category != None: category = obj.db_category fieldname = fieldname.replace("[{}]".format(obj.db_category), "") # the session may be None if the char quits and someone # else then edits the object if session: callsign = { outputfunc_name: { "name": name, **({"category": category} if category is not None else {}), "value": _GA(obj, fieldname), } } session.msg(**callsign)
[docs]def monitor(session, *args, **kwargs): """ Adds monitoring to a given property or Attribute. Keyword Args: name (str): The name of the property or Attribute to report. No db_* prefix is needed. Only names in the _monitorable dict earlier in this module are accepted. stop (bool): Stop monitoring the above name. outputfunc_name (str, optional): Change the name of the outputfunc name. This is used e.g. by MSDP which has its own specific output format. """ from evennia.scripts.monitorhandler import MONITOR_HANDLER name = kwargs.get("name", None) outputfunc_name = kwargs.get("outputfunc_name", "monitor") category = kwargs.get("category", None) if name and name in _monitorable and session.puppet: field_name = _monitorable[name] obj = session.puppet if kwargs.get("stop", False): MONITOR_HANDLER.remove(obj, field_name, idstring=session.sessid) else: # the handler will add fieldname and obj to the kwargs automatically MONITOR_HANDLER.add( obj, field_name, _on_monitor_change, idstring=session.sessid, persistent=False, name=name, session=session, outputfunc_name=outputfunc_name, category=category, )
[docs]def unmonitor(session, *args, **kwargs): """ Wrapper for turning off monitoring """ kwargs["stop"] = True monitor(session, *args, **kwargs)
[docs]def monitored(session, *args, **kwargs): """ Report on what is being monitored """ from evennia.scripts.monitorhandler import MONITOR_HANDLER obj = session.puppet monitors = MONITOR_HANDLER.all(obj=obj) session.msg(monitored=(monitors, {}))
def _on_webclient_options_change(**kwargs): """ Called when the webclient options stored on the account changes. Inform the interested clients of this change. """ session = kwargs["session"] obj = kwargs["obj"] fieldname = kwargs["fieldname"] clientoptions = _GA(obj, fieldname) # the session may be None if the char quits and someone # else then edits the object if session: session.msg(webclient_options=clientoptions)
[docs]def webclient_options(session, *args, **kwargs): """ Handles retrieving and changing of options related to the webclient. If kwargs is empty (or contains just a "cmdid"), the saved options will be sent back to the session. A monitor handler will be created to inform the client of any future options that changes. If kwargs is not empty, the key/values stored in there will be persisted to the account object. Keyword Args: <option name>: an option to save """ account = session.account clientoptions = account.db._saved_webclient_options if not clientoptions: # No saved options for this account, copy and save the default. account.db._saved_webclient_options = settings.WEBCLIENT_OPTIONS.copy() # Get the _SaverDict created by the database. clientoptions = account.db._saved_webclient_options # The webclient adds a cmdid to every kwargs, but we don't need it. try: del kwargs["cmdid"] except KeyError: pass if not kwargs: # No kwargs: we are getting the stored options # Convert clientoptions to regular dict for sending. session.msg(webclient_options=dict(clientoptions)) # Create a monitor. If a monitor already exists then it will replace # the previous one since it would use the same idstring from evennia.scripts.monitorhandler import MONITOR_HANDLER MONITOR_HANDLER.add( account, "_saved_webclient_options", _on_webclient_options_change, idstring=session.sessid, persistent=False, session=session, ) else: # kwargs provided: persist them to the account object. clientoptions.update(kwargs)
# OOB protocol-specific aliases and wrappers # GMCP aliases hello = client_options supports_set = client_options # MSDP aliases (some of the the generic MSDP commands defined in the MSDP spec are prefixed # by msdp_ at the protocol level) # See https://tintin.sourceforge.io/protocols/msdp/
[docs]def msdp_list(session, *args, **kwargs): """ MSDP LIST command """ from evennia.scripts.monitorhandler import MONITOR_HANDLER args_lower = [arg.lower() for arg in args] if "commands" in args_lower: inputfuncs = [ key[5:] if key.startswith("msdp_") else key for key in session.sessionhandler.get_inputfuncs().keys() ] session.msg(commands=(inputfuncs, {})) if "lists" in args_lower: session.msg( lists=( [ "commands", "lists", "configurable_variables", "reportable_variables", "reported_variables", "sendable_variables", ], {}, ) ) if "configurable_variables" in args_lower: session.msg(configurable_variables=(_CLIENT_OPTIONS, {})) if "reportable_variables" in args_lower: session.msg(reportable_variables=(_monitorable, {})) if "reported_variables" in args_lower: obj = session.puppet monitor_infos = MONITOR_HANDLER.all(obj=obj) fieldnames = [tup[1] for tup in monitor_infos] session.msg(reported_variables=(fieldnames, {})) if "sendable_variables" in args_lower: session.msg(sendable_variables=(_monitorable, {}))
[docs]def msdp_report(session, *args, **kwargs): """ MSDP REPORT command """ kwargs["outputfunc_name":"report"] monitor(session, *args, **kwargs)
[docs]def msdp_unreport(session, *args, **kwargs): """ MSDP UNREPORT command """ unmonitor(session, *args, **kwargs)
[docs]def msdp_send(session, *args, **kwargs): """ MSDP SEND command """ out = {} for varname in args: if varname.lower() in _monitorable: out[varname] = _monitorable[varname.lower()] session.msg(send=((), out))
# client specific def _not_implemented(session, *args, **kwargs): """ Dummy used to swallow missing-inputfunc errors for common clients. """ pass # GMCP External.Discord.Hello is sent by Mudlet as a greeting # (see https://wiki.mudlet.org/w/Manual:Technical_Manual) external_discord_hello = _not_implemented # GMCP Client.Gui is sent by Mudlet for gui setup. client_gui = _not_implemented