Merge "Make the dispatcher responsible to listen()"

This commit is contained in:
Jenkins 2014-02-04 21:01:15 +00:00 committed by Gerrit Code Review
commit 84298bd041
5 changed files with 23 additions and 15 deletions

View File

@ -29,7 +29,7 @@ from oslo.messaging import _utils as utils
from oslo.messaging import localcontext from oslo.messaging import localcontext
from oslo.messaging import serializer as msg_serializer from oslo.messaging import serializer as msg_serializer
from oslo.messaging import server as msg_server from oslo.messaging import server as msg_server
from oslo.messaging import target from oslo.messaging import target as msg_target
class RPCDispatcherError(msg_server.MessagingServerError): class RPCDispatcherError(msg_server.MessagingServerError):
@ -68,12 +68,24 @@ class RPCDispatcher(object):
Endpoints may have a target attribute describing the namespace and version Endpoints may have a target attribute describing the namespace and version
of the methods exposed by that object. All public methods on an endpoint of the methods exposed by that object. All public methods on an endpoint
object are remotely invokable by clients. object are remotely invokable by clients.
""" """
def __init__(self, endpoints, serializer): def __init__(self, target, endpoints, serializer):
"""Construct a rpc server dispatcher.
:param target: the exchange, topic and server to listen on
:type target: Target
"""
self.endpoints = endpoints self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer() self.serializer = serializer or msg_serializer.NoOpSerializer()
self._default_target = target.Target() self._default_target = msg_target.Target()
self._target = target
def _listen(self, transport):
return transport._listen(self._target)
@staticmethod @staticmethod
def _is_namespace(target, namespace): def _is_namespace(target, namespace):

View File

@ -121,9 +121,8 @@ def get_rpc_server(transport, target, endpoints,
:param serializer: an optional entity serializer :param serializer: an optional entity serializer
:type serializer: Serializer :type serializer: Serializer
""" """
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer) dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
return msg_server.MessageHandlingServer(transport, target, return msg_server.MessageHandlingServer(transport, dispatcher, executor)
dispatcher, executor)
class ExpectedException(Exception): class ExpectedException(Exception):

View File

@ -61,7 +61,7 @@ class MessageHandlingServer(object):
new tasks. new tasks.
""" """
def __init__(self, transport, target, dispatcher, executor='blocking'): def __init__(self, transport, dispatcher, executor='blocking'):
"""Construct a message handling server. """Construct a message handling server.
The dispatcher parameter is a callable which is invoked with context The dispatcher parameter is a callable which is invoked with context
@ -73,8 +73,6 @@ class MessageHandlingServer(object):
:param transport: the messaging transport :param transport: the messaging transport
:type transport: Transport :type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param dispatcher: a callable which is invoked for each method :param dispatcher: a callable which is invoked for each method
:type dispatcher: callable :type dispatcher: callable
:param executor: name of message executor - e.g. 'eventlet', 'blocking' :param executor: name of message executor - e.g. 'eventlet', 'blocking'
@ -83,7 +81,6 @@ class MessageHandlingServer(object):
self.conf = transport.conf self.conf = transport.conf
self.transport = transport self.transport = transport
self.target = target
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.executor = executor self.executor = executor
@ -116,9 +113,8 @@ class MessageHandlingServer(object):
""" """
if self._executor is not None: if self._executor is not None:
return return
try: try:
listener = self.transport._listen(self.target) listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex: except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex) raise ServerListenError(self.target, ex)

View File

@ -97,7 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase):
endpoints.append(_FakeEndpoint(target)) endpoints.append(_FakeEndpoint(target))
serializer = None serializer = None
dispatcher = messaging.RPCDispatcher(endpoints, serializer) target = messaging.Target()
dispatcher = messaging.RPCDispatcher(target, endpoints, serializer)
if self.dispatch_to is not None: if self.dispatch_to is not None:
endpoint = endpoints[self.dispatch_to['endpoint']] endpoint = endpoints[self.dispatch_to['endpoint']]
@ -139,7 +140,8 @@ class TestSerializer(test_utils.BaseTestCase):
def test_serializer(self): def test_serializer(self):
endpoint = _FakeEndpoint() endpoint = _FakeEndpoint()
serializer = msg_serializer.NoOpSerializer() serializer = msg_serializer.NoOpSerializer()
dispatcher = messaging.RPCDispatcher([endpoint], serializer) target = messaging.Target()
dispatcher = messaging.RPCDispatcher(target, [endpoint], serializer)
self.mox.StubOutWithMock(endpoint, 'foo') self.mox.StubOutWithMock(endpoint, 'foo')
args = dict([(k, 'd' + v) for k, v in self.args.items()]) args = dict([(k, 'd' + v) for k, v in self.args.items()])

View File

@ -105,7 +105,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertIs(server.conf, self.conf) self.assertIs(server.conf, self.conf)
self.assertIs(server.transport, transport) self.assertIs(server.transport, transport)
self.assertIs(server.target, target)
self.assertIsInstance(server.dispatcher, messaging.RPCDispatcher) self.assertIsInstance(server.dispatcher, messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints) self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer) self.assertIs(server.dispatcher.serializer, serializer)