Source code for dragonfly.actions.action_base

#
# This file is part of Dragonfly.
# (c) Copyright 2007, 2008 by Christo Butcher
# Licensed under the LGPL.
#
#   Dragonfly is free software: you can redistribute it and/or modify it
#   under the terms of the GNU Lesser General Public License as published
#   by the Free Software Foundation, either version 3 of the License, or
#   (at your option) any later version.
#
#   Dragonfly is distributed in the hope that it will be useful, but
#   WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#   Lesser General Public License for more details.
#
#   You should have received a copy of the GNU Lesser General Public
#   License along with Dragonfly.  If not, see
#   <http://www.gnu.org/licenses/>.
#

"""
ActionBase base class
============================================================================

"""

from functools import reduce
from locale import getpreferredencoding
import logging

from six import PY2, integer_types, text_type


#---------------------------------------------------------------------------


[docs]class ActionError(Exception): pass
#---------------------------------------------------------------------------
[docs]class ActionBase(object): """ Base class for Dragonfly's action classes. """ _log_init = logging.getLogger("action.init") _log_exec = logging.getLogger("action.exec") _log = logging.getLogger("action") #----------------------------------------------------------------------- # Initialization and aggregation methods. def __init__(self): self._str = "" def __repr__(self): if PY2: return self.__unicode__().encode(getpreferredencoding()) else: return self.__unicode__() def __unicode__(self): return u"%s(%s)" % (self.__class__.__name__, self._str) def __add__(self, other): return ActionSeries(self, other) def __iadd__(self, other): return ActionSeries(self, other) def __or__(self, other): return UnsafeActionSeries(self, other) def __ior__(self, other): return UnsafeActionSeries(self, other) def __mul__(self, factor): return ActionRepetition(self, factor) def __imul__(self, factor): return ActionRepetition(self, factor) #----------------------------------------------------------------------- # Execution methods. def bind(self, data=None): return BoundAction(self, data) def copy_bind(self, data=None): return BoundAction(self, data) def execute(self, data=None): self._log_exec.debug("Executing action: %s (%s)", self, data) try: if self._execute(data) is False: raise ActionError(str(self)) except ActionError as e: self._log_exec.error("Execution failed: %s", e) return False except Exception as e: self._log_exec.exception("Execution of %s failed due to " "exception: %s", self, e) return False return True def _execute(self, data=None): """ Virtual method. """
#---------------------------------------------------------------------------
[docs]class DynStrActionBase(ActionBase): # pylint: disable=E1111,R1710 # Suppress warnings about return statements in some methods. #----------------------------------------------------------------------- # Initialization methods. def __init__(self, spec=None, static=False): ActionBase.__init__(self) self.initialize(spec, static) def initialize(self, spec=None, static=False): self._spec = spec self._static = False self._events = None if spec is None: return if static or spec.find("%") == -1: self._static = True self._events = self._parse_spec(spec) else: self._static = False self._events = None self._str = "%r" % spec if not self._static: self._str += ", dynamic" def _parse_spec(self, spec): """ Virtual method. """ #----------------------------------------------------------------------- # Execution methods. def _execute(self, data=None): if self._static: # If static, the events have already been parsed by the # initialize() method. self._execute_events(self._events) else: # If not static, now is the time to build the dynamic spec, # parse it, and execute the events. if not data: spec = self._spec else: try: spec = self._spec % data except KeyError: self._log_exec.error("%s: Spec %r doesn't match data " "%r.", self, self._spec, data) return False self._log_exec.debug("%s: Parsing dynamic spec: %r", self, spec) events = self._parse_spec(spec) self._execute_events(events) def _execute_events(self, events): """ Virtual method. """
#---------------------------------------------------------------------------
[docs]class BoundAction(ActionBase): #----------------------------------------------------------------------- # Initialization methods. def __init__(self, action, data): ActionBase.__init__(self) self._action = action self._data = data self._str = "%r, %r" % (action, data) #----------------------------------------------------------------------- # Execution methods. def execute(self, data=None): if not data: data = {} if self._data: data = dict(data) data.update(self._data) self._action.execute(data)
#---------------------------------------------------------------------------
[docs]class ActionSeries(ActionBase): #----------------------------------------------------------------------- # Initialization methods. #: Whether to stop executing if an action in the series fails. stop_on_failures = True def __init__(self, *actions): ActionBase.__init__(self) self._actions = list(actions) self._set_str() def _set_str(self): # Use a flat list of the series actions for a more readable # string representation. self._str = u", ".join(text_type(a) for a in self.flat_action_list()) def flat_action_list(self): # Get a flattened list of the series actions. result = [] for action in self._actions: if isinstance(action, ActionSeries) and action.stop_on_failures: result.extend(action.flat_action_list()) else: result.append(action) return result def append(self, other): assert isinstance(other, ActionBase) self._actions.append(other) self._set_str() def __iadd__(self, other): self.append(other) return self def __ior__(self, other): self.append(other) return self #----------------------------------------------------------------------- # Execution methods. def _execute(self, data=None): # Use a flat list of the series actions for more sensible sequence # termination and logging if an error occurs during execution. for action in self.flat_action_list(): if action.execute(data) is False and self.stop_on_failures: return False return True def execute(self, data=None): # Override execute() to discard the return value. ActionBase.execute(self, data) def __str__(self): return reduce((lambda x, y: "{}+{}".format(x, y)), self._actions)
[docs]class UnsafeActionSeries(ActionSeries): stop_on_failures = False def execute(self, data=None): for action in self._actions: action.execute(data) def __str__(self): return reduce((lambda x, y: "{}|{}".format(x, y)), self._actions)
#---------------------------------------------------------------------------
[docs]class ActionRepetition(ActionBase): #----------------------------------------------------------------------- # Initialization methods. def __init__(self, action, factor): ActionBase.__init__(self) self._action = action self._factor = factor self._str = "%s, %s" % (action, factor) if not isinstance(factor, (int, Repeat)): raise TypeError("Invalid multiplier type: %r" " (must be an int or a Repeat object)" % factor) #----------------------------------------------------------------------- # Execution methods. def execute(self, data=None): if isinstance(self._factor, integer_types): repeat = self._factor elif isinstance(self._factor, Repeat): repeat = self._factor.factor(data) else: raise ActionError("Invalid repeat factor: %r" % (self._factor,)) for _ in range(repeat): if self._action.execute(data) is False: raise ActionError(str(self)) def __str__(self): return '{{{}}}{}'.format(self._action, self._factor)
#---------------------------------------------------------------------------
[docs]class Repeat(object): # pylint: disable=line-too-long """ Action repeat factor. Integer Repeat factors ignore any supply data:: >>> integer = Repeat(count=3) >>> integer.factor() 3 >>> integer.factor({"foo": 4}) # Non-related data is ignored. 3 Integer Repeat factors can be specified with the ``*`` operator:: >>> from dragonfly import Function >>> def func(): ... print("executing 'func'") ... >>> action = Function(func) * 3 >>> action.execute() executing 'func' executing 'func' executing 'func' Named Repeat factors retrieved their factor-value from the supplied data:: >>> named = Repeat("foo") >>> named.factor() Traceback (most recent call last): ... ActionError: No extra repeat factor found for name 'foo' ('NoneType' object is unsubscriptable) >>> named.factor({"foo": 4}) 4 Repeat factors with both integer count and named extra values set combined (add) these together to determine their factor-value:: >>> combined = Repeat(extra="foo", count=3) >>> combined.factor() Traceback (most recent call last): ... ActionError: No extra repeat factor found for name 'foo' ('NoneType' object is unsubscriptable) >>> combined.factor({"foo": 4}) # Combined factors 3 + 4 = 7. 7 """ def __init__(self, extra=None, count=None): # Backward compatibility for swapped arguments # (#103) if isinstance(extra, integer_types): self._count = extra self._extra = count else: self._extra = extra self._count = count if count is not None else 0 def factor(self, data=None): count = self._count if self._extra: try: additional = int(data[self._extra]) except ValueError: raise ActionError("Repeat factor %r has invalid value %r" % (self._extra, data[self._extra])) except KeyError: raise ActionError("No extra repeat factor found for name %r" % (self._extra,)) except Exception as e: raise ActionError("No extra repeat factor found for name %r" " (%s)" % (self._extra, e)) count += additional return count def __str__(self): return '{}'.format(self._extra if self._extra else self._count)