From a84b3240fcabd030b2c970eed46aa9f20cf0afd2 Mon Sep 17 00:00:00 2001 From: Greg Hill Date: Wed, 6 Aug 2014 08:40:42 -0500 Subject: [PATCH] add pre/post execute/retry callbacks to tasks This enables us to execute code to set up or tear down global state in running tasks. Change-Id: Ib1e5d03ab46b3ce1d03fa83b91bf437fa950b758 Implements: blueprint task-callbacks --- taskflow/engines/action_engine/executor.py | 6 ++++ taskflow/task.py | 34 ++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 816060f5..e28e863b 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -31,11 +31,14 @@ REVERTED = 'reverted' def _execute_task(task, arguments, progress_callback): with task.autobind('update_progress', progress_callback): try: + task.pre_execute() result = task.execute(**arguments) except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. result = misc.Failure() + finally: + task.post_execute() return (task, EXECUTED, result) @@ -45,11 +48,14 @@ def _revert_task(task, arguments, result, failures, progress_callback): kwargs['flow_failures'] = failures with task.autobind('update_progress', progress_callback): try: + task.pre_revert() result = task.revert(**kwargs) except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. result = misc.Failure() + finally: + task.post_revert() return (task, REVERTED, result) diff --git a/taskflow/task.py b/taskflow/task.py index 067613a2..cd470e72 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -47,6 +47,15 @@ class BaseTask(atom.Atom): # Map of events => lists of callbacks to invoke on task events. self._events_listeners = collections.defaultdict(list) + def pre_execute(self): + """Code to be run prior to executing the task. + + A common pattern for initializing the state of the system prior to + running tasks is to define some code in a base class that all your + tasks inherit from. In that class, you can define a pre_execute + method and it will always be invoked just prior to your tasks running. + """ + @abc.abstractmethod def execute(self, *args, **kwargs): """Activate a given task which will perform some operation and return. @@ -65,6 +74,25 @@ class BaseTask(atom.Atom): or remote). """ + def post_execute(self): + """Code to be run after executing the task. + + A common pattern for cleaning up global state of the system after the + execution of tasks is to define some code in a base class that all your + tasks inherit from. In that class, you can define a post_execute + method and it will always be invoked just after your tasks execute, + regardless of whether they succeded or not. + + This pattern is useful if you have global shared database sessions + that need to be cleaned up, for example. + """ + + def pre_revert(self): + """Code to be run prior to reverting the task. + + This works the same as pre_execute, but for the revert phase. + """ + def revert(self, *args, **kwargs): """Revert this task. @@ -79,6 +107,12 @@ class BaseTask(atom.Atom): contain the failure information. """ + def post_revert(self): + """Code to be run after reverting the task. + + This works the same as post_execute, but for the revert phase. + """ + def update_progress(self, progress, **kwargs): """Update task progress and notify all registered listeners.