diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index a9434818b..50d448856 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -192,8 +192,11 @@ class PikaDriver(base.BaseDriver): # exchange which is not exists, we get ChannelClosed exception # and need to reconnect try: - self._declare_rpc_exchange(exchange, - expiration_time - time.time()) + self._declare_rpc_exchange( + exchange, + None if expiration_time is None else + expiration_time - time.time() + ) except pika_drv_exc.ConnectionException as e: LOG.warning("Problem during declaring exchange. %s", e) return True diff --git a/oslo_messaging/dispatcher.py b/oslo_messaging/dispatcher.py index 9324715de..2cd12cb7e 100644 --- a/oslo_messaging/dispatcher.py +++ b/oslo_messaging/dispatcher.py @@ -16,96 +16,22 @@ import logging import six -from oslo_messaging._i18n import _ - __all__ = [ - "DispatcherBase", - "DispatcherExecutorContext" + "DispatcherBase" ] LOG = logging.getLogger(__name__) -class DispatcherExecutorContext(object): - """Dispatcher executor context helper - - A dispatcher can have work to do before and after the dispatch of the - request in the main server thread while the dispatcher itself can be - done in its own thread. - - The executor can use the helper like this: - - callback = dispatcher(incoming) - callback.prepare() - thread = MyWhateverThread() - thread.on_done(callback.done) - thread.run(callback.run) - - """ - def __init__(self, incoming, dispatch, post=None): - self._result = None - self._incoming = incoming - self._dispatch = dispatch - self._post = post - - def run(self): - """The incoming message dispath itself - - Can be run in an other thread/greenlet/corotine if the executor is - able to do it. - """ - try: - self._result = self._dispatch(self._incoming) - except Exception: - msg = _('The dispatcher method must catches all exceptions') - LOG.exception(msg) - raise RuntimeError(msg) - - def done(self): - """Callback after the incoming message have been dispathed - - Should be ran in the main executor thread/greenlet/corotine - """ - # FIXME(sileht): this is not currently true, this works only because - # the driver connection used for polling write on the wire only to - # ack/requeue message, but what if one day, the driver do something - # else - if self._post is not None: - self._post(self._incoming, self._result) - - @six.add_metaclass(abc.ABCMeta) class DispatcherBase(object): "Base class for dispatcher" - batch_size = 1 - "Number of messages to wait before calling endpoints callacks" - - batch_timeout = None - "Number of seconds to wait before calling endpoints callacks" - @abc.abstractmethod - def _listen(self, transport): - """Initiate the driver Listener + def dispatch(self, incoming): + """Dispatch incoming messages to the endpoints and return result - Usually the driver Listener is start with the transport helper methods: - - * transport._listen() - * transport._listen_for_notifications() - - :param transport: the transport object - :type transport: oslo_messaging.transport.Transport - :returns: a driver Listener object - :rtype: oslo_messaging._drivers.base.Listener - """ - - @abc.abstractmethod - def __call__(self, incoming): - """Called by the executor to get the DispatcherExecutorContext - - :param incoming: list of messages - :type incoming: oslo_messging._drivers.base.IncomingMessage - :returns: DispatcherExecutorContext - :rtype: DispatcherExecutorContext + :param incoming: incoming object for dispatching to the endpoint + :type incoming: object, depends on endpoint type """ diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index 926865162..66cfac6bf 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -19,7 +19,7 @@ import logging import six -from oslo_messaging._i18n import _LE, _LW +from oslo_messaging._i18n import _LW from oslo_messaging import dispatcher from oslo_messaging import localcontext from oslo_messaging import serializer as msg_serializer @@ -35,14 +35,11 @@ class NotificationResult(object): REQUEUE = 'requeue' -class _NotificationDispatcherBase(dispatcher.DispatcherBase): - def __init__(self, targets, endpoints, serializer, allow_requeue, - pool=None): - self.targets = targets +class NotificationDispatcher(dispatcher.DispatcherBase): + def __init__(self, endpoints, serializer): + self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() - self.allow_requeue = allow_requeue - self.pool = pool self._callbacks_by_priority = {} for endpoint, prio in itertools.product(endpoints, PRIORITIES): @@ -52,42 +49,77 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): self._callbacks_by_priority.setdefault(prio, []).append( (screen, method)) - priorities = self._callbacks_by_priority.keys() - self._targets_priorities = set(itertools.product(self.targets, - priorities)) + @property + def supported_priorities(self): + return self._callbacks_by_priority.keys() - def _listen(self, transport): - transport._require_driver_features(requeue=self.allow_requeue) - return transport._listen_for_notifications(self._targets_priorities, - pool=self.pool) - - def __call__(self, incoming): - return dispatcher.DispatcherExecutorContext( - incoming, self._dispatch_and_handle_error, - post=self._post_dispatch) - - def _post_dispatch(self, incoming, requeues): - for m in incoming: - try: - if requeues and m in requeues: - m.requeue() - else: - m.acknowledge() - except Exception: - LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) - - def _dispatch_and_handle_error(self, incoming): - """Dispatch a notification message to the appropriate endpoint method. - - :param incoming: the incoming notification message - :type ctxt: IncomingMessage + def dispatch(self, incoming): + """Dispatch notification messages to the appropriate endpoint method. """ - try: - return self._dispatch(incoming) - except Exception: - LOG.error(_LE('Exception during message handling'), exc_info=True) + priority, raw_message, message = self._extract_user_message(incoming) - def _dispatch(self, incoming): + if priority not in PRIORITIES: + LOG.warning(_LW('Unknown priority "%s"'), priority) + return + + for screen, callback in self._callbacks_by_priority.get(priority, + []): + if screen and not screen.match(message["ctxt"], + message["publisher_id"], + message["event_type"], + message["metadata"], + message["payload"]): + continue + + ret = self._exec_callback(callback, message) + if ret == NotificationResult.REQUEUE: + return ret + return NotificationResult.HANDLED + + def _exec_callback(self, callback, message): + localcontext._set_local_context(message["ctxt"]) + + try: + return callback(message["ctxt"], + message["publisher_id"], + message["event_type"], + message["payload"], + message["metadata"]) + except Exception: + LOG.exception("Callback raised an exception.") + return NotificationResult.REQUEUE + finally: + localcontext._clear_local_context() + + def _extract_user_message(self, incoming): + ctxt = self.serializer.deserialize_context(incoming.ctxt) + message = incoming.message + + publisher_id = message.get('publisher_id') + event_type = message.get('event_type') + metadata = { + 'message_id': message.get('message_id'), + 'timestamp': message.get('timestamp') + } + priority = message.get('priority', '').lower() + payload = self.serializer.deserialize_entity(ctxt, + message.get('payload')) + return priority, incoming, dict(ctxt=ctxt, + publisher_id=publisher_id, + event_type=event_type, + payload=payload, + metadata=metadata) + + +class BatchNotificationDispatcher(NotificationDispatcher): + """A message dispatcher which understands Notification messages. + + A MessageHandlingServer is constructed by passing a callable dispatcher + which is invoked with a list of message dictionaries each time 'batch_size' + messages are received or 'batch_timeout' seconds is reached. + """ + + def dispatch(self, incoming): """Dispatch notification messages to the appropriate endpoint method. """ @@ -120,70 +152,14 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase): continue ret = self._exec_callback(callback, filtered_messages) - if self.allow_requeue and ret == NotificationResult.REQUEUE: + if ret == NotificationResult.REQUEUE: requeues.update(raw_messages) break return requeues - def _exec_callback(self, callback, *args): - ret = callback(*args) - return NotificationResult.HANDLED if ret is None else ret - - def _extract_user_message(self, incoming): - ctxt = self.serializer.deserialize_context(incoming.ctxt) - message = incoming.message - - publisher_id = message.get('publisher_id') - event_type = message.get('event_type') - metadata = { - 'message_id': message.get('message_id'), - 'timestamp': message.get('timestamp') - } - priority = message.get('priority', '').lower() - payload = self.serializer.deserialize_entity(ctxt, - message.get('payload')) - return priority, incoming, dict(ctxt=ctxt, - publisher_id=publisher_id, - event_type=event_type, - payload=payload, - metadata=metadata) - - -class NotificationDispatcher(_NotificationDispatcherBase): - """A message dispatcher which understands Notification messages. - - A MessageHandlingServer is constructed by passing a callable dispatcher - which is invoked with context and message dictionaries each time a message - is received. - """ def _exec_callback(self, callback, messages): - localcontext._set_local_context( - messages[0]["ctxt"]) try: - return super(NotificationDispatcher, self)._exec_callback( - callback, - messages[0]["ctxt"], - messages[0]["publisher_id"], - messages[0]["event_type"], - messages[0]["payload"], - messages[0]["metadata"]) - finally: - localcontext._clear_local_context() - - -class BatchNotificationDispatcher(_NotificationDispatcherBase): - """A message dispatcher which understands Notification messages. - - A MessageHandlingServer is constructed by passing a callable dispatcher - which is invoked with a list of message dictionaries each time 'batch_size' - messages are received or 'batch_timeout' seconds is reached. - """ - - def __init__(self, targets, endpoints, serializer, allow_requeue, - pool=None, batch_size=None, batch_timeout=None): - super(BatchNotificationDispatcher, self).__init__(targets, endpoints, - serializer, - allow_requeue, - pool) - self.batch_size = batch_size - self.batch_timeout = batch_timeout + return callback(messages) + except Exception: + LOG.exception("Callback raised an exception.") + return NotificationResult.REQUEUE diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index 8855d5875..308e081f8 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -103,10 +103,89 @@ by passing allow_requeue=True to get_notification_listener(). If the driver does not support requeueing, it will raise NotImplementedError at this point. """ +import itertools +import logging +from oslo_messaging._i18n import _LE from oslo_messaging.notify import dispatcher as notify_dispatcher from oslo_messaging import server as msg_server +LOG = logging.getLogger(__name__) + + +class NotificationServer(msg_server.MessageHandlingServer): + def __init__(self, transport, targets, dispatcher, executor='blocking', + allow_requeue=True, pool=None): + super(NotificationServer, self).__init__(transport, dispatcher, + executor) + self._allow_requeue = allow_requeue + self._pool = pool + self.targets = targets + self._targets_priorities = set( + itertools.product(self.targets, + self.dispatcher.supported_priorities) + ) + + def _create_listener(self): + return msg_server.SingleMessageListenerAdapter( + self.transport._listen_for_notifications( + self._targets_priorities, self._pool + ) + ) + + def _process_incoming(self, incoming): + res = notify_dispatcher.NotificationResult.REQUEUE + try: + res = self.dispatcher.dispatch(incoming) + except Exception: + LOG.error(_LE('Exception during message handling'), exc_info=True) + + try: + if (res == notify_dispatcher.NotificationResult.REQUEUE and + self._allow_requeue): + incoming.requeue() + else: + incoming.acknowledge() + except Exception: + LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) + + +class BatchNotificationServer(NotificationServer): + def __init__(self, transport, targets, dispatcher, executor='blocking', + allow_requeue=True, pool=None, batch_size=1, + batch_timeout=None): + super(BatchNotificationServer, self).__init__( + transport=transport, targets=targets, dispatcher=dispatcher, + executor=executor, allow_requeue=allow_requeue, pool=pool + ) + + self._batch_size = batch_size + self._batch_timeout = batch_timeout + + def _create_listener(self): + return msg_server.BatchMessageListenerAdapter( + self.transport._listen_for_notifications( + self._targets_priorities, self._pool + ), + timeout=self._batch_timeout, + batch_size=self._batch_size + ) + + def _process_incoming(self, incoming): + try: + not_processed_messages = self.dispatcher.dispatch(incoming) + except Exception: + not_processed_messages = set(incoming) + LOG.error(_LE('Exception during message handling'), exc_info=True) + for m in incoming: + try: + if m in not_processed_messages and self._allow_requeue: + m.requeue() + else: + m.acknowledge() + except Exception: + LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) + def get_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None, @@ -137,10 +216,10 @@ def get_notification_listener(transport, targets, endpoints, :type pool: str :raises: NotImplementedError """ - dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints, - serializer, - allow_requeue, pool) - return msg_server.MessageHandlingServer(transport, dispatcher, executor) + dispatcher = notify_dispatcher.NotificationDispatcher(endpoints, + serializer) + return NotificationServer(transport, targets, dispatcher, executor, + allow_requeue, pool) def get_batch_notification_listener(transport, targets, endpoints, @@ -180,6 +259,8 @@ def get_batch_notification_listener(transport, targets, endpoints, :raises: NotImplementedError """ dispatcher = notify_dispatcher.BatchNotificationDispatcher( - targets, endpoints, serializer, allow_requeue, pool, - batch_size, batch_timeout) - return msg_server.MessageHandlingServer(transport, dispatcher, executor) + endpoints, serializer) + return BatchNotificationServer( + transport, targets, dispatcher, executor, allow_requeue, pool, + batch_size, batch_timeout + ) diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index cecef2224..e94b4d461 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -29,7 +29,6 @@ import sys import six -from oslo_messaging._i18n import _LE from oslo_messaging import _utils as utils from oslo_messaging import dispatcher from oslo_messaging import localcontext @@ -94,20 +93,16 @@ class RPCDispatcher(dispatcher.DispatcherBase): """ - def __init__(self, target, endpoints, serializer): + def __init__(self, endpoints, serializer): """Construct a rpc server dispatcher. - :param target: the exchange, topic and server to listen on - :type target: Target + :param endpoints: list of endpoint objects for dispatching to + :param serializer: optional message serializer """ self.endpoints = endpoints self.serializer = serializer or msg_serializer.NoOpSerializer() self._default_target = msg_target.Target() - self._target = target - - def _listen(self, transport): - return transport._listen(self._target) @staticmethod def _is_namespace(target, namespace): @@ -127,43 +122,16 @@ class RPCDispatcher(dispatcher.DispatcherBase): result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result) - def __call__(self, incoming): - incoming[0].acknowledge() - return dispatcher.DispatcherExecutorContext( - incoming[0], self._dispatch_and_reply) - - def _dispatch_and_reply(self, incoming): - try: - incoming.reply(self._dispatch(incoming.ctxt, - incoming.message)) - except ExpectedException as e: - LOG.debug(u'Expected exception during message handling (%s)', - e.exc_info[1]) - incoming.reply(failure=e.exc_info, log_failure=False) - except Exception as e: - # current sys.exc_info() content can be overriden - # by another exception raise by a log handler during - # LOG.exception(). So keep a copy and delete it later. - exc_info = sys.exc_info() - try: - LOG.error(_LE('Exception during message handling: %s'), e, - exc_info=exc_info) - incoming.reply(failure=exc_info) - finally: - # NOTE(dhellmann): Remove circular object reference - # between the current stack frame and the traceback in - # exc_info. - del exc_info - - def _dispatch(self, ctxt, message): + def dispatch(self, incoming): """Dispatch an RPC message to the appropriate endpoint method. - :param ctxt: the request context - :type ctxt: dict - :param message: the message payload - :type message: dict + :param incoming: incoming message + :type incoming: IncomingMessage :raises: NoSuchMethod, UnsupportedVersion """ + message = incoming.message + ctxt = incoming.ctxt + method = message.get('method') args = message.get('args', {}) namespace = message.get('namespace') diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 74dbede44..61c0b4e1f 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -102,9 +102,53 @@ __all__ = [ 'expected_exceptions', ] +import logging +import sys + +from oslo_messaging._i18n import _LE from oslo_messaging.rpc import dispatcher as rpc_dispatcher from oslo_messaging import server as msg_server +LOG = logging.getLogger(__name__) + + +class RPCServer(msg_server.MessageHandlingServer): + def __init__(self, transport, target, dispatcher, executor='blocking'): + super(RPCServer, self).__init__(transport, dispatcher, executor) + self._target = target + + def _create_listener(self): + return msg_server.SingleMessageListenerAdapter( + self.transport._listen(self._target) + ) + + def _process_incoming(self, incoming): + incoming.acknowledge() + try: + res = self.dispatcher.dispatch(incoming) + except rpc_dispatcher.ExpectedException as e: + LOG.debug(u'Expected exception during message handling (%s)', + e.exc_info[1]) + incoming.reply(failure=e.exc_info) + except Exception as e: + # current sys.exc_info() content can be overriden + # by another exception raise by a log handler during + # LOG.exception(). So keep a copy and delete it later. + exc_info = sys.exc_info() + try: + LOG.exception(_LE('Exception during message handling: %s'), e) + incoming.reply(failure=exc_info) + finally: + # NOTE(dhellmann): Remove circular object reference + # between the current stack frame and the traceback in + # exc_info. + del exc_info + else: + try: + incoming.reply(res) + except Exception: + LOG.Exception("Can not send reply for message %s", incoming) + def get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None): @@ -129,8 +173,8 @@ def get_rpc_server(transport, target, endpoints, :param serializer: an optional entity serializer :type serializer: Serializer """ - dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer) - return msg_server.MessageHandlingServer(transport, dispatcher, executor) + dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer) + return RPCServer(transport, target, dispatcher, executor) def expected_exceptions(*exceptions): diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index b702c079d..4452a9139 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -23,6 +23,7 @@ __all__ = [ 'ServerListenError', ] +import abc import functools import inspect import logging @@ -34,6 +35,7 @@ from oslo_service import service from oslo_utils import eventletutils from oslo_utils import excutils from oslo_utils import timeutils +import six from stevedore import driver from oslo_messaging._drivers import base as driver_base @@ -295,6 +297,42 @@ def ordered(after=None, reset_after=None): return _ordered +@six.add_metaclass(abc.ABCMeta) +class MessageListenerAdapter(object): + def __init__(self, driver_listener, timeout=None): + self._driver_listener = driver_listener + self._timeout = timeout + + @abc.abstractmethod + def poll(self): + """Poll incoming and return incoming request""" + + def stop(self): + self._driver_listener.stop() + + def cleanup(self): + self._driver_listener.cleanup() + + +class SingleMessageListenerAdapter(MessageListenerAdapter): + def poll(self): + msgs = self._driver_listener.poll(prefetch_size=1, + timeout=self._timeout) + return msgs[0] if msgs else None + + +class BatchMessageListenerAdapter(MessageListenerAdapter): + def __init__(self, driver_listener, timeout=None, batch_size=1): + super(BatchMessageListenerAdapter, self).__init__(driver_listener, + timeout) + self._batch_size = batch_size + + def poll(self): + return self._driver_listener.poll(prefetch_size=self._batch_size, + timeout=self._timeout) + + +@six.add_metaclass(abc.ABCMeta) class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): """Server for handling messages. @@ -345,9 +383,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): super(MessageHandlingServer, self).__init__() - def _submit_work(self, callback): - fut = self._work_executor.submit(callback.run) - fut.add_done_callback(lambda f: callback.done()) + @abc.abstractmethod + def _process_incoming(self, incoming): + """Process incoming request + + :param incoming: incoming request. + """ + + @abc.abstractmethod + def _create_listener(self): + """Creates listener object for polling requests + :return: MessageListenerAdapter + """ @ordered(reset_after='stop') def start(self, override_pool_size=None): @@ -374,7 +421,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): self._started = True try: - self.listener = self.dispatcher._listen(self.transport) + self.listener = self._create_listener() except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) @@ -412,22 +459,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): @excutils.forever_retry_uncaught_exceptions def _runner(self): while self._started: - incoming = self.listener.poll( - timeout=self.dispatcher.batch_timeout, - prefetch_size=self.dispatcher.batch_size) + incoming = self.listener.poll() if incoming: - self._submit_work(self.dispatcher(incoming)) + self._work_executor.submit(self._process_incoming, incoming) # listener is stopped but we need to process all already consumed # messages while True: - incoming = self.listener.poll( - timeout=self.dispatcher.batch_timeout, - prefetch_size=self.dispatcher.batch_size) + incoming = self.listener.poll() if incoming: - self._submit_work(self.dispatcher(incoming)) + self._work_executor.submit(self._process_incoming, incoming) else: return diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py index 2442d535e..3818a7fd8 100644 --- a/oslo_messaging/tests/notify/test_dispatcher.py +++ b/oslo_messaging/tests/notify/test_dispatcher.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import itertools - from oslo_utils import timeutils import testscenarios @@ -25,7 +23,6 @@ from six.moves import mock load_tests = testscenarios.load_tests_apply_scenarios - notification_msg = dict( publisher_id="publisher_id", event_type="compute.start", @@ -96,20 +93,21 @@ class TestDispatcher(test_utils.BaseTestCase): msg = notification_msg.copy() msg['priority'] = self.priority - targets = [oslo_messaging.Target(topic='notifications')] - dispatcher = notify_dispatcher.NotificationDispatcher( - targets, endpoints, None, allow_requeue=True, pool=None) - - # check it listen on wanted topics - self.assertEqual(sorted(set((targets[0], prio) - for prio in itertools.chain.from_iterable( - self.endpoints))), - sorted(dispatcher._targets_priorities)) + dispatcher = notify_dispatcher.NotificationDispatcher(endpoints, None) incoming = mock.Mock(ctxt={}, message=msg) - callback = dispatcher([incoming]) - callback.run() - callback.done() + + res = dispatcher.dispatch(incoming) + + expected_res = ( + notify_dispatcher.NotificationResult.REQUEUE + if (self.return_value == + notify_dispatcher.NotificationResult.REQUEUE or + self.ex is not None) + else notify_dispatcher.NotificationResult.HANDLED + ) + + self.assertEqual(expected_res, res) # check endpoint callbacks are called or not for i, endpoint_methods in enumerate(self.endpoints): @@ -127,26 +125,14 @@ class TestDispatcher(test_utils.BaseTestCase): else: self.assertEqual(0, endpoints[i].call_count) - if self.ex: - self.assertEqual(1, incoming.acknowledge.call_count) - self.assertEqual(0, incoming.requeue.call_count) - elif self.return_value == oslo_messaging.NotificationResult.HANDLED \ - or self.return_value is None: - self.assertEqual(1, incoming.acknowledge.call_count) - self.assertEqual(0, incoming.requeue.call_count) - elif self.return_value == oslo_messaging.NotificationResult.REQUEUE: - self.assertEqual(0, incoming.acknowledge.call_count) - self.assertEqual(1, incoming.requeue.call_count) - @mock.patch('oslo_messaging.notify.dispatcher.LOG') def test_dispatcher_unknown_prio(self, mylog): msg = notification_msg.copy() msg['priority'] = 'what???' dispatcher = notify_dispatcher.NotificationDispatcher( - [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None) - callback = dispatcher([mock.Mock(ctxt={}, message=msg)]) - callback.run() - callback.done() + [mock.Mock()], None) + res = dispatcher.dispatch(mock.Mock(ctxt={}, message=msg)) + self.assertEqual(None, res) mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') @@ -236,9 +222,8 @@ class TestDispatcherFilter(test_utils.BaseTestCase): **self.filter_rule) endpoint = mock.Mock(spec=['info'], filter_rule=notification_filter) - targets = [oslo_messaging.Target(topic='notifications')] dispatcher = notify_dispatcher.NotificationDispatcher( - targets, [endpoint], serializer=None, allow_requeue=True) + [endpoint], serializer=None) message = {'payload': {'state': 'active'}, 'priority': 'info', 'publisher_id': self.publisher_id, @@ -246,9 +231,7 @@ class TestDispatcherFilter(test_utils.BaseTestCase): 'timestamp': '2014-03-03 18:21:04.369234', 'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'} incoming = mock.Mock(ctxt=self.context, message=message) - callback = dispatcher([incoming]) - callback.run() - callback.done() + dispatcher.dispatch(incoming) if self.match: self.assertEqual(1, endpoint.info.call_count) diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index bf56288fc..92e8f6435 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -351,8 +351,7 @@ class TestLogNotifier(test_utils.BaseTestCase): logger = mock.MagicMock() logger.info = mock.MagicMock() message = {'password': 'passw0rd', 'event_type': 'foo'} - json_str = jsonutils.dumps(message) - mask_str = strutils.mask_password(json_str) + mask_str = jsonutils.dumps(strutils.mask_dict_password(message)) with mock.patch.object(logging, 'getLogger') as gl: gl.return_value = logger diff --git a/oslo_messaging/tests/rpc/test_dispatcher.py b/oslo_messaging/tests/rpc/test_dispatcher.py index 672733a05..2a2f7b474 100644 --- a/oslo_messaging/tests/rpc/test_dispatcher.py +++ b/oslo_messaging/tests/rpc/test_dispatcher.py @@ -109,33 +109,29 @@ class TestDispatcher(test_utils.BaseTestCase): for e in self.endpoints] serializer = None - target = oslo_messaging.Target() - dispatcher = oslo_messaging.RPCDispatcher(target, endpoints, - serializer) - - def check_reply(reply=None, failure=None, log_failure=True): - if self.ex and failure is not None: - ex = failure[1] - self.assertFalse(self.success, ex) - self.assertIsNotNone(self.ex, ex) - self.assertIsInstance(ex, self.ex, ex) - if isinstance(ex, oslo_messaging.NoSuchMethod): - self.assertEqual(self.msg.get('method'), ex.method) - elif isinstance(ex, oslo_messaging.UnsupportedVersion): - self.assertEqual(self.msg.get('version', '1.0'), - ex.version) - if ex.method: - self.assertEqual(self.msg.get('method'), ex.method) - else: - self.assertTrue(self.success, failure) - self.assertIsNone(failure) + dispatcher = oslo_messaging.RPCDispatcher(endpoints, serializer) incoming = mock.Mock(ctxt=self.ctxt, message=self.msg) - incoming.reply.side_effect = check_reply - callback = dispatcher([incoming]) - callback.run() - callback.done() + res = None + + try: + res = dispatcher.dispatch(incoming) + except Exception as ex: + self.assertFalse(self.success, ex) + self.assertIsNotNone(self.ex, ex) + self.assertIsInstance(ex, self.ex, ex) + if isinstance(ex, oslo_messaging.NoSuchMethod): + self.assertEqual(self.msg.get('method'), ex.method) + elif isinstance(ex, oslo_messaging.UnsupportedVersion): + self.assertEqual(self.msg.get('version', '1.0'), + ex.version) + if ex.method: + self.assertEqual(self.msg.get('method'), ex.method) + else: + self.assertTrue(self.success, + "Not expected success of operation durung testing") + self.assertIsNotNone(res) for n, endpoint in enumerate(endpoints): for method_name in ['foo', 'bar']: @@ -147,8 +143,6 @@ class TestDispatcher(test_utils.BaseTestCase): else: self.assertEqual(0, method.call_count) - self.assertEqual(1, incoming.reply.call_count) - class TestSerializer(test_utils.BaseTestCase): @@ -165,9 +159,7 @@ class TestSerializer(test_utils.BaseTestCase): def test_serializer(self): endpoint = _FakeEndpoint() serializer = msg_serializer.NoOpSerializer() - target = oslo_messaging.Target() - dispatcher = oslo_messaging.RPCDispatcher(target, [endpoint], - serializer) + dispatcher = oslo_messaging.RPCDispatcher([endpoint], serializer) self.mox.StubOutWithMock(endpoint, 'foo') args = dict([(k, 'd' + v) for k, v in self.args.items()]) @@ -187,7 +179,9 @@ class TestSerializer(test_utils.BaseTestCase): self.mox.ReplayAll() - retval = dispatcher._dispatch(self.ctxt, dict(method='foo', - args=self.args)) + incoming = mock.Mock() + incoming.ctxt = self.ctxt + incoming.message = dict(method='foo', args=self.args) + retval = dispatcher.dispatch(incoming) if self.retval is not None: self.assertEqual('s' + self.retval, retval) diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index effa640f2..3b98eaf4f 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -149,7 +149,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): serializer=serializer) # Mocking executor server._executor_cls = MagicMockIgnoreArgs - server.listener = MagicMockIgnoreArgs() + server._create_listener = MagicMockIgnoreArgs() server.dispatcher = MagicMockIgnoreArgs() # Here assigning executor's listener object to listener variable # before calling wait method, because in wait method we are @@ -551,7 +551,6 @@ class TestServerLocking(test_utils.BaseTestCase): def __init__(self, *args, **kwargs): self._lock = threading.Lock() self._calls = [] - self.listener = mock.MagicMock() executors.append(self) submit = _logmethod('submit') @@ -559,9 +558,16 @@ class TestServerLocking(test_utils.BaseTestCase): self.executors = executors - self.server = oslo_messaging.MessageHandlingServer(mock.Mock(), - mock.Mock()) + class MessageHandlingServerImpl(oslo_messaging.MessageHandlingServer): + def _create_listener(self): + pass + + def _process_incoming(self, incoming): + pass + + self.server = MessageHandlingServerImpl(mock.Mock(), mock.Mock()) self.server._executor_cls = FakeExecutor + self.server._create_listener = mock.Mock() def test_start_stop_wait(self): # Test a simple execution of start, stop, wait in order diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index 1daa11d3f..2ca8f8a2e 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -61,6 +61,14 @@ class OptsTestCase(test_utils.BaseTestCase): def test_defaults(self): transport = mock.Mock() transport.conf = self.conf - server.MessageHandlingServer(transport, mock.Mock()) + + class MessageHandlingServerImpl(server.MessageHandlingServer): + def _create_listener(self): + pass + + def _process_incoming(self, incoming): + pass + + MessageHandlingServerImpl(transport, mock.Mock()) opts.set_defaults(self.conf, executor_thread_pool_size=100) self.assertEqual(100, self.conf.executor_thread_pool_size)