From 990d894eaf0d18941b56d89bff2bca51de375640 Mon Sep 17 00:00:00 2001 From: Dmitriy Ukhlov Date: Fri, 25 Mar 2016 13:39:53 +0200 Subject: [PATCH] Move server related logic from dispatchers Dispatcher should be responsible for routing message to the callback method of endpoint object and returning result back to the server only. But now it is also responsible for sending reply, ack/reque messages etc. Also this patch makes small improvements: 1) Notification dispatcher now requeue message if endpoint raises exception 2) unstable behaviour of test_mask_passwords test is fixed Change-Id: I5f23e23644e90919cb67f81fc306ee85c5e09974 --- oslo_messaging/_drivers/impl_pika.py | 7 +- oslo_messaging/dispatcher.py | 84 +-------- oslo_messaging/notify/dispatcher.py | 178 ++++++++---------- oslo_messaging/notify/listener.py | 95 +++++++++- oslo_messaging/rpc/dispatcher.py | 50 +---- oslo_messaging/rpc/server.py | 48 ++++- oslo_messaging/server.py | 67 +++++-- .../tests/notify/test_dispatcher.py | 53 ++---- oslo_messaging/tests/notify/test_notifier.py | 3 +- oslo_messaging/tests/rpc/test_dispatcher.py | 56 +++--- oslo_messaging/tests/rpc/test_server.py | 14 +- oslo_messaging/tests/test_opts.py | 10 +- 12 files changed, 348 insertions(+), 317 deletions(-) diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index a9434818b..50d448856 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -192,8 +192,11 @@ class PikaDriver(base.BaseDriver): # exchange which is not exists, we get ChannelClosed exception # and need to reconnect try: - self._declare_rpc_exchange(exchange, - expiration_time - time.time()) + self._declare_rpc_exchange( + exchange, + None if expiration_time is None else + expiration_time - time.time() + ) except pika_drv_exc.ConnectionException as e: LOG.warning("Problem during declaring exchange. %s", e) return True diff --git a/oslo_messaging/dispatcher.py b/oslo_messaging/dispatcher.py index 9324715de..2cd12cb7e 100644 --- a/oslo_messaging/dispatcher.py +++ b/oslo_messaging/dispatcher.py @@ -16,96 +16,22 @@ import logging import six -from oslo_messaging._i18n import _ - __all__ = [ - "DispatcherBase", - "DispatcherExecutorContext" + "DispatcherBase" ] LOG = logging.getLogger(__name__) -class DispatcherExecutorContext(object): - """Dispatcher executor context helper - - A dispatcher can have work to do before and after the dispatch of the - request in the main server thread while the dispatcher itself can be - done in its own thread. - - The executor can use the helper like this: - - callback = dispatcher(incoming) - callback.prepare() - thread = MyWhateverThread() - thread.on_done(callback.done) - thread.run(callback.run) - - """ - def __init__(self, incoming, dispatch, post=None): - self._result = None - self._incoming = incoming - self._dispatch = dispatch - self._post = post - - def run(self): - """The incoming message dispath itself - - Can be run in an other thread/greenlet/corotine if the executor is - able to do it. - """ - try: - self._result = self._dispatch(self._incoming) - except Exception: - msg = _('The dispatcher method must catches all exceptions') - LOG.exception(msg) - raise RuntimeError(msg) - - def done(self): - """Callback after the incoming message have been dispathed - - Should be ran in the main executor thread/greenlet/corotine - """ - # FIXME(sileht): this is not currently true, this works only because - # the driver connection used for polling write on the wire only to - # ack/requeue message, but what if one day, the driver do something - # else - if self._post is not None: - self._post(self._incoming, self._result) - - @six.add_metaclass(abc.ABCMeta) class DispatcherBase(object): "Base class for dispatcher" - batch_size = 1 - "Number of messages to wait before calling endpoints callacks" - - batch_timeout = None - "Number of seconds to wait before calling endpoints callacks" - @abc.abstractmethod - def _listen(self, transport): - """Initiate the driver Listener + def dispatch(self, incoming): + """Dispatch incoming messages to the endpoints and return result - Usually the driver Listener is start with the transport helper methods: - - * transport._listen() - * transport._listen_for_notifications() - - :param transport: the transport object - :type transport: oslo_messaging.transport.Transport - :returns: a driver Listener object - :rtype: oslo_messaging._drivers.base.Listener - """ - - @abc.abstractmethod - def __call__(self, incoming): - """Called by the executor to get the DispatcherExecutorContext - - :param incoming: list of messages - :type incoming: oslo_messging._drivers.base.IncomingMessage - :returns: DispatcherExecutorContext - :rtype: DispatcherExecutorContext + :param incoming: incoming object for dispatching to the endpoint + :type incoming: object, depends on endpoint type """ diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index 926865162..66cfac6bf 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -19,7 +19,7 @@ import logging import six -from oslo_messaging._i18n import _LE, _LW +from oslo_messaging._i18n import _LW from oslo_messaging import dispatcher from oslo_messaging import localcontext from oslo_messaging import serializer as msg_serializer @@ -35,14 +35,11 @@ class NotificationResult(object): REQUEUE = 'requeue' -class _NotificationDispatcherBase(dispatcher.DispatcherBase): - def __init__(self, targets, endpoints, serializer, allow_requeue, - pool=None): - self.targets = targets +class NotificationDispatcher(dispatcher.DispatcherBase): + def __init__(self, endpoints, serializer): + self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() - self.allow_requeue = allow_requeue - self.pool = pool self._callbacks_by_priority = {} for endpoint, prio in itertools.product(endpoints, PRIORITIES): @@ -52,42 +49,77 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): self._callbacks_by_priority.setdefault(prio, []).append( (screen, method)) - priorities = self._callbacks_by_priority.keys() - self._targets_priorities = set(itertools.product(self.targets, - priorities)) + @property + def supported_priorities(self): + return self._callbacks_by_priority.keys() - def _listen(self, transport): - transport._require_driver_features(requeue=self.allow_requeue) - return transport._listen_for_notifications(self._targets_priorities, - pool=self.pool) - - def __call__(self, incoming): - return dispatcher.DispatcherExecutorContext( - incoming, self._dispatch_and_handle_error, - post=self._post_dispatch) - - def _post_dispatch(self, incoming, requeues): - for m in incoming: - try: - if requeues and m in requeues: - m.requeue() - else: - m.acknowledge() - except Exception: - LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) - - def _dispatch_and_handle_error(self, incoming): - """Dispatch a notification message to the appropriate endpoint method. - - :param incoming: the incoming notification message - :type ctxt: IncomingMessage + def dispatch(self, incoming): + """Dispatch notification messages to the appropriate endpoint method. """ - try: - return self._dispatch(incoming) - except Exception: - LOG.error(_LE('Exception during message handling'), exc_info=True) + priority, raw_message, message = self._extract_user_message(incoming) - def _dispatch(self, incoming): + if priority not in PRIORITIES: + LOG.warning(_LW('Unknown priority "%s"'), priority) + return + + for screen, callback in self._callbacks_by_priority.get(priority, + []): + if screen and not screen.match(message["ctxt"], + message["publisher_id"], + message["event_type"], + message["metadata"], + message["payload"]): + continue + + ret = self._exec_callback(callback, message) + if ret == NotificationResult.REQUEUE: + return ret + return NotificationResult.HANDLED + + def _exec_callback(self, callback, message): + localcontext._set_local_context(message["ctxt"]) + + try: + return callback(message["ctxt"], + message["publisher_id"], + message["event_type"], + message["payload"], + message["metadata"]) + except Exception: + LOG.exception("Callback raised an exception.") + return NotificationResult.REQUEUE + finally: + localcontext._clear_local_context() + + def _extract_user_message(self, incoming): + ctxt = self.serializer.deserialize_context(incoming.ctxt) + message = incoming.message + + publisher_id = message.get('publisher_id') + event_type = message.get('event_type') + metadata = { + 'message_id': message.get('message_id'), + 'timestamp': message.get('timestamp') + } + priority = message.get('priority', '').lower() + payload = self.serializer.deserialize_entity(ctxt, + message.get('payload')) + return priority, incoming, dict(ctxt=ctxt, + publisher_id=publisher_id, + event_type=event_type, + payload=payload, + metadata=metadata) + + +class BatchNotificationDispatcher(NotificationDispatcher): + """A message dispatcher which understands Notification messages. + + A MessageHandlingServer is constructed by passing a callable dispatcher + which is invoked with a list of message dictionaries each time 'batch_size' + messages are received or 'batch_timeout' seconds is reached. + """ + + def dispatch(self, incoming): """Dispatch notification messages to the appropriate endpoint method. """ @@ -120,70 +152,14 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): continue ret = self._exec_callback(callback, filtered_messages) - if self.allow_requeue and ret == NotificationResult.REQUEUE: + if ret == NotificationResult.REQUEUE: requeues.update(raw_messages) break return requeues - def _exec_callback(self, callback, *args): - ret = callback(*args) - return NotificationResult.HANDLED if ret is None else ret - - def _extract_user_message(self, incoming): - ctxt = self.serializer.deserialize_context(incoming.ctxt) - message = incoming.message - - publisher_id = message.get('publisher_id') - event_type = message.get('event_type') - metadata = { - 'message_id': message.get('message_id'), - 'timestamp': message.get('timestamp') - } - priority = message.get('priority', '').lower() - payload = self.serializer.deserialize_entity(ctxt, - message.get('payload')) - return priority, incoming, dict(ctxt=ctxt, - publisher_id=publisher_id, - event_type=event_type, - payload=payload, - metadata=metadata) - - -class NotificationDispatcher(_NotificationDispatcherBase): - """A message dispatcher which understands Notification messages. - - A MessageHandlingServer is constructed by passing a callable dispatcher - which is invoked with context and message dictionaries each time a message - is received. - """ def _exec_callback(self, callback, messages): - localcontext._set_local_context( - messages[0]["ctxt"]) try: - return super(NotificationDispatcher, self)._exec_callback( - callback, - messages[0]["ctxt"], - messages[0]["publisher_id"], - messages[0]["event_type"], - messages[0]["payload"], - messages[0]["metadata"]) - finally: - localcontext._clear_local_context() - - -class BatchNotificationDispatcher(_NotificationDispatcherBase): - """A message dispatcher which understands Notification messages. - - A MessageHandlingServer is constructed by passing a callable dispatcher - which is invoked with a list of message dictionaries each time 'batch_size' - messages are received or 'batch_timeout' seconds is reached. - """ - - def __init__(self, targets, endpoints, serializer, allow_requeue, - pool=None, batch_size=None, batch_timeout=None): - super(BatchNotificationDispatcher, self).__init__(targets, endpoints, - serializer, - allow_requeue, - pool) - self.batch_size = batch_size - self.batch_timeout = batch_timeout + return callback(messages) + except Exception: + LOG.exception("Callback raised an exception.") + return NotificationResult.REQUEUE diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index 8855d5875..308e081f8 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -103,10 +103,89 @@ by passing allow_requeue=True to get_notification_listener(). If the driver does not support requeueing, it will raise NotImplementedError at this point. """ +import itertools +import logging +from oslo_messaging._i18n import _LE from oslo_messaging.notify import dispatcher as notify_dispatcher from oslo_messaging import server as msg_server +LOG = logging.getLogger(__name__) + + +class NotificationServer(msg_server.MessageHandlingServer): + def __init__(self, transport, targets, dispatcher, executor='blocking', + allow_requeue=True, pool=None): + super(NotificationServer, self).__init__(transport, dispatcher, + executor) + self._allow_requeue = allow_requeue + self._pool = pool + self.targets = targets + self._targets_priorities = set( + itertools.product(self.targets, + self.dispatcher.supported_priorities) + ) + + def _create_listener(self): + return msg_server.SingleMessageListenerAdapter( + self.transport._listen_for_notifications( + self._targets_priorities, self._pool + ) + ) + + def _process_incoming(self, incoming): + res = notify_dispatcher.NotificationResult.REQUEUE + try: + res = self.dispatcher.dispatch(incoming) + except Exception: + LOG.error(_LE('Exception during message handling'), exc_info=True) + + try: + if (res == notify_dispatcher.NotificationResult.REQUEUE and + self._allow_requeue): + incoming.requeue() + else: + incoming.acknowledge() + except Exception: + LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) + + +class BatchNotificationServer(NotificationServer): + def __init__(self, transport, targets, dispatcher, executor='blocking', + allow_requeue=True, pool=None, batch_size=1, + batch_timeout=None): + super(BatchNotificationServer, self).__init__( + transport=transport, targets=targets, dispatcher=dispatcher, + executor=executor, allow_requeue=allow_requeue, pool=pool + ) + + self._batch_size = batch_size + self._batch_timeout = batch_timeout + + def _create_listener(self): + return msg_server.BatchMessageListenerAdapter( + self.transport._listen_for_notifications( + self._targets_priorities, self._pool + ), + timeout=self._batch_timeout, + batch_size=self._batch_size + ) + + def _process_incoming(self, incoming): + try: + not_processed_messages = self.dispatcher.dispatch(incoming) + except Exception: + not_processed_messages = set(incoming) + LOG.error(_LE('Exception during message handling'), exc_info=True) + for m in incoming: + try: + if m in not_processed_messages and self._allow_requeue: + m.requeue() + else: + m.acknowledge() + except Exception: + LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) + def get_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None, @@ -137,10 +216,10 @@ def get_notification_listener(transport, targets, endpoints, :type pool: str :raises: NotImplementedError """ - dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints, - serializer, - allow_requeue, pool) - return msg_server.MessageHandlingServer(transport, dispatcher, executor) + dispatcher = notify_dispatcher.NotificationDispatcher(endpoints, + serializer) + return NotificationServer(transport, targets, dispatcher, executor, + allow_requeue, pool) def get_batch_notification_listener(transport, targets, endpoints, @@ -180,6 +259,8 @@ def get_batch_notification_listener(transport, targets, endpoints, :raises: NotImplementedError """ dispatcher = notify_dispatcher.BatchNotificationDispatcher( - targets, endpoints, serializer, allow_requeue, pool, - batch_size, batch_timeout) - return msg_server.MessageHandlingServer(transport, dispatcher, executor) + endpoints, serializer) + return BatchNotificationServer( + transport, targets, dispatcher, executor, allow_requeue, pool, + batch_size, batch_timeout + ) diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index cecef2224..e94b4d461 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -29,7 +29,6 @@ import sys import six -from oslo_messaging._i18n import _LE from oslo_messaging import _utils as utils from oslo_messaging import dispatcher from oslo_messaging import localcontext @@ -94,20 +93,16 @@ class RPCDispatcher(dispatcher.DispatcherBase): """ - def __init__(self, target, endpoints, serializer): + def __init__(self, endpoints, serializer): """Construct a rpc server dispatcher. - :param target: the exchange, topic and server to listen on - :type target: Target + :param endpoints: list of endpoint objects for dispatching to + :param serializer: optional message serializer """ self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() self._default_target = msg_target.Target() - self._target = target - - def _listen(self, transport): - return transport._listen(self._target) @staticmethod def _is_namespace(target, namespace): @@ -127,43 +122,16 @@ class RPCDispatcher(dispatcher.DispatcherBase): result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result) - def __call__(self, incoming): - incoming[0].acknowledge() - return dispatcher.DispatcherExecutorContext( - incoming[0], self._dispatch_and_reply) - - def _dispatch_and_reply(self, incoming): - try: - incoming.reply(self._dispatch(incoming.ctxt, - incoming.message)) - except ExpectedException as e: - LOG.debug(u'Expected exception during message handling (%s)', - e.exc_info[1]) - incoming.reply(failure=e.exc_info, log_failure=False) - except Exception as e: - # current sys.exc_info() content can be overriden - # by another exception raise by a log handler during - # LOG.exception(). So keep a copy and delete it later. - exc_info = sys.exc_info() - try: - LOG.error(_LE('Exception during message handling: %s'), e, - exc_info=exc_info) - incoming.reply(failure=exc_info) - finally: - # NOTE(dhellmann): Remove circular object reference - # between the current stack frame and the traceback in - # exc_info. - del exc_info - - def _dispatch(self, ctxt, message): + def dispatch(self, incoming): """Dispatch an RPC message to the appropriate endpoint method. - :param ctxt: the request context - :type ctxt: dict - :param message: the message payload - :type message: dict + :param incoming: incoming message + :type incoming: IncomingMessage :raises: NoSuchMethod, UnsupportedVersion """ + message = incoming.message + ctxt = incoming.ctxt + method = message.get('method') args = message.get('args', {}) namespace = message.get('namespace') diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 74dbede44..61c0b4e1f 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -102,9 +102,53 @@ __all__ = [ 'expected_exceptions', ] +import logging +import sys + +from oslo_messaging._i18n import _LE from oslo_messaging.rpc import dispatcher as rpc_dispatcher from oslo_messaging import server as msg_server +LOG = logging.getLogger(__name__) + + +class RPCServer(msg_server.MessageHandlingServer): + def __init__(self, transport, target, dispatcher, executor='blocking'): + super(RPCServer, self).__init__(transport, dispatcher, executor) + self._target = target + + def _create_listener(self): + return msg_server.SingleMessageListenerAdapter( + self.transport._listen(self._target) + ) + + def _process_incoming(self, incoming): + incoming.acknowledge() + try: + res = self.dispatcher.dispatch(incoming) + except rpc_dispatcher.ExpectedException as e: + LOG.debug(u'Expected exception during message handling (%s)', + e.exc_info[1]) + incoming.reply(failure=e.exc_info) + except Exception as e: + # current sys.exc_info() content can be overriden + # by another exception raise by a log handler during + # LOG.exception(). So keep a copy and delete it later. + exc_info = sys.exc_info() + try: + LOG.exception(_LE('Exception during message handling: %s'), e) + incoming.reply(failure=exc_info) + finally: + # NOTE(dhellmann): Remove circular object reference + # between the current stack frame and the traceback in + # exc_info. + del exc_info + else: + try: + incoming.reply(res) + except Exception: + LOG.Exception("Can not send reply for message %s", incoming) + def get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None): @@ -129,8 +173,8 @@ def get_rpc_server(transport, target, endpoints, :param serializer: an optional entity serializer :type serializer: Serializer """ - dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer) - return msg_server.MessageHandlingServer(transport, dispatcher, executor) + dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer) + return RPCServer(transport, target, dispatcher, executor) def expected_exceptions(*exceptions): diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index b702c079d..4452a9139 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -23,6 +23,7 @@ __all__ = [ 'ServerListenError', ] +import abc import functools import inspect import logging @@ -34,6 +35,7 @@ from oslo_service import service from oslo_utils import eventletutils from oslo_utils import excutils from oslo_utils import timeutils +import six from stevedore import driver from oslo_messaging._drivers import base as driver_base @@ -295,6 +297,42 @@ def ordered(after=None, reset_after=None): return _ordered +@six.add_metaclass(abc.ABCMeta) +class MessageListenerAdapter(object): + def __init__(self, driver_listener, timeout=None): + self._driver_listener = driver_listener + self._timeout = timeout + + @abc.abstractmethod + def poll(self): + """Poll incoming and return incoming request""" + + def stop(self): + self._driver_listener.stop() + + def cleanup(self): + self._driver_listener.cleanup() + + +class SingleMessageListenerAdapter(MessageListenerAdapter): + def poll(self): + msgs = self._driver_listener.poll(prefetch_size=1, + timeout=self._timeout) + return msgs[0] if msgs else None + + +class BatchMessageListenerAdapter(MessageListenerAdapter): + def __init__(self, driver_listener, timeout=None, batch_size=1): + super(BatchMessageListenerAdapter, self).__init__(driver_listener, + timeout) + self._batch_size = batch_size + + def poll(self): + return self._driver_listener.poll(prefetch_size=self._batch_size, + timeout=self._timeout) + + +@six.add_metaclass(abc.ABCMeta) class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): """Server for handling messages. @@ -345,9 +383,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): super(MessageHandlingServer, self).__init__() - def _submit_work(self, callback): - fut = self._work_executor.submit(callback.run) - fut.add_done_callback(lambda f: callback.done()) + @abc.abstractmethod + def _process_incoming(self, incoming): + """Process incoming request + + :param incoming: incoming request. + """ + + @abc.abstractmethod + def _create_listener(self): + """Creates listener object for polling requests + :return: MessageListenerAdapter + """ @ordered(reset_after='stop') def start(self, override_pool_size=None): @@ -374,7 +421,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): self._started = True try: - self.listener = self.dispatcher._listen(self.transport) + self.listener = self._create_listener() except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) @@ -412,22 +459,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): @excutils.forever_retry_uncaught_exceptions def _runner(self): while self._started: - incoming = self.listener.poll( - timeout=self.dispatcher.batch_timeout, - prefetch_size=self.dispatcher.batch_size) + incoming = self.listener.poll() if incoming: - self._submit_work(self.dispatcher(incoming)) + self._work_executor.submit(self._process_incoming, incoming) # listener is stopped but we need to process all already consumed # messages while True: - incoming = self.listener.poll( - timeout=self.dispatcher.batch_timeout, - prefetch_size=self.dispatcher.batch_size) + incoming = self.listener.poll() if incoming: - self._submit_work(self.dispatcher(incoming)) + self._work_executor.submit(self._process_incoming, incoming) else: return diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py index 2442d535e..3818a7fd8 100644 --- a/oslo_messaging/tests/notify/test_dispatcher.py +++ b/oslo_messaging/tests/notify/test_dispatcher.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import itertools - from oslo_utils import timeutils import testscenarios @@ -25,7 +23,6 @@ from six.moves import mock load_tests = testscenarios.load_tests_apply_scenarios - notification_msg = dict( publisher_id="publisher_id", event_type="compute.start", @@ -96,20 +93,21 @@ class TestDispatcher(test_utils.BaseTestCase): msg = notification_msg.copy() msg['priority'] = self.priority - targets = [oslo_messaging.Target(topic='notifications')] - dispatcher = notify_dispatcher.NotificationDispatcher( - targets, endpoints, None, allow_requeue=True, pool=None) - - # check it listen on wanted topics - self.assertEqual(sorted(set((targets[0], prio) - for prio in itertools.chain.from_iterable( - self.endpoints))), - sorted(dispatcher._targets_priorities)) + dispatcher = notify_dispatcher.NotificationDispatcher(endpoints, None) incoming = mock.Mock(ctxt={}, message=msg) - callback = dispatcher([incoming]) - callback.run() - callback.done() + + res = dispatcher.dispatch(incoming) + + expected_res = ( + notify_dispatcher.NotificationResult.REQUEUE + if (self.return_value == + notify_dispatcher.NotificationResult.REQUEUE or + self.ex is not None) + else notify_dispatcher.NotificationResult.HANDLED + ) + + self.assertEqual(expected_res, res) # check endpoint callbacks are called or not for i, endpoint_methods in enumerate(self.endpoints): @@ -127,26 +125,14 @@ class TestDispatcher(test_utils.BaseTestCase): else: self.assertEqual(0, endpoints[i].call_count) - if self.ex: - self.assertEqual(1, incoming.acknowledge.call_count) - self.assertEqual(0, incoming.requeue.call_count) - elif self.return_value == oslo_messaging.NotificationResult.HANDLED \ - or self.return_value is None: - self.assertEqual(1, incoming.acknowledge.call_count) - self.assertEqual(0, incoming.requeue.call_count) - elif self.return_value == oslo_messaging.NotificationResult.REQUEUE: - self.assertEqual(0, incoming.acknowledge.call_count) - self.assertEqual(1, incoming.requeue.call_count) - @mock.patch('oslo_messaging.notify.dispatcher.LOG') def test_dispatcher_unknown_prio(self, mylog): msg = notification_msg.copy() msg['priority'] = 'what???' dispatcher = notify_dispatcher.NotificationDispatcher( - [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None) - callback = dispatcher([mock.Mock(ctxt={}, message=msg)]) - callback.run() - callback.done() + [mock.Mock()], None) + res = dispatcher.dispatch(mock.Mock(ctxt={}, message=msg)) + self.assertEqual(None, res) mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') @@ -236,9 +222,8 @@ class TestDispatcherFilter(test_utils.BaseTestCase): **self.filter_rule) endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter) - targets = [oslo_messaging.Target(topic='notifications')] dispatcher = notify_dispatcher.NotificationDispatcher( - targets, [endpoint], serializer=None, allow_requeue=True) + [endpoint], serializer=None) message = {'payload': {'state': 'active'}, 'priority': 'info', 'publisher_id': self.publisher_id, @@ -246,9 +231,7 @@ class TestDispatcherFilter(test_utils.BaseTestCase): 'timestamp': '2014-03-03 18:21:04.369234', 'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'} incoming = mock.Mock(ctxt=self.context, message=message) - callback = dispatcher([incoming]) - callback.run() - callback.done() + dispatcher.dispatch(incoming) if self.match: self.assertEqual(1, endpoint.info.call_count) diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index bf56288fc..92e8f6435 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -351,8 +351,7 @@ class TestLogNotifier(test_utils.BaseTestCase): logger = mock.MagicMock() logger.info = mock.MagicMock() message = {'password': 'passw0rd', 'event_type': 'foo'} - json_str = jsonutils.dumps(message) - mask_str = strutils.mask_password(json_str) + mask_str = jsonutils.dumps(strutils.mask_dict_password(message)) with mock.patch.object(logging, 'getLogger') as gl: gl.return_value = logger diff --git a/oslo_messaging/tests/rpc/test_dispatcher.py b/oslo_messaging/tests/rpc/test_dispatcher.py index 672733a05..2a2f7b474 100644 --- a/oslo_messaging/tests/rpc/test_dispatcher.py +++ b/oslo_messaging/tests/rpc/test_dispatcher.py @@ -109,33 +109,29 @@ class TestDispatcher(test_utils.BaseTestCase): for e in self.endpoints] serializer = None - target = oslo_messaging.Target() - dispatcher = oslo_messaging.RPCDispatcher(target, endpoints, - serializer) - - def check_reply(reply=None, failure=None, log_failure=True): - if self.ex and failure is not None: - ex = failure[1] - self.assertFalse(self.success, ex) - self.assertIsNotNone(self.ex, ex) - self.assertIsInstance(ex, self.ex, ex) - if isinstance(ex, oslo_messaging.NoSuchMethod): - self.assertEqual(self.msg.get('method'), ex.method) - elif isinstance(ex, oslo_messaging.UnsupportedVersion): - self.assertEqual(self.msg.get('version', '1.0'), - ex.version) - if ex.method: - self.assertEqual(self.msg.get('method'), ex.method) - else: - self.assertTrue(self.success, failure) - self.assertIsNone(failure) + dispatcher = oslo_messaging.RPCDispatcher(endpoints, serializer) incoming = mock.Mock(ctxt=self.ctxt, message=self.msg) - incoming.reply.side_effect = check_reply - callback = dispatcher([incoming]) - callback.run() - callback.done() + res = None + + try: + res = dispatcher.dispatch(incoming) + except Exception as ex: + self.assertFalse(self.success, ex) + self.assertIsNotNone(self.ex, ex) + self.assertIsInstance(ex, self.ex, ex) + if isinstance(ex, oslo_messaging.NoSuchMethod): + self.assertEqual(self.msg.get('method'), ex.method) + elif isinstance(ex, oslo_messaging.UnsupportedVersion): + self.assertEqual(self.msg.get('version', '1.0'), + ex.version) + if ex.method: + self.assertEqual(self.msg.get('method'), ex.method) + else: + self.assertTrue(self.success, + "Not expected success of operation durung testing") + self.assertIsNotNone(res) for n, endpoint in enumerate(endpoints): for method_name in ['foo', 'bar']: @@ -147,8 +143,6 @@ class TestDispatcher(test_utils.BaseTestCase): else: self.assertEqual(0, method.call_count) - self.assertEqual(1, incoming.reply.call_count) - class TestSerializer(test_utils.BaseTestCase): @@ -165,9 +159,7 @@ class TestSerializer(test_utils.BaseTestCase): def test_serializer(self): endpoint = _FakeEndpoint() serializer = msg_serializer.NoOpSerializer() - target = oslo_messaging.Target() - dispatcher = oslo_messaging.RPCDispatcher(target, [endpoint], - serializer) + dispatcher = oslo_messaging.RPCDispatcher([endpoint], serializer) self.mox.StubOutWithMock(endpoint, 'foo') args = dict([(k, 'd' + v) for k, v in self.args.items()]) @@ -187,7 +179,9 @@ class TestSerializer(test_utils.BaseTestCase): self.mox.ReplayAll() - retval = dispatcher._dispatch(self.ctxt, dict(method='foo', - args=self.args)) + incoming = mock.Mock() + incoming.ctxt = self.ctxt + incoming.message = dict(method='foo', args=self.args) + retval = dispatcher.dispatch(incoming) if self.retval is not None: self.assertEqual('s' + self.retval, retval) diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index effa640f2..3b98eaf4f 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -149,7 +149,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): serializer=serializer) # Mocking executor server._executor_cls = MagicMockIgnoreArgs - server.listener = MagicMockIgnoreArgs() + server._create_listener = MagicMockIgnoreArgs() server.dispatcher = MagicMockIgnoreArgs() # Here assigning executor's listener object to listener variable # before calling wait method, because in wait method we are @@ -551,7 +551,6 @@ class TestServerLocking(test_utils.BaseTestCase): def __init__(self, *args, **kwargs): self._lock = threading.Lock() self._calls = [] - self.listener = mock.MagicMock() executors.append(self) submit = _logmethod('submit') @@ -559,9 +558,16 @@ class TestServerLocking(test_utils.BaseTestCase): self.executors = executors - self.server = oslo_messaging.MessageHandlingServer(mock.Mock(), - mock.Mock()) + class MessageHandlingServerImpl(oslo_messaging.MessageHandlingServer): + def _create_listener(self): + pass + + def _process_incoming(self, incoming): + pass + + self.server = MessageHandlingServerImpl(mock.Mock(), mock.Mock()) self.server._executor_cls = FakeExecutor + self.server._create_listener = mock.Mock() def test_start_stop_wait(self): # Test a simple execution of start, stop, wait in order diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index 1daa11d3f..2ca8f8a2e 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -61,6 +61,14 @@ class OptsTestCase(test_utils.BaseTestCase): def test_defaults(self): transport = mock.Mock() transport.conf = self.conf - server.MessageHandlingServer(transport, mock.Mock()) + + class MessageHandlingServerImpl(server.MessageHandlingServer): + def _create_listener(self): + pass + + def _process_incoming(self, incoming): + pass + + MessageHandlingServerImpl(transport, mock.Mock()) opts.set_defaults(self.conf, executor_thread_pool_size=100) self.assertEqual(100, self.conf.executor_thread_pool_size)