diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 78d16ff9..4c5c091a 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -30,7 +30,7 @@ REVERTED = 'reverted' def _execute_task(task, arguments, progress_callback): - with task.autobind('update_progress', progress_callback): + with task.autobind(_task.EVENT_UPDATE_PROGRESS, progress_callback): try: task.pre_execute() result = task.execute(**arguments) @@ -47,7 +47,7 @@ def _revert_task(task, arguments, result, failures, progress_callback): kwargs = arguments.copy() kwargs[_task.REVERT_RESULT] = result kwargs[_task.REVERT_FLOW_FAILURES] = failures - with task.autobind('update_progress', progress_callback): + with task.autobind(_task.EVENT_UPDATE_PROGRESS, progress_callback): try: task.pre_revert() result = task.revert(**kwargs) diff --git a/taskflow/task.py b/taskflow/task.py index 7c8df6cd..3a0395f8 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -35,6 +35,9 @@ REVERT_RESULT = 'result' # The cause of the flow failure/s REVERT_FLOW_FAILURES = 'flow_failures' +# Common events +EVENT_UPDATE_PROGRESS = 'update_progress' + @six.add_metaclass(abc.ABCMeta) class BaseTask(atom.Atom): @@ -46,7 +49,11 @@ class BaseTask(atom.Atom): same piece of work. """ - TASK_EVENTS = ('update_progress', ) + # Known events this task can have callbacks bound to (others that are not + # in this set/tuple will not be able to be bound); this should be updated + # and/or extended in subclasses as needed to enable or disable new or + # existing events... + TASK_EVENTS = (EVENT_UPDATE_PROGRESS,) def __init__(self, name, provides=None, inject=None): if name is None: @@ -139,83 +146,128 @@ class BaseTask(atom.Atom): if progress < 0.0: LOG.warn("Progress must be >= 0.0, clamping to lower bound") progress = 0.0 - self._trigger('update_progress', progress, **kwargs) + self.trigger(EVENT_UPDATE_PROGRESS, progress, **kwargs) - def _trigger(self, event, *args, **kwargs): - """Execute all handlers for the given event type.""" - for (handler, event_data) in self._events_listeners.get(event, []): + def trigger(self, event_name, *args, **kwargs): + """Execute all callbacks registered for the given event type. + + NOTE(harlowja): if a bound callback raises an exception it will be + logged (at a ``WARNING`` level) and the exception + will be dropped. + + :param event_name: event name to trigger + :param args: arbitrary positional arguments passed to the triggered + callbacks (if any are matched), these will be in addition + to any ``kwargs`` provided on binding (these are passed + as positional arguments to the callback). + :param kwargs: arbitrary keyword arguments passed to the triggered + callbacks (if any are matched), these will be in addition + to any ``kwargs`` provided on binding (these are passed + as keyword arguments to the callback). + """ + for (cb, event_data) in self._events_listeners.get(event_name, []): try: - handler(self, event_data, *args, **kwargs) + cb(self, event_data, *args, **kwargs) except Exception: - LOG.warn("Failed calling `%s` on event '%s'", - reflection.get_callable_name(handler), event, + LOG.warn("Failed calling callback `%s` on event '%s'", + reflection.get_callable_name(cb), event_name, exc_info=True) @contextlib.contextmanager - def autobind(self, event_name, handler_func, **kwargs): - """Binds & unbinds a given event handler to the task. + def autobind(self, event_name, callback, **kwargs): + """Binds & unbinds a given callback to the task. This function binds and unbinds using the context manager protocol. When events are triggered on the task of the given event name this - handler will automatically be called with the provided keyword - arguments. + callback will automatically be called with the provided + keyword arguments as the first argument (further arguments may be + provided by the entity triggering the event). + + The arguments are interpreted as for :func:`bind() `. """ bound = False - if handler_func is not None: + if callback is not None: try: - self.bind(event_name, handler_func, **kwargs) + self.bind(event_name, callback, **kwargs) bound = True except ValueError: - LOG.warn("Failed binding functor `%s` as a receiver of" - " event '%s' notifications emitted from task %s", - handler_func, event_name, self, exc_info=True) + LOG.warn("Failed binding callback `%s` as a receiver of" + " event '%s' notifications emitted from task '%s'", + reflection.get_callable_name(callback), event_name, + self, exc_info=True) try: yield self finally: if bound: - self.unbind(event_name, handler_func) + self.unbind(event_name, callback) - def bind(self, event, handler, **kwargs): - """Attach a handler to an event for the task. + def bind(self, event_name, callback, **kwargs): + """Attach a callback to be triggered on a task event. - :param event: event type - :param handler: callback to execute each time event is triggered + Callbacks should *not* be bound, modified, or removed after execution + has commenced (they may be adjusted after execution has finished). This + is primarily due to the need to preserve the callbacks that exist at + execution time for engines which run tasks remotely or out of + process (so that those engines can correctly proxy back transmitted + events). + + Callbacks should also be *quick* to execute so that the engine calling + them can continue execution in a timely manner (if long running + callbacks need to exist, consider creating a separate pool + queue + for those that the attached callbacks put long running operations into + for execution by other entities). + + :param event_name: event type name + :param callback: callable to execute each time event is triggered :param kwargs: optional named parameters that will be passed to the - event handler - :raises ValueError: if invalid event type passed + callable object as a dictionary to the callbacks + *second* positional parameter. + :raises ValueError: if invalid event type, or callback is passed """ - if event not in self.TASK_EVENTS: + if event_name not in self.TASK_EVENTS: raise ValueError("Unknown task event '%s', can only bind" - " to events %s" % (event, self.TASK_EVENTS)) - if not six.callable(handler): - raise ValueError("Event handler callback must be callable") - self._events_listeners[event].append((handler, kwargs)) + " to events %s" % (event_name, self.TASK_EVENTS)) + if callback is not None: + if not six.callable(callback): + raise ValueError("Event handler callback must be callable") + self._events_listeners[event_name].append((callback, kwargs)) - def unbind(self, event, handler=None): - """Remove a previously-attached event handler from the task. + def unbind(self, event_name, callback=None): + """Remove a previously-attached event callback from the task. - If a handler function not passed, then this will unbind all event - handlers for the provided event. If multiple of the same handlers are - bound, then the first match is removed (and only the first match). + If a callback is not passed, then this will unbind *all* event + callbacks for the provided event. If multiple of the same callbacks + are bound, then the first match is removed (and only the first match). - :param event: event type - :param handler: handler previously bound + :param event_name: event type + :param callback: callback previously bound :rtype: boolean :return: whether anything was removed """ removed_any = False - if not handler: - removed_any = self._events_listeners.pop(event, removed_any) + if not callback: + removed_any = self._events_listeners.pop(event_name, removed_any) else: - event_listeners = self._events_listeners.get(event, []) - for i, (handler2, _event_data) in enumerate(event_listeners): - if reflection.is_same_callback(handler, handler2): + event_listeners = self._events_listeners.get(event_name, []) + for i, (cb, _event_data) in enumerate(event_listeners): + if reflection.is_same_callback(cb, callback): + # NOTE(harlowja): its safe to do this as long as we stop + # iterating after we do the removal, otherwise its not + # safe (since this could have resized the list). event_listeners.pop(i) removed_any = True break return bool(removed_any) + def listeners_iter(self): + """Return an iterator over the mapping of event => callbacks bound.""" + for event_name in list(six.iterkeys(self._events_listeners)): + # Use get() just incase it was removed while iterating... + event_listeners = self._events_listeners.get(event_name, []) + if event_listeners: + yield (event_name, event_listeners[:]) + class Task(BaseTask): """Base class for user-defined tasks (derive from it at will!). diff --git a/taskflow/tests/unit/test_task.py b/taskflow/tests/unit/test_task.py index 8c9c7eff..3a963963 100644 --- a/taskflow/tests/unit/test_task.py +++ b/taskflow/tests/unit/test_task.py @@ -201,9 +201,9 @@ class TaskTest(test.TestCase): def progress_callback(task, event_data, progress): result.append(progress) - task = ProgressTask() - with task.autobind('update_progress', progress_callback): - task.execute(values) + a_task = ProgressTask() + with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback): + a_task.execute(values) self.assertEqual(result, values) @mock.patch.object(task.LOG, 'warn') @@ -213,9 +213,9 @@ class TaskTest(test.TestCase): def progress_callback(task, event_data, progress): result.append(progress) - task = ProgressTask() - with task.autobind('update_progress', progress_callback): - task.execute([-1.0, -0.5, 0.0]) + a_task = ProgressTask() + with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback): + a_task.execute([-1.0, -0.5, 0.0]) self.assertEqual(result, [0.0, 0.0, 0.0]) self.assertEqual(mocked_warn.call_count, 2) @@ -226,9 +226,9 @@ class TaskTest(test.TestCase): def progress_callback(task, event_data, progress): result.append(progress) - task = ProgressTask() - with task.autobind('update_progress', progress_callback): - task.execute([1.0, 1.5, 2.0]) + a_task = ProgressTask() + with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback): + a_task.execute([1.0, 1.5, 2.0]) self.assertEqual(result, [1.0, 1.0, 1.0]) self.assertEqual(mocked_warn.call_count, 2) @@ -237,50 +237,40 @@ class TaskTest(test.TestCase): def progress_callback(*args, **kwargs): raise Exception('Woot!') - task = ProgressTask() - with task.autobind('update_progress', progress_callback): - task.execute([0.5]) + a_task = ProgressTask() + with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback): + a_task.execute([0.5]) mocked_warn.assert_called_once_with( mock.ANY, reflection.get_callable_name(progress_callback), - 'update_progress', exc_info=mock.ANY) - - @mock.patch.object(task.LOG, 'warn') - def test_autobind_non_existent_event(self, mocked_warn): - event = 'test-event' - handler = lambda: None - task = MyTask() - with task.autobind(event, handler): - self.assertEqual(len(task._events_listeners), 0) - mocked_warn.assert_called_once_with( - mock.ANY, handler, event, task, exc_info=mock.ANY) + task.EVENT_UPDATE_PROGRESS, exc_info=mock.ANY) def test_autobind_handler_is_none(self): - task = MyTask() - with task.autobind('update_progress', None): - self.assertEqual(len(task._events_listeners), 0) + a_task = MyTask() + with a_task.autobind(task.EVENT_UPDATE_PROGRESS, None): + self.assertEqual(len(list(a_task.listeners_iter())), 0) def test_unbind_any_handler(self): - task = MyTask() - self.assertEqual(len(task._events_listeners), 0) - task.bind('update_progress', lambda: None) - self.assertEqual(len(task._events_listeners), 1) - self.assertTrue(task.unbind('update_progress')) - self.assertEqual(len(task._events_listeners), 0) + a_task = MyTask() + self.assertEqual(len(list(a_task.listeners_iter())), 0) + a_task.bind(task.EVENT_UPDATE_PROGRESS, lambda: None) + self.assertEqual(len(list(a_task.listeners_iter())), 1) + self.assertTrue(a_task.unbind(task.EVENT_UPDATE_PROGRESS)) + self.assertEqual(len(list(a_task.listeners_iter())), 0) def test_unbind_any_handler_empty_listeners(self): - task = MyTask() - self.assertEqual(len(task._events_listeners), 0) - self.assertFalse(task.unbind('update_progress')) - self.assertEqual(len(task._events_listeners), 0) + a_task = MyTask() + self.assertEqual(len(list(a_task.listeners_iter())), 0) + self.assertFalse(a_task.unbind(task.EVENT_UPDATE_PROGRESS)) + self.assertEqual(len(list(a_task.listeners_iter())), 0) def test_unbind_non_existent_listener(self): handler1 = lambda: None handler2 = lambda: None - task = MyTask() - task.bind('update_progress', handler1) - self.assertEqual(len(task._events_listeners), 1) - self.assertFalse(task.unbind('update_progress', handler2)) - self.assertEqual(len(task._events_listeners), 1) + a_task = MyTask() + a_task.bind(task.EVENT_UPDATE_PROGRESS, handler1) + self.assertEqual(len(list(a_task.listeners_iter())), 1) + self.assertFalse(a_task.unbind(task.EVENT_UPDATE_PROGRESS, handler2)) + self.assertEqual(len(list(a_task.listeners_iter())), 1) def test_bind_not_callable(self): task = MyTask()