[zmq] Fix cast message loss in simulator

In this change rely on eventlet socket locks and
don't use queue to sync green threads access to socket.

Change-Id: I1c712ecc1ac4ec995e2b7652866a8b2d691543ce
Closes-Bug: #1561048
This commit is contained in:
ozamiatin 2016-04-07 08:16:03 +03:00 committed by Oleksii Zamiatin
parent ee394d3c5b
commit cbd6672452
5 changed files with 76 additions and 30 deletions

View File

@ -32,6 +32,8 @@ from oslo_messaging import server
RPCException = rpc_common.RPCException
_MATCHMAKER_BACKENDS = ('redis', 'dummy')
_MATCHMAKER_DEFAULT = 'redis'
_CONCURRENCY_CHOICES = ('eventlet', 'native')
_CONCURRENCY_DEFAULT = 'eventlet'
LOG = logging.getLogger(__name__)
@ -46,7 +48,8 @@ zmq_opts = [
choices=_MATCHMAKER_BACKENDS,
help='MatchMaker driver.'),
cfg.StrOpt('rpc_zmq_concurrency', default='eventlet',
cfg.StrOpt('rpc_zmq_concurrency', default=_CONCURRENCY_DEFAULT,
choices=_CONCURRENCY_CHOICES,
help='Type of concurrency used. Either "native" or "eventlet"'),
cfg.IntOpt('rpc_zmq_contexts', default=1,

View File

@ -55,37 +55,66 @@ class DealerPublisher(zmq_publisher_base.QueuedSender):
super(DealerPublisher, self).send_request(request)
class DealerPublisherLight(zmq_publisher_base.QueuedSender):
"""Used when publishing to proxy. """
class DealerPublisherAsync(object):
"""This simplified publisher is to be used with eventlet only.
Eventlet takes care about zmq sockets sharing between green threads
using queued lock.
Use DealerPublisher for other concurrency models.
"""
def __init__(self, conf, matchmaker):
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
def _do_send_request(socket, request):
@staticmethod
def _send_message_data(socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request.create_envelope(), zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket = self.sockets_manager.get_socket(request.target)
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(socket.connections_count()):
self._send_message_data(socket, request)
else:
self._send_message_data(socket, request)
def cleanup(self):
self.sockets_manager.cleanup()
class DealerPublisherLight(object):
"""Used when publishing to a proxy. """
def __init__(self, conf, matchmaker):
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = self.sockets_manager.get_socket_to_publishers()
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
request.msg_type)
envelope = request.create_envelope()
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(envelope, zmq.SNDMORE)
self.socket.send_pyobj(request)
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
"a target %(target)s",
{"message": request.message_id,
"target": request.target,
"addr": list(socket.connections)})
sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
super(DealerPublisherLight, self).__init__(
sockets_manager, _do_send_request)
self.socket = self.outbound_sockets.get_socket_to_publishers()
def _connect_socket(self, target):
return self.socket
"addr": list(self.socket.connections)})
def cleanup(self):
self.socket.close()
super(DealerPublisherLight, self).cleanup()

View File

@ -31,6 +31,11 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
default_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
cast_publisher = zmq_dealer_publisher.DealerPublisherAsync(
conf, matchmaker) \
if zmq_async.is_eventlet_concurrency(conf) \
else default_publisher
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, matchmaker) if conf.use_pub_sub else default_publisher
@ -41,6 +46,8 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
zmq_names.CAST_TYPE: cast_publisher,
# Here use DealerPublisherLight for sending request to proxy
# which finally uses PubPublisher to send fanout in case of
# 'use_pub_sub' option configured.

View File

@ -54,6 +54,11 @@ def get_executor(method, zmq_concurrency='eventlet'):
return threading_poller.ThreadingExecutor(method)
def is_eventlet_concurrency(conf):
return _is_eventlet_zmq_available() and conf.rpc_zmq_concurrency == \
'eventlet'
def _is_eventlet_zmq_available():
return importutils.try_import('eventlet.green.zmq')

View File

@ -13,6 +13,7 @@
# under the License.
import testtools
import time
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
@ -95,12 +96,13 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
target = oslo_messaging.Target(topic='testtopic', server="my@server")
self.listener.listen(target)
time.sleep(0.01)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())
@ -117,7 +119,7 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())