Hoist the notifier to its own module

The notifier module needs to be hoisted out of the misc utility
file so that it can be depended on existing by users in a well
defined (non-utility) location.

This change does this hoisting process & creates a new module
and places the existing code there, then creates a deprecated
proxy that exists at the old location (this will be removed
in the next version + 1).

In a future change (in 0.5) we can remove this old location and
remove all references to the previous location (until then we
must keep the old location being used to ensure subclass checks
and other types checks function properly).

Part of blueprint top-level-types

Change-Id: I47fac110adf7cec5c859c2e055c1ceb1f25a7fbd
This commit is contained in:
Joshua Harlow
2014-06-24 12:47:05 -07:00
parent f2ea4f1288
commit bf84288aa0
5 changed files with 230 additions and 182 deletions

View File

@@ -25,7 +25,7 @@ Graph
Notifier
========
.. autoclass:: taskflow.utils.misc.Notifier
.. automodule:: taskflow.types.notifier
Table
=====

View File

@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
from taskflow import states
from taskflow import test
from taskflow.types import notifier as nt
class NotifierTest(test.TestCase):
def test_notify_called(self):
call_collector = []
def call_me(state, details):
call_collector.append((state, details))
notifier = nt.Notifier()
notifier.register(nt.Notifier.ANY, call_me)
notifier.notify(states.SUCCESS, {})
notifier.notify(states.SUCCESS, {})
self.assertEqual(2, len(call_collector))
self.assertEqual(1, len(notifier))
def test_notify_register_deregister(self):
def call_me(state, details):
pass
class A(object):
def call_me_too(self, state, details):
pass
notifier = nt.Notifier()
notifier.register(nt.Notifier.ANY, call_me)
a = A()
notifier.register(nt.Notifier.ANY, a.call_me_too)
self.assertEqual(2, len(notifier))
notifier.deregister(nt.Notifier.ANY, call_me)
notifier.deregister(nt.Notifier.ANY, a.call_me_too)
self.assertEqual(0, len(notifier))
def test_notify_reset(self):
def call_me(state, details):
pass
notifier = nt.Notifier()
notifier.register(nt.Notifier.ANY, call_me)
self.assertEqual(1, len(notifier))
notifier.reset()
self.assertEqual(0, len(notifier))
def test_bad_notify(self):
def call_me(state, details):
pass
notifier = nt.Notifier()
self.assertRaises(KeyError, notifier.register,
nt.Notifier.ANY, call_me,
kwargs={'details': 5})
def test_selective_notify(self):
call_counts = collections.defaultdict(list)
def call_me_on(registered_state, state, details):
call_counts[registered_state].append((state, details))
notifier = nt.Notifier()
notifier.register(states.SUCCESS,
functools.partial(call_me_on, states.SUCCESS))
notifier.register(nt.Notifier.ANY,
functools.partial(call_me_on,
nt.Notifier.ANY))
self.assertEqual(2, len(notifier))
notifier.notify(states.SUCCESS, {})
self.assertEqual(1, len(call_counts[nt.Notifier.ANY]))
self.assertEqual(1, len(call_counts[states.SUCCESS]))
notifier.notify(states.FAILURE, {})
self.assertEqual(2, len(call_counts[nt.Notifier.ANY]))
self.assertEqual(1, len(call_counts[states.SUCCESS]))
self.assertEqual(2, len(call_counts))

View File

@@ -15,7 +15,6 @@
# under the License.
import collections
import functools
import inspect
import random
import threading
@@ -24,7 +23,6 @@ import time
import six
import testtools
from taskflow import states
from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.types import failure
@@ -192,88 +190,6 @@ class GetCallableNameTestExtended(test.TestCase):
self.assertEqual(expected_name, name)
class NotifierTest(test.TestCase):
def test_notify_called(self):
call_collector = []
def call_me(state, details):
call_collector.append((state, details))
notifier = misc.Notifier()
notifier.register(misc.Notifier.ANY, call_me)
notifier.notify(states.SUCCESS, {})
notifier.notify(states.SUCCESS, {})
self.assertEqual(2, len(call_collector))
self.assertEqual(1, len(notifier))
def test_notify_register_deregister(self):
def call_me(state, details):
pass
class A(object):
def call_me_too(self, state, details):
pass
notifier = misc.Notifier()
notifier.register(misc.Notifier.ANY, call_me)
a = A()
notifier.register(misc.Notifier.ANY, a.call_me_too)
self.assertEqual(2, len(notifier))
notifier.deregister(misc.Notifier.ANY, call_me)
notifier.deregister(misc.Notifier.ANY, a.call_me_too)
self.assertEqual(0, len(notifier))
def test_notify_reset(self):
def call_me(state, details):
pass
notifier = misc.Notifier()
notifier.register(misc.Notifier.ANY, call_me)
self.assertEqual(1, len(notifier))
notifier.reset()
self.assertEqual(0, len(notifier))
def test_bad_notify(self):
def call_me(state, details):
pass
notifier = misc.Notifier()
self.assertRaises(KeyError, notifier.register,
misc.Notifier.ANY, call_me,
kwargs={'details': 5})
def test_selective_notify(self):
call_counts = collections.defaultdict(list)
def call_me_on(registered_state, state, details):
call_counts[registered_state].append((state, details))
notifier = misc.Notifier()
notifier.register(states.SUCCESS,
functools.partial(call_me_on, states.SUCCESS))
notifier.register(misc.Notifier.ANY,
functools.partial(call_me_on,
misc.Notifier.ANY))
self.assertEqual(2, len(notifier))
notifier.notify(states.SUCCESS, {})
self.assertEqual(1, len(call_counts[misc.Notifier.ANY]))
self.assertEqual(1, len(call_counts[states.SUCCESS]))
notifier.notify(states.FAILURE, {})
self.assertEqual(2, len(call_counts[misc.Notifier.ANY]))
self.assertEqual(1, len(call_counts[states.SUCCESS]))
self.assertEqual(2, len(call_counts))
class GetCallableArgsTest(test.TestCase):
def test_mere_function(self):

122
taskflow/types/notifier.py Normal file
View File

@@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import logging
import six
from taskflow.utils import reflection
LOG = logging.getLogger(__name__)
class Notifier(object):
"""A notification helper class.
It is intended to be used to subscribe to notifications of events
occurring as well as allow a entity to post said notifications to any
associated subscribers without having either entity care about how this
notification occurs.
"""
#: Keys that can not be used in callbacks arguments
RESERVED_KEYS = ('details',)
#: Kleene star constant that is used to recieve all notifications
ANY = '*'
def __init__(self):
self._listeners = collections.defaultdict(list)
def __len__(self):
"""Returns how many callbacks are registered."""
count = 0
for (_event_type, callbacks) in six.iteritems(self._listeners):
count += len(callbacks)
return count
def is_registered(self, event_type, callback):
"""Check if a callback is registered."""
listeners = list(self._listeners.get(event_type, []))
for (cb, _args, _kwargs) in listeners:
if reflection.is_same_callback(cb, callback):
return True
return False
def reset(self):
"""Forget all previously registered callbacks."""
self._listeners.clear()
def notify(self, event_type, details):
"""Notify about event occurrence.
All callbacks registered to receive notifications about given
event type will be called.
:param event_type: event type that occurred
:param details: addition event details
"""
listeners = list(self._listeners.get(self.ANY, []))
for i in self._listeners[event_type]:
if i not in listeners:
listeners.append(i)
if not listeners:
return
for (callback, args, kwargs) in listeners:
if args is None:
args = []
if kwargs is None:
kwargs = {}
kwargs['details'] = details
try:
callback(event_type, *args, **kwargs)
except Exception:
LOG.warn("Failure calling callback %s to notify about event"
" %s, details: %s", callback, event_type,
details, exc_info=True)
def register(self, event_type, callback, args=None, kwargs=None):
"""Register a callback to be called when event of a given type occurs.
Callback will be called with provided ``args`` and ``kwargs`` and
when event type occurs (or on any event if ``event_type`` equals to
:attr:`.ANY`). It will also get additional keyword argument,
``details``, that will hold event details provided to the
:meth:`.notify` method.
"""
assert six.callable(callback), "Callback must be callable"
if self.is_registered(event_type, callback):
raise ValueError("Callback %s already registered" % (callback))
if kwargs:
for k in self.RESERVED_KEYS:
if k in kwargs:
raise KeyError(("Reserved key '%s' not allowed in "
"kwargs") % k)
kwargs = copy.copy(kwargs)
if args:
args = copy.copy(args)
self._listeners[event_type].append((callback, args, kwargs))
def deregister(self, event_type, callback):
"""Remove a single callback from listening to event ``event_type``."""
if event_type not in self._listeners:
return
for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]):
if reflection.is_same_callback(cb, callback):
self._listeners[event_type].pop(i)
break

View File

@@ -15,9 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import datetime
import errno
import inspect
@@ -37,6 +35,7 @@ from six.moves import range as compat_range
from six.moves.urllib import parse as urlparse
from taskflow.types import failure
from taskflow.types import notifier
from taskflow.utils import deprecation
from taskflow.utils import reflection
@@ -396,101 +395,8 @@ Failure = deprecation.moved_class(failure.Failure, 'Failure', __name__,
version="0.5", removal_version="?")
class Notifier(object):
"""A notification helper class.
It is intended to be used to subscribe to notifications of events
occurring as well as allow a entity to post said notifications to any
associated subscribers without having either entity care about how this
notification occurs.
"""
#: Keys that can not be used in callbacks arguments
RESERVED_KEYS = ('details',)
#: Kleene star constant that is used to recieve all notifications
ANY = '*'
def __init__(self):
self._listeners = collections.defaultdict(list)
def __len__(self):
"""Returns how many callbacks are registered."""
count = 0
for (_event_type, callbacks) in six.iteritems(self._listeners):
count += len(callbacks)
return count
def is_registered(self, event_type, callback):
"""Check if a callback is registered."""
listeners = list(self._listeners.get(event_type, []))
for (cb, _args, _kwargs) in listeners:
if reflection.is_same_callback(cb, callback):
return True
return False
def reset(self):
"""Forget all previously registered callbacks."""
self._listeners.clear()
def notify(self, event_type, details):
"""Notify about event occurrence.
All callbacks registered to receive notifications about given
event type will be called.
:param event_type: event type that occurred
:param details: addition event details
"""
listeners = list(self._listeners.get(self.ANY, []))
for i in self._listeners[event_type]:
if i not in listeners:
listeners.append(i)
if not listeners:
return
for (callback, args, kwargs) in listeners:
if args is None:
args = []
if kwargs is None:
kwargs = {}
kwargs['details'] = details
try:
callback(event_type, *args, **kwargs)
except Exception:
LOG.warn("Failure calling callback %s to notify about event"
" %s, details: %s", callback, event_type,
details, exc_info=True)
def register(self, event_type, callback, args=None, kwargs=None):
"""Register a callback to be called when event of a given type occurs.
Callback will be called with provided ``args`` and ``kwargs`` and
when event type occurs (or on any event if ``event_type`` equals to
:attr:`.ANY`). It will also get additional keyword argument,
``details``, that will hold event details provided to the
:meth:`.notify` method.
"""
assert six.callable(callback), "Callback must be callable"
if self.is_registered(event_type, callback):
raise ValueError("Callback %s already registered" % (callback))
if kwargs:
for k in self.RESERVED_KEYS:
if k in kwargs:
raise KeyError(("Reserved key '%s' not allowed in "
"kwargs") % k)
kwargs = copy.copy(kwargs)
if args:
args = copy.copy(args)
self._listeners[event_type].append((callback, args, kwargs))
def deregister(self, event_type, callback):
"""Remove a single callback from listening to event ``event_type``."""
if event_type not in self._listeners:
return
for i, (cb, args, kwargs) in enumerate(self._listeners[event_type]):
if reflection.is_same_callback(cb, callback):
self._listeners[event_type].pop(i)
break
Notifier = deprecation.moved_class(notifier.Notifier, 'Notifier', __name__,
version="0.5", removal_version="?")
@contextlib.contextmanager