diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index 2bf3e908..8f92317d 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -17,7 +17,7 @@ transitions, which is useful for monitoring, logging, metrics, debugging and plenty of other tasks. To receive these notifications you should register a callback in -:py:class:`~taskflow.utils.misc.TransitionNotifier` provided by engine. +:py:class:`~taskflow.utils.misc.Notifier` provided by engine. Each engine provides two of them: one notifies about flow state changes, and another notifies about changes of tasks. @@ -30,15 +30,15 @@ Receiving Notifications with Callbacks -------------------------------------- To manage notifications instances of -:py:class:`~taskflow.utils.misc.TransitionNotifier` are used. +:py:class:`~taskflow.utils.misc.Notifier` are used. -.. autoclass:: taskflow.utils.misc.TransitionNotifier +.. autoclass:: taskflow.utils.misc.Notifier Flow Notifications ------------------ To receive notification on flow state changes use -:py:class:`~taskflow.utils.misc.TransitionNotifier` available as +:py:class:`~taskflow.utils.misc.Notifier` available as ``notifier`` property of the engine. A basic example is: .. doctest:: @@ -71,7 +71,7 @@ Task notifications ------------------ To receive notification on task state changes use -:py:class:`~taskflow.utils.misc.TransitionNotifier` available as +:py:class:`~taskflow.utils.misc.Notifier` available as ``task_notifier`` property of the engine. A basic example is: .. doctest:: diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index e015798a..8a6d42c9 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -35,8 +35,8 @@ class EngineBase(object): else: self._conf = dict(conf) self._storage = None - self.notifier = misc.TransitionNotifier() - self.task_notifier = misc.TransitionNotifier() + self.notifier = misc.Notifier() + self.task_notifier = misc.Notifier() @property def storage(self): diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 6d38f77e..c7be0624 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -18,6 +18,7 @@ import contextlib import functools import logging +from concurrent import futures from kazoo import exceptions as k_exceptions from kazoo.protocol import paths as k_paths from kazoo.recipe import watchers @@ -146,8 +147,9 @@ class ZookeeperJob(base_job.Job): return self._book -class ZookeeperJobBoard(jobboard.JobBoard): - def __init__(self, name, conf, client=None, persistence=None): +class ZookeeperJobBoard(jobboard.NotifyingJobBoard): + def __init__(self, name, conf, + client=None, persistence=None, emit_notifications=True): super(ZookeeperJobBoard, self).__init__(name, conf) if client is not None: self._client = client @@ -177,6 +179,13 @@ class ZookeeperJobBoard(jobboard.JobBoard): # Since we use sequenced ids this will be the path that the sequences # are prefixed with, for example, job0000000001, job0000000002, ... self._job_base = k_paths.join(path, "job") + self._worker = None + self._emit_notifications = bool(emit_notifications) + + def _emit(self, state, details): + # Submit the work to the executor to avoid blocking the kazoo queue. + if self._worker is not None: + self._worker.submit(self.notifier.notify, state, details) @property def path(self): @@ -224,7 +233,12 @@ class ZookeeperJobBoard(jobboard.JobBoard): def _remove_job(self, path): LOG.debug("Removing job that was at path: %s", path) - self._known_jobs.pop(path, None) + job = self._known_jobs.pop(path, None) + if job is not None: + self._emit(jobboard.REMOVAL, + details={ + 'job': job, + }) def _process_child(self, path, request): """Receives the result of a child data fetch request.""" @@ -239,6 +253,10 @@ class ZookeeperJobBoard(jobboard.JobBoard): book_data=job_data.get("book"), details=job_data.get("details", {})) self._known_jobs[path] = job + self._emit(jobboard.POSTED, + details={ + 'job': job, + }) except (ValueError, TypeError, KeyError): LOG.warn("Incorrectly formatted job data found at path: %s", path, exc_info=True) @@ -447,6 +465,10 @@ class ZookeeperJobBoard(jobboard.JobBoard): if self._owned: LOG.debug("Stopping client") kazoo_utils.finalize_client(self._client) + if self._worker is not None: + LOG.debug("Shutting down the notifier") + self._worker.shutdown() + self._worker = None self._clear() LOG.debug("Stopped & cleared local state") @@ -472,6 +494,8 @@ class ZookeeperJobBoard(jobboard.JobBoard): raise excp.JobFailure("Failed to connect to zookeeper", e) try: kazoo_utils.check_compatible(self._client, MIN_ZK_VERSION) + if self._worker is None and self._emit_notifications: + self._worker = futures.ThreadPoolExecutor(max_workers=1) self._client.ensure_path(self.path) self._job_watcher = watchers.ChildrenWatch( self._client, diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index a2a3d29b..737251bd 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -19,6 +19,8 @@ import abc import six +from taskflow.utils import misc + @six.add_metaclass(abc.ABCMeta) class JobBoard(object): @@ -114,3 +116,22 @@ class JobBoard(object): abandoning a unclaimed job (or a job they do not own) will cause an exception. """ + + +# Jobboard events +POSTED = 'POSTED' # new job is/has been posted +REMOVAL = 'REMOVAL' # existing job is/has been removed + + +class NotifyingJobBoard(JobBoard): + """A jobboard subclass that can notify about jobs being created + and removed, which can remove the repeated usage of iterjobs() to achieve + the same operation. + + NOTE(harlowja): notifications that are emitted *may* be emitted on a + separate dedicated thread when they occur, so ensure that all callbacks + registered are thread safe. + """ + def __init__(self, name, conf): + super(NotifyingJobBoard, self).__init__(name, conf) + self.notifier = misc.Notifier() diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index e8f1674c..352b652a 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -46,8 +46,8 @@ class ListenerBase(object): """ def __init__(self, engine, - task_listen_for=(misc.TransitionNotifier.ANY,), - flow_listen_for=(misc.TransitionNotifier.ANY,)): + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=(misc.Notifier.ANY,)): if not task_listen_for: task_listen_for = [] if not flow_listen_for: diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index bcf0cf3d..71bf83f5 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -33,8 +33,8 @@ class LoggingListener(base.LoggingBase): can also be configured, ``logging.DEBUG`` is used by default. """ def __init__(self, engine, - task_listen_for=(misc.TransitionNotifier.ANY,), - flow_listen_for=(misc.TransitionNotifier.ANY,), + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=(misc.Notifier.ANY,), log=None, level=logging.DEBUG): super(LoggingListener, self).__init__(engine, diff --git a/taskflow/listeners/printing.py b/taskflow/listeners/printing.py index b8b2cf5d..e9359bf5 100644 --- a/taskflow/listeners/printing.py +++ b/taskflow/listeners/printing.py @@ -26,8 +26,8 @@ from taskflow.utils import misc class PrintingListener(base.LoggingBase): """Writes the task and flow notifications messages to stdout or stderr.""" def __init__(self, engine, - task_listen_for=(misc.TransitionNotifier.ANY,), - flow_listen_for=(misc.TransitionNotifier.ANY,), + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=(misc.Notifier.ANY,), stderr=False): super(PrintingListener, self).__init__(engine, task_listen_for=task_listen_for, diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 1c5c197b..d0bb0695 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -149,8 +149,8 @@ class NotifierTest(test.TestCase): def call_me(state, details): call_collector.append((state, details)) - notifier = misc.TransitionNotifier() - notifier.register(misc.TransitionNotifier.ANY, call_me) + notifier = misc.Notifier() + notifier.register(misc.Notifier.ANY, call_me) notifier.notify(states.SUCCESS, {}) notifier.notify(states.SUCCESS, {}) @@ -166,14 +166,14 @@ class NotifierTest(test.TestCase): def call_me_too(self, state, details): pass - notifier = misc.TransitionNotifier() - notifier.register(misc.TransitionNotifier.ANY, call_me) + notifier = misc.Notifier() + notifier.register(misc.Notifier.ANY, call_me) a = A() - notifier.register(misc.TransitionNotifier.ANY, a.call_me_too) + notifier.register(misc.Notifier.ANY, a.call_me_too) self.assertEqual(2, len(notifier)) - notifier.deregister(misc.TransitionNotifier.ANY, call_me) - notifier.deregister(misc.TransitionNotifier.ANY, a.call_me_too) + notifier.deregister(misc.Notifier.ANY, call_me) + notifier.deregister(misc.Notifier.ANY, a.call_me_too) self.assertEqual(0, len(notifier)) def test_notify_reset(self): @@ -181,8 +181,8 @@ class NotifierTest(test.TestCase): def call_me(state, details): pass - notifier = misc.TransitionNotifier() - notifier.register(misc.TransitionNotifier.ANY, call_me) + notifier = misc.Notifier() + notifier.register(misc.Notifier.ANY, call_me) self.assertEqual(1, len(notifier)) notifier.reset() @@ -193,9 +193,9 @@ class NotifierTest(test.TestCase): def call_me(state, details): pass - notifier = misc.TransitionNotifier() + notifier = misc.Notifier() self.assertRaises(KeyError, notifier.register, - misc.TransitionNotifier.ANY, call_me, + misc.Notifier.ANY, call_me, kwargs={'details': 5}) def test_selective_notify(self): @@ -204,21 +204,21 @@ class NotifierTest(test.TestCase): def call_me_on(registered_state, state, details): call_counts[registered_state].append((state, details)) - notifier = misc.TransitionNotifier() + notifier = misc.Notifier() notifier.register(states.SUCCESS, functools.partial(call_me_on, states.SUCCESS)) - notifier.register(misc.TransitionNotifier.ANY, + notifier.register(misc.Notifier.ANY, functools.partial(call_me_on, - misc.TransitionNotifier.ANY)) + misc.Notifier.ANY)) self.assertEqual(2, len(notifier)) notifier.notify(states.SUCCESS, {}) - self.assertEqual(1, len(call_counts[misc.TransitionNotifier.ANY])) + self.assertEqual(1, len(call_counts[misc.Notifier.ANY])) self.assertEqual(1, len(call_counts[states.SUCCESS])) notifier.notify(states.FAILURE, {}) - self.assertEqual(2, len(call_counts[misc.TransitionNotifier.ANY])) + self.assertEqual(2, len(call_counts[misc.Notifier.ANY])) self.assertEqual(1, len(call_counts[states.SUCCESS])) self.assertEqual(2, len(call_counts)) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 1da28a4c..a2d4a48a 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -391,7 +391,7 @@ class StopWatch(object): return self -class TransitionNotifier(object): +class Notifier(object): """A utility helper class that can be used to subscribe to notifications of events occurring as well as allow a entity to post said notifications to subscribers. @@ -405,15 +405,14 @@ class TransitionNotifier(object): def __len__(self): """Returns how many callbacks are registered.""" - count = 0 - for (_s, callbacks) in six.iteritems(self._listeners): + for (_event_type, callbacks) in six.iteritems(self._listeners): count += len(callbacks) return count - def is_registered(self, state, callback): + def is_registered(self, event_type, callback): """Check if a callback is registered.""" - listeners = list(self._listeners.get(state, [])) + listeners = list(self._listeners.get(event_type, [])) for (cb, _args, _kwargs) in listeners: if reflection.is_same_callback(cb, callback): return True @@ -423,17 +422,17 @@ class TransitionNotifier(object): """Forget all previously registered callbacks.""" self._listeners.clear() - def notify(self, state, details): - """Notify about state change. + def notify(self, event_type, details): + """Notify about event occurrence. All callbacks registered to receive notifications about given - state will be called. + event type will be called. - :param state: state we moved to - :param details: addition transition details + :param event_type: event type that occured + :param details: addition event details """ listeners = list(self._listeners.get(self.ANY, [])) - for i in self._listeners[state]: + for i in self._listeners[event_type]: if i not in listeners: listeners.append(i) if not listeners: @@ -445,23 +444,23 @@ class TransitionNotifier(object): kwargs = {} kwargs['details'] = details try: - callback(state, *args, **kwargs) + callback(event_type, *args, **kwargs) except Exception: - LOG.warn("Failure calling callback %s to notify about state" - " transition %s, details: %s", - callback, state, details, exc_info=True) + LOG.warn("Failure calling callback %s to notify about event" + " %s, details: %s", callback, event_type, + details, exc_info=True) - def register(self, state, callback, args=None, kwargs=None): - """Register a callback to be called when state is changed. + def register(self, event_type, callback, args=None, kwargs=None): + """Register a callback to be called when event of a given type occurs. Callback will be called with provided ``args`` and ``kwargs`` and - when state is changed to ``state`` (or on any state change if - ``state`` equals to ``TransitionNotifier.ANY``). It will also - get additional keyword argument, ``details``, that will hold - transition details provided to :py:meth:`notify` method. + when event type occurs (or on any event if ``event_type`` equals to + ``Notifier.ANY``). It will also get additional keyword argument, + ``details``, that will hold event details provided to + :py:meth:`notify` method. """ assert six.callable(callback), "Callback must be callable" - if self.is_registered(state, callback): + if self.is_registered(event_type, callback): raise ValueError("Callback %s already registered" % (callback)) if kwargs: for k in self.RESERVED_KEYS: @@ -471,15 +470,15 @@ class TransitionNotifier(object): kwargs = copy.copy(kwargs) if args: args = copy.copy(args) - self._listeners[state].append((callback, args, kwargs)) + self._listeners[event_type].append((callback, args, kwargs)) - def deregister(self, state, callback): - """Remove callback from listening to state ``state``.""" - if state not in self._listeners: + def deregister(self, event_type, callback): + """Remove a single callback from listening to event ``event_type``.""" + if event_type not in self._listeners: return - for i, (cb, args, kwargs) in enumerate(self._listeners[state]): + for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]): if reflection.is_same_callback(cb, callback): - self._listeners[state].pop(i) + self._listeners[event_type].pop(i) break