Allow jobboard event notification

Instead of requiring iteration to be able to list new
jobs and existing jobs so that they can be claimed, allow
for attaching callbacks to the jobboard where instead of
requiring iteration those callbacks will be automatically
called by the jobboard internally when events are recieved.

Breaking change: renames transition notifier to notifier
since it is really not just a notifier for transitions but
is a generic notifier of events occuring (and details about
those events). This is an internal api so its not expected
that this will cause any issues (it's not expected for external
users to be creating instances of this class).

Implements bp board-notification

Change-Id: I2384d5e335ed9d17e29fec9a78699e3156fa225c
This commit is contained in:
Joshua Harlow
2014-04-03 17:49:56 -07:00
committed by Thomas Goirand
parent 791b9a96c9
commit fa46708c0e
9 changed files with 104 additions and 60 deletions

View File

@@ -17,7 +17,7 @@ transitions, which is useful for monitoring, logging, metrics, debugging
and plenty of other tasks.
To receive these notifications you should register a callback in
:py:class:`~taskflow.utils.misc.TransitionNotifier` provided by engine.
:py:class:`~taskflow.utils.misc.Notifier` provided by engine.
Each engine provides two of them: one notifies about flow state changes,
and another notifies about changes of tasks.
@@ -30,15 +30,15 @@ Receiving Notifications with Callbacks
--------------------------------------
To manage notifications instances of
:py:class:`~taskflow.utils.misc.TransitionNotifier` are used.
:py:class:`~taskflow.utils.misc.Notifier` are used.
.. autoclass:: taskflow.utils.misc.TransitionNotifier
.. autoclass:: taskflow.utils.misc.Notifier
Flow Notifications
------------------
To receive notification on flow state changes use
:py:class:`~taskflow.utils.misc.TransitionNotifier` available as
:py:class:`~taskflow.utils.misc.Notifier` available as
``notifier`` property of the engine. A basic example is:
.. doctest::
@@ -71,7 +71,7 @@ Task notifications
------------------
To receive notification on task state changes use
:py:class:`~taskflow.utils.misc.TransitionNotifier` available as
:py:class:`~taskflow.utils.misc.Notifier` available as
``task_notifier`` property of the engine. A basic example is:
.. doctest::

View File

@@ -35,8 +35,8 @@ class EngineBase(object):
else:
self._conf = dict(conf)
self._storage = None
self.notifier = misc.TransitionNotifier()
self.task_notifier = misc.TransitionNotifier()
self.notifier = misc.Notifier()
self.task_notifier = misc.Notifier()
@property
def storage(self):

View File

@@ -18,6 +18,7 @@ import contextlib
import functools
import logging
from concurrent import futures
from kazoo import exceptions as k_exceptions
from kazoo.protocol import paths as k_paths
from kazoo.recipe import watchers
@@ -146,8 +147,9 @@ class ZookeeperJob(base_job.Job):
return self._book
class ZookeeperJobBoard(jobboard.JobBoard):
def __init__(self, name, conf, client=None, persistence=None):
class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
def __init__(self, name, conf,
client=None, persistence=None, emit_notifications=True):
super(ZookeeperJobBoard, self).__init__(name, conf)
if client is not None:
self._client = client
@@ -177,6 +179,13 @@ class ZookeeperJobBoard(jobboard.JobBoard):
# Since we use sequenced ids this will be the path that the sequences
# are prefixed with, for example, job0000000001, job0000000002, ...
self._job_base = k_paths.join(path, "job")
self._worker = None
self._emit_notifications = bool(emit_notifications)
def _emit(self, state, details):
# Submit the work to the executor to avoid blocking the kazoo queue.
if self._worker is not None:
self._worker.submit(self.notifier.notify, state, details)
@property
def path(self):
@@ -224,7 +233,12 @@ class ZookeeperJobBoard(jobboard.JobBoard):
def _remove_job(self, path):
LOG.debug("Removing job that was at path: %s", path)
self._known_jobs.pop(path, None)
job = self._known_jobs.pop(path, None)
if job is not None:
self._emit(jobboard.REMOVAL,
details={
'job': job,
})
def _process_child(self, path, request):
"""Receives the result of a child data fetch request."""
@@ -239,6 +253,10 @@ class ZookeeperJobBoard(jobboard.JobBoard):
book_data=job_data.get("book"),
details=job_data.get("details", {}))
self._known_jobs[path] = job
self._emit(jobboard.POSTED,
details={
'job': job,
})
except (ValueError, TypeError, KeyError):
LOG.warn("Incorrectly formatted job data found at path: %s",
path, exc_info=True)
@@ -447,6 +465,10 @@ class ZookeeperJobBoard(jobboard.JobBoard):
if self._owned:
LOG.debug("Stopping client")
kazoo_utils.finalize_client(self._client)
if self._worker is not None:
LOG.debug("Shutting down the notifier")
self._worker.shutdown()
self._worker = None
self._clear()
LOG.debug("Stopped & cleared local state")
@@ -472,6 +494,8 @@ class ZookeeperJobBoard(jobboard.JobBoard):
raise excp.JobFailure("Failed to connect to zookeeper", e)
try:
kazoo_utils.check_compatible(self._client, MIN_ZK_VERSION)
if self._worker is None and self._emit_notifications:
self._worker = futures.ThreadPoolExecutor(max_workers=1)
self._client.ensure_path(self.path)
self._job_watcher = watchers.ChildrenWatch(
self._client,

View File

@@ -19,6 +19,8 @@ import abc
import six
from taskflow.utils import misc
@six.add_metaclass(abc.ABCMeta)
class JobBoard(object):
@@ -114,3 +116,22 @@ class JobBoard(object):
abandoning a unclaimed job (or a job they do not own) will cause an
exception.
"""
# Jobboard events
POSTED = 'POSTED' # new job is/has been posted
REMOVAL = 'REMOVAL' # existing job is/has been removed
class NotifyingJobBoard(JobBoard):
"""A jobboard subclass that can notify about jobs being created
and removed, which can remove the repeated usage of iterjobs() to achieve
the same operation.
NOTE(harlowja): notifications that are emitted *may* be emitted on a
separate dedicated thread when they occur, so ensure that all callbacks
registered are thread safe.
"""
def __init__(self, name, conf):
super(NotifyingJobBoard, self).__init__(name, conf)
self.notifier = misc.Notifier()

View File

@@ -46,8 +46,8 @@ class ListenerBase(object):
"""
def __init__(self, engine,
task_listen_for=(misc.TransitionNotifier.ANY,),
flow_listen_for=(misc.TransitionNotifier.ANY,)):
task_listen_for=(misc.Notifier.ANY,),
flow_listen_for=(misc.Notifier.ANY,)):
if not task_listen_for:
task_listen_for = []
if not flow_listen_for:

View File

@@ -33,8 +33,8 @@ class LoggingListener(base.LoggingBase):
can also be configured, ``logging.DEBUG`` is used by default.
"""
def __init__(self, engine,
task_listen_for=(misc.TransitionNotifier.ANY,),
flow_listen_for=(misc.TransitionNotifier.ANY,),
task_listen_for=(misc.Notifier.ANY,),
flow_listen_for=(misc.Notifier.ANY,),
log=None,
level=logging.DEBUG):
super(LoggingListener, self).__init__(engine,

View File

@@ -26,8 +26,8 @@ from taskflow.utils import misc
class PrintingListener(base.LoggingBase):
"""Writes the task and flow notifications messages to stdout or stderr."""
def __init__(self, engine,
task_listen_for=(misc.TransitionNotifier.ANY,),
flow_listen_for=(misc.TransitionNotifier.ANY,),
task_listen_for=(misc.Notifier.ANY,),
flow_listen_for=(misc.Notifier.ANY,),
stderr=False):
super(PrintingListener, self).__init__(engine,
task_listen_for=task_listen_for,

View File

@@ -149,8 +149,8 @@ class NotifierTest(test.TestCase):
def call_me(state, details):
call_collector.append((state, details))
notifier = misc.TransitionNotifier()
notifier.register(misc.TransitionNotifier.ANY, call_me)
notifier = misc.Notifier()
notifier.register(misc.Notifier.ANY, call_me)
notifier.notify(states.SUCCESS, {})
notifier.notify(states.SUCCESS, {})
@@ -166,14 +166,14 @@ class NotifierTest(test.TestCase):
def call_me_too(self, state, details):
pass
notifier = misc.TransitionNotifier()
notifier.register(misc.TransitionNotifier.ANY, call_me)
notifier = misc.Notifier()
notifier.register(misc.Notifier.ANY, call_me)
a = A()
notifier.register(misc.TransitionNotifier.ANY, a.call_me_too)
notifier.register(misc.Notifier.ANY, a.call_me_too)
self.assertEqual(2, len(notifier))
notifier.deregister(misc.TransitionNotifier.ANY, call_me)
notifier.deregister(misc.TransitionNotifier.ANY, a.call_me_too)
notifier.deregister(misc.Notifier.ANY, call_me)
notifier.deregister(misc.Notifier.ANY, a.call_me_too)
self.assertEqual(0, len(notifier))
def test_notify_reset(self):
@@ -181,8 +181,8 @@ class NotifierTest(test.TestCase):
def call_me(state, details):
pass
notifier = misc.TransitionNotifier()
notifier.register(misc.TransitionNotifier.ANY, call_me)
notifier = misc.Notifier()
notifier.register(misc.Notifier.ANY, call_me)
self.assertEqual(1, len(notifier))
notifier.reset()
@@ -193,9 +193,9 @@ class NotifierTest(test.TestCase):
def call_me(state, details):
pass
notifier = misc.TransitionNotifier()
notifier = misc.Notifier()
self.assertRaises(KeyError, notifier.register,
misc.TransitionNotifier.ANY, call_me,
misc.Notifier.ANY, call_me,
kwargs={'details': 5})
def test_selective_notify(self):
@@ -204,21 +204,21 @@ class NotifierTest(test.TestCase):
def call_me_on(registered_state, state, details):
call_counts[registered_state].append((state, details))
notifier = misc.TransitionNotifier()
notifier = misc.Notifier()
notifier.register(states.SUCCESS,
functools.partial(call_me_on, states.SUCCESS))
notifier.register(misc.TransitionNotifier.ANY,
notifier.register(misc.Notifier.ANY,
functools.partial(call_me_on,
misc.TransitionNotifier.ANY))
misc.Notifier.ANY))
self.assertEqual(2, len(notifier))
notifier.notify(states.SUCCESS, {})
self.assertEqual(1, len(call_counts[misc.TransitionNotifier.ANY]))
self.assertEqual(1, len(call_counts[misc.Notifier.ANY]))
self.assertEqual(1, len(call_counts[states.SUCCESS]))
notifier.notify(states.FAILURE, {})
self.assertEqual(2, len(call_counts[misc.TransitionNotifier.ANY]))
self.assertEqual(2, len(call_counts[misc.Notifier.ANY]))
self.assertEqual(1, len(call_counts[states.SUCCESS]))
self.assertEqual(2, len(call_counts))

View File

@@ -391,7 +391,7 @@ class StopWatch(object):
return self
class TransitionNotifier(object):
class Notifier(object):
"""A utility helper class that can be used to subscribe to
notifications of events occurring as well as allow a entity to post said
notifications to subscribers.
@@ -405,15 +405,14 @@ class TransitionNotifier(object):
def __len__(self):
"""Returns how many callbacks are registered."""
count = 0
for (_s, callbacks) in six.iteritems(self._listeners):
for (_event_type, callbacks) in six.iteritems(self._listeners):
count += len(callbacks)
return count
def is_registered(self, state, callback):
def is_registered(self, event_type, callback):
"""Check if a callback is registered."""
listeners = list(self._listeners.get(state, []))
listeners = list(self._listeners.get(event_type, []))
for (cb, _args, _kwargs) in listeners:
if reflection.is_same_callback(cb, callback):
return True
@@ -423,17 +422,17 @@ class TransitionNotifier(object):
"""Forget all previously registered callbacks."""
self._listeners.clear()
def notify(self, state, details):
"""Notify about state change.
def notify(self, event_type, details):
"""Notify about event occurrence.
All callbacks registered to receive notifications about given
state will be called.
event type will be called.
:param state: state we moved to
:param details: addition transition details
:param event_type: event type that occured
:param details: addition event details
"""
listeners = list(self._listeners.get(self.ANY, []))
for i in self._listeners[state]:
for i in self._listeners[event_type]:
if i not in listeners:
listeners.append(i)
if not listeners:
@@ -445,23 +444,23 @@ class TransitionNotifier(object):
kwargs = {}
kwargs['details'] = details
try:
callback(state, *args, **kwargs)
callback(event_type, *args, **kwargs)
except Exception:
LOG.warn("Failure calling callback %s to notify about state"
" transition %s, details: %s",
callback, state, details, exc_info=True)
LOG.warn("Failure calling callback %s to notify about event"
" %s, details: %s", callback, event_type,
details, exc_info=True)
def register(self, state, callback, args=None, kwargs=None):
"""Register a callback to be called when state is changed.
def register(self, event_type, callback, args=None, kwargs=None):
"""Register a callback to be called when event of a given type occurs.
Callback will be called with provided ``args`` and ``kwargs`` and
when state is changed to ``state`` (or on any state change if
``state`` equals to ``TransitionNotifier.ANY``). It will also
get additional keyword argument, ``details``, that will hold
transition details provided to :py:meth:`notify` method.
when event type occurs (or on any event if ``event_type`` equals to
``Notifier.ANY``). It will also get additional keyword argument,
``details``, that will hold event details provided to
:py:meth:`notify` method.
"""
assert six.callable(callback), "Callback must be callable"
if self.is_registered(state, callback):
if self.is_registered(event_type, callback):
raise ValueError("Callback %s already registered" % (callback))
if kwargs:
for k in self.RESERVED_KEYS:
@@ -471,15 +470,15 @@ class TransitionNotifier(object):
kwargs = copy.copy(kwargs)
if args:
args = copy.copy(args)
self._listeners[state].append((callback, args, kwargs))
self._listeners[event_type].append((callback, args, kwargs))
def deregister(self, state, callback):
"""Remove callback from listening to state ``state``."""
if state not in self._listeners:
def deregister(self, event_type, callback):
"""Remove a single callback from listening to event ``event_type``."""
if event_type not in self._listeners:
return
for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]):
if reflection.is_same_callback(cb, callback):
self._listeners[state].pop(i)
self._listeners[event_type].pop(i)
break