diff --git a/doc/source/types.rst b/doc/source/types.rst index 5c53db72..c628c1ce 100644 --- a/doc/source/types.rst +++ b/doc/source/types.rst @@ -25,7 +25,7 @@ Graph Notifier ======== -.. autoclass:: taskflow.utils.misc.Notifier +.. automodule:: taskflow.types.notifier Table ===== diff --git a/taskflow/tests/unit/test_notifier.py b/taskflow/tests/unit/test_notifier.py new file mode 100644 index 00000000..0761cb6e --- /dev/null +++ b/taskflow/tests/unit/test_notifier.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools + +from taskflow import states +from taskflow import test +from taskflow.types import notifier as nt + + +class NotifierTest(test.TestCase): + + def test_notify_called(self): + call_collector = [] + + def call_me(state, details): + call_collector.append((state, details)) + + notifier = nt.Notifier() + notifier.register(nt.Notifier.ANY, call_me) + notifier.notify(states.SUCCESS, {}) + notifier.notify(states.SUCCESS, {}) + + self.assertEqual(2, len(call_collector)) + self.assertEqual(1, len(notifier)) + + def test_notify_register_deregister(self): + + def call_me(state, details): + pass + + class A(object): + def call_me_too(self, state, details): + pass + + notifier = nt.Notifier() + notifier.register(nt.Notifier.ANY, call_me) + a = A() + notifier.register(nt.Notifier.ANY, a.call_me_too) + + self.assertEqual(2, len(notifier)) + notifier.deregister(nt.Notifier.ANY, call_me) + notifier.deregister(nt.Notifier.ANY, a.call_me_too) + self.assertEqual(0, len(notifier)) + + def test_notify_reset(self): + + def call_me(state, details): + pass + + notifier = nt.Notifier() + notifier.register(nt.Notifier.ANY, call_me) + self.assertEqual(1, len(notifier)) + + notifier.reset() + self.assertEqual(0, len(notifier)) + + def test_bad_notify(self): + + def call_me(state, details): + pass + + notifier = nt.Notifier() + self.assertRaises(KeyError, notifier.register, + nt.Notifier.ANY, call_me, + kwargs={'details': 5}) + + def test_selective_notify(self): + call_counts = collections.defaultdict(list) + + def call_me_on(registered_state, state, details): + call_counts[registered_state].append((state, details)) + + notifier = nt.Notifier() + notifier.register(states.SUCCESS, + functools.partial(call_me_on, states.SUCCESS)) + notifier.register(nt.Notifier.ANY, + functools.partial(call_me_on, + nt.Notifier.ANY)) + + self.assertEqual(2, len(notifier)) + notifier.notify(states.SUCCESS, {}) + + self.assertEqual(1, len(call_counts[nt.Notifier.ANY])) + self.assertEqual(1, len(call_counts[states.SUCCESS])) + + notifier.notify(states.FAILURE, {}) + self.assertEqual(2, len(call_counts[nt.Notifier.ANY])) + self.assertEqual(1, len(call_counts[states.SUCCESS])) + self.assertEqual(2, len(call_counts)) diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index c9c93f3c..e8660bd3 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -15,7 +15,6 @@ # under the License. import collections -import functools import inspect import random import threading @@ -24,7 +23,6 @@ import time import six import testtools -from taskflow import states from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure @@ -192,88 +190,6 @@ class GetCallableNameTestExtended(test.TestCase): self.assertEqual(expected_name, name) -class NotifierTest(test.TestCase): - - def test_notify_called(self): - call_collector = [] - - def call_me(state, details): - call_collector.append((state, details)) - - notifier = misc.Notifier() - notifier.register(misc.Notifier.ANY, call_me) - notifier.notify(states.SUCCESS, {}) - notifier.notify(states.SUCCESS, {}) - - self.assertEqual(2, len(call_collector)) - self.assertEqual(1, len(notifier)) - - def test_notify_register_deregister(self): - - def call_me(state, details): - pass - - class A(object): - def call_me_too(self, state, details): - pass - - notifier = misc.Notifier() - notifier.register(misc.Notifier.ANY, call_me) - a = A() - notifier.register(misc.Notifier.ANY, a.call_me_too) - - self.assertEqual(2, len(notifier)) - notifier.deregister(misc.Notifier.ANY, call_me) - notifier.deregister(misc.Notifier.ANY, a.call_me_too) - self.assertEqual(0, len(notifier)) - - def test_notify_reset(self): - - def call_me(state, details): - pass - - notifier = misc.Notifier() - notifier.register(misc.Notifier.ANY, call_me) - self.assertEqual(1, len(notifier)) - - notifier.reset() - self.assertEqual(0, len(notifier)) - - def test_bad_notify(self): - - def call_me(state, details): - pass - - notifier = misc.Notifier() - self.assertRaises(KeyError, notifier.register, - misc.Notifier.ANY, call_me, - kwargs={'details': 5}) - - def test_selective_notify(self): - call_counts = collections.defaultdict(list) - - def call_me_on(registered_state, state, details): - call_counts[registered_state].append((state, details)) - - notifier = misc.Notifier() - notifier.register(states.SUCCESS, - functools.partial(call_me_on, states.SUCCESS)) - notifier.register(misc.Notifier.ANY, - functools.partial(call_me_on, - misc.Notifier.ANY)) - - self.assertEqual(2, len(notifier)) - notifier.notify(states.SUCCESS, {}) - - self.assertEqual(1, len(call_counts[misc.Notifier.ANY])) - self.assertEqual(1, len(call_counts[states.SUCCESS])) - - notifier.notify(states.FAILURE, {}) - self.assertEqual(2, len(call_counts[misc.Notifier.ANY])) - self.assertEqual(1, len(call_counts[states.SUCCESS])) - self.assertEqual(2, len(call_counts)) - - class GetCallableArgsTest(test.TestCase): def test_mere_function(self): diff --git a/taskflow/types/notifier.py b/taskflow/types/notifier.py new file mode 100644 index 00000000..a92d6b89 --- /dev/null +++ b/taskflow/types/notifier.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import copy +import logging + +import six + +from taskflow.utils import reflection + +LOG = logging.getLogger(__name__) + + +class Notifier(object): + """A notification helper class. + + It is intended to be used to subscribe to notifications of events + occurring as well as allow a entity to post said notifications to any + associated subscribers without having either entity care about how this + notification occurs. + """ + + #: Keys that can not be used in callbacks arguments + RESERVED_KEYS = ('details',) + + #: Kleene star constant that is used to recieve all notifications + ANY = '*' + + def __init__(self): + self._listeners = collections.defaultdict(list) + + def __len__(self): + """Returns how many callbacks are registered.""" + count = 0 + for (_event_type, callbacks) in six.iteritems(self._listeners): + count += len(callbacks) + return count + + def is_registered(self, event_type, callback): + """Check if a callback is registered.""" + listeners = list(self._listeners.get(event_type, [])) + for (cb, _args, _kwargs) in listeners: + if reflection.is_same_callback(cb, callback): + return True + return False + + def reset(self): + """Forget all previously registered callbacks.""" + self._listeners.clear() + + def notify(self, event_type, details): + """Notify about event occurrence. + + All callbacks registered to receive notifications about given + event type will be called. + + :param event_type: event type that occurred + :param details: addition event details + """ + listeners = list(self._listeners.get(self.ANY, [])) + for i in self._listeners[event_type]: + if i not in listeners: + listeners.append(i) + if not listeners: + return + for (callback, args, kwargs) in listeners: + if args is None: + args = [] + if kwargs is None: + kwargs = {} + kwargs['details'] = details + try: + callback(event_type, *args, **kwargs) + except Exception: + LOG.warn("Failure calling callback %s to notify about event" + " %s, details: %s", callback, event_type, + details, exc_info=True) + + def register(self, event_type, callback, args=None, kwargs=None): + """Register a callback to be called when event of a given type occurs. + + Callback will be called with provided ``args`` and ``kwargs`` and + when event type occurs (or on any event if ``event_type`` equals to + :attr:`.ANY`). It will also get additional keyword argument, + ``details``, that will hold event details provided to the + :meth:`.notify` method. + """ + assert six.callable(callback), "Callback must be callable" + if self.is_registered(event_type, callback): + raise ValueError("Callback %s already registered" % (callback)) + if kwargs: + for k in self.RESERVED_KEYS: + if k in kwargs: + raise KeyError(("Reserved key '%s' not allowed in " + "kwargs") % k) + kwargs = copy.copy(kwargs) + if args: + args = copy.copy(args) + self._listeners[event_type].append((callback, args, kwargs)) + + def deregister(self, event_type, callback): + """Remove a single callback from listening to event ``event_type``.""" + if event_type not in self._listeners: + return + for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]): + if reflection.is_same_callback(cb, callback): + self._listeners[event_type].pop(i) + break diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 41c5cfcd..a4619881 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -15,9 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import contextlib -import copy import datetime import errno import inspect @@ -37,6 +35,7 @@ from six.moves import range as compat_range from six.moves.urllib import parse as urlparse from taskflow.types import failure +from taskflow.types import notifier from taskflow.utils import deprecation from taskflow.utils import reflection @@ -396,101 +395,8 @@ Failure = deprecation.moved_class(failure.Failure, 'Failure', __name__, version="0.5", removal_version="?") -class Notifier(object): - """A notification helper class. - - It is intended to be used to subscribe to notifications of events - occurring as well as allow a entity to post said notifications to any - associated subscribers without having either entity care about how this - notification occurs. - """ - - #: Keys that can not be used in callbacks arguments - RESERVED_KEYS = ('details',) - - #: Kleene star constant that is used to recieve all notifications - ANY = '*' - - def __init__(self): - self._listeners = collections.defaultdict(list) - - def __len__(self): - """Returns how many callbacks are registered.""" - count = 0 - for (_event_type, callbacks) in six.iteritems(self._listeners): - count += len(callbacks) - return count - - def is_registered(self, event_type, callback): - """Check if a callback is registered.""" - listeners = list(self._listeners.get(event_type, [])) - for (cb, _args, _kwargs) in listeners: - if reflection.is_same_callback(cb, callback): - return True - return False - - def reset(self): - """Forget all previously registered callbacks.""" - self._listeners.clear() - - def notify(self, event_type, details): - """Notify about event occurrence. - - All callbacks registered to receive notifications about given - event type will be called. - - :param event_type: event type that occurred - :param details: addition event details - """ - listeners = list(self._listeners.get(self.ANY, [])) - for i in self._listeners[event_type]: - if i not in listeners: - listeners.append(i) - if not listeners: - return - for (callback, args, kwargs) in listeners: - if args is None: - args = [] - if kwargs is None: - kwargs = {} - kwargs['details'] = details - try: - callback(event_type, *args, **kwargs) - except Exception: - LOG.warn("Failure calling callback %s to notify about event" - " %s, details: %s", callback, event_type, - details, exc_info=True) - - def register(self, event_type, callback, args=None, kwargs=None): - """Register a callback to be called when event of a given type occurs. - - Callback will be called with provided ``args`` and ``kwargs`` and - when event type occurs (or on any event if ``event_type`` equals to - :attr:`.ANY`). It will also get additional keyword argument, - ``details``, that will hold event details provided to the - :meth:`.notify` method. - """ - assert six.callable(callback), "Callback must be callable" - if self.is_registered(event_type, callback): - raise ValueError("Callback %s already registered" % (callback)) - if kwargs: - for k in self.RESERVED_KEYS: - if k in kwargs: - raise KeyError(("Reserved key '%s' not allowed in " - "kwargs") % k) - kwargs = copy.copy(kwargs) - if args: - args = copy.copy(args) - self._listeners[event_type].append((callback, args, kwargs)) - - def deregister(self, event_type, callback): - """Remove a single callback from listening to event ``event_type``.""" - if event_type not in self._listeners: - return - for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]): - if reflection.is_same_callback(cb, callback): - self._listeners[event_type].pop(i) - break +Notifier = deprecation.moved_class(notifier.Notifier, 'Notifier', __name__, + version="0.5", removal_version="?") @contextlib.contextmanager