Don't reply to notification message

The notification listener doesn't have anything to send to the notifier
and the notifier doesn't attend to receive something.

So this patch remove the message reply when the listener is a
notification
listener.

Partial implements blueprint notification-subscriber-server

Change-Id: Ic989947ba3b6894cde788422842fca19159ea261
This commit is contained in:
Mehdi Abaakouk 2014-02-17 12:46:17 +01:00
parent 7473d18ebe
commit 8a644c1166
9 changed files with 275 additions and 77 deletions

View File

@ -13,41 +13,17 @@
# under the License. # under the License.
import abc import abc
import logging
import sys
import six import six
from oslo import messaging
_LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class ExecutorBase(object): class ExecutorBase(object):
def __init__(self, conf, listener, callback): def __init__(self, conf, listener, dispatcher):
self.conf = conf self.conf = conf
self.listener = listener self.listener = listener
self.callback = callback self.dispatcher = dispatcher
def _dispatch(self, incoming):
try:
incoming.reply(self.callback(incoming.ctxt, incoming.message))
except messaging.ExpectedException as e:
_LOG.debug('Expected exception during message handling (%s)' %
e.exc_info[1])
incoming.reply(failure=e.exc_info, log_failure=False)
except Exception as e:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
_LOG.error('Exception during message handling: %s', e,
exc_info=exc_info)
incoming.reply(failure=exc_info)
# NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in
# exc_info.
del exc_info
@abc.abstractmethod @abc.abstractmethod
def start(self): def start(self):

View File

@ -29,14 +29,15 @@ class BlockingExecutor(base.ExecutorBase):
for simple demo programs. for simple demo programs.
""" """
def __init__(self, conf, listener, callback): def __init__(self, conf, listener, dispatcher):
super(BlockingExecutor, self).__init__(conf, listener, callback) super(BlockingExecutor, self).__init__(conf, listener, dispatcher)
self._running = False self._running = False
def start(self): def start(self):
self._running = True self._running = True
while self._running: while self._running:
self._dispatch(self.listener.poll()) with self.dispatcher(self.listener.poll()) as callback:
callback()
def stop(self): def stop(self):
self._running = False self._running = False

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import sys
import eventlet import eventlet
from eventlet import greenpool from eventlet import greenpool
import greenlet import greenlet
@ -29,6 +31,33 @@ _eventlet_opts = [
] ]
def spawn_with(ctxt, pool):
"""This is the equivalent of a with statement
but with the content of the BLOCK statement executed
into a greenthread
exception path grab from:
http://www.python.org/dev/peps/pep-0343/
"""
def complete(thread, exit):
exc = True
try:
try:
thread.wait()
except Exception:
exc = False
if not exit(*sys.exc_info()):
raise
finally:
if exc:
exit(None, None, None)
callback = ctxt.__enter__()
thread = pool.spawn(callback)
thread.link(complete, ctxt.__exit__)
class EventletExecutor(base.ExecutorBase): class EventletExecutor(base.ExecutorBase):
"""A message executor which integrates with eventlet. """A message executor which integrates with eventlet.
@ -40,8 +69,8 @@ class EventletExecutor(base.ExecutorBase):
method waits for all message dispatch greenthreads to complete. method waits for all message dispatch greenthreads to complete.
""" """
def __init__(self, conf, listener, callback): def __init__(self, conf, listener, dispatcher):
super(EventletExecutor, self).__init__(conf, listener, callback) super(EventletExecutor, self).__init__(conf, listener, dispatcher)
self.conf.register_opts(_eventlet_opts) self.conf.register_opts(_eventlet_opts)
self._thread = None self._thread = None
self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size) self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size)
@ -55,7 +84,8 @@ class EventletExecutor(base.ExecutorBase):
try: try:
while True: while True:
incoming = self.listener.poll() incoming = self.listener.poll()
self._greenpool.spawn_n(self._dispatch, incoming) spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
except greenlet.GreenletExit: except greenlet.GreenletExit:
return return

View File

@ -14,8 +14,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib
import itertools import itertools
import logging import logging
import sys
from oslo.messaging import localcontext from oslo.messaging import localcontext
from oslo.messaging import serializer as msg_serializer from oslo.messaging import serializer as msg_serializer
@ -55,7 +57,25 @@ class NotificationDispatcher(object):
def _listen(self, transport): def _listen(self, transport):
return transport._listen_for_notifications(self._targets_priorities) return transport._listen_for_notifications(self._targets_priorities)
def __call__(self, ctxt, message): @contextlib.contextmanager
def __call__(self, incoming):
yield lambda: self._dispatch_and_handle_error(incoming)
def _dispatch_and_handle_error(self, incoming):
"""Dispatch a notification message to the appropriate endpoint method.
:param incoming: the incoming notification message
:type ctxt: IncomingMessage
"""
try:
self._dispatch(incoming.ctxt, incoming.message)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error('Exception during message handling',
exc_info=exc_info)
def _dispatch(self, ctxt, message):
"""Dispatch an RPC message to the appropriate endpoint method. """Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context :param ctxt: the request context

View File

@ -21,8 +21,13 @@ __all__ = [
'RPCDispatcher', 'RPCDispatcher',
'RPCDispatcherError', 'RPCDispatcherError',
'UnsupportedVersion', 'UnsupportedVersion',
'ExpectedException',
] ]
import contextlib
import logging
import sys
import six import six
from oslo.messaging import _utils as utils from oslo.messaging import _utils as utils
@ -31,6 +36,19 @@ from oslo.messaging import serializer as msg_serializer
from oslo.messaging import server as msg_server from oslo.messaging import server as msg_server
from oslo.messaging import target as msg_target from oslo.messaging import target as msg_target
LOG = logging.getLogger(__name__)
class ExpectedException(Exception):
"""Encapsulates an expected exception raised by an RPC endpoint
Merely instantiating this exception records the current exception
information, which will be passed back to the RPC client without
exceptional logging.
"""
def __init__(self):
self.exc_info = sys.exc_info()
class RPCDispatcherError(msg_server.MessagingServerError): class RPCDispatcherError(msg_server.MessagingServerError):
"A base class for all RPC dispatcher exceptions." "A base class for all RPC dispatcher exceptions."
@ -96,7 +114,7 @@ class RPCDispatcher(object):
endpoint_version = target.version or '1.0' endpoint_version = target.version or '1.0'
return utils.version_is_compatible(endpoint_version, version) return utils.version_is_compatible(endpoint_version, version)
def _dispatch(self, endpoint, method, ctxt, args): def _do_dispatch(self, endpoint, method, ctxt, args):
ctxt = self.serializer.deserialize_context(ctxt) ctxt = self.serializer.deserialize_context(ctxt)
new_args = dict() new_args = dict()
for argname, arg in six.iteritems(args): for argname, arg in six.iteritems(args):
@ -104,7 +122,30 @@ class RPCDispatcher(object):
result = getattr(endpoint, method)(ctxt, **new_args) result = getattr(endpoint, method)(ctxt, **new_args)
return self.serializer.serialize_entity(ctxt, result) return self.serializer.serialize_entity(ctxt, result)
def __call__(self, ctxt, message): @contextlib.contextmanager
def __call__(self, incoming):
yield lambda: self._dispatch_and_reply(incoming)
def _dispatch_and_reply(self, incoming):
try:
incoming.reply(self._dispatch(incoming.ctxt,
incoming.message))
except ExpectedException as e:
LOG.debug('Expected exception during message handling (%s)' %
e.exc_info[1])
incoming.reply(failure=e.exc_info, log_failure=False)
except Exception as e:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error('Exception during message handling: %s', e,
exc_info=exc_info)
incoming.reply(failure=exc_info)
# NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in
# exc_info.
del exc_info
def _dispatch(self, ctxt, message):
"""Dispatch an RPC message to the appropriate endpoint method. """Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context :param ctxt: the request context
@ -131,7 +172,7 @@ class RPCDispatcher(object):
if hasattr(endpoint, method): if hasattr(endpoint, method):
localcontext.set_local_context(ctxt) localcontext.set_local_context(ctxt)
try: try:
return self._dispatch(endpoint, method, ctxt, args) return self._do_dispatch(endpoint, method, ctxt, args)
finally: finally:
localcontext.clear_local_context() localcontext.clear_local_context()

View File

@ -92,12 +92,9 @@ to - primitive types.
__all__ = [ __all__ = [
'get_rpc_server', 'get_rpc_server',
'ExpectedException',
'expected_exceptions', 'expected_exceptions',
] ]
import sys
from oslo.messaging.rpc import dispatcher as rpc_dispatcher from oslo.messaging.rpc import dispatcher as rpc_dispatcher
from oslo.messaging import server as msg_server from oslo.messaging import server as msg_server
@ -125,17 +122,6 @@ def get_rpc_server(transport, target, endpoints,
return msg_server.MessageHandlingServer(transport, dispatcher, executor) return msg_server.MessageHandlingServer(transport, dispatcher, executor)
class ExpectedException(Exception):
"""Encapsulates an expected exception raised by an RPC endpoint
Merely instantiating this exception records the current exception
information, which will be passed back to the RPC client without
exceptional logging.
"""
def __init__(self):
self.exc_info = sys.exc_info()
def expected_exceptions(*exceptions): def expected_exceptions(*exceptions):
"""Decorator for RPC endpoint methods that raise expected exceptions. """Decorator for RPC endpoint methods that raise expected exceptions.
@ -158,6 +144,6 @@ def expected_exceptions(*exceptions):
# derived from the args passed to us will be # derived from the args passed to us will be
# ignored and thrown as normal. # ignored and thrown as normal.
except exceptions: except exceptions:
raise ExpectedException() raise rpc_dispatcher.ExpectedException()
return inner return inner
return outer return outer

131
tests/test_executor.py Normal file
View File

@ -0,0 +1,131 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import eventlet
import threading
import mock
import testscenarios
from oslo.messaging._executors import impl_blocking
from oslo.messaging._executors import impl_eventlet
from tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class TestExecutor(test_utils.BaseTestCase):
_impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor,
stop_before_return=True)),
('eventlet', dict(executor=impl_eventlet.EventletExecutor,
stop_before_return=False))]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._impl)
@staticmethod
def _run_in_thread(executor):
def thread():
executor.start()
executor.wait()
thread = threading.Thread(target=thread)
thread.daemon = True
thread.start()
thread.join(timeout=30)
def test_executor_dispatch(self):
callback = mock.MagicMock(return_value='result')
class Dispatcher(object):
@contextlib.contextmanager
def __call__(self, incoming):
yield lambda: callback(incoming.ctxt, incoming.message)
listener = mock.Mock(spec=['poll'])
executor = self.executor(self.conf, listener, Dispatcher())
incoming_message = mock.MagicMock(ctxt={},
message={'payload': 'data'})
def fake_poll():
if self.stop_before_return:
executor.stop()
return incoming_message
else:
if listener.poll.call_count == 1:
return incoming_message
executor.stop()
listener.poll.side_effect = fake_poll
self._run_in_thread(executor)
callback.assert_called_once_with({}, {'payload': 'data'})
TestExecutor.generate_scenarios()
class ExceptedException(Exception):
pass
class EventletContextManagerSpawnTest(test_utils.BaseTestCase):
def setUp(self):
super(EventletContextManagerSpawnTest, self).setUp()
self.before = mock.Mock()
self.callback = mock.Mock()
self.after = mock.Mock()
self.exception_call = mock.Mock()
@contextlib.contextmanager
def context_mgr():
self.before()
try:
yield lambda: self.callback()
except ExceptedException:
self.exception_call()
self.after()
self.mgr = context_mgr()
def test_normal_run(self):
impl_eventlet.spawn_with(self.mgr, pool=eventlet)
eventlet.sleep(0)
self.assertEqual(self.before.call_count, 1)
self.assertEqual(self.callback.call_count, 1)
self.assertEqual(self.after.call_count, 1)
self.assertEqual(self.exception_call.call_count, 0)
def test_excepted_exception(self):
self.callback.side_effect = ExceptedException
impl_eventlet.spawn_with(self.mgr, pool=eventlet)
eventlet.sleep(0)
self.assertEqual(self.before.call_count, 1)
self.assertEqual(self.callback.call_count, 1)
self.assertEqual(self.after.call_count, 1)
self.assertEqual(self.exception_call.call_count, 1)
def test_unexcepted_exception(self):
self.callback.side_effect = Exception
impl_eventlet.spawn_with(self.mgr, pool=eventlet)
eventlet.sleep(0)
self.assertEqual(self.before.call_count, 1)
self.assertEqual(self.callback.call_count, 1)
self.assertEqual(self.after.call_count, 0)
self.assertEqual(self.exception_call.call_count, 0)

View File

@ -73,7 +73,9 @@ class TestDispatcher(test_utils.BaseTestCase):
for prio in itertools.chain.from_iterable( for prio in itertools.chain.from_iterable(
self.endpoints)))) self.endpoints))))
dispatcher({}, msg) incoming = mock.Mock(ctxt={}, message=msg)
with dispatcher(incoming) as callback:
callback()
# check endpoint callbacks are called or not # check endpoint callbacks are called or not
for i, endpoint_methods in enumerate(self.endpoints): for i, endpoint_methods in enumerate(self.endpoints):
@ -94,5 +96,6 @@ class TestDispatcher(test_utils.BaseTestCase):
dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()], dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()],
[mock.Mock()], [mock.Mock()],
None) None)
dispatcher({}, msg) with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
callback()
mylog.warning.assert_called_once_with('Unknown priority "what???"') mylog.warning.assert_called_once_with('Unknown priority "what???"')

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import mock
import testscenarios import testscenarios
from oslo import messaging from oslo import messaging
@ -91,38 +92,46 @@ class TestDispatcher(test_utils.BaseTestCase):
] ]
def test_dispatcher(self): def test_dispatcher(self):
endpoints = [] endpoints = [mock.Mock(spec=_FakeEndpoint,
for e in self.endpoints: target=messaging.Target(**e))
target = messaging.Target(**e) if e else None for e in self.endpoints]
endpoints.append(_FakeEndpoint(target))
serializer = None serializer = None
target = messaging.Target() target = messaging.Target()
dispatcher = messaging.RPCDispatcher(target, endpoints, serializer) dispatcher = messaging.RPCDispatcher(target, endpoints, serializer)
if self.dispatch_to is not None: def check_reply(reply=None, failure=None, log_failure=True):
endpoint = endpoints[self.dispatch_to['endpoint']] if self.ex and failure is not None:
method = self.dispatch_to['method'] ex = failure[1]
self.assertFalse(self.success, ex)
self.assertIsNotNone(self.ex, ex)
self.assertIsInstance(ex, self.ex, ex)
if isinstance(ex, messaging.NoSuchMethod):
self.assertEqual(ex.method, self.msg.get('method'))
elif isinstance(ex, messaging.UnsupportedVersion):
self.assertEqual(ex.version,
self.msg.get('version', '1.0'))
else:
self.assertTrue(self.success, failure)
self.assertIsNone(failure)
self.mox.StubOutWithMock(endpoint, method) incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
incoming.reply.side_effect = check_reply
method = getattr(endpoint, method) with dispatcher(incoming) as callback:
method(self.ctxt, **self.msg.get('args', {})) callback()
self.mox.ReplayAll() for n, endpoint in enumerate(endpoints):
for method_name in ['foo', 'bar']:
method = getattr(endpoint, method_name)
if self.dispatch_to and n == self.dispatch_to['endpoint'] and \
method_name == self.dispatch_to['method']:
method.assert_called_once_with(
self.ctxt, **self.msg.get('args', {}))
else:
self.assertEqual(method.call_count, 0)
try: self.assertEqual(incoming.reply.call_count, 1)
dispatcher(self.ctxt, self.msg)
except Exception as ex:
self.assertFalse(self.success, ex)
self.assertIsNotNone(self.ex, ex)
self.assertIsInstance(ex, self.ex, ex)
if isinstance(ex, messaging.NoSuchMethod):
self.assertEqual(ex.method, self.msg.get('method'))
elif isinstance(ex, messaging.UnsupportedVersion):
self.assertEqual(ex.version, self.msg.get('version', '1.0'))
else:
self.assertTrue(self.success)
class TestSerializer(test_utils.BaseTestCase): class TestSerializer(test_utils.BaseTestCase):
@ -161,6 +170,7 @@ class TestSerializer(test_utils.BaseTestCase):
self.mox.ReplayAll() self.mox.ReplayAll()
retval = dispatcher(self.ctxt, dict(method='foo', args=self.args)) retval = dispatcher._dispatch(self.ctxt, dict(method='foo',
args=self.args))
if self.retval is not None: if self.retval is not None:
self.assertEqual(retval, 's' + self.retval) self.assertEqual(retval, 's' + self.retval)