Make the dispatcher responsible to listen()
The dispatcher is now responsible to configure and to get the listener from the transport. The server just ask to the dispatcher to build and return a configured listener for a provider transport. Partial implements blueprint notification-subscriber-server Change-Id: I4a6d9620b8239f6d377bc5788b8a90a860b2f02c
This commit is contained in:
parent
11a90eabc9
commit
86e5737bf6
@ -29,7 +29,7 @@ from oslo.messaging import _utils as utils
|
||||
from oslo.messaging import localcontext
|
||||
from oslo.messaging import serializer as msg_serializer
|
||||
from oslo.messaging import server as msg_server
|
||||
from oslo.messaging import target
|
||||
from oslo.messaging import target as msg_target
|
||||
|
||||
|
||||
class RPCDispatcherError(msg_server.MessagingServerError):
|
||||
@ -68,12 +68,24 @@ class RPCDispatcher(object):
|
||||
Endpoints may have a target attribute describing the namespace and version
|
||||
of the methods exposed by that object. All public methods on an endpoint
|
||||
object are remotely invokable by clients.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, endpoints, serializer):
|
||||
def __init__(self, target, endpoints, serializer):
|
||||
"""Construct a rpc server dispatcher.
|
||||
|
||||
:param target: the exchange, topic and server to listen on
|
||||
:type target: Target
|
||||
"""
|
||||
|
||||
self.endpoints = endpoints
|
||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||
self._default_target = target.Target()
|
||||
self._default_target = msg_target.Target()
|
||||
self._target = target
|
||||
|
||||
def _listen(self, transport):
|
||||
return transport._listen(self._target)
|
||||
|
||||
@staticmethod
|
||||
def _is_namespace(target, namespace):
|
||||
|
@ -121,9 +121,8 @@ def get_rpc_server(transport, target, endpoints,
|
||||
:param serializer: an optional entity serializer
|
||||
:type serializer: Serializer
|
||||
"""
|
||||
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer)
|
||||
return msg_server.MessageHandlingServer(transport, target,
|
||||
dispatcher, executor)
|
||||
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
|
||||
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
|
||||
|
||||
|
||||
class ExpectedException(Exception):
|
||||
|
@ -61,7 +61,7 @@ class MessageHandlingServer(object):
|
||||
new tasks.
|
||||
"""
|
||||
|
||||
def __init__(self, transport, target, dispatcher, executor='blocking'):
|
||||
def __init__(self, transport, dispatcher, executor='blocking'):
|
||||
"""Construct a message handling server.
|
||||
|
||||
The dispatcher parameter is a callable which is invoked with context
|
||||
@ -73,8 +73,6 @@ class MessageHandlingServer(object):
|
||||
|
||||
:param transport: the messaging transport
|
||||
:type transport: Transport
|
||||
:param target: the exchange, topic and server to listen on
|
||||
:type target: Target
|
||||
:param dispatcher: a callable which is invoked for each method
|
||||
:type dispatcher: callable
|
||||
:param executor: name of message executor - e.g. 'eventlet', 'blocking'
|
||||
@ -83,7 +81,6 @@ class MessageHandlingServer(object):
|
||||
self.conf = transport.conf
|
||||
|
||||
self.transport = transport
|
||||
self.target = target
|
||||
self.dispatcher = dispatcher
|
||||
self.executor = executor
|
||||
|
||||
@ -116,9 +113,8 @@ class MessageHandlingServer(object):
|
||||
"""
|
||||
if self._executor is not None:
|
||||
return
|
||||
|
||||
try:
|
||||
listener = self.transport._listen(self.target)
|
||||
listener = self.dispatcher._listen(self.transport)
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ServerListenError(self.target, ex)
|
||||
|
||||
|
@ -97,7 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase):
|
||||
endpoints.append(_FakeEndpoint(target))
|
||||
|
||||
serializer = None
|
||||
dispatcher = messaging.RPCDispatcher(endpoints, serializer)
|
||||
target = messaging.Target()
|
||||
dispatcher = messaging.RPCDispatcher(target, endpoints, serializer)
|
||||
|
||||
if self.dispatch_to is not None:
|
||||
endpoint = endpoints[self.dispatch_to['endpoint']]
|
||||
@ -139,7 +140,8 @@ class TestSerializer(test_utils.BaseTestCase):
|
||||
def test_serializer(self):
|
||||
endpoint = _FakeEndpoint()
|
||||
serializer = msg_serializer.NoOpSerializer()
|
||||
dispatcher = messaging.RPCDispatcher([endpoint], serializer)
|
||||
target = messaging.Target()
|
||||
dispatcher = messaging.RPCDispatcher(target, [endpoint], serializer)
|
||||
|
||||
self.mox.StubOutWithMock(endpoint, 'foo')
|
||||
args = dict([(k, 'd' + v) for k, v in self.args.items()])
|
||||
|
@ -105,7 +105,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
|
||||
self.assertIs(server.conf, self.conf)
|
||||
self.assertIs(server.transport, transport)
|
||||
self.assertIs(server.target, target)
|
||||
self.assertIsInstance(server.dispatcher, messaging.RPCDispatcher)
|
||||
self.assertIs(server.dispatcher.endpoints, endpoints)
|
||||
self.assertIs(server.dispatcher.serializer, serializer)
|
||||
|
Loading…
Reference in New Issue
Block a user