Merge "Hoist the notifier to its own module"
This commit is contained in:
@@ -25,7 +25,7 @@ Graph
|
||||
Notifier
|
||||
========
|
||||
|
||||
.. autoclass:: taskflow.utils.misc.Notifier
|
||||
.. automodule:: taskflow.types.notifier
|
||||
|
||||
Table
|
||||
=====
|
||||
|
||||
104
taskflow/tests/unit/test_notifier.py
Normal file
104
taskflow/tests/unit/test_notifier.py
Normal file
@@ -0,0 +1,104 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
|
||||
from taskflow import states
|
||||
from taskflow import test
|
||||
from taskflow.types import notifier as nt
|
||||
|
||||
|
||||
class NotifierTest(test.TestCase):
|
||||
|
||||
def test_notify_called(self):
|
||||
call_collector = []
|
||||
|
||||
def call_me(state, details):
|
||||
call_collector.append((state, details))
|
||||
|
||||
notifier = nt.Notifier()
|
||||
notifier.register(nt.Notifier.ANY, call_me)
|
||||
notifier.notify(states.SUCCESS, {})
|
||||
notifier.notify(states.SUCCESS, {})
|
||||
|
||||
self.assertEqual(2, len(call_collector))
|
||||
self.assertEqual(1, len(notifier))
|
||||
|
||||
def test_notify_register_deregister(self):
|
||||
|
||||
def call_me(state, details):
|
||||
pass
|
||||
|
||||
class A(object):
|
||||
def call_me_too(self, state, details):
|
||||
pass
|
||||
|
||||
notifier = nt.Notifier()
|
||||
notifier.register(nt.Notifier.ANY, call_me)
|
||||
a = A()
|
||||
notifier.register(nt.Notifier.ANY, a.call_me_too)
|
||||
|
||||
self.assertEqual(2, len(notifier))
|
||||
notifier.deregister(nt.Notifier.ANY, call_me)
|
||||
notifier.deregister(nt.Notifier.ANY, a.call_me_too)
|
||||
self.assertEqual(0, len(notifier))
|
||||
|
||||
def test_notify_reset(self):
|
||||
|
||||
def call_me(state, details):
|
||||
pass
|
||||
|
||||
notifier = nt.Notifier()
|
||||
notifier.register(nt.Notifier.ANY, call_me)
|
||||
self.assertEqual(1, len(notifier))
|
||||
|
||||
notifier.reset()
|
||||
self.assertEqual(0, len(notifier))
|
||||
|
||||
def test_bad_notify(self):
|
||||
|
||||
def call_me(state, details):
|
||||
pass
|
||||
|
||||
notifier = nt.Notifier()
|
||||
self.assertRaises(KeyError, notifier.register,
|
||||
nt.Notifier.ANY, call_me,
|
||||
kwargs={'details': 5})
|
||||
|
||||
def test_selective_notify(self):
|
||||
call_counts = collections.defaultdict(list)
|
||||
|
||||
def call_me_on(registered_state, state, details):
|
||||
call_counts[registered_state].append((state, details))
|
||||
|
||||
notifier = nt.Notifier()
|
||||
notifier.register(states.SUCCESS,
|
||||
functools.partial(call_me_on, states.SUCCESS))
|
||||
notifier.register(nt.Notifier.ANY,
|
||||
functools.partial(call_me_on,
|
||||
nt.Notifier.ANY))
|
||||
|
||||
self.assertEqual(2, len(notifier))
|
||||
notifier.notify(states.SUCCESS, {})
|
||||
|
||||
self.assertEqual(1, len(call_counts[nt.Notifier.ANY]))
|
||||
self.assertEqual(1, len(call_counts[states.SUCCESS]))
|
||||
|
||||
notifier.notify(states.FAILURE, {})
|
||||
self.assertEqual(2, len(call_counts[nt.Notifier.ANY]))
|
||||
self.assertEqual(1, len(call_counts[states.SUCCESS]))
|
||||
self.assertEqual(2, len(call_counts))
|
||||
@@ -15,7 +15,6 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import inspect
|
||||
import random
|
||||
import threading
|
||||
@@ -24,7 +23,6 @@ import time
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from taskflow import states
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.types import failure
|
||||
@@ -192,88 +190,6 @@ class GetCallableNameTestExtended(test.TestCase):
|
||||
self.assertEqual(expected_name, name)
|
||||
|
||||
|
||||
class NotifierTest(test.TestCase):
|
||||
|
||||
def test_notify_called(self):
|
||||
call_collector = []
|
||||
|
||||
def call_me(state, details):
|
||||
call_collector.append((state, details))
|
||||
|
||||
notifier = misc.Notifier()
|
||||
notifier.register(misc.Notifier.ANY, call_me)
|
||||
notifier.notify(states.SUCCESS, {})
|
||||
notifier.notify(states.SUCCESS, {})
|
||||
|
||||
self.assertEqual(2, len(call_collector))
|
||||
self.assertEqual(1, len(notifier))
|
||||
|
||||
def test_notify_register_deregister(self):
|
||||
|
||||
def call_me(state, details):
|
||||
pass
|
||||
|
||||
class A(object):
|
||||
def call_me_too(self, state, details):
|
||||
pass
|
||||
|
||||
notifier = misc.Notifier()
|
||||
notifier.register(misc.Notifier.ANY, call_me)
|
||||
a = A()
|
||||
notifier.register(misc.Notifier.ANY, a.call_me_too)
|
||||
|
||||
self.assertEqual(2, len(notifier))
|
||||
notifier.deregister(misc.Notifier.ANY, call_me)
|
||||
notifier.deregister(misc.Notifier.ANY, a.call_me_too)
|
||||
self.assertEqual(0, len(notifier))
|
||||
|
||||
def test_notify_reset(self):
|
||||
|
||||
def call_me(state, details):
|
||||
pass
|
||||
|
||||
notifier = misc.Notifier()
|
||||
notifier.register(misc.Notifier.ANY, call_me)
|
||||
self.assertEqual(1, len(notifier))
|
||||
|
||||
notifier.reset()
|
||||
self.assertEqual(0, len(notifier))
|
||||
|
||||
def test_bad_notify(self):
|
||||
|
||||
def call_me(state, details):
|
||||
pass
|
||||
|
||||
notifier = misc.Notifier()
|
||||
self.assertRaises(KeyError, notifier.register,
|
||||
misc.Notifier.ANY, call_me,
|
||||
kwargs={'details': 5})
|
||||
|
||||
def test_selective_notify(self):
|
||||
call_counts = collections.defaultdict(list)
|
||||
|
||||
def call_me_on(registered_state, state, details):
|
||||
call_counts[registered_state].append((state, details))
|
||||
|
||||
notifier = misc.Notifier()
|
||||
notifier.register(states.SUCCESS,
|
||||
functools.partial(call_me_on, states.SUCCESS))
|
||||
notifier.register(misc.Notifier.ANY,
|
||||
functools.partial(call_me_on,
|
||||
misc.Notifier.ANY))
|
||||
|
||||
self.assertEqual(2, len(notifier))
|
||||
notifier.notify(states.SUCCESS, {})
|
||||
|
||||
self.assertEqual(1, len(call_counts[misc.Notifier.ANY]))
|
||||
self.assertEqual(1, len(call_counts[states.SUCCESS]))
|
||||
|
||||
notifier.notify(states.FAILURE, {})
|
||||
self.assertEqual(2, len(call_counts[misc.Notifier.ANY]))
|
||||
self.assertEqual(1, len(call_counts[states.SUCCESS]))
|
||||
self.assertEqual(2, len(call_counts))
|
||||
|
||||
|
||||
class GetCallableArgsTest(test.TestCase):
|
||||
|
||||
def test_mere_function(self):
|
||||
|
||||
122
taskflow/types/notifier.py
Normal file
122
taskflow/types/notifier.py
Normal file
@@ -0,0 +1,122 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
from taskflow.utils import reflection
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Notifier(object):
|
||||
"""A notification helper class.
|
||||
|
||||
It is intended to be used to subscribe to notifications of events
|
||||
occurring as well as allow a entity to post said notifications to any
|
||||
associated subscribers without having either entity care about how this
|
||||
notification occurs.
|
||||
"""
|
||||
|
||||
#: Keys that can not be used in callbacks arguments
|
||||
RESERVED_KEYS = ('details',)
|
||||
|
||||
#: Kleene star constant that is used to recieve all notifications
|
||||
ANY = '*'
|
||||
|
||||
def __init__(self):
|
||||
self._listeners = collections.defaultdict(list)
|
||||
|
||||
def __len__(self):
|
||||
"""Returns how many callbacks are registered."""
|
||||
count = 0
|
||||
for (_event_type, callbacks) in six.iteritems(self._listeners):
|
||||
count += len(callbacks)
|
||||
return count
|
||||
|
||||
def is_registered(self, event_type, callback):
|
||||
"""Check if a callback is registered."""
|
||||
listeners = list(self._listeners.get(event_type, []))
|
||||
for (cb, _args, _kwargs) in listeners:
|
||||
if reflection.is_same_callback(cb, callback):
|
||||
return True
|
||||
return False
|
||||
|
||||
def reset(self):
|
||||
"""Forget all previously registered callbacks."""
|
||||
self._listeners.clear()
|
||||
|
||||
def notify(self, event_type, details):
|
||||
"""Notify about event occurrence.
|
||||
|
||||
All callbacks registered to receive notifications about given
|
||||
event type will be called.
|
||||
|
||||
:param event_type: event type that occurred
|
||||
:param details: addition event details
|
||||
"""
|
||||
listeners = list(self._listeners.get(self.ANY, []))
|
||||
for i in self._listeners[event_type]:
|
||||
if i not in listeners:
|
||||
listeners.append(i)
|
||||
if not listeners:
|
||||
return
|
||||
for (callback, args, kwargs) in listeners:
|
||||
if args is None:
|
||||
args = []
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
kwargs['details'] = details
|
||||
try:
|
||||
callback(event_type, *args, **kwargs)
|
||||
except Exception:
|
||||
LOG.warn("Failure calling callback %s to notify about event"
|
||||
" %s, details: %s", callback, event_type,
|
||||
details, exc_info=True)
|
||||
|
||||
def register(self, event_type, callback, args=None, kwargs=None):
|
||||
"""Register a callback to be called when event of a given type occurs.
|
||||
|
||||
Callback will be called with provided ``args`` and ``kwargs`` and
|
||||
when event type occurs (or on any event if ``event_type`` equals to
|
||||
:attr:`.ANY`). It will also get additional keyword argument,
|
||||
``details``, that will hold event details provided to the
|
||||
:meth:`.notify` method.
|
||||
"""
|
||||
assert six.callable(callback), "Callback must be callable"
|
||||
if self.is_registered(event_type, callback):
|
||||
raise ValueError("Callback %s already registered" % (callback))
|
||||
if kwargs:
|
||||
for k in self.RESERVED_KEYS:
|
||||
if k in kwargs:
|
||||
raise KeyError(("Reserved key '%s' not allowed in "
|
||||
"kwargs") % k)
|
||||
kwargs = copy.copy(kwargs)
|
||||
if args:
|
||||
args = copy.copy(args)
|
||||
self._listeners[event_type].append((callback, args, kwargs))
|
||||
|
||||
def deregister(self, event_type, callback):
|
||||
"""Remove a single callback from listening to event ``event_type``."""
|
||||
if event_type not in self._listeners:
|
||||
return
|
||||
for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]):
|
||||
if reflection.is_same_callback(cb, callback):
|
||||
self._listeners[event_type].pop(i)
|
||||
break
|
||||
@@ -15,9 +15,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import copy
|
||||
import datetime
|
||||
import errno
|
||||
import inspect
|
||||
@@ -37,6 +35,7 @@ from six.moves import range as compat_range
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import notifier
|
||||
from taskflow.utils import deprecation
|
||||
from taskflow.utils import reflection
|
||||
|
||||
@@ -396,101 +395,8 @@ Failure = deprecation.moved_class(failure.Failure, 'Failure', __name__,
|
||||
version="0.5", removal_version="?")
|
||||
|
||||
|
||||
class Notifier(object):
|
||||
"""A notification helper class.
|
||||
|
||||
It is intended to be used to subscribe to notifications of events
|
||||
occurring as well as allow a entity to post said notifications to any
|
||||
associated subscribers without having either entity care about how this
|
||||
notification occurs.
|
||||
"""
|
||||
|
||||
#: Keys that can not be used in callbacks arguments
|
||||
RESERVED_KEYS = ('details',)
|
||||
|
||||
#: Kleene star constant that is used to recieve all notifications
|
||||
ANY = '*'
|
||||
|
||||
def __init__(self):
|
||||
self._listeners = collections.defaultdict(list)
|
||||
|
||||
def __len__(self):
|
||||
"""Returns how many callbacks are registered."""
|
||||
count = 0
|
||||
for (_event_type, callbacks) in six.iteritems(self._listeners):
|
||||
count += len(callbacks)
|
||||
return count
|
||||
|
||||
def is_registered(self, event_type, callback):
|
||||
"""Check if a callback is registered."""
|
||||
listeners = list(self._listeners.get(event_type, []))
|
||||
for (cb, _args, _kwargs) in listeners:
|
||||
if reflection.is_same_callback(cb, callback):
|
||||
return True
|
||||
return False
|
||||
|
||||
def reset(self):
|
||||
"""Forget all previously registered callbacks."""
|
||||
self._listeners.clear()
|
||||
|
||||
def notify(self, event_type, details):
|
||||
"""Notify about event occurrence.
|
||||
|
||||
All callbacks registered to receive notifications about given
|
||||
event type will be called.
|
||||
|
||||
:param event_type: event type that occurred
|
||||
:param details: addition event details
|
||||
"""
|
||||
listeners = list(self._listeners.get(self.ANY, []))
|
||||
for i in self._listeners[event_type]:
|
||||
if i not in listeners:
|
||||
listeners.append(i)
|
||||
if not listeners:
|
||||
return
|
||||
for (callback, args, kwargs) in listeners:
|
||||
if args is None:
|
||||
args = []
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
kwargs['details'] = details
|
||||
try:
|
||||
callback(event_type, *args, **kwargs)
|
||||
except Exception:
|
||||
LOG.warn("Failure calling callback %s to notify about event"
|
||||
" %s, details: %s", callback, event_type,
|
||||
details, exc_info=True)
|
||||
|
||||
def register(self, event_type, callback, args=None, kwargs=None):
|
||||
"""Register a callback to be called when event of a given type occurs.
|
||||
|
||||
Callback will be called with provided ``args`` and ``kwargs`` and
|
||||
when event type occurs (or on any event if ``event_type`` equals to
|
||||
:attr:`.ANY`). It will also get additional keyword argument,
|
||||
``details``, that will hold event details provided to the
|
||||
:meth:`.notify` method.
|
||||
"""
|
||||
assert six.callable(callback), "Callback must be callable"
|
||||
if self.is_registered(event_type, callback):
|
||||
raise ValueError("Callback %s already registered" % (callback))
|
||||
if kwargs:
|
||||
for k in self.RESERVED_KEYS:
|
||||
if k in kwargs:
|
||||
raise KeyError(("Reserved key '%s' not allowed in "
|
||||
"kwargs") % k)
|
||||
kwargs = copy.copy(kwargs)
|
||||
if args:
|
||||
args = copy.copy(args)
|
||||
self._listeners[event_type].append((callback, args, kwargs))
|
||||
|
||||
def deregister(self, event_type, callback):
|
||||
"""Remove a single callback from listening to event ``event_type``."""
|
||||
if event_type not in self._listeners:
|
||||
return
|
||||
for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]):
|
||||
if reflection.is_same_callback(cb, callback):
|
||||
self._listeners[event_type].pop(i)
|
||||
break
|
||||
Notifier = deprecation.moved_class(notifier.Notifier, 'Notifier', __name__,
|
||||
version="0.5", removal_version="?")
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
||||
Reference in New Issue
Block a user