[zmq] Make zmq_immediate configurable
Add flexibility by making socket options values configurable. Change-Id: I184561fe5e9648fb039b3349aafd9d97fa3a75c8
This commit is contained in:
parent
f61f0c1c1b
commit
83a07b14fd
oslo_messaging/_drivers/zmq_driver
@ -76,7 +76,8 @@ class SocketsManager(object):
|
||||
if self.socket_to_publishers is not None:
|
||||
return self.socket_to_publishers
|
||||
self.socket_to_publishers = zmq_socket.ZmqSocket(
|
||||
self.conf, self.zmq_context, self.socket_type)
|
||||
self.conf, self.zmq_context, self.socket_type,
|
||||
immediate=self.conf.oslo_messaging_zmq.zmq_immediate)
|
||||
publishers = self.matchmaker.get_publishers()
|
||||
for pub_address, router_address in publishers:
|
||||
self.socket_to_publishers.connect_to_host(router_address)
|
||||
@ -86,7 +87,8 @@ class SocketsManager(object):
|
||||
if self.socket_to_routers is not None:
|
||||
return self.socket_to_routers
|
||||
self.socket_to_routers = zmq_socket.ZmqSocket(
|
||||
self.conf, self.zmq_context, self.socket_type)
|
||||
self.conf, self.zmq_context, self.socket_type,
|
||||
immediate=self.conf.oslo_messaging_zmq.zmq_immediate)
|
||||
routers = self.matchmaker.get_routers()
|
||||
for router_address in routers:
|
||||
self.socket_to_routers.connect_to_host(router_address)
|
||||
|
@ -35,7 +35,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
||||
super(SubConsumer, self).__init__(conf, poller, server)
|
||||
self.matchmaker = server.matchmaker
|
||||
self.target = server.target
|
||||
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB)
|
||||
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
|
||||
immediate=False)
|
||||
self.sockets.append(self.socket)
|
||||
self._subscribe_on_target(self.target)
|
||||
self.on_publishers(self.matchmaker.get_publishers())
|
||||
|
@ -110,7 +110,14 @@ zmq_opts = [
|
||||
choices=('json', 'msgpack'),
|
||||
deprecated_group='DEFAULT',
|
||||
help='Default serialization mechanism for '
|
||||
'serializing/deserializing outgoing/incoming messages')
|
||||
'serializing/deserializing outgoing/incoming messages'),
|
||||
|
||||
cfg.BoolOpt('zmq_immediate', default=False,
|
||||
help='This option configures round-robin mode in zmq socket. '
|
||||
'True means not keeping a queue when server side '
|
||||
'disconnects. False means to keep queue and messages '
|
||||
'even if server is disconnected, when the server '
|
||||
'appears we send all accumulated messages to it.')
|
||||
]
|
||||
|
||||
zmq_ack_retry_opts = [
|
||||
|
@ -38,7 +38,7 @@ class ZmqSocket(object):
|
||||
'msgpack': msgpack_serializer.MessagePackSerializer()
|
||||
}
|
||||
|
||||
def __init__(self, conf, context, socket_type, immediate=True,
|
||||
def __init__(self, conf, context, socket_type, immediate,
|
||||
high_watermark=0):
|
||||
self.conf = conf
|
||||
self.context = context
|
||||
|
Loading…
x
Reference in New Issue
Block a user