Rework pieces of the task callback capability

Unifies the bind, unbind, autobind parameters.

Also to make it easier to introspect what are a tasks associated
callbacks and events are provide a listeners_iter() method that
can be used to introspect the registered (event, callbacks) pairs that
are registered with a task.

Also adds more useful docstrings to the various callback associated
binding, unbinding functions to make it more understandable how they
are used and what they are provided.

Also makes the currently only default provided event 'update_progress'
a constant that can be referenced from the task module, which allows others
to easily find it and use it.

Change-Id: I14181a150b74fbd97f6ea976723f37c0ba4cec36
This commit is contained in:
Joshua Harlow
2014-10-19 23:43:01 -07:00
parent 5a0fbff76d
commit e168f44979
3 changed files with 126 additions and 84 deletions

View File

@@ -30,7 +30,7 @@ REVERTED = 'reverted'
def _execute_task(task, arguments, progress_callback):
with task.autobind('update_progress', progress_callback):
with task.autobind(_task.EVENT_UPDATE_PROGRESS, progress_callback):
try:
task.pre_execute()
result = task.execute(**arguments)
@@ -47,7 +47,7 @@ def _revert_task(task, arguments, result, failures, progress_callback):
kwargs = arguments.copy()
kwargs[_task.REVERT_RESULT] = result
kwargs[_task.REVERT_FLOW_FAILURES] = failures
with task.autobind('update_progress', progress_callback):
with task.autobind(_task.EVENT_UPDATE_PROGRESS, progress_callback):
try:
task.pre_revert()
result = task.revert(**kwargs)

View File

@@ -35,6 +35,9 @@ REVERT_RESULT = 'result'
# The cause of the flow failure/s
REVERT_FLOW_FAILURES = 'flow_failures'
# Common events
EVENT_UPDATE_PROGRESS = 'update_progress'
@six.add_metaclass(abc.ABCMeta)
class BaseTask(atom.Atom):
@@ -46,7 +49,11 @@ class BaseTask(atom.Atom):
same piece of work.
"""
TASK_EVENTS = ('update_progress', )
# Known events this task can have callbacks bound to (others that are not
# in this set/tuple will not be able to be bound); this should be updated
# and/or extended in subclasses as needed to enable or disable new or
# existing events...
TASK_EVENTS = (EVENT_UPDATE_PROGRESS,)
def __init__(self, name, provides=None, inject=None):
if name is None:
@@ -139,83 +146,128 @@ class BaseTask(atom.Atom):
if progress < 0.0:
LOG.warn("Progress must be >= 0.0, clamping to lower bound")
progress = 0.0
self._trigger('update_progress', progress, **kwargs)
self.trigger(EVENT_UPDATE_PROGRESS, progress, **kwargs)
def _trigger(self, event, *args, **kwargs):
"""Execute all handlers for the given event type."""
for (handler, event_data) in self._events_listeners.get(event, []):
def trigger(self, event_name, *args, **kwargs):
"""Execute all callbacks registered for the given event type.
NOTE(harlowja): if a bound callback raises an exception it will be
logged (at a ``WARNING`` level) and the exception
will be dropped.
:param event_name: event name to trigger
:param args: arbitrary positional arguments passed to the triggered
callbacks (if any are matched), these will be in addition
to any ``kwargs`` provided on binding (these are passed
as positional arguments to the callback).
:param kwargs: arbitrary keyword arguments passed to the triggered
callbacks (if any are matched), these will be in addition
to any ``kwargs`` provided on binding (these are passed
as keyword arguments to the callback).
"""
for (cb, event_data) in self._events_listeners.get(event_name, []):
try:
handler(self, event_data, *args, **kwargs)
cb(self, event_data, *args, **kwargs)
except Exception:
LOG.warn("Failed calling `%s` on event '%s'",
reflection.get_callable_name(handler), event,
LOG.warn("Failed calling callback `%s` on event '%s'",
reflection.get_callable_name(cb), event_name,
exc_info=True)
@contextlib.contextmanager
def autobind(self, event_name, handler_func, **kwargs):
"""Binds & unbinds a given event handler to the task.
def autobind(self, event_name, callback, **kwargs):
"""Binds & unbinds a given callback to the task.
This function binds and unbinds using the context manager protocol.
When events are triggered on the task of the given event name this
handler will automatically be called with the provided keyword
arguments.
callback will automatically be called with the provided
keyword arguments as the first argument (further arguments may be
provided by the entity triggering the event).
The arguments are interpreted as for :func:`bind() <bind>`.
"""
bound = False
if handler_func is not None:
if callback is not None:
try:
self.bind(event_name, handler_func, **kwargs)
self.bind(event_name, callback, **kwargs)
bound = True
except ValueError:
LOG.warn("Failed binding functor `%s` as a receiver of"
" event '%s' notifications emitted from task %s",
handler_func, event_name, self, exc_info=True)
LOG.warn("Failed binding callback `%s` as a receiver of"
" event '%s' notifications emitted from task '%s'",
reflection.get_callable_name(callback), event_name,
self, exc_info=True)
try:
yield self
finally:
if bound:
self.unbind(event_name, handler_func)
self.unbind(event_name, callback)
def bind(self, event, handler, **kwargs):
"""Attach a handler to an event for the task.
def bind(self, event_name, callback, **kwargs):
"""Attach a callback to be triggered on a task event.
:param event: event type
:param handler: callback to execute each time event is triggered
Callbacks should *not* be bound, modified, or removed after execution
has commenced (they may be adjusted after execution has finished). This
is primarily due to the need to preserve the callbacks that exist at
execution time for engines which run tasks remotely or out of
process (so that those engines can correctly proxy back transmitted
events).
Callbacks should also be *quick* to execute so that the engine calling
them can continue execution in a timely manner (if long running
callbacks need to exist, consider creating a separate pool + queue
for those that the attached callbacks put long running operations into
for execution by other entities).
:param event_name: event type name
:param callback: callable to execute each time event is triggered
:param kwargs: optional named parameters that will be passed to the
event handler
:raises ValueError: if invalid event type passed
callable object as a dictionary to the callbacks
*second* positional parameter.
:raises ValueError: if invalid event type, or callback is passed
"""
if event not in self.TASK_EVENTS:
if event_name not in self.TASK_EVENTS:
raise ValueError("Unknown task event '%s', can only bind"
" to events %s" % (event, self.TASK_EVENTS))
if not six.callable(handler):
raise ValueError("Event handler callback must be callable")
self._events_listeners[event].append((handler, kwargs))
" to events %s" % (event_name, self.TASK_EVENTS))
if callback is not None:
if not six.callable(callback):
raise ValueError("Event handler callback must be callable")
self._events_listeners[event_name].append((callback, kwargs))
def unbind(self, event, handler=None):
"""Remove a previously-attached event handler from the task.
def unbind(self, event_name, callback=None):
"""Remove a previously-attached event callback from the task.
If a handler function not passed, then this will unbind all event
handlers for the provided event. If multiple of the same handlers are
bound, then the first match is removed (and only the first match).
If a callback is not passed, then this will unbind *all* event
callbacks for the provided event. If multiple of the same callbacks
are bound, then the first match is removed (and only the first match).
:param event: event type
:param handler: handler previously bound
:param event_name: event type
:param callback: callback previously bound
:rtype: boolean
:return: whether anything was removed
"""
removed_any = False
if not handler:
removed_any = self._events_listeners.pop(event, removed_any)
if not callback:
removed_any = self._events_listeners.pop(event_name, removed_any)
else:
event_listeners = self._events_listeners.get(event, [])
for i, (handler2, _event_data) in enumerate(event_listeners):
if reflection.is_same_callback(handler, handler2):
event_listeners = self._events_listeners.get(event_name, [])
for i, (cb, _event_data) in enumerate(event_listeners):
if reflection.is_same_callback(cb, callback):
# NOTE(harlowja): its safe to do this as long as we stop
# iterating after we do the removal, otherwise its not
# safe (since this could have resized the list).
event_listeners.pop(i)
removed_any = True
break
return bool(removed_any)
def listeners_iter(self):
"""Return an iterator over the mapping of event => callbacks bound."""
for event_name in list(six.iterkeys(self._events_listeners)):
# Use get() just incase it was removed while iterating...
event_listeners = self._events_listeners.get(event_name, [])
if event_listeners:
yield (event_name, event_listeners[:])
class Task(BaseTask):
"""Base class for user-defined tasks (derive from it at will!).

View File

@@ -201,9 +201,9 @@ class TaskTest(test.TestCase):
def progress_callback(task, event_data, progress):
result.append(progress)
task = ProgressTask()
with task.autobind('update_progress', progress_callback):
task.execute(values)
a_task = ProgressTask()
with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback):
a_task.execute(values)
self.assertEqual(result, values)
@mock.patch.object(task.LOG, 'warn')
@@ -213,9 +213,9 @@ class TaskTest(test.TestCase):
def progress_callback(task, event_data, progress):
result.append(progress)
task = ProgressTask()
with task.autobind('update_progress', progress_callback):
task.execute([-1.0, -0.5, 0.0])
a_task = ProgressTask()
with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback):
a_task.execute([-1.0, -0.5, 0.0])
self.assertEqual(result, [0.0, 0.0, 0.0])
self.assertEqual(mocked_warn.call_count, 2)
@@ -226,9 +226,9 @@ class TaskTest(test.TestCase):
def progress_callback(task, event_data, progress):
result.append(progress)
task = ProgressTask()
with task.autobind('update_progress', progress_callback):
task.execute([1.0, 1.5, 2.0])
a_task = ProgressTask()
with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback):
a_task.execute([1.0, 1.5, 2.0])
self.assertEqual(result, [1.0, 1.0, 1.0])
self.assertEqual(mocked_warn.call_count, 2)
@@ -237,50 +237,40 @@ class TaskTest(test.TestCase):
def progress_callback(*args, **kwargs):
raise Exception('Woot!')
task = ProgressTask()
with task.autobind('update_progress', progress_callback):
task.execute([0.5])
a_task = ProgressTask()
with a_task.autobind(task.EVENT_UPDATE_PROGRESS, progress_callback):
a_task.execute([0.5])
mocked_warn.assert_called_once_with(
mock.ANY, reflection.get_callable_name(progress_callback),
'update_progress', exc_info=mock.ANY)
@mock.patch.object(task.LOG, 'warn')
def test_autobind_non_existent_event(self, mocked_warn):
event = 'test-event'
handler = lambda: None
task = MyTask()
with task.autobind(event, handler):
self.assertEqual(len(task._events_listeners), 0)
mocked_warn.assert_called_once_with(
mock.ANY, handler, event, task, exc_info=mock.ANY)
task.EVENT_UPDATE_PROGRESS, exc_info=mock.ANY)
def test_autobind_handler_is_none(self):
task = MyTask()
with task.autobind('update_progress', None):
self.assertEqual(len(task._events_listeners), 0)
a_task = MyTask()
with a_task.autobind(task.EVENT_UPDATE_PROGRESS, None):
self.assertEqual(len(list(a_task.listeners_iter())), 0)
def test_unbind_any_handler(self):
task = MyTask()
self.assertEqual(len(task._events_listeners), 0)
task.bind('update_progress', lambda: None)
self.assertEqual(len(task._events_listeners), 1)
self.assertTrue(task.unbind('update_progress'))
self.assertEqual(len(task._events_listeners), 0)
a_task = MyTask()
self.assertEqual(len(list(a_task.listeners_iter())), 0)
a_task.bind(task.EVENT_UPDATE_PROGRESS, lambda: None)
self.assertEqual(len(list(a_task.listeners_iter())), 1)
self.assertTrue(a_task.unbind(task.EVENT_UPDATE_PROGRESS))
self.assertEqual(len(list(a_task.listeners_iter())), 0)
def test_unbind_any_handler_empty_listeners(self):
task = MyTask()
self.assertEqual(len(task._events_listeners), 0)
self.assertFalse(task.unbind('update_progress'))
self.assertEqual(len(task._events_listeners), 0)
a_task = MyTask()
self.assertEqual(len(list(a_task.listeners_iter())), 0)
self.assertFalse(a_task.unbind(task.EVENT_UPDATE_PROGRESS))
self.assertEqual(len(list(a_task.listeners_iter())), 0)
def test_unbind_non_existent_listener(self):
handler1 = lambda: None
handler2 = lambda: None
task = MyTask()
task.bind('update_progress', handler1)
self.assertEqual(len(task._events_listeners), 1)
self.assertFalse(task.unbind('update_progress', handler2))
self.assertEqual(len(task._events_listeners), 1)
a_task = MyTask()
a_task.bind(task.EVENT_UPDATE_PROGRESS, handler1)
self.assertEqual(len(list(a_task.listeners_iter())), 1)
self.assertFalse(a_task.unbind(task.EVENT_UPDATE_PROGRESS, handler2))
self.assertEqual(len(list(a_task.listeners_iter())), 1)
def test_bind_not_callable(self):
task = MyTask()