From fa46708c0ec41db1192b2013833bc10035e7bd95 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 3 Apr 2014 17:49:56 -0700 Subject: [PATCH] Allow jobboard event notification Instead of requiring iteration to be able to list new jobs and existing jobs so that they can be claimed, allow for attaching callbacks to the jobboard where instead of requiring iteration those callbacks will be automatically called by the jobboard internally when events are recieved. Breaking change: renames transition notifier to notifier since it is really not just a notifier for transitions but is a generic notifier of events occuring (and details about those events). This is an internal api so its not expected that this will cause any issues (it's not expected for external users to be creating instances of this class). Implements bp board-notification Change-Id: I2384d5e335ed9d17e29fec9a78699e3156fa225c --- doc/source/notifications.rst | 10 ++--- taskflow/engines/base.py | 4 +- taskflow/jobs/backends/impl_zookeeper.py | 30 +++++++++++-- taskflow/jobs/jobboard.py | 21 +++++++++ taskflow/listeners/base.py | 4 +- taskflow/listeners/logging.py | 4 +- taskflow/listeners/printing.py | 4 +- taskflow/tests/unit/test_utils.py | 32 +++++++------- taskflow/utils/misc.py | 55 ++++++++++++------------ 9 files changed, 104 insertions(+), 60 deletions(-) diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index 2bf3e908..8f92317d 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -17,7 +17,7 @@ transitions, which is useful for monitoring, logging, metrics, debugging and plenty of other tasks. To receive these notifications you should register a callback in -:py:class:`~taskflow.utils.misc.TransitionNotifier` provided by engine. +:py:class:`~taskflow.utils.misc.Notifier` provided by engine. Each engine provides two of them: one notifies about flow state changes, and another notifies about changes of tasks. @@ -30,15 +30,15 @@ Receiving Notifications with Callbacks -------------------------------------- To manage notifications instances of -:py:class:`~taskflow.utils.misc.TransitionNotifier` are used. +:py:class:`~taskflow.utils.misc.Notifier` are used. -.. autoclass:: taskflow.utils.misc.TransitionNotifier +.. autoclass:: taskflow.utils.misc.Notifier Flow Notifications ------------------ To receive notification on flow state changes use -:py:class:`~taskflow.utils.misc.TransitionNotifier` available as +:py:class:`~taskflow.utils.misc.Notifier` available as ``notifier`` property of the engine. A basic example is: .. doctest:: @@ -71,7 +71,7 @@ Task notifications ------------------ To receive notification on task state changes use -:py:class:`~taskflow.utils.misc.TransitionNotifier` available as +:py:class:`~taskflow.utils.misc.Notifier` available as ``task_notifier`` property of the engine. A basic example is: .. doctest:: diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index e015798a..8a6d42c9 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -35,8 +35,8 @@ class EngineBase(object): else: self._conf = dict(conf) self._storage = None - self.notifier = misc.TransitionNotifier() - self.task_notifier = misc.TransitionNotifier() + self.notifier = misc.Notifier() + self.task_notifier = misc.Notifier() @property def storage(self): diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 6d38f77e..c7be0624 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -18,6 +18,7 @@ import contextlib import functools import logging +from concurrent import futures from kazoo import exceptions as k_exceptions from kazoo.protocol import paths as k_paths from kazoo.recipe import watchers @@ -146,8 +147,9 @@ class ZookeeperJob(base_job.Job): return self._book -class ZookeeperJobBoard(jobboard.JobBoard): - def __init__(self, name, conf, client=None, persistence=None): +class ZookeeperJobBoard(jobboard.NotifyingJobBoard): + def __init__(self, name, conf, + client=None, persistence=None, emit_notifications=True): super(ZookeeperJobBoard, self).__init__(name, conf) if client is not None: self._client = client @@ -177,6 +179,13 @@ class ZookeeperJobBoard(jobboard.JobBoard): # Since we use sequenced ids this will be the path that the sequences # are prefixed with, for example, job0000000001, job0000000002, ... self._job_base = k_paths.join(path, "job") + self._worker = None + self._emit_notifications = bool(emit_notifications) + + def _emit(self, state, details): + # Submit the work to the executor to avoid blocking the kazoo queue. + if self._worker is not None: + self._worker.submit(self.notifier.notify, state, details) @property def path(self): @@ -224,7 +233,12 @@ class ZookeeperJobBoard(jobboard.JobBoard): def _remove_job(self, path): LOG.debug("Removing job that was at path: %s", path) - self._known_jobs.pop(path, None) + job = self._known_jobs.pop(path, None) + if job is not None: + self._emit(jobboard.REMOVAL, + details={ + 'job': job, + }) def _process_child(self, path, request): """Receives the result of a child data fetch request.""" @@ -239,6 +253,10 @@ class ZookeeperJobBoard(jobboard.JobBoard): book_data=job_data.get("book"), details=job_data.get("details", {})) self._known_jobs[path] = job + self._emit(jobboard.POSTED, + details={ + 'job': job, + }) except (ValueError, TypeError, KeyError): LOG.warn("Incorrectly formatted job data found at path: %s", path, exc_info=True) @@ -447,6 +465,10 @@ class ZookeeperJobBoard(jobboard.JobBoard): if self._owned: LOG.debug("Stopping client") kazoo_utils.finalize_client(self._client) + if self._worker is not None: + LOG.debug("Shutting down the notifier") + self._worker.shutdown() + self._worker = None self._clear() LOG.debug("Stopped & cleared local state") @@ -472,6 +494,8 @@ class ZookeeperJobBoard(jobboard.JobBoard): raise excp.JobFailure("Failed to connect to zookeeper", e) try: kazoo_utils.check_compatible(self._client, MIN_ZK_VERSION) + if self._worker is None and self._emit_notifications: + self._worker = futures.ThreadPoolExecutor(max_workers=1) self._client.ensure_path(self.path) self._job_watcher = watchers.ChildrenWatch( self._client, diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index a2a3d29b..737251bd 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -19,6 +19,8 @@ import abc import six +from taskflow.utils import misc + @six.add_metaclass(abc.ABCMeta) class JobBoard(object): @@ -114,3 +116,22 @@ class JobBoard(object): abandoning a unclaimed job (or a job they do not own) will cause an exception. """ + + +# Jobboard events +POSTED = 'POSTED' # new job is/has been posted +REMOVAL = 'REMOVAL' # existing job is/has been removed + + +class NotifyingJobBoard(JobBoard): + """A jobboard subclass that can notify about jobs being created + and removed, which can remove the repeated usage of iterjobs() to achieve + the same operation. + + NOTE(harlowja): notifications that are emitted *may* be emitted on a + separate dedicated thread when they occur, so ensure that all callbacks + registered are thread safe. + """ + def __init__(self, name, conf): + super(NotifyingJobBoard, self).__init__(name, conf) + self.notifier = misc.Notifier() diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index e8f1674c..352b652a 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -46,8 +46,8 @@ class ListenerBase(object): """ def __init__(self, engine, - task_listen_for=(misc.TransitionNotifier.ANY,), - flow_listen_for=(misc.TransitionNotifier.ANY,)): + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=(misc.Notifier.ANY,)): if not task_listen_for: task_listen_for = [] if not flow_listen_for: diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index bcf0cf3d..71bf83f5 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -33,8 +33,8 @@ class LoggingListener(base.LoggingBase): can also be configured, ``logging.DEBUG`` is used by default. """ def __init__(self, engine, - task_listen_for=(misc.TransitionNotifier.ANY,), - flow_listen_for=(misc.TransitionNotifier.ANY,), + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=(misc.Notifier.ANY,), log=None, level=logging.DEBUG): super(LoggingListener, self).__init__(engine, diff --git a/taskflow/listeners/printing.py b/taskflow/listeners/printing.py index b8b2cf5d..e9359bf5 100644 --- a/taskflow/listeners/printing.py +++ b/taskflow/listeners/printing.py @@ -26,8 +26,8 @@ from taskflow.utils import misc class PrintingListener(base.LoggingBase): """Writes the task and flow notifications messages to stdout or stderr.""" def __init__(self, engine, - task_listen_for=(misc.TransitionNotifier.ANY,), - flow_listen_for=(misc.TransitionNotifier.ANY,), + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=(misc.Notifier.ANY,), stderr=False): super(PrintingListener, self).__init__(engine, task_listen_for=task_listen_for, diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 1c5c197b..d0bb0695 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -149,8 +149,8 @@ class NotifierTest(test.TestCase): def call_me(state, details): call_collector.append((state, details)) - notifier = misc.TransitionNotifier() - notifier.register(misc.TransitionNotifier.ANY, call_me) + notifier = misc.Notifier() + notifier.register(misc.Notifier.ANY, call_me) notifier.notify(states.SUCCESS, {}) notifier.notify(states.SUCCESS, {}) @@ -166,14 +166,14 @@ class NotifierTest(test.TestCase): def call_me_too(self, state, details): pass - notifier = misc.TransitionNotifier() - notifier.register(misc.TransitionNotifier.ANY, call_me) + notifier = misc.Notifier() + notifier.register(misc.Notifier.ANY, call_me) a = A() - notifier.register(misc.TransitionNotifier.ANY, a.call_me_too) + notifier.register(misc.Notifier.ANY, a.call_me_too) self.assertEqual(2, len(notifier)) - notifier.deregister(misc.TransitionNotifier.ANY, call_me) - notifier.deregister(misc.TransitionNotifier.ANY, a.call_me_too) + notifier.deregister(misc.Notifier.ANY, call_me) + notifier.deregister(misc.Notifier.ANY, a.call_me_too) self.assertEqual(0, len(notifier)) def test_notify_reset(self): @@ -181,8 +181,8 @@ class NotifierTest(test.TestCase): def call_me(state, details): pass - notifier = misc.TransitionNotifier() - notifier.register(misc.TransitionNotifier.ANY, call_me) + notifier = misc.Notifier() + notifier.register(misc.Notifier.ANY, call_me) self.assertEqual(1, len(notifier)) notifier.reset() @@ -193,9 +193,9 @@ class NotifierTest(test.TestCase): def call_me(state, details): pass - notifier = misc.TransitionNotifier() + notifier = misc.Notifier() self.assertRaises(KeyError, notifier.register, - misc.TransitionNotifier.ANY, call_me, + misc.Notifier.ANY, call_me, kwargs={'details': 5}) def test_selective_notify(self): @@ -204,21 +204,21 @@ class NotifierTest(test.TestCase): def call_me_on(registered_state, state, details): call_counts[registered_state].append((state, details)) - notifier = misc.TransitionNotifier() + notifier = misc.Notifier() notifier.register(states.SUCCESS, functools.partial(call_me_on, states.SUCCESS)) - notifier.register(misc.TransitionNotifier.ANY, + notifier.register(misc.Notifier.ANY, functools.partial(call_me_on, - misc.TransitionNotifier.ANY)) + misc.Notifier.ANY)) self.assertEqual(2, len(notifier)) notifier.notify(states.SUCCESS, {}) - self.assertEqual(1, len(call_counts[misc.TransitionNotifier.ANY])) + self.assertEqual(1, len(call_counts[misc.Notifier.ANY])) self.assertEqual(1, len(call_counts[states.SUCCESS])) notifier.notify(states.FAILURE, {}) - self.assertEqual(2, len(call_counts[misc.TransitionNotifier.ANY])) + self.assertEqual(2, len(call_counts[misc.Notifier.ANY])) self.assertEqual(1, len(call_counts[states.SUCCESS])) self.assertEqual(2, len(call_counts)) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 1da28a4c..a2d4a48a 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -391,7 +391,7 @@ class StopWatch(object): return self -class TransitionNotifier(object): +class Notifier(object): """A utility helper class that can be used to subscribe to notifications of events occurring as well as allow a entity to post said notifications to subscribers. @@ -405,15 +405,14 @@ class TransitionNotifier(object): def __len__(self): """Returns how many callbacks are registered.""" - count = 0 - for (_s, callbacks) in six.iteritems(self._listeners): + for (_event_type, callbacks) in six.iteritems(self._listeners): count += len(callbacks) return count - def is_registered(self, state, callback): + def is_registered(self, event_type, callback): """Check if a callback is registered.""" - listeners = list(self._listeners.get(state, [])) + listeners = list(self._listeners.get(event_type, [])) for (cb, _args, _kwargs) in listeners: if reflection.is_same_callback(cb, callback): return True @@ -423,17 +422,17 @@ class TransitionNotifier(object): """Forget all previously registered callbacks.""" self._listeners.clear() - def notify(self, state, details): - """Notify about state change. + def notify(self, event_type, details): + """Notify about event occurrence. All callbacks registered to receive notifications about given - state will be called. + event type will be called. - :param state: state we moved to - :param details: addition transition details + :param event_type: event type that occured + :param details: addition event details """ listeners = list(self._listeners.get(self.ANY, [])) - for i in self._listeners[state]: + for i in self._listeners[event_type]: if i not in listeners: listeners.append(i) if not listeners: @@ -445,23 +444,23 @@ class TransitionNotifier(object): kwargs = {} kwargs['details'] = details try: - callback(state, *args, **kwargs) + callback(event_type, *args, **kwargs) except Exception: - LOG.warn("Failure calling callback %s to notify about state" - " transition %s, details: %s", - callback, state, details, exc_info=True) + LOG.warn("Failure calling callback %s to notify about event" + " %s, details: %s", callback, event_type, + details, exc_info=True) - def register(self, state, callback, args=None, kwargs=None): - """Register a callback to be called when state is changed. + def register(self, event_type, callback, args=None, kwargs=None): + """Register a callback to be called when event of a given type occurs. Callback will be called with provided ``args`` and ``kwargs`` and - when state is changed to ``state`` (or on any state change if - ``state`` equals to ``TransitionNotifier.ANY``). It will also - get additional keyword argument, ``details``, that will hold - transition details provided to :py:meth:`notify` method. + when event type occurs (or on any event if ``event_type`` equals to + ``Notifier.ANY``). It will also get additional keyword argument, + ``details``, that will hold event details provided to + :py:meth:`notify` method. """ assert six.callable(callback), "Callback must be callable" - if self.is_registered(state, callback): + if self.is_registered(event_type, callback): raise ValueError("Callback %s already registered" % (callback)) if kwargs: for k in self.RESERVED_KEYS: @@ -471,15 +470,15 @@ class TransitionNotifier(object): kwargs = copy.copy(kwargs) if args: args = copy.copy(args) - self._listeners[state].append((callback, args, kwargs)) + self._listeners[event_type].append((callback, args, kwargs)) - def deregister(self, state, callback): - """Remove callback from listening to state ``state``.""" - if state not in self._listeners: + def deregister(self, event_type, callback): + """Remove a single callback from listening to event ``event_type``.""" + if event_type not in self._listeners: return - for i, (cb, args, kwargs) in enumerate(self._listeners[state]): + for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]): if reflection.is_same_callback(cb, callback): - self._listeners[state].pop(i) + self._listeners[event_type].pop(i) break