[zmq] Dynamic connections failover
ZeroMQ manages failover automatically when socket is being connected to multiple hosts. Dynamic connections mode assumes to connect once per message so we connect only to a single host. However we cannot say for sure if this host is still alive. For failover reasons we can connect to a primary host and then add some more hosts as failover and let ZeroMQ to decide where it finally sends the message. Change-Id: I19cbb75aaea8a0b725dd6a8ff665392738e164a1
This commit is contained in:
parent
f07652ae2d
commit
8424dbc7cb
@ -66,6 +66,10 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
|||||||
def _get_round_robin_host_connection(self, target, socket):
|
def _get_round_robin_host_connection(self, target, socket):
|
||||||
host = self.routing_table.get_round_robin_host(target)
|
host = self.routing_table.get_round_robin_host(target)
|
||||||
socket.connect_to_host(host)
|
socket.connect_to_host(host)
|
||||||
|
failover_hosts = self.routing_table.get_all_round_robin_hosts(target)
|
||||||
|
upper_bound = self.conf.oslo_messaging_zmq.zmq_failover_connections
|
||||||
|
for host in failover_hosts[:upper_bound]:
|
||||||
|
socket.connect_to_host(host)
|
||||||
|
|
||||||
def _get_fanout_connection(self, target, socket):
|
def _get_fanout_connection(self, target, socket):
|
||||||
for host in self.routing_table.get_fanout_hosts(target):
|
for host in self.routing_table.get_fanout_hosts(target):
|
||||||
|
@ -105,7 +105,9 @@ class DealerPublisherProxyDynamic(
|
|||||||
if not self.publishers:
|
if not self.publishers:
|
||||||
raise zmq_matchmaker_base.MatchmakerUnavailable()
|
raise zmq_matchmaker_base.MatchmakerUnavailable()
|
||||||
socket = self.sockets_manager.get_socket()
|
socket = self.sockets_manager.get_socket()
|
||||||
socket.connect_to_host(random.choice(tuple(self.publishers)))
|
random.shuffle(self.publishers)
|
||||||
|
for publisher in self.publishers:
|
||||||
|
socket.connect_to_host(publisher)
|
||||||
return socket
|
return socket
|
||||||
|
|
||||||
def send_request(self, socket, request):
|
def send_request(self, socket, request):
|
||||||
|
@ -97,6 +97,11 @@ zmq_opts = [
|
|||||||
'means to use direct connections for direct message '
|
'means to use direct connections for direct message '
|
||||||
'types (ignored otherwise).'),
|
'types (ignored otherwise).'),
|
||||||
|
|
||||||
|
cfg.IntOpt('zmq_failover_connections', default=2,
|
||||||
|
help='How many additional connections to a host will be made '
|
||||||
|
'for failover reasons. This option is actual only in '
|
||||||
|
'dynamic connections mode.'),
|
||||||
|
|
||||||
cfg.PortOpt('rpc_zmq_min_port',
|
cfg.PortOpt('rpc_zmq_min_port',
|
||||||
default=49153,
|
default=49153,
|
||||||
deprecated_group='DEFAULT',
|
deprecated_group='DEFAULT',
|
||||||
|
Loading…
Reference in New Issue
Block a user