diff --git a/oslo_messaging/notify/logger.py b/oslo_messaging/notify/logger.py index 94fae02cd..94fefa5cb 100644 --- a/oslo_messaging/notify/logger.py +++ b/oslo_messaging/notify/logger.py @@ -64,7 +64,7 @@ class LoggingNotificationHandler(logging.Handler): return method( - {}, + None, 'logrecord', { 'name': record.name, diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py index 1407e9fb0..fcf331076 100644 --- a/oslo_messaging/notify/notifier.py +++ b/oslo_messaging/notify/notifier.py @@ -172,48 +172,21 @@ def get_notification_transport(conf, url=None, allowed_remote_exmods=None): def _sanitize_context(ctxt): - # NOTE(JayF): The below values are in the same order they are in - # oslo_context.context.RequestContext.__init__() - safe_keys = ( - 'user_id', - 'project_id', - 'domain_id', - 'user_domain_id', - 'project_domain_id', - # NOTE(JayF): Without is_admin; heat will make a roundtrip to policy - # to try to set it to a sane value when instantiating the - # replacement context. Instead, just pass it on. - 'is_admin', - 'request_id', - 'roles', - 'user_name', - 'project_name', - 'domain_name', - 'user_domain_name', - 'project_domain_name', - 'service_user_id', - 'service_user_domain_id', - 'service_user_domain_name', - 'service_project_id', - 'service_project_name', - 'service_project_domain_id', - 'service_project_domain_name', - 'service_roles', - 'global_request_id', - 'system_scope', - # NOTE(JayF) These have been renamed but may show up in notifications - 'user', - 'domain', - 'user_domain', - 'project_domain', - ) - ctxt_dict = ctxt if isinstance(ctxt, dict) else ctxt.to_dict() - safe_dict = {k: v for k, v in ctxt_dict.items() - if k in safe_keys} - if ctxt_dict is ctxt: - return safe_dict - else: - return ctxt.__class__.from_dict(safe_dict) + if ctxt is None or type(ctxt) is dict: + # NOTE(JayF): Logging drivers, unit tests, and some code calls + # notifier with an emptydict or None instead of an + # actual context. In these cases, discard the passed + # value. + return {} + + try: + return ctxt.redacted_copy() + except AttributeError: + # NOTE(JayF): We'd rather send a notification without any context + # than missing sending the notification altogether. + _LOG.warning("Unable to properly redact context for " + "notification, omitting context from notification.") + return {} class Notifier(object): diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index 60bbf042d..41616a8e0 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -189,13 +189,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): batch=(5, 1)) notifier = self._setup_notifier(transport) + cxt = test_utils.TestContext() for _ in range(12): - notifier.info({}, 'an_event.start', 'test message') + notifier.info(cxt, 'an_event.start', 'test message') self.wait_for_messages(3) self.assertFalse(listener_thread.stop()) - messages = [dict(ctxt={}, + messages = [dict(ctxt=cxt, publisher_id='testpublisher', event_type='an_event.start', payload='test message', @@ -216,13 +217,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): batch=(5, None)) notifier = self._setup_notifier(transport) + ctxt = test_utils.TestContext() for _ in range(10): - notifier.info({}, 'an_event.start', 'test message') + notifier.info(ctxt, 'an_event.start', 'test message') self.wait_for_messages(2) self.assertFalse(listener_thread.stop()) - messages = [dict(ctxt={}, + messages = [dict(ctxt=ctxt, publisher_id='testpublisher', event_type='an_event.start', payload='test message', @@ -242,13 +244,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): batch=(5, None)) notifier = self._setup_notifier(transport) + ctxt = test_utils.TestContext() for _ in range(10): - notifier.info({}, 'an_event.start', 'test message') + notifier.info(ctxt, 'an_event.start', 'test message') self.wait_for_messages(2) self.assertFalse(listener_thread.stop()) - messages = [dict(ctxt={}, + messages = [dict(ctxt=ctxt, publisher_id='testpublisher', event_type='an_event.start', payload='test message', @@ -266,13 +269,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) - notifier.info({}, 'an_event.start', 'test message') + cxt = test_utils.TestContext() + notifier.info(cxt, 'an_event.start', 'test message') self.wait_for_messages(1) self.assertFalse(listener_thread.stop()) endpoint.info.assert_called_once_with( - {}, 'testpublisher', 'an_event.start', 'test message', + cxt, 'testpublisher', 'an_event.start', 'test message', {'message_id': mock.ANY, 'timestamp': mock.ANY}) def test_two_topics(self): @@ -286,18 +290,20 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): listener_thread = self._setup_listener(transport, [endpoint], targets=targets) notifier = self._setup_notifier(transport, topics=['topic1']) - notifier.info({'user_name': 'bob'}, 'an_event.start1', 'test') + cxt1 = test_utils.TestContext(user_name='bob') + notifier.info(cxt1, 'an_event.start1', 'test') notifier = self._setup_notifier(transport, topics=['topic2']) - notifier.info({'user_name': 'bob2'}, 'an_event.start2', 'test') + cxt2 = test_utils.TestContext(user_name='bob2') + notifier.info(cxt2, 'an_event.start2', 'test') self.wait_for_messages(2) self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ - mock.call({'user_name': 'bob'}, 'testpublisher', + mock.call(cxt1, 'testpublisher', 'an_event.start1', 'test', {'timestamp': mock.ANY, 'message_id': mock.ANY}), - mock.call({'user_name': 'bob2'}, 'testpublisher', + mock.call(cxt2, 'testpublisher', 'an_event.start2', 'test', {'timestamp': mock.ANY, 'message_id': mock.ANY})], any_order=True) @@ -326,23 +332,23 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): transport._send_notification = mock.MagicMock( side_effect=side_effect) - notifier.info({'user_name': 'bob0'}, + notifier.info(test_utils.TestContext(user_name='bob0'), 'an_event.start', 'test message default exchange') mock_notifier_exchange('exchange1') - notifier.info({'user_name': 'bob1'}, - 'an_event.start', 'test message exchange1') + ctxt1 = test_utils.TestContext(user_name='bob1') + notifier.info(ctxt1, 'an_event.start', 'test message exchange1') mock_notifier_exchange('exchange2') - notifier.info({'user_name': 'bob2'}, - 'an_event.start', 'test message exchange2') + ctxt2 = test_utils.TestContext(user_name='bob2') + notifier.info(ctxt2, 'an_event.start', 'test message exchange2') self.wait_for_messages(2) self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ - mock.call({'user_name': 'bob1'}, 'testpublisher', 'an_event.start', + mock.call(ctxt1, 'testpublisher', 'an_event.start', 'test message exchange1', {'timestamp': mock.ANY, 'message_id': mock.ANY}), - mock.call({'user_name': 'bob2'}, 'testpublisher', 'an_event.start', + mock.call(ctxt2, 'testpublisher', 'an_event.start', 'test message exchange2', {'timestamp': mock.ANY, 'message_id': mock.ANY})], any_order=True) @@ -358,18 +364,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): listener_thread = self._setup_listener(transport, [endpoint1, endpoint2]) notifier = self._setup_notifier(transport) - notifier.info({}, 'an_event.start', 'test') + cxt = test_utils.TestContext() + notifier.info(cxt, 'an_event.start', 'test') self.wait_for_messages(1) self.assertFalse(listener_thread.stop()) endpoint1.info.assert_called_once_with( - {}, 'testpublisher', 'an_event.start', 'test', { + cxt, 'testpublisher', 'an_event.start', 'test', { 'timestamp': mock.ANY, 'message_id': mock.ANY}) endpoint2.info.assert_called_once_with( - {}, 'testpublisher', 'an_event.start', 'test', { + cxt, 'testpublisher', 'an_event.start', 'test', { 'timestamp': mock.ANY, 'message_id': mock.ANY}) @@ -387,15 +394,16 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): endpoint.info.side_effect = side_effect_requeue listener_thread = self._setup_listener(transport, [endpoint]) notifier = self._setup_notifier(transport) - notifier.info({}, 'an_event.start', 'test') + cxt = test_utils.TestContext() + notifier.info(cxt, 'an_event.start', 'test') self.wait_for_messages(2) self.assertFalse(listener_thread.stop()) endpoint.info.assert_has_calls([ - mock.call({}, 'testpublisher', 'an_event.start', 'test', + mock.call(cxt, 'testpublisher', 'an_event.start', 'test', {'timestamp': mock.ANY, 'message_id': mock.ANY}), - mock.call({}, 'testpublisher', 'an_event.start', 'test', + mock.call(cxt, 'testpublisher', 'an_event.start', 'test', {'timestamp': mock.ANY, 'message_id': mock.ANY})]) def test_two_pools(self): @@ -414,23 +422,27 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): targets=targets, pool="pool2") notifier = self._setup_notifier(transport, topics=["topic"]) - notifier.info({'user_name': 'bob0'}, 'an_event.start', 'test message0') - notifier.info({'user_name': 'bob1'}, 'an_event.start', 'test message1') + ctxts = [ + test_utils.TestContext(user_name='bob0'), + test_utils.TestContext(user_name='bob1') + ] + notifier.info(ctxts[0], 'an_event.start', 'test message0') + notifier.info(ctxts[1], 'an_event.start', 'test message1') self.wait_for_messages(2, "pool1") self.wait_for_messages(2, "pool2") self.assertFalse(listener2_thread.stop()) self.assertFalse(listener1_thread.stop()) - def mocked_endpoint_call(i): - return mock.call({'user_name': 'bob%d' % i}, 'testpublisher', + def mocked_endpoint_call(i, ctxts): + return mock.call(ctxts[i], 'testpublisher', 'an_event.start', 'test message%d' % i, {'timestamp': mock.ANY, 'message_id': mock.ANY}) - endpoint1.info.assert_has_calls([mocked_endpoint_call(0), - mocked_endpoint_call(1)]) - endpoint2.info.assert_has_calls([mocked_endpoint_call(0), - mocked_endpoint_call(1)]) + endpoint1.info.assert_has_calls([mocked_endpoint_call(0, ctxts), + mocked_endpoint_call(1, ctxts)]) + endpoint2.info.assert_has_calls([mocked_endpoint_call(0, ctxts), + mocked_endpoint_call(1, ctxts)]) def test_two_pools_three_listener(self): transport = msg_notifier.get_notification_transport( @@ -451,42 +463,42 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): listener3_thread = self._setup_listener(transport, [endpoint3], targets=targets, pool="pool2") - def mocked_endpoint_call(i): - return mock.call({'user_name': 'bob%d' % i}, 'testpublisher', + def mocked_endpoint_call(i, ctxt): + return mock.call(ctxt, 'testpublisher', 'an_event.start', 'test message%d' % i, {'timestamp': mock.ANY, 'message_id': mock.ANY}) notifier = self._setup_notifier(transport, topics=["topic"]) mocked_endpoint1_calls = [] for i in range(0, 25): - notifier.info({'user_name': 'bob%d' % i}, 'an_event.start', - 'test message%d' % i) - mocked_endpoint1_calls.append(mocked_endpoint_call(i)) + ctxt = test_utils.TestContext(user_name='bob%d' % i) + notifier.info(ctxt, 'an_event.start', 'test message%d' % i) + mocked_endpoint1_calls.append(mocked_endpoint_call(i, ctxt)) self.wait_for_messages(25, 'pool2') listener2_thread.stop() for i in range(0, 25): - notifier.info({'user_name': 'bob%d' % i}, 'an_event.start', - 'test message%d' % i) - mocked_endpoint1_calls.append(mocked_endpoint_call(i)) + cxt = test_utils.TestContext(user_name='bob%d' % i) + notifier.info(cxt, 'an_event.start', 'test message%d' % i) + mocked_endpoint1_calls.append(mocked_endpoint_call(i, cxt)) self.wait_for_messages(50, 'pool2') listener2_thread.start() listener3_thread.stop() for i in range(0, 25): - notifier.info({'user_name': 'bob%d' % i}, 'an_event.start', - 'test message%d' % i) - mocked_endpoint1_calls.append(mocked_endpoint_call(i)) + ctxt = test_utils.TestContext(user_name='bob%d' % i) + notifier.info(ctxt, 'an_event.start', 'test message%d' % i) + mocked_endpoint1_calls.append(mocked_endpoint_call(i, ctxt)) self.wait_for_messages(75, 'pool2') listener3_thread.start() for i in range(0, 25): - notifier.info({'user_name': 'bob%d' % i}, 'an_event.start', - 'test message%d' % i) - mocked_endpoint1_calls.append(mocked_endpoint_call(i)) + ctxt = test_utils.TestContext(user_name='bob%d' % i) + notifier.info(ctxt, 'an_event.start', 'test message%d' % i) + mocked_endpoint1_calls.append(mocked_endpoint_call(i, ctxt)) self.wait_for_messages(100, 'pool1') self.wait_for_messages(100, 'pool2') diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index 8a918fdcf..9a2f637ba 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -36,6 +36,7 @@ from oslo_messaging import serializer as msg_serializer from oslo_messaging.tests import utils as test_utils from unittest import mock + load_tests = testscenarios.load_tests_apply_scenarios @@ -122,7 +123,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase): ] _context = [ - ('ctxt', dict(ctxt={'user_name': 'bob'})), + ('ctxt', dict(ctxt=test_utils.TestContext(user_name='bob'))), ] _retry = [ @@ -229,157 +230,6 @@ class TestMessagingNotifier(test_utils.BaseTestCase): TestMessagingNotifier.generate_scenarios() -class TestMessagingNotifierContextFiltering(test_utils.BaseTestCase): - - _v1 = [ - ('v1', dict(v1=True)), - ('not_v1', dict(v1=False)), - ] - - _v2 = [ - ('v2', dict(v2=True)), - ('not_v2', dict(v2=False)), - ] - - _publisher_id = [ - ('ctor_pub_id', dict(ctor_pub_id='test', - expected_pub_id='test')), - ('prep_pub_id', dict(prep_pub_id='test.localhost', - expected_pub_id='test.localhost')), - ('override', dict(ctor_pub_id='test', - prep_pub_id='test.localhost', - expected_pub_id='test.localhost')), - ] - - _topics = [ - ('no_topics', dict(topics=[])), - ('single_topic', dict(topics=['notifications'])), - ('multiple_topic2', dict(topics=['foo', 'bar'])), - ] - - _priority = [ - ('audit', dict(priority='audit')), - ('debug', dict(priority='debug')), - ('info', dict(priority='info')), - ('warn', dict(priority='warn')), - ('error', dict(priority='error')), - ('sample', dict(priority='sample')), - ('critical', dict(priority='critical')), - ] - - _payload = [ - ('payload', dict(payload={'foo': 'bar'})), - ] - - _context = [ - ('ctxt', dict(ctxt={'user_name': 'bob'})), - ] - - _retry = [ - ('unconfigured', dict()), - ('None', dict(retry=None)), - ('0', dict(retry=0)), - ('5', dict(retry=5)), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._v1, - cls._v2, - cls._publisher_id, - cls._topics, - cls._priority, - cls._payload, - cls._retry) - - def setUp(self): - super(TestMessagingNotifierContextFiltering, self).setUp() - - self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger - self.useFixture(fixtures.MockPatchObject( - messaging, 'LOG', self.logger)) - self.useFixture(fixtures.MockPatchObject( - msg_notifier, '_LOG', self.logger)) - - @mock.patch('oslo_utils.timeutils.utcnow') - def test_notifier(self, mock_utcnow): - ctxt = {'user_name': 'bob', 'secret_data': 'redact_me'} - safe_ctxt = {'user_name': 'bob'} - drivers = [] - if self.v1: - drivers.append('messaging') - if self.v2: - drivers.append('messagingv2') - - self.config(driver=drivers, - topics=self.topics, - group='oslo_messaging_notifications') - - transport = oslo_messaging.get_notification_transport(self.conf, - url='fake:') - - if hasattr(self, 'ctor_pub_id'): - notifier = oslo_messaging.Notifier(transport, - publisher_id=self.ctor_pub_id) - else: - notifier = oslo_messaging.Notifier(transport) - - prepare_kwds = {} - if hasattr(self, 'retry'): - prepare_kwds['retry'] = self.retry - if hasattr(self, 'prep_pub_id'): - prepare_kwds['publisher_id'] = self.prep_pub_id - if prepare_kwds: - notifier = notifier.prepare(**prepare_kwds) - - transport._send_notification = mock.Mock() - - message_id = uuid.uuid4() - uuid.uuid4 = mock.Mock(return_value=message_id) - - mock_utcnow.return_value = datetime.datetime.utcnow() - - message = { - 'message_id': str(message_id), - 'publisher_id': self.expected_pub_id, - 'event_type': 'test.notify', - 'priority': self.priority.upper(), - 'payload': self.payload, - 'timestamp': str(timeutils.utcnow()), - } - - sends = [] - if self.v1: - sends.append(dict(version=1.0)) - if self.v2: - sends.append(dict(version=2.0)) - - calls = [] - for send_kwargs in sends: - for topic in self.topics: - if hasattr(self, 'retry'): - send_kwargs['retry'] = self.retry - else: - send_kwargs['retry'] = -1 - target = oslo_messaging.Target(topic='%s.%s' % (topic, - self.priority)) - calls.append(mock.call(target, - safe_ctxt, - message, - **send_kwargs)) - - method = getattr(notifier, self.priority) - method(ctxt, 'test.notify', self.payload) - - uuid.uuid4.assert_called_once_with() - transport._send_notification.assert_has_calls(calls, any_order=True) - - self.assertTrue(notifier.is_enabled()) - - -TestMessagingNotifierContextFiltering.generate_scenarios() - - class TestMessagingNotifierRetry(test_utils.BaseTestCase): class TestingException(BaseException): @@ -422,7 +272,7 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase): with mock.patch( 'oslo_messaging.notify.messaging.LOG.exception' ) as mock_log: - notifier.info({}, "test", {}) + notifier.info(test_utils.TestContext(), "test", {}) # one normal call plus two retries self.assertEqual(3, len(calls)) @@ -451,7 +301,7 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase): # call simply returns without i) failing to deliver the message to # the non existent kafka bus ii) retrying the message delivery twice # as the configuration requested it. - notifier.info({}, "test", {}) + notifier.info(test_utils.TestContext(), "test", {}) class TestSerializer(test_utils.BaseTestCase): @@ -484,7 +334,8 @@ class TestSerializer(test_utils.BaseTestCase): serializer.serialize_entity = mock.Mock() serializer.serialize_entity.return_value = 'sbar' - notifier.info(dict(user_name='bob'), 'test.notify', 'bar') + ctxt = test_utils.TestContext(user_name='bob') + notifier.info(ctxt, 'test.notify', 'bar') message = { 'message_id': str(message_id), @@ -498,11 +349,10 @@ class TestSerializer(test_utils.BaseTestCase): self.assertEqual([(dict(user_name='alice'), message, 'INFO', -1)], _impl_test.NOTIFICATIONS) - uuid.uuid4.assert_called_once_with() - serializer.serialize_context.assert_called_once_with( - dict(user_name='bob')) - serializer.serialize_entity.assert_called_once_with( - dict(user_name='bob'), 'bar') + # NOTE(JayF): This is also called when we create a TestContext + uuid.uuid4.assert_has_calls([mock.call(), mock.call()]) + serializer.serialize_context.assert_called_once_with(ctxt) + serializer.serialize_entity.assert_called_once_with(ctxt, 'bar') class TestNotifierTopics(test_utils.BaseTestCase): @@ -561,9 +411,10 @@ class TestLogNotifier(test_utils.BaseTestCase): with mock.patch.object(logging, 'getLogger') as gl: gl.return_value = logger - notifier.info({}, 'test.notify', 'bar') + notifier.info(test_utils.TestContext(), 'test.notify', 'bar') - uuid.uuid4.assert_called_once_with() + # NOTE(JayF): TestContext calls this, too + uuid.uuid4.assert_has_calls([mock.call(), mock.call()]) logging.getLogger.assert_called_once_with( 'oslo.messaging.notification.test.notify') @@ -818,7 +669,7 @@ group_1: with mock.patch.object(self.router, 'plugin_manager') as pm: with mock.patch.object(self.router, '_get_drivers_for_message', drivers_mock): - self.notifier.info({}, 'my_event', {}) + self.notifier.info(test_utils.TestContext(), 'my_event', {}) self.assertEqual(sorted(['rpc', 'foo']), sorted(pm.map.call_args[0][6])) @@ -856,13 +707,13 @@ group_1: return_value=pm): with mock.patch('oslo_messaging.notify.' '_impl_routing.LOG'): - - self.notifier.info({}, 'my_event', {}) + cxt = test_utils.TestContext() + self.notifier.info(cxt, 'my_event', {}) self.assertFalse(bar_driver.info.called) rpc_driver.notify.assert_called_once_with( - {}, mock.ANY, 'INFO', -1) + cxt, mock.ANY, 'INFO', -1) rpc2_driver.notify.assert_called_once_with( - {}, mock.ANY, 'INFO', -1) + cxt, mock.ANY, 'INFO', -1) class TestNoOpNotifier(test_utils.BaseTestCase): diff --git a/oslo_messaging/tests/test_utils.py b/oslo_messaging/tests/test_utils.py index b4c7402df..f9d4e51c1 100644 --- a/oslo_messaging/tests/test_utils.py +++ b/oslo_messaging/tests/test_utils.py @@ -12,7 +12,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - from oslo_messaging._drivers import common from oslo_messaging import _utils as utils from oslo_messaging.tests import utils as test_utils diff --git a/oslo_messaging/tests/utils.py b/oslo_messaging/tests/utils.py index 057796678..59c05725b 100644 --- a/oslo_messaging/tests/utils.py +++ b/oslo_messaging/tests/utils.py @@ -22,6 +22,7 @@ import threading from oslo_config import cfg +from oslo_context.context import RequestContext from oslo_utils import eventletutils from oslotest import base @@ -82,3 +83,10 @@ class ServerThreadHelper(threading.Thread): def stop(self): self._stop_event.set() + + +class TestContext(RequestContext): + def redacted_copy(self): + # NOTE(JayF): By returning our self here instead of redacting, we can + # continue using equality comparisons in unit tests. + return self diff --git a/requirements.txt b/requirements.txt index e02718844..d1ad6453c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 futurist>=1.2.0 # Apache-2.0 oslo.config>=5.2.0 # Apache-2.0 +oslo.context>=5.3.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 oslo.utils>=3.37.0 # Apache-2.0 oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0