Merge "Replace notifier with oslo.messaging"
This commit is contained in:
commit
5cf7dd1344
|
@ -119,26 +119,36 @@
|
|||
|
||||
# notification_driver can be defined multiple times
|
||||
# Do nothing driver (the default)
|
||||
# notification_driver = keystone.openstack.common.notifier.no_op_notifier
|
||||
# notification_driver = noop
|
||||
# Logging driver example (not enabled by default)
|
||||
# notification_driver = keystone.openstack.common.notifier.log_notifier
|
||||
# notification_driver = log
|
||||
# RPC driver example (not enabled by default)
|
||||
# notification_driver = keystone.openstack.common.notifier.rpc_notifier
|
||||
# notification_driver = messaging
|
||||
|
||||
# Default publisher_id for outgoing notifications; included in the payload.
|
||||
# default_publisher_id =
|
||||
|
||||
# AMQP topics to publish to when using the RPC notification driver.
|
||||
# Multiple values can be specified by separating with commas.
|
||||
# AMQP topics to publish to when using the RPC notification driver. Multiple
|
||||
# values can be specified by separating with commas.
|
||||
# notification_topics = notifications
|
||||
|
||||
# A URL representing the messaging driver to use and its full configuration. If
|
||||
# not set, we fall back to the RPC option and driver specific configuration.
|
||||
# transport_url =
|
||||
|
||||
# The default exchange under which topics are scoped. May be overridden by an
|
||||
# exchange name specified in the transport_url option.
|
||||
# control_exchange = openstack
|
||||
|
||||
|
||||
# === RPC Options ===
|
||||
|
||||
# For Keystone, these options apply only when the RPC notification driver is
|
||||
# used.
|
||||
|
||||
# The messaging module to use, defaults to kombu.
|
||||
# rpc_backend = keystone.openstack.common.rpc.impl_kombu
|
||||
# The messaging driver to use, defaults to rabbit (kombu). Other
|
||||
# drivers include qpid and zmq.
|
||||
# rpc_backend = rabbit
|
||||
|
||||
# Size of RPC thread pool
|
||||
# rpc_thread_pool_size = 64
|
||||
|
|
|
@ -15,10 +15,18 @@
|
|||
"""Notifications module for OpenStack Identity Service resources"""
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
|
||||
from keystone.openstack.common import log
|
||||
from keystone.openstack.common.notifier import api as notifier_api
|
||||
|
||||
notifier_opts = [
|
||||
cfg.StrOpt('default_publisher_id',
|
||||
default=None,
|
||||
help='Default publisher_id for outgoing notifications'),
|
||||
]
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
# NOTE(gyee): actions that can be notified. One must update this list whenever
|
||||
|
@ -26,6 +34,11 @@ LOG = log.getLogger(__name__)
|
|||
ACTIONS = frozenset(['created', 'deleted', 'disabled', 'updated'])
|
||||
# resource types that can be notified
|
||||
SUBSCRIBERS = {}
|
||||
_notifier = None
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(notifier_opts)
|
||||
|
||||
|
||||
class ManagerNotificationWrapper(object):
|
||||
|
@ -57,7 +70,7 @@ class ManagerNotificationWrapper(object):
|
|||
raise
|
||||
else:
|
||||
resource_id = args[self.resource_id_arg_index]
|
||||
send_notification(
|
||||
_send_notification(
|
||||
self.operation,
|
||||
self.resource_type,
|
||||
resource_id,
|
||||
|
@ -141,8 +154,34 @@ def notify_event_callbacks(service, resource_type, operation, payload):
|
|||
cb(service, resource_type, operation, payload)
|
||||
|
||||
|
||||
def send_notification(operation, resource_type, resource_id,
|
||||
public=True):
|
||||
def _get_notifier():
|
||||
"""Return a notifier object.
|
||||
|
||||
If _notifier is None it means that a notifier object has not been set.
|
||||
If _notifier is False it means that a notifier has previously failed to
|
||||
construct.
|
||||
Otherwise it is a constructed Notifier object.
|
||||
"""
|
||||
global _notifier
|
||||
|
||||
if _notifier is None:
|
||||
host = CONF.default_publisher_id or socket.gethostname()
|
||||
try:
|
||||
transport = messaging.get_transport(CONF)
|
||||
_notifier = messaging.Notifier(transport, "identity.%s" % host)
|
||||
except Exception:
|
||||
LOG.exception("Failed to construct notifier")
|
||||
_notifier = False
|
||||
|
||||
return _notifier
|
||||
|
||||
|
||||
def _reset_notifier():
|
||||
global _notifier
|
||||
_notifier = None
|
||||
|
||||
|
||||
def _send_notification(operation, resource_type, resource_id, public=True):
|
||||
"""Send notification to inform observers about the affected resource.
|
||||
|
||||
This method doesn't raise an exception when sending the notification fails.
|
||||
|
@ -158,7 +197,6 @@ def send_notification(operation, resource_type, resource_id,
|
|||
context = {}
|
||||
payload = {'resource_info': resource_id}
|
||||
service = 'identity'
|
||||
publisher_id = notifier_api.publisher_id(service)
|
||||
event_type = '%(service)s.%(resource_type)s.%(operation)s' % {
|
||||
'service': service,
|
||||
'resource_type': resource_type,
|
||||
|
@ -167,10 +205,11 @@ def send_notification(operation, resource_type, resource_id,
|
|||
notify_event_callbacks(service, resource_type, operation, payload)
|
||||
|
||||
if public:
|
||||
try:
|
||||
notifier_api.notify(
|
||||
context, publisher_id, event_type, notifier_api.INFO, payload)
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
_('Failed to send %(res_id)s %(event_type)s notification'),
|
||||
{'res_id': resource_id, 'event_type': event_type})
|
||||
notifier = _get_notifier()
|
||||
if notifier:
|
||||
try:
|
||||
notifier.info(context, event_type, payload)
|
||||
except Exception:
|
||||
LOG.exception(_(
|
||||
'Failed to send %(res_id)s %(event_type)s notification'),
|
||||
{'res_id': resource_id, 'event_type': event_type})
|
||||
|
|
|
@ -347,6 +347,7 @@ class TestCase(testtools.TestCase):
|
|||
|
||||
# Ensure Notification subscriotions and resource types are empty
|
||||
self.addCleanup(notifications.SUBSCRIBERS.clear)
|
||||
self.addCleanup(notifications._reset_notifier)
|
||||
|
||||
# Reset the auth-plugin registry
|
||||
self.addCleanup(self.clear_auth_plugin_registry)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
|
||||
from keystone.common import dependency
|
||||
from keystone import notifications
|
||||
|
@ -23,6 +24,8 @@ from keystone import tests
|
|||
from keystone.tests import test_v3
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
EXP_RESOURCE_TYPE = uuid.uuid4().hex
|
||||
|
||||
|
||||
|
@ -48,7 +51,7 @@ class NotificationsWrapperTestCase(tests.TestCase):
|
|||
fixture = self.useFixture(moxstubout.MoxStubout())
|
||||
self.stubs = fixture.stubs
|
||||
|
||||
self.stubs.Set(notifications, 'send_notification', fake_notify)
|
||||
self.stubs.Set(notifications, '_send_notification', fake_notify)
|
||||
|
||||
@notifications.created(EXP_RESOURCE_TYPE)
|
||||
def create_resource(self, resource_id, data):
|
||||
|
@ -123,11 +126,14 @@ class NotificationsTestCase(tests.TestCase):
|
|||
fixture = self.useFixture(moxstubout.MoxStubout())
|
||||
self.stubs = fixture.stubs
|
||||
|
||||
# these should use self.opt(), but they haven't been registered yet
|
||||
CONF.rpc_backend = 'fake'
|
||||
CONF.notification_driver = ['fake']
|
||||
|
||||
def test_send_notification(self):
|
||||
"""Test the private method _send_notification to ensure event_type,
|
||||
payload, and context are built and passed properly.
|
||||
"""
|
||||
|
||||
resource = uuid.uuid4().hex
|
||||
resource_type = EXP_RESOURCE_TYPE
|
||||
operation = 'created'
|
||||
|
@ -141,18 +147,19 @@ class NotificationsTestCase(tests.TestCase):
|
|||
# ensures and maintains these conditions.
|
||||
expected_args = [
|
||||
{}, # empty context
|
||||
mock.ANY, # publisher
|
||||
'identity.%s.created' % resource_type, # event_type
|
||||
{'resource_info': resource}, # payload
|
||||
'INFO', # priority is always INFO...
|
||||
{'resource_info': resource} # payload
|
||||
]
|
||||
|
||||
mod_path = 'keystone.notifications.notifier_api.notify'
|
||||
with mock.patch(mod_path) as mocked:
|
||||
notifications.send_notification(operation, resource_type,
|
||||
resource)
|
||||
with mock.patch.object(notifications._get_notifier(),
|
||||
'_notify') as mocked:
|
||||
notifications._send_notification(operation, resource_type,
|
||||
resource)
|
||||
mocked.assert_called_once_with(*expected_args)
|
||||
|
||||
notifications._send_notification(operation, resource_type, resource)
|
||||
|
||||
|
||||
class NotificationsForEntities(test_v3.RestfulTestCase):
|
||||
def setUp(self):
|
||||
|
@ -172,7 +179,7 @@ class NotificationsForEntities(test_v3.RestfulTestCase):
|
|||
fixture = self.useFixture(moxstubout.MoxStubout())
|
||||
self.stubs = fixture.stubs
|
||||
|
||||
self.stubs.Set(notifications, 'send_notification', fake_notify)
|
||||
self.stubs.Set(notifications, '_send_notification', fake_notify)
|
||||
|
||||
def _assertNotifySeen(self, resource_id, operation, resource_type):
|
||||
self.assertIn(operation, self.exp_operations)
|
||||
|
|
|
@ -16,6 +16,7 @@ lxml>=2.3
|
|||
iso8601>=0.1.8
|
||||
python-keystoneclient>=0.5.0
|
||||
oslo.config>=1.2.0
|
||||
oslo.messaging>=1.3.0a4
|
||||
Babel>=1.3
|
||||
oauthlib>=0.6
|
||||
dogpile.cache>=0.5.0
|
||||
|
|
Loading…
Reference in New Issue