From ec26bbffc6985792d4502ac9f4d4cb0960b16832 Mon Sep 17 00:00:00 2001 From: Greg Hill Date: Wed, 13 Jan 2016 13:47:03 -0600 Subject: [PATCH] Refactor Atom/BaseTask/Task/Retry class hierarchy * Moved common argument mapping logic to Atom * Removed BaseTask * Removed duplicated logic from subclasses Change-Id: I275745adb80cecb0c35c1230eac76436bf3b0157 --- taskflow/atom.py | 86 +++++++++++++- taskflow/engines/action_engine/compiler.py | 2 +- taskflow/engines/worker_based/worker.py | 2 +- taskflow/listeners/logging.py | 2 +- taskflow/retry.py | 11 +- taskflow/storage.py | 2 +- taskflow/task.py | 126 +++------------------ 7 files changed, 107 insertions(+), 124 deletions(-) diff --git a/taskflow/atom.py b/taskflow/atom.py index 73350309..60a6c427 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -228,7 +228,15 @@ class Atom(object): submission order). """ - def __init__(self, name=None, provides=None, inject=None): + default_provides = None + + def __init__(self, name=None, provides=None, requires=None, + auto_extract=True, rebind=None, inject=None, + ignore_list=None): + + if provides is None: + provides = self.default_provides + self.name = name self.version = (1, 0) self.inject = inject @@ -238,8 +246,13 @@ class Atom(object): self.provides = sets.OrderedSet(self.save_as) self.rebind = collections.OrderedDict() + self._build_arg_mapping(self.execute, requires=requires, + rebind=rebind, auto_extract=auto_extract, + ignore_list=ignore_list) + def _build_arg_mapping(self, executor, requires=None, rebind=None, auto_extract=True, ignore_list=None): + required, optional = _build_arg_mapping(self.name, requires, rebind, executor, auto_extract, ignore_list=ignore_list) @@ -258,13 +271,76 @@ class Atom(object): self.requires -= inject_keys self.optional -= inject_keys - @abc.abstractmethod - def execute(self, *args, **kwargs): - """Executes this atom.""" + def pre_execute(self): + """Code to be run prior to executing the atom. + + A common pattern for initializing the state of the system prior to + running atoms is to define some code in a base class that all your + atoms inherit from. In that class, you can define a ``pre_execute`` + method and it will always be invoked just prior to your atoms running. + """ @abc.abstractmethod + def execute(self, *args, **kwargs): + """Activate a given atom which will perform some operation and return. + + This method can be used to perform an action on a given set of input + requirements (passed in via ``*args`` and ``**kwargs``) to accomplish + some type of operation. This operation may provide some named + outputs/results as a result of it executing for later reverting (or for + other atoms to depend on). + + NOTE(harlowja): the result (if any) that is returned should be + persistable so that it can be passed back into this atom if + reverting is triggered (especially in the case where reverting + happens in a different python process or on a remote machine) and so + that the result can be transmitted to other atoms (which may be local + or remote). + + :param args: positional arguments that atom requires to execute. + :param kwargs: any keyword arguments that atom requires to execute. + """ + + def post_execute(self): + """Code to be run after executing the atom. + + A common pattern for cleaning up global state of the system after the + execution of atoms is to define some code in a base class that all your + atoms inherit from. In that class, you can define a ``post_execute`` + method and it will always be invoked just after your atoms execute, + regardless of whether they succeeded or not. + + This pattern is useful if you have global shared database sessions + that need to be cleaned up, for example. + """ + + def pre_revert(self): + """Code to be run prior to reverting the atom. + + This works the same as :meth:`.pre_execute`, but for the revert phase. + """ + def revert(self, *args, **kwargs): - """Reverts this atom (undoing any :meth:`execute` side-effects).""" + """Revert this atom. + + This method should undo any side-effects caused by previous execution + of the atom using the result of the :py:meth:`execute` method and + information on the failure which triggered reversion of the flow the + atom is contained in (if applicable). + + :param args: positional arguments that the atom required to execute. + :param kwargs: any keyword arguments that the atom required to + execute; the special key ``'result'`` will contain + the :py:meth:`execute` result (if any) and + the ``**kwargs`` key ``'flow_failures'`` will contain + any failure information. + """ + + def post_revert(self): + """Code to be run after reverting the atom. + + This works the same as :meth:`.post_execute`, but for the revert phase. + """ def __str__(self): return "%s==%s" % (self.name, misc.get_version_string(self)) diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 9933fd0c..ab0e443a 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -326,7 +326,7 @@ class PatternCompiler(object): self._compilation = None self._matchers = [ (flow.Flow, FlowCompiler(self._compile)), - (task.BaseTask, TaskCompiler()), + (task.Task, TaskCompiler()), ] self._level = 0 diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index a462c7f3..b4b820d2 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -113,7 +113,7 @@ System details: @staticmethod def _derive_endpoints(tasks): """Derive endpoints from list of strings, classes or packages.""" - derived_tasks = misc.find_subclasses(tasks, t_task.BaseTask) + derived_tasks = misc.find_subclasses(tasks, t_task.Task) return [endpoint.Endpoint(task) for task in derived_tasks] def _generate_banner(self): diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 219f6ac8..ac1ecffc 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -63,7 +63,7 @@ def _make_matcher(task_name): def _task_matcher(node): item = node.item - return isinstance(item, task.BaseTask) and item.name == task_name + return isinstance(item, task.Task) and item.name == task_name return _task_matcher diff --git a/taskflow/retry.py b/taskflow/retry.py index aa9208e3..c0198153 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -154,15 +154,12 @@ class Retry(atom.Atom): decisions and outcomes that have occurred (if available). """ - default_provides = None - def __init__(self, name=None, provides=None, requires=None, auto_extract=True, rebind=None): - if provides is None: - provides = self.default_provides - super(Retry, self).__init__(name, provides) - self._build_arg_mapping(self.execute, requires, rebind, auto_extract, - ignore_list=[EXECUTE_REVERT_HISTORY]) + super(Retry, self).__init__(name=name, provides=provides, + requires=requires, rebind=rebind, + auto_extract=auto_extract, + ignore_list=[EXECUTE_REVERT_HISTORY]) @property def name(self): diff --git a/taskflow/storage.py b/taskflow/storage.py index 58919ee0..dde1f2cd 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -303,7 +303,7 @@ class Storage(object): self._injected_args = {} self._lock = fasteners.ReaderWriterLock() self._ensure_matchers = [ - ((task.BaseTask,), (models.TaskDetail, 'Task')), + ((task.Task,), (models.TaskDetail, 'Task')), ((retry.Retry,), (models.RetryDetail, 'Retry')), ] if scope_fetcher is None: diff --git a/taskflow/task.py b/taskflow/task.py index 935fb740..1a83afe7 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -44,7 +44,7 @@ EVENT_UPDATE_PROGRESS = 'update_progress' @six.add_metaclass(abc.ABCMeta) -class BaseTask(atom.Atom): +class Task(atom.Atom): """An abstraction that defines a potential piece of work. This potential piece of work is expected to be able to contain @@ -59,10 +59,13 @@ class BaseTask(atom.Atom): # or existing internal events... TASK_EVENTS = (EVENT_UPDATE_PROGRESS,) - def __init__(self, name, provides=None, inject=None): + def __init__(self, name=None, provides=None, requires=None, + auto_extract=True, rebind=None, inject=None): if name is None: name = reflection.get_class_name(self) - super(BaseTask, self).__init__(name, provides, inject=inject) + super(Task, self).__init__(name, provides=provides, requires=requires, + auto_extract=auto_extract, rebind=rebind, + inject=inject) self._notifier = notifier.RestrictedNotifier(self.TASK_EVENTS) @property @@ -76,77 +79,6 @@ class BaseTask(atom.Atom): """ return self._notifier - def pre_execute(self): - """Code to be run prior to executing the task. - - A common pattern for initializing the state of the system prior to - running tasks is to define some code in a base class that all your - tasks inherit from. In that class, you can define a ``pre_execute`` - method and it will always be invoked just prior to your tasks running. - """ - - @abc.abstractmethod - def execute(self, *args, **kwargs): - """Activate a given task which will perform some operation and return. - - This method can be used to perform an action on a given set of input - requirements (passed in via ``*args`` and ``**kwargs``) to accomplish - some type of operation. This operation may provide some named - outputs/results as a result of it executing for later reverting (or for - other tasks to depend on). - - NOTE(harlowja): the result (if any) that is returned should be - persistable so that it can be passed back into this task if - reverting is triggered (especially in the case where reverting - happens in a different python process or on a remote machine) and so - that the result can be transmitted to other tasks (which may be local - or remote). - - :param args: positional arguments that task requires to execute. - :param kwargs: any keyword arguments that task requires to execute. - """ - - def post_execute(self): - """Code to be run after executing the task. - - A common pattern for cleaning up global state of the system after the - execution of tasks is to define some code in a base class that all your - tasks inherit from. In that class, you can define a ``post_execute`` - method and it will always be invoked just after your tasks execute, - regardless of whether they succeded or not. - - This pattern is useful if you have global shared database sessions - that need to be cleaned up, for example. - """ - - def pre_revert(self): - """Code to be run prior to reverting the task. - - This works the same as :meth:`.pre_execute`, but for the revert phase. - """ - - def revert(self, *args, **kwargs): - """Revert this task. - - This method should undo any side-effects caused by previous execution - of the task using the result of the :py:meth:`execute` method and - information on the failure which triggered reversion of the flow the - task is contained in (if applicable). - - :param args: positional arguments that the task required to execute. - :param kwargs: any keyword arguments that the task required to - execute; the special key ``'result'`` will contain - the :py:meth:`execute` result (if any) and - the ``**kwargs`` key ``'flow_failures'`` will contain - any failure information. - """ - - def post_revert(self): - """Code to be run after reverting the task. - - This works the same as :meth:`.post_execute`, but for the revert phase. - """ - def copy(self, retain_listeners=True): """Clone/copy this task. @@ -177,29 +109,7 @@ class BaseTask(atom.Atom): {'progress': cleaned_progress}) -class Task(BaseTask): - """Base class for user-defined tasks (derive from it at will!). - - Adds the following features on top of the :py:class:`.BaseTask`: - - - Auto-generates a name from the class name if a name is not - explicitly provided. - - Automatically adds all :py:meth:`.BaseTask.execute` argument names to - the task requirements (items provided by the task may be also specified - via ``default_provides`` class attribute or instance property). - """ - - default_provides = None - - def __init__(self, name=None, provides=None, requires=None, - auto_extract=True, rebind=None, inject=None): - if provides is None: - provides = self.default_provides - super(Task, self).__init__(name, provides=provides, inject=inject) - self._build_arg_mapping(self.execute, requires, rebind, auto_extract) - - -class FunctorTask(BaseTask): +class FunctorTask(Task): """Adaptor to make a task from a callable. Take any callable pair and make a task from it. @@ -239,7 +149,7 @@ class FunctorTask(BaseTask): return None -class ReduceFunctorTask(BaseTask): +class ReduceFunctorTask(Task): """General purpose Task to reduce a list by applying a function. This Task mimics the behavior of Python's built-in ``reduce`` function. The @@ -269,20 +179,21 @@ class ReduceFunctorTask(BaseTask): if name is None: name = reflection.get_callable_name(functor) - super(ReduceFunctorTask, self).__init__(name=name, provides=provides, - inject=inject) + super(ReduceFunctorTask, self).__init__(name=name, + provides=provides, + inject=inject, + requires=requires, + rebind=rebind, + auto_extract=auto_extract) self._functor = functor - self._build_arg_mapping(executor=self.execute, requires=requires, - rebind=rebind, auto_extract=auto_extract) - def execute(self, *args, **kwargs): l = [kwargs[r] for r in self.requires] return compat_reduce(self._functor, l) -class MapFunctorTask(BaseTask): +class MapFunctorTask(Task): """General purpose Task to map a function to a list. This Task mimics the behavior of Python's built-in ``map`` function. The @@ -314,13 +225,12 @@ class MapFunctorTask(BaseTask): if name is None: name = reflection.get_callable_name(functor) super(MapFunctorTask, self).__init__(name=name, provides=provides, - inject=inject) + inject=inject, requires=requires, + rebind=rebind, + auto_extract=auto_extract) self._functor = functor - self._build_arg_mapping(executor=self.execute, requires=requires, - rebind=rebind, auto_extract=auto_extract) - def execute(self, *args, **kwargs): l = [kwargs[r] for r in self.requires] return list(compat_map(self._functor, l))