From cbd6672452655a2f0f872200f2bdaef760e3e61c Mon Sep 17 00:00:00 2001 From: ozamiatin Date: Thu, 7 Apr 2016 08:16:03 +0300 Subject: [PATCH] [zmq] Fix cast message loss in simulator In this change rely on eventlet socket locks and don't use queue to sync green threads access to socket. Change-Id: I1c712ecc1ac4ec995e2b7652866a8b2d691543ce Closes-Bug: #1561048 --- oslo_messaging/_drivers/impl_zmq.py | 5 +- .../publishers/dealer/zmq_dealer_publisher.py | 83 +++++++++++++------ .../_drivers/zmq_driver/client/zmq_client.py | 7 ++ .../_drivers/zmq_driver/zmq_async.py | 5 ++ .../tests/drivers/zmq/test_impl_zmq.py | 6 +- 5 files changed, 76 insertions(+), 30 deletions(-) diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 568df14da..c86e643c4 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -32,6 +32,8 @@ from oslo_messaging import server RPCException = rpc_common.RPCException _MATCHMAKER_BACKENDS = ('redis', 'dummy') _MATCHMAKER_DEFAULT = 'redis' +_CONCURRENCY_CHOICES = ('eventlet', 'native') +_CONCURRENCY_DEFAULT = 'eventlet' LOG = logging.getLogger(__name__) @@ -46,7 +48,8 @@ zmq_opts = [ choices=_MATCHMAKER_BACKENDS, help='MatchMaker driver.'), - cfg.StrOpt('rpc_zmq_concurrency', default='eventlet', + cfg.StrOpt('rpc_zmq_concurrency', default=_CONCURRENCY_DEFAULT, + choices=_CONCURRENCY_CHOICES, help='Type of concurrency used. Either "native" or "eventlet"'), cfg.IntOpt('rpc_zmq_contexts', default=1, diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index 74fbee2ca..caec2d0ab 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -55,37 +55,66 @@ class DealerPublisher(zmq_publisher_base.QueuedSender): super(DealerPublisher, self).send_request(request) -class DealerPublisherLight(zmq_publisher_base.QueuedSender): - """Used when publishing to proxy. """ +class DealerPublisherAsync(object): + """This simplified publisher is to be used with eventlet only. + Eventlet takes care about zmq sockets sharing between green threads + using queued lock. + Use DealerPublisher for other concurrency models. + """ def __init__(self, conf, matchmaker): - - def _do_send_request(socket, request): - if request.msg_type == zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern( - request.msg_type) - - envelope = request.create_envelope() - - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) - socket.send_pyobj(request) - - LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " - "a target %(target)s", - {"message": request.message_id, - "target": request.target, - "addr": list(socket.connections)}) - - sockets_manager = zmq_publisher_base.SocketsManager( + self.sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) - super(DealerPublisherLight, self).__init__( - sockets_manager, _do_send_request) - self.socket = self.outbound_sockets.get_socket_to_publishers() - def _connect_socket(self, target): - return self.socket + @staticmethod + def _send_message_data(socket, request): + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(request.create_envelope(), zmq.SNDMORE) + socket.send_pyobj(request) + + LOG.debug("Sent message_id %(message)s to a target %(target)s", + {"message": request.message_id, + "target": request.target}) + + def send_request(self, request): + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + socket = self.sockets_manager.get_socket(request.target) + + if request.msg_type in zmq_names.MULTISEND_TYPES: + for _ in range(socket.connections_count()): + self._send_message_data(socket, request) + else: + self._send_message_data(socket, request) + + def cleanup(self): + self.sockets_manager.cleanup() + + +class DealerPublisherLight(object): + """Used when publishing to a proxy. """ + + def __init__(self, conf, matchmaker): + self.sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + self.socket = self.sockets_manager.get_socket_to_publishers() + + def send_request(self, request): + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern( + request.msg_type) + + envelope = request.create_envelope() + + self.socket.send(b'', zmq.SNDMORE) + self.socket.send_pyobj(envelope, zmq.SNDMORE) + self.socket.send_pyobj(request) + + LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " + "a target %(target)s", + {"message": request.message_id, + "target": request.target, + "addr": list(self.socket.connections)}) def cleanup(self): self.socket.close() - super(DealerPublisherLight, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 7183c6114..a413d19b1 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -31,6 +31,11 @@ class ZmqClient(zmq_client_base.ZmqClientBase): default_publisher = zmq_dealer_publisher.DealerPublisher( conf, matchmaker) + cast_publisher = zmq_dealer_publisher.DealerPublisherAsync( + conf, matchmaker) \ + if zmq_async.is_eventlet_concurrency(conf) \ + else default_publisher + fanout_publisher = zmq_dealer_publisher.DealerPublisherLight( conf, matchmaker) if conf.use_pub_sub else default_publisher @@ -41,6 +46,8 @@ class ZmqClient(zmq_client_base.ZmqClientBase): zmq_dealer_call_publisher.DealerCallPublisher( conf, matchmaker), + zmq_names.CAST_TYPE: cast_publisher, + # Here use DealerPublisherLight for sending request to proxy # which finally uses PubPublisher to send fanout in case of # 'use_pub_sub' option configured. diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index 741a08b1e..259f51248 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -54,6 +54,11 @@ def get_executor(method, zmq_concurrency='eventlet'): return threading_poller.ThreadingExecutor(method) +def is_eventlet_concurrency(conf): + return _is_eventlet_zmq_available() and conf.rpc_zmq_concurrency == \ + 'eventlet' + + def _is_eventlet_zmq_available(): return importutils.try_import('eventlet.green.zmq') diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 621204d33..ed8c366bd 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -13,6 +13,7 @@ # under the License. import testtools +import time import oslo_messaging from oslo_messaging._drivers import impl_zmq @@ -95,12 +96,13 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase): target = oslo_messaging.Target(topic='testtopic', server="my@server") self.listener.listen(target) + time.sleep(0.01) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=False) - self.listener._received.wait() + self.listener._received.wait(5) self.assertIsNone(result) self.assertTrue(self.listener._received.isSet()) @@ -117,7 +119,7 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase): {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=False) - self.listener._received.wait() + self.listener._received.wait(5) self.assertIsNone(result) self.assertTrue(self.listener._received.isSet())