Merge "Fix the notify method of the routing notifier"
This commit is contained in:
commit
ebf3a2e765
@ -104,33 +104,31 @@ class RoutingDriver(notifier._Driver):
|
||||
|
||||
return list(accepted_drivers)
|
||||
|
||||
def _filter_func(self, ext, context, message, accepted_drivers):
|
||||
def _filter_func(self, ext, context, message, priority, accepted_drivers):
|
||||
"""True/False if the driver should be called for this message.
|
||||
"""
|
||||
# context is unused here, but passed in by map()
|
||||
return ext.name in accepted_drivers
|
||||
|
||||
def _call_notify(self, ext, context, message, accepted_drivers):
|
||||
def _call_notify(self, ext, context, message, priority, accepted_drivers):
|
||||
"""Emit the notification.
|
||||
"""
|
||||
# accepted_drivers is passed in as a result of the map() function
|
||||
LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") %
|
||||
{'event': message.get('event_type'), 'driver': ext.name})
|
||||
ext.obj.notify(context, message)
|
||||
ext.obj.notify(context, message, priority)
|
||||
|
||||
def notify(self, context, message):
|
||||
def notify(self, context, message, priority):
|
||||
if not self.plugin_manager:
|
||||
self._load_notifiers()
|
||||
|
||||
# Fail if these aren't present ...
|
||||
event_type = message['event_type']
|
||||
priority = message['priority'].lower()
|
||||
|
||||
accepted_drivers = set()
|
||||
for group in self.routing_groups.values():
|
||||
accepted_drivers.update(self._get_drivers_for_message(group,
|
||||
event_type,
|
||||
priority))
|
||||
|
||||
accepted_drivers.update(
|
||||
self._get_drivers_for_message(group, event_type,
|
||||
priority.lower()))
|
||||
self.plugin_manager.map(self._filter_func, self._call_notify, context,
|
||||
message, list(accepted_drivers))
|
||||
message, priority, list(accepted_drivers))
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import datetime
|
||||
import logging
|
||||
import sys
|
||||
@ -20,6 +21,7 @@ import uuid
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
from stevedore import dispatch
|
||||
from stevedore import extension
|
||||
import testscenarios
|
||||
import yaml
|
||||
@ -27,7 +29,6 @@ import yaml
|
||||
from oslo import messaging
|
||||
from oslo.messaging.notify import _impl_log
|
||||
from oslo.messaging.notify import _impl_messaging
|
||||
from oslo.messaging.notify import _impl_routing as routing
|
||||
from oslo.messaging.notify import _impl_test
|
||||
from oslo.messaging.notify import notifier as msg_notifier
|
||||
from oslo.messaging.openstack.common import jsonutils
|
||||
@ -304,7 +305,11 @@ class TestLogNotifier(test_utils.BaseTestCase):
|
||||
class TestRoutingNotifier(test_utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestRoutingNotifier, self).setUp()
|
||||
self.router = routing.RoutingDriver(None, None, None)
|
||||
self.config(notification_driver=['routing'])
|
||||
|
||||
transport = _FakeTransport(self.conf)
|
||||
self.notifier = messaging.Notifier(transport)
|
||||
self.router = self.notifier._driver_mgr['routing'].obj
|
||||
|
||||
def _fake_extension_manager(self, ext):
|
||||
return extension.ExtensionManager.make_test_instance(
|
||||
@ -380,22 +385,22 @@ group_1:
|
||||
# No matching event ...
|
||||
self.assertEqual([],
|
||||
self.router._get_drivers_for_message(
|
||||
group, "unknown", None))
|
||||
group, "unknown", "info"))
|
||||
|
||||
# Child of foo ...
|
||||
self.assertEqual(['rpc'],
|
||||
self.router._get_drivers_for_message(
|
||||
group, "foo.1", None))
|
||||
group, "foo.1", "info"))
|
||||
|
||||
# Foo itself ...
|
||||
self.assertEqual([],
|
||||
self.router._get_drivers_for_message(
|
||||
group, "foo", None))
|
||||
group, "foo", "info"))
|
||||
|
||||
# Child of blah.zoo
|
||||
self.assertEqual(['rpc'],
|
||||
self.router._get_drivers_for_message(
|
||||
group, "blah.zoo.zing", None))
|
||||
group, "blah.zoo.zing", "info"))
|
||||
|
||||
def test_get_drivers_for_message_accepted_priorities(self):
|
||||
config = r"""
|
||||
@ -461,21 +466,58 @@ group_1:
|
||||
ext.name = "rpc"
|
||||
|
||||
# Good ...
|
||||
self.assertTrue(self.router._filter_func(ext, {}, {},
|
||||
self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
|
||||
['foo', 'rpc']))
|
||||
|
||||
# Bad
|
||||
self.assertFalse(self.router._filter_func(ext, {}, {}, ['foo']))
|
||||
self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
|
||||
['foo']))
|
||||
|
||||
def test_notify(self):
|
||||
self.router.routing_groups = {'group_1': None, 'group_2': None}
|
||||
message = {'event_type': 'my_event', 'priority': 'my_priority'}
|
||||
|
||||
drivers_mock = mock.MagicMock()
|
||||
drivers_mock.side_effect = [['rpc'], ['foo']]
|
||||
|
||||
with mock.patch.object(self.router, 'plugin_manager') as pm:
|
||||
with mock.patch.object(self.router, '_get_drivers_for_message',
|
||||
drivers_mock):
|
||||
self.router.notify({}, message)
|
||||
self.assertEqual(['rpc', 'foo'], pm.map.call_args[0][4])
|
||||
self.notifier.info({}, 'my_event', {})
|
||||
self.assertEqual(['rpc', 'foo'], pm.map.call_args[0][5])
|
||||
|
||||
def test_notify_filtered(self):
|
||||
self.config(routing_notifier_config="routing_notifier.yaml")
|
||||
routing_config = r"""
|
||||
group_1:
|
||||
rpc:
|
||||
accepted_events:
|
||||
- my_event
|
||||
rpc2:
|
||||
accepted_priorities:
|
||||
- info
|
||||
bar:
|
||||
accepted_events:
|
||||
- nothing
|
||||
"""
|
||||
config_file = mock.MagicMock()
|
||||
config_file.return_value = routing_config
|
||||
|
||||
rpc_driver = mock.Mock()
|
||||
rpc2_driver = mock.Mock()
|
||||
bar_driver = mock.Mock()
|
||||
|
||||
pm = dispatch.DispatchExtensionManager.make_test_instance(
|
||||
[extension.Extension('rpc', None, None, rpc_driver),
|
||||
extension.Extension('rpc2', None, None, rpc2_driver),
|
||||
extension.Extension('bar', None, None, bar_driver)],
|
||||
)
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.router, '_get_notifier_config_file',
|
||||
config_file),
|
||||
mock.patch('stevedore.dispatch.DispatchExtensionManager',
|
||||
return_value=pm)):
|
||||
self.notifier.info({}, 'my_event', {})
|
||||
self.assertFalse(bar_driver.info.called)
|
||||
rpc_driver.notify.assert_called_once_with({}, mock.ANY, 'INFO')
|
||||
rpc2_driver.notify.assert_called_once_with(
|
||||
{}, mock.ANY, 'INFO')
|
||||
|
Loading…
x
Reference in New Issue
Block a user