[zmq] Distinguish Round-Robin/Fanout socket sending mode

For zmq.DEALER socket there are two round-robin modes exist,
controlled by zmq.IMMEDIATE option. Immediate True means sending
only to running servers and False means to send to all connected
servers (may be not up and running at the moment).

If we do all messaging over DEALER socket as we do for direct
connections, immediate=True better suits for direct CALL/CAST
message types and immediate=False is better for fanout.

This patch fixes things for dynamic connections as for static
we already used the approach.

Change-Id: I70c7aeb3c7a2c63c128bec394827577cab580def
This commit is contained in:
ozamiatin 2017-01-11 13:19:45 +02:00
parent fe540187d5
commit 492ffe9774
2 changed files with 6 additions and 4 deletions

View File

@ -72,12 +72,14 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
socket.connect_to_host(host)
def acquire_connection(self, request):
socket = self.sockets_manager.get_socket()
if request.msg_type in zmq_names.DIRECT_TYPES:
socket = self.sockets_manager.get_socket()
self._get_round_robin_host_connection(request.target, socket)
return socket
elif request.msg_type in zmq_names.MULTISEND_TYPES:
socket = self.sockets_manager.get_socket(immediate=False)
self._get_fanout_connection(request.target, socket)
return socket
return socket
def _finally_unregister(self, socket, request):
super(DealerPublisherDirect, self)._finally_unregister(socket, request)

View File

@ -33,9 +33,9 @@ class SocketsManager(object):
self.socket_to_routers = None
self.sockets = {}
def get_socket(self):
def get_socket(self, immediate=True):
return zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type, immediate=False)
self.socket_type, immediate=immediate)
def get_cached_socket(self, target_key, hosts=None, immediate=True):
hosts = [] if hosts is None else hosts