From 135f562408e8baa81016da9ac55ea40eda174757 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 30 Dec 2013 18:04:08 -0800 Subject: [PATCH] Move autobinding to task base class The autobinding functionality is more of a general task function so it is nice to have it available for use in the root base class instead of being an auxilary method. Change-Id: I75abed25cb0bf165f61f8317e3cb1a62a3f9bf4a --- taskflow/engines/action_engine/executor.py | 30 ++++++---------------- taskflow/task.py | 21 +++++++++++++++ 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 47a195d1c..66897a664 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -17,7 +17,6 @@ # under the License. import abc -import contextlib from concurrent import futures import six @@ -31,21 +30,8 @@ EXECUTED = 'executed' REVERTED = 'reverted' -@contextlib.contextmanager -def _autobind(task, bind_name, bind_func, **kwargs): - task.bind(bind_name, bind_func, **kwargs) - try: - yield task - finally: - task.unbind(bind_name, bind_func) - - -def _noop(*args, **kwargs): - pass - - def _execute_task(task, arguments, progress_callback): - with _autobind(task, 'update_progress', progress_callback): + with task.autobind('update_progress', progress_callback): try: result = task.execute(**arguments) except Exception: @@ -59,7 +45,7 @@ def _revert_task(task, arguments, result, failures, progress_callback): kwargs = arguments.copy() kwargs['result'] = result kwargs['flow_failures'] = failures - with _autobind(task, 'update_progress', progress_callback): + with task.autobind('update_progress', progress_callback): try: result = task.revert(**kwargs) except Exception: @@ -79,12 +65,12 @@ class TaskExecutorBase(object): """ @abc.abstractmethod - def execute_task(self, task, arguments, progress_callback=_noop): + def execute_task(self, task, arguments, progress_callback=None): """Schedules task execution.""" @abc.abstractmethod def revert_task(self, task, arguments, result, failures, - progress_callback=_noop): + progress_callback=None): """Schedules task reversion""" @abc.abstractmethod @@ -103,12 +89,12 @@ class TaskExecutorBase(object): class SerialTaskExecutor(TaskExecutorBase): """Execute task one after another.""" - def execute_task(self, task, arguments, progress_callback=_noop): + def execute_task(self, task, arguments, progress_callback=None): return async_utils.make_completed_future( _execute_task(task, arguments, progress_callback)) def revert_task(self, task, arguments, result, failures, - progress_callback=_noop): + progress_callback=None): return async_utils.make_completed_future( _revert_task(task, arguments, result, failures, progress_callback)) @@ -129,12 +115,12 @@ class ParallelTaskExecutor(TaskExecutorBase): self._executor = executor self._own_executor = executor is None - def execute_task(self, task, arguments, progress_callback=_noop): + def execute_task(self, task, arguments, progress_callback=None): return self._executor.submit( _execute_task, task, arguments, progress_callback) def revert_task(self, task, arguments, result, failures, - progress_callback=_noop): + progress_callback=None): return self._executor.submit( _revert_task, task, arguments, result, failures, progress_callback) diff --git a/taskflow/task.py b/taskflow/task.py index ccc8d95c2..88bcec04e 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -19,6 +19,7 @@ import abc import collections +import contextlib import logging import six @@ -189,6 +190,26 @@ class BaseTask(object): LOG.exception("Failed calling `%s` on event '%s'", reflection.get_callable_name(handler), event) + @contextlib.contextmanager + def autobind(self, event_name, handler_func, **kwargs): + """Binds a given function to the task for a given event name and then + unbinds that event name and associated function automatically on exit. + """ + bound = False + if handler_func is not None: + try: + self.bind(event_name, handler_func, **kwargs) + bound = True + except ValueError: + LOG.exception("Failed binding functor `%s` as a reciever of" + " event '%s' notifications emitted from task %s", + handler_func, event_name, self) + try: + yield self + finally: + if bound: + self.unbind(event_name, handler_func) + def bind(self, event, handler, **kwargs): """Attach a handler to an event for the task.