From 8424dbc7cb3f102e4a0841684c4dc14267cd3da9 Mon Sep 17 00:00:00 2001 From: ozamiatin Date: Thu, 12 Jan 2017 16:08:53 +0200 Subject: [PATCH] [zmq] Dynamic connections failover ZeroMQ manages failover automatically when socket is being connected to multiple hosts. Dynamic connections mode assumes to connect once per message so we connect only to a single host. However we cannot say for sure if this host is still alive. For failover reasons we can connect to a primary host and then add some more hosts as failover and let ZeroMQ to decide where it finally sends the message. Change-Id: I19cbb75aaea8a0b725dd6a8ff665392738e164a1 --- .../client/publishers/dealer/zmq_dealer_publisher_direct.py | 4 ++++ .../client/publishers/dealer/zmq_dealer_publisher_proxy.py | 4 +++- oslo_messaging/_drivers/zmq_driver/zmq_options.py | 5 +++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index 3fcada48c..44d5b6ae3 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -66,6 +66,10 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): def _get_round_robin_host_connection(self, target, socket): host = self.routing_table.get_round_robin_host(target) socket.connect_to_host(host) + failover_hosts = self.routing_table.get_all_round_robin_hosts(target) + upper_bound = self.conf.oslo_messaging_zmq.zmq_failover_connections + for host in failover_hosts[:upper_bound]: + socket.connect_to_host(host) def _get_fanout_connection(self, target, socket): for host in self.routing_table.get_fanout_hosts(target): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 8b07fcbb9..a0235856f 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -105,7 +105,9 @@ class DealerPublisherProxyDynamic( if not self.publishers: raise zmq_matchmaker_base.MatchmakerUnavailable() socket = self.sockets_manager.get_socket() - socket.connect_to_host(random.choice(tuple(self.publishers))) + random.shuffle(self.publishers) + for publisher in self.publishers: + socket.connect_to_host(publisher) return socket def send_request(self, socket, request): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index 98dd65a40..6ced79a62 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -97,6 +97,11 @@ zmq_opts = [ 'means to use direct connections for direct message ' 'types (ignored otherwise).'), + cfg.IntOpt('zmq_failover_connections', default=2, + help='How many additional connections to a host will be made ' + 'for failover reasons. This option is actual only in ' + 'dynamic connections mode.'), + cfg.PortOpt('rpc_zmq_min_port', default=49153, deprecated_group='DEFAULT',