OpenStack library for messaging
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

540 lines
18 KiB

# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import logging
import sys
import uuid
import fixtures
import mock
from stevedore import dispatch
from stevedore import extension
import testscenarios
import yaml
from oslo import messaging
from oslo.messaging.notify import _impl_log
from oslo.messaging.notify import _impl_messaging
from oslo.messaging.notify import _impl_test
from oslo.messaging.notify import notifier as msg_notifier
from oslo.messaging.openstack.common import jsonutils
from oslo.messaging import serializer as msg_serializer
from oslo.utils import timeutils
from tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeTransport(object):
def __init__(self, conf):
self.conf = conf
def _send_notification(self, target, ctxt, message, version, retry=None):
pass
class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
"""Record logged exceptions and re-raise in cleanup.
The notifier just logs notification send errors so, for the sake of
debugging test failures, we record any exceptions logged and re-raise them
during cleanup.
"""
class FakeLogger(object):
def __init__(self):
self.exceptions = []
def exception(self, msg, *args, **kwargs):
self.exceptions.append(sys.exc_info()[1])
def setUp(self):
super(_ReRaiseLoggedExceptionsFixture, self).setUp()
self.logger = self.FakeLogger()
def reraise_exceptions():
for ex in self.logger.exceptions:
raise ex
self.addCleanup(reraise_exceptions)
class TestMessagingNotifier(test_utils.BaseTestCase):
_v1 = [
('v1', dict(v1=True)),
('not_v1', dict(v1=False)),
]
_v2 = [
('v2', dict(v2=True)),
('not_v2', dict(v2=False)),
]
_publisher_id = [
('ctor_pub_id', dict(ctor_pub_id='test',
expected_pub_id='test')),
('prep_pub_id', dict(prep_pub_id='test.localhost',
expected_pub_id='test.localhost')),
('override', dict(ctor_pub_id='test',
prep_pub_id='test.localhost',
expected_pub_id='test.localhost')),
]
_topics = [
('no_topics', dict(topics=[])),
('single_topic', dict(topics=['notifications'])),
('multiple_topic2', dict(topics=['foo', 'bar'])),
]
_priority = [
('audit', dict(priority='audit')),
('debug', dict(priority='debug')),
('info', dict(priority='info')),
('warn', dict(priority='warn')),
('error', dict(priority='error')),
('sample', dict(priority='sample')),
('critical', dict(priority='critical')),
]
_payload = [
('payload', dict(payload={'foo': 'bar'})),
]
_context = [
('ctxt', dict(ctxt={'user': 'bob'})),
]
_retry = [
('unconfigured', dict()),
('None', dict(retry=None)),
('0', dict(retry=0)),
('5', dict(retry=5)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
cls._v2,
cls._publisher_id,
cls._topics,
cls._priority,
cls._payload,
cls._context,
cls._retry)
def setUp(self):
super(TestMessagingNotifier, self).setUp()
self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger
self.stubs.Set(_impl_messaging, 'LOG', self.logger)
self.stubs.Set(msg_notifier, '_LOG', self.logger)
@mock.patch('oslo.utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
drivers = []
if self.v1:
drivers.append('messaging')
if self.v2:
drivers.append('messagingv2')
self.config(notification_driver=drivers,
notification_topics=self.topics)
transport = _FakeTransport(self.conf)
if hasattr(self, 'ctor_pub_id'):
notifier = messaging.Notifier(transport,
publisher_id=self.ctor_pub_id)
else:
notifier = messaging.Notifier(transport)
prepare_kwds = {}
if hasattr(self, 'retry'):
prepare_kwds['retry'] = self.retry
if hasattr(self, 'prep_pub_id'):
prepare_kwds['publisher_id'] = self.prep_pub_id
if prepare_kwds:
notifier = notifier.prepare(**prepare_kwds)
self.mox.StubOutWithMock(transport, '_send_notification')
message_id = uuid.uuid4()
self.mox.StubOutWithMock(uuid, 'uuid4')
uuid.uuid4().AndReturn(message_id)
mock_utcnow.return_value = datetime.datetime.utcnow()
message = {
'message_id': str(message_id),
'publisher_id': self.expected_pub_id,
'event_type': 'test.notify',
'priority': self.priority.upper(),
'payload': self.payload,
'timestamp': str(timeutils.utcnow()),
}
sends = []
if self.v1:
sends.append(dict(version=1.0))
if self.v2:
sends.append(dict(version=2.0))
for send_kwargs in sends:
for topic in self.topics:
if hasattr(self, 'retry'):
send_kwargs['retry'] = self.retry
else:
send_kwargs['retry'] = None
target = messaging.Target(topic='%s.%s' % (topic,
self.priority))
transport._send_notification(target, self.ctxt, message,
**send_kwargs).InAnyOrder()
self.mox.ReplayAll()
method = getattr(notifier, self.priority)
method(self.ctxt, 'test.notify', self.payload)
TestMessagingNotifier.generate_scenarios()
class TestSerializer(test_utils.BaseTestCase):
def setUp(self):
super(TestSerializer, self).setUp()
self.addCleanup(_impl_test.reset)
@mock.patch('oslo.utils.timeutils.utcnow')
def test_serializer(self, mock_utcnow):
transport = _FakeTransport(self.conf)
serializer = msg_serializer.NoOpSerializer()
notifier = messaging.Notifier(transport,
'test.localhost',
driver='test',
topic='test',
serializer=serializer)
message_id = uuid.uuid4()
self.mox.StubOutWithMock(uuid, 'uuid4')
uuid.uuid4().AndReturn(message_id)
mock_utcnow.return_value = datetime.datetime.utcnow()
self.mox.StubOutWithMock(serializer, 'serialize_context')
self.mox.StubOutWithMock(serializer, 'serialize_entity')
serializer.serialize_context(dict(user='bob')).\
AndReturn(dict(user='alice'))
serializer.serialize_entity(dict(user='bob'), 'bar').AndReturn('sbar')
self.mox.ReplayAll()
notifier.info(dict(user='bob'), 'test.notify', 'bar')
message = {
'message_id': str(message_id),
'publisher_id': 'test.localhost',
'event_type': 'test.notify',
'priority': 'INFO',
'payload': 'sbar',
'timestamp': str(timeutils.utcnow()),
}
self.assertEqual([(dict(user='alice'), message, 'INFO', None)],
_impl_test.NOTIFICATIONS)
class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo.utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
self.config(notification_driver=['log'])
transport = _FakeTransport(self.conf)
notifier = messaging.Notifier(transport, 'test.localhost')
message_id = uuid.uuid4()
self.mox.StubOutWithMock(uuid, 'uuid4')
uuid.uuid4().AndReturn(message_id)
mock_utcnow.return_value = datetime.datetime.utcnow()
message = {
'message_id': str(message_id),
'publisher_id': 'test.localhost',
'event_type': 'test.notify',
'priority': 'INFO',
'payload': 'bar',
'timestamp': str(timeutils.utcnow()),
}
logger = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(logging, 'getLogger')
logging.getLogger('oslo.messaging.notification.test.notify').\
AndReturn(logger)
logger.info(jsonutils.dumps(message))
self.mox.ReplayAll()
notifier.info({}, 'test.notify', 'bar')
def test_sample_priority(self):
# Ensure logger drops sample-level notifications.
driver = _impl_log.LogDriver(None, None, None)
logger = self.mox.CreateMock(
logging.getLogger('oslo.messaging.notification.foo'))
logger.sample = None
self.mox.StubOutWithMock(logging, 'getLogger')
logging.getLogger('oslo.messaging.notification.foo').\
AndReturn(logger)
self.mox.ReplayAll()
msg = {'event_type': 'foo'}
driver.notify(None, msg, "sample", None)
class TestRoutingNotifier(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingNotifier, self).setUp()
self.config(notification_driver=['routing'])
transport = _FakeTransport(self.conf)
self.notifier = messaging.Notifier(transport)
self.router = self.notifier._driver_mgr['routing'].obj
def _fake_extension_manager(self, ext):
return extension.ExtensionManager.make_test_instance(
[extension.Extension('test', None, None, ext), ])
def _empty_extension_manager(self):
return extension.ExtensionManager.make_test_instance([])
def test_should_load_plugin(self):
self.router.used_drivers = set(["zoo", "blah"])
ext = mock.MagicMock()
ext.name = "foo"
self.assertFalse(self.router._should_load_plugin(ext))
ext.name = "zoo"
self.assertTrue(self.router._should_load_plugin(ext))
def test_load_notifiers_no_config(self):
# default routing_notifier_config=""
self.router._load_notifiers()
self.assertEqual({}, self.router.routing_groups)
self.assertEqual(0, len(self.router.used_drivers))
def test_load_notifiers_no_extensions(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r""
config_file = mock.MagicMock()
config_file.return_value = routing_config
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=self._empty_extension_manager()):
with mock.patch('oslo.messaging.notify.'
'_impl_routing.LOG') as mylog:
self.router._load_notifiers()
self.assertFalse(mylog.debug.called)
self.assertEqual({}, self.router.routing_groups)
def test_load_notifiers_config(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r"""
group_1:
rpc : foo
group_2:
rpc : blah
"""
config_file = mock.MagicMock()
config_file.return_value = routing_config
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=self._fake_extension_manager(
mock.MagicMock())):
self.router._load_notifiers()
groups = list(self.router.routing_groups.keys())
groups.sort()
self.assertEqual(['group_1', 'group_2'], groups)
def test_get_drivers_for_message_accepted_events(self):
config = r"""
group_1:
rpc:
accepted_events:
- foo.*
- blah.zoo.*
- zip
"""
groups = yaml.load(config)
group = groups['group_1']
# No matching event ...
self.assertEqual([],
self.router._get_drivers_for_message(
group, "unknown", "info"))
# Child of foo ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, "foo.1", "info"))
# Foo itself ...
self.assertEqual([],
self.router._get_drivers_for_message(
group, "foo", "info"))
# Child of blah.zoo
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, "blah.zoo.zing", "info"))
def test_get_drivers_for_message_accepted_priorities(self):
config = r"""
group_1:
rpc:
accepted_priorities:
- info
- error
"""
groups = yaml.load(config)
group = groups['group_1']
# No matching priority
self.assertEqual([],
self.router._get_drivers_for_message(
group, None, "unknown"))
# Info ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, None, "info"))
# Error (to make sure the list is getting processed) ...
self.assertEqual(['rpc'],
self.router._get_drivers_for_message(
group, None, "error"))
def test_get_drivers_for_message_both(self):
config = r"""
group_1:
rpc:
accepted_priorities:
- info
accepted_events:
- foo.*
driver_1:
accepted_priorities:
- info
driver_2:
accepted_events:
- foo.*
"""
groups = yaml.load(config)
group = groups['group_1']
# Valid event, but no matching priority
self.assertEqual(['driver_2'],
self.router._get_drivers_for_message(
group, 'foo.blah', "unknown"))
# Valid priority, but no matching event
self.assertEqual(['driver_1'],
self.router._get_drivers_for_message(
group, 'unknown', "info"))
# Happy day ...
x = self.router._get_drivers_for_message(group, 'foo.blah', "info")
x.sort()
self.assertEqual(['driver_1', 'driver_2', 'rpc'], x)
def test_filter_func(self):
ext = mock.MagicMock()
ext.name = "rpc"
# Good ...
self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
None, ['foo', 'rpc']))
# Bad
self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
None, ['foo']))
def test_notify(self):
self.router.routing_groups = {'group_1': None, 'group_2': None}
drivers_mock = mock.MagicMock()
drivers_mock.side_effect = [['rpc'], ['foo']]
with mock.patch.object(self.router, 'plugin_manager') as pm:
with mock.patch.object(self.router, '_get_drivers_for_message',
drivers_mock):
self.notifier.info({}, 'my_event', {})
self.assertEqual(sorted(['rpc', 'foo']),
sorted(pm.map.call_args[0][6]))
def test_notify_filtered(self):
self.config(routing_notifier_config="routing_notifier.yaml")
routing_config = r"""
group_1:
rpc:
accepted_events:
- my_event
rpc2:
accepted_priorities:
- info
bar:
accepted_events:
- nothing
"""
config_file = mock.MagicMock()
config_file.return_value = routing_config
rpc_driver = mock.Mock()
rpc2_driver = mock.Mock()
bar_driver = mock.Mock()
pm = dispatch.DispatchExtensionManager.make_test_instance(
[extension.Extension('rpc', None, None, rpc_driver),
extension.Extension('rpc2', None, None, rpc2_driver),
extension.Extension('bar', None, None, bar_driver)],
)
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=pm):
self.notifier.info({}, 'my_event', {})
self.assertFalse(bar_driver.info.called)
rpc_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO', None)
rpc2_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO', None)