Merge "[zmq] Fix cast message loss in simulator"

This commit is contained in:
Jenkins 2016-04-13 13:44:15 +00:00 committed by Gerrit Code Review
commit 0b286754e2
5 changed files with 76 additions and 30 deletions

View File

@ -32,6 +32,8 @@ from oslo_messaging import server
RPCException = rpc_common.RPCException
_MATCHMAKER_BACKENDS = ('redis', 'dummy')
_MATCHMAKER_DEFAULT = 'redis'
_CONCURRENCY_CHOICES = ('eventlet', 'native')
_CONCURRENCY_DEFAULT = 'eventlet'
LOG = logging.getLogger(__name__)
@ -46,7 +48,8 @@ zmq_opts = [
choices=_MATCHMAKER_BACKENDS,
help='MatchMaker driver.'),
cfg.StrOpt('rpc_zmq_concurrency', default='eventlet',
cfg.StrOpt('rpc_zmq_concurrency', default=_CONCURRENCY_DEFAULT,
choices=_CONCURRENCY_CHOICES,
help='Type of concurrency used. Either "native" or "eventlet"'),
cfg.IntOpt('rpc_zmq_contexts', default=1,

View File

@ -55,37 +55,66 @@ class DealerPublisher(zmq_publisher_base.QueuedSender):
super(DealerPublisher, self).send_request(request)
class DealerPublisherLight(zmq_publisher_base.QueuedSender):
"""Used when publishing to proxy. """
class DealerPublisherAsync(object):
"""This simplified publisher is to be used with eventlet only.
Eventlet takes care about zmq sockets sharing between green threads
using queued lock.
Use DealerPublisher for other concurrency models.
"""
def __init__(self, conf, matchmaker):
def _do_send_request(socket, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
request.msg_type)
envelope = request.create_envelope()
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
"a target %(target)s",
{"message": request.message_id,
"target": request.target,
"addr": list(socket.connections)})
sockets_manager = zmq_publisher_base.SocketsManager(
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
super(DealerPublisherLight, self).__init__(
sockets_manager, _do_send_request)
self.socket = self.outbound_sockets.get_socket_to_publishers()
def _connect_socket(self, target):
return self.socket
@staticmethod
def _send_message_data(socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request.create_envelope(), zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket = self.sockets_manager.get_socket(request.target)
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(socket.connections_count()):
self._send_message_data(socket, request)
else:
self._send_message_data(socket, request)
def cleanup(self):
self.sockets_manager.cleanup()
class DealerPublisherLight(object):
"""Used when publishing to a proxy. """
def __init__(self, conf, matchmaker):
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = self.sockets_manager.get_socket_to_publishers()
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
request.msg_type)
envelope = request.create_envelope()
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(envelope, zmq.SNDMORE)
self.socket.send_pyobj(request)
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
"a target %(target)s",
{"message": request.message_id,
"target": request.target,
"addr": list(self.socket.connections)})
def cleanup(self):
self.socket.close()
super(DealerPublisherLight, self).cleanup()

View File

@ -31,6 +31,11 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
default_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
cast_publisher = zmq_dealer_publisher.DealerPublisherAsync(
conf, matchmaker) \
if zmq_async.is_eventlet_concurrency(conf) \
else default_publisher
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, matchmaker) if conf.use_pub_sub else default_publisher
@ -41,6 +46,8 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
zmq_names.CAST_TYPE: cast_publisher,
# Here use DealerPublisherLight for sending request to proxy
# which finally uses PubPublisher to send fanout in case of
# 'use_pub_sub' option configured.

View File

@ -54,6 +54,11 @@ def get_executor(method, zmq_concurrency='eventlet'):
return threading_poller.ThreadingExecutor(method)
def is_eventlet_concurrency(conf):
return _is_eventlet_zmq_available() and conf.rpc_zmq_concurrency == \
'eventlet'
def _is_eventlet_zmq_available():
return importutils.try_import('eventlet.green.zmq')

View File

@ -13,6 +13,7 @@
# under the License.
import testtools
import time
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
@ -95,12 +96,13 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
target = oslo_messaging.Target(topic='testtopic', server="my@server")
self.listener.listen(target)
time.sleep(0.01)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())
@ -117,7 +119,7 @@ class TestZmqBasics(zmq_common.ZmqBaseTestCase):
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.listener._received.wait(5)
self.assertIsNone(result)
self.assertTrue(self.listener._received.isSet())