Refactor notifications

I'm replacing the implementation of notifications with the blinker
library. In order to do this properly I wanted to have a more defined
interface and less coupling in the tests. Test should be unaware of the
implementation of the thing they are testing so that the implementation
can be changed later.

Change-Id: Icdcbf77944187bc9b2b390420862f700baa179c9
This commit is contained in:
David Stanek 2014-03-16 14:14:54 +00:00 committed by Morgan Fainberg
parent e13641f296
commit 135708488e
3 changed files with 26 additions and 24 deletions

View File

@ -28,6 +28,7 @@ from pycadf import resource
from keystone.openstack.common.gettextutils import _
from keystone.openstack.common import log
notifier_opts = [
cfg.StrOpt('default_publisher_id',
default=None,
@ -39,7 +40,7 @@ LOG = log.getLogger(__name__)
# a new action is supported.
ACTIONS = frozenset(['created', 'deleted', 'disabled', 'updated'])
# resource types that can be notified
SUBSCRIBERS = {}
_SUBSCRIBERS = {}
_notifier = None
@ -129,8 +130,8 @@ def register_event_callback(event, resource_type, callbacks):
msg = _('Method not callable: %s') % callback
LOG.error(msg)
raise TypeError(msg)
SUBSCRIBERS.setdefault(event, {}).setdefault(resource_type, set())
SUBSCRIBERS[event][resource_type].add(callback)
_SUBSCRIBERS.setdefault(event, {}).setdefault(resource_type, set())
_SUBSCRIBERS[event][resource_type].add(callback)
if LOG.logger.getEffectiveLevel() <= logging.INFO:
# Do this only if its going to appear in the logs.
@ -144,9 +145,9 @@ def register_event_callback(event, resource_type, callbacks):
def notify_event_callbacks(service, resource_type, operation, payload):
"""Sends a notification to registered extensions."""
if operation in SUBSCRIBERS:
if resource_type in SUBSCRIBERS[operation]:
for cb in SUBSCRIBERS[operation][resource_type]:
if operation in _SUBSCRIBERS:
if resource_type in _SUBSCRIBERS[operation]:
for cb in _SUBSCRIBERS[operation][resource_type]:
subst_dict = {'cb_name': cb.__name__,
'service': service,
'resource_type': resource_type,
@ -181,7 +182,11 @@ def _get_notifier():
return _notifier
def _reset_notifier():
def clear_subscribers():
_SUBSCRIBERS.clear()
def reset_notifier():
global _notifier
_notifier = None
@ -199,19 +204,19 @@ def _send_notification(operation, resource_type, resource_id, public=True):
if False, the event will only be sent via
notify_event_callbacks to in process listeners.
"""
context = {}
payload = {'resource_info': resource_id}
service = 'identity'
event_type = '%(service)s.%(resource_type)s.%(operation)s' % {
'service': service,
'resource_type': resource_type,
'operation': operation}
notify_event_callbacks(service, resource_type, operation, payload)
if public:
notifier = _get_notifier()
if notifier:
context = {}
event_type = '%(service)s.%(resource_type)s.%(operation)s' % {
'service': service,
'resource_type': resource_type,
'operation': operation}
try:
notifier.info(context, event_type, payload)
except Exception:

View File

@ -412,8 +412,8 @@ class TestCase(BaseTestCase):
self.addCleanup(kvs.INMEMDB.clear)
# Ensure Notification subscriotions and resource types are empty
self.addCleanup(notifications.SUBSCRIBERS.clear)
self.addCleanup(notifications._reset_notifier)
self.addCleanup(notifications.clear_subscribers)
self.addCleanup(notifications.reset_notifier)
# Reset the auth-plugin registry
self.addCleanup(self.clear_auth_plugin_registry)

View File

@ -363,7 +363,6 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
def setUp(self):
super(TestEventCallbacks, self).setUp()
notifications.SUBSCRIBERS = {}
self.has_been_called = False
def _project_deleted_callback(self, service, resource_type, operation,
@ -382,7 +381,6 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
def test_notification_method_not_callable(self):
fake_method = None
notifications.SUBSCRIBERS = {}
self.assertRaises(TypeError,
notifications.register_event_callback,
UPDATED_OPERATION,
@ -406,11 +404,10 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
notifications.register_event_callback(DELETED_OPERATION,
resource_type,
self._project_deleted_callback)
self.assertIn(DELETED_OPERATION, notifications.SUBSCRIBERS)
self.assertIn(resource_type,
notifications.SUBSCRIBERS[DELETED_OPERATION])
def test_provider_event_callbacks_subscription(self):
callback_called = []
@dependency.provider('foo_api')
class Foo:
def __init__(self):
@ -419,11 +416,13 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
def foo_callback(self, service, resource_type, operation,
payload):
pass
callback_called.append(True) # uses callback_called
# from the closure
notifications.SUBSCRIBERS = {}
Foo()
self.assertIn(CREATED_OPERATION, notifications.SUBSCRIBERS)
project_ref = self.new_project_ref(domain_id=self.domain_id)
self.assignment_api.create_project(project_ref['id'], project_ref)
self.assertEqual([True], callback_called)
def test_invalid_event_callbacks(self):
@dependency.provider('foo_api')
@ -431,7 +430,6 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
def __init__(self):
self.event_callbacks = 'bogus'
notifications.SUBSCRIBERS = {}
self.assertRaises(ValueError, Foo)
def test_invalid_event_callbacks_event(self):
@ -440,7 +438,6 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
def __init__(self):
self.event_callbacks = {CREATED_OPERATION: 'bogus'}
notifications.SUBSCRIBERS = {}
self.assertRaises(ValueError, Foo)