Merge "[zmq] Make zmq_immediate configurable"

This commit is contained in:
Jenkins 2016-08-24 23:27:17 +00:00 committed by Gerrit Code Review
commit c3500cc4c7
4 changed files with 15 additions and 5 deletions
oslo_messaging/_drivers/zmq_driver

@ -76,7 +76,8 @@ class SocketsManager(object):
if self.socket_to_publishers is not None: if self.socket_to_publishers is not None:
return self.socket_to_publishers return self.socket_to_publishers
self.socket_to_publishers = zmq_socket.ZmqSocket( self.socket_to_publishers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type) self.conf, self.zmq_context, self.socket_type,
immediate=self.conf.oslo_messaging_zmq.zmq_immediate)
publishers = self.matchmaker.get_publishers() publishers = self.matchmaker.get_publishers()
for pub_address, router_address in publishers: for pub_address, router_address in publishers:
self.socket_to_publishers.connect_to_host(router_address) self.socket_to_publishers.connect_to_host(router_address)
@ -86,7 +87,8 @@ class SocketsManager(object):
if self.socket_to_routers is not None: if self.socket_to_routers is not None:
return self.socket_to_routers return self.socket_to_routers
self.socket_to_routers = zmq_socket.ZmqSocket( self.socket_to_routers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type) self.conf, self.zmq_context, self.socket_type,
immediate=self.conf.oslo_messaging_zmq.zmq_immediate)
routers = self.matchmaker.get_routers() routers = self.matchmaker.get_routers()
for router_address in routers: for router_address in routers:
self.socket_to_routers.connect_to_host(router_address) self.socket_to_routers.connect_to_host(router_address)

@ -36,7 +36,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
super(SubConsumer, self).__init__(conf, poller, server) super(SubConsumer, self).__init__(conf, poller, server)
self.matchmaker = server.matchmaker self.matchmaker = server.matchmaker
self.target = server.target self.target = server.target
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB) self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
immediate=False)
self.sockets.append(self.socket) self.sockets.append(self.socket)
self._subscribe_on_target(self.target) self._subscribe_on_target(self.target)
self.connection_updater = SubscriberConnectionUpdater( self.connection_updater = SubscriberConnectionUpdater(

@ -110,7 +110,14 @@ zmq_opts = [
choices=('json', 'msgpack'), choices=('json', 'msgpack'),
deprecated_group='DEFAULT', deprecated_group='DEFAULT',
help='Default serialization mechanism for ' help='Default serialization mechanism for '
'serializing/deserializing outgoing/incoming messages') 'serializing/deserializing outgoing/incoming messages'),
cfg.BoolOpt('zmq_immediate', default=False,
help='This option configures round-robin mode in zmq socket. '
'True means not keeping a queue when server side '
'disconnects. False means to keep queue and messages '
'even if server is disconnected, when the server '
'appears we send all accumulated messages to it.')
] ]
zmq_ack_retry_opts = [ zmq_ack_retry_opts = [

@ -38,7 +38,7 @@ class ZmqSocket(object):
'msgpack': msgpack_serializer.MessagePackSerializer() 'msgpack': msgpack_serializer.MessagePackSerializer()
} }
def __init__(self, conf, context, socket_type, immediate=True, def __init__(self, conf, context, socket_type, immediate,
high_watermark=0): high_watermark=0):
self.conf = conf self.conf = conf
self.context = context self.context = context