Refactor the failover flows
This patch refactors the failover flows to improve the performance and reliability of failovers in Octavia. Specific improvements are: * More tasks and flows will retry when other OpenStack services are failing. * Failover can now succeed even when all of the amphora are missing for a given load balancer. * It will check and repair the load balancer VIP should the VIP port(s) become corrupted in neutron. * It will cleanup extra resources that may be associated with a load balancer in the event of a cloud service failure. This patch also removes some dead code. Change-Id: I04cb2f1f10ec566298834f81df0cf8b100ca916c Story: 2003084 Task: 23166 Story: 2004440 Task: 28108
This commit is contained in:
parent
f26ab8b97b
commit
955bb88406
@ -165,6 +165,19 @@
|
||||
# Endpoint type to use for communication with the Barbican service.
|
||||
# endpoint_type = publicURL
|
||||
|
||||
[compute]
|
||||
# The maximum attempts to retry an action with the compute service.
|
||||
# max_retries = 15
|
||||
|
||||
# Seconds to wait before retrying an action with the compute service.
|
||||
# retry_interval = 1
|
||||
|
||||
# The seconds to backoff retry attempts
|
||||
# retry_backoff = 1
|
||||
|
||||
# The maximum interval in seconds between retry attempts
|
||||
# retry_max = 10
|
||||
|
||||
[networking]
|
||||
# The maximum attempts to retry an action with the networking service.
|
||||
# max_retries = 15
|
||||
@ -172,6 +185,12 @@
|
||||
# Seconds to wait before retrying an action with the networking service.
|
||||
# retry_interval = 1
|
||||
|
||||
# The seconds to backoff retry attempts
|
||||
# retry_backoff = 1
|
||||
|
||||
# The maximum interval in seconds between retry attempts
|
||||
# retry_max = 10
|
||||
|
||||
# The maximum time to wait, in seconds, for a port to detach from an amphora
|
||||
# port_detach_timeout = 300
|
||||
|
||||
@ -236,11 +255,26 @@
|
||||
# active_connection_max_retries = 15
|
||||
# active_connection_rety_interval = 2
|
||||
|
||||
# These "failover" timeouts are used during the failover process to probe
|
||||
# amphorae that are part of the load balancer being failed over.
|
||||
# These values are very low to facilitate "fail fast" should an amphora
|
||||
# not respond in a failure situation.
|
||||
# failover_connection_max_retries = 2
|
||||
# failover_connection_retry_interval = 5
|
||||
|
||||
# The user flow log format for HAProxy.
|
||||
# {{ project_id }} and {{ lb_id }} will be automatically substituted by the
|
||||
# controller when configuring HAProxy if they are present in the string.
|
||||
# user_log_format = '{{ project_id }} {{ lb_id }} %f %ci %cp %t %{+Q}r %ST %B %U %[ssl_c_verify] %{+Q}[ssl_c_s_dn] %b %s %Tt %tsc'
|
||||
|
||||
# API messaging / database commit retries
|
||||
# This is many times the controller worker retries waiting for the API to
|
||||
# complete a database commit for a message received over the queue.
|
||||
# api_db_commit_retry_attempts = 15
|
||||
# api_db_commit_retry_initial_delay = 1
|
||||
# api_db_commit_retry_backoff = 1
|
||||
# api_db_commit_retry_max = 5
|
||||
|
||||
[controller_worker]
|
||||
# workers = 1
|
||||
# amp_active_retries = 30
|
||||
@ -297,6 +331,9 @@
|
||||
# loadbalancer_topology = SINGLE
|
||||
# user_data_config_drive = False
|
||||
|
||||
# amphora_delete_retries = 5
|
||||
# amphora_delete_retry_interval = 5
|
||||
|
||||
[task_flow]
|
||||
# TaskFlow engine options are:
|
||||
# - serial: Runs all tasks on a single thread.
|
||||
|
@ -12,7 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ipaddress
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
@ -21,10 +20,11 @@ import subprocess
|
||||
import pyroute2
|
||||
import webob
|
||||
|
||||
import netifaces
|
||||
from octavia.amphorae.backends.agent import api_server
|
||||
from octavia.amphorae.backends.agent.api_server import util
|
||||
from octavia.amphorae.backends.utils import network_utils
|
||||
from octavia.common import constants as consts
|
||||
from octavia.common import exceptions
|
||||
|
||||
|
||||
class AmphoraInfo(object):
|
||||
@ -175,65 +175,15 @@ class AmphoraInfo(object):
|
||||
return networks
|
||||
|
||||
def get_interface(self, ip_addr):
|
||||
|
||||
try:
|
||||
ip_version = ipaddress.ip_address(ip_addr).version
|
||||
except Exception:
|
||||
interface = network_utils.get_interface_name(
|
||||
ip_addr, net_ns=consts.AMPHORA_NAMESPACE)
|
||||
except exceptions.InvalidIPAddress:
|
||||
return webob.Response(json=dict(message="Invalid IP address"),
|
||||
status=400)
|
||||
except exceptions.NotFound:
|
||||
return webob.Response(
|
||||
json=dict(message="Invalid IP address"), status=400)
|
||||
|
||||
if ip_version == 4:
|
||||
address_format = netifaces.AF_INET
|
||||
elif ip_version == 6:
|
||||
address_format = netifaces.AF_INET6
|
||||
else:
|
||||
return webob.Response(
|
||||
json=dict(message="Bad IP address version"), status=400)
|
||||
|
||||
# We need to normalize the address as IPv6 has multiple representations
|
||||
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
|
||||
normalized_addr = socket.inet_ntop(address_format,
|
||||
socket.inet_pton(address_format,
|
||||
ip_addr))
|
||||
|
||||
with pyroute2.NetNS(consts.AMPHORA_NAMESPACE) as netns:
|
||||
for addr in netns.get_addr():
|
||||
# Save the interface index as IPv6 records don't list a
|
||||
# textual interface
|
||||
interface_idx = addr['index']
|
||||
# Save the address family (IPv4/IPv6) for use normalizing
|
||||
# the IP address for comparison
|
||||
interface_af = addr['family']
|
||||
# Search through the attributes of each address record
|
||||
for attr in addr['attrs']:
|
||||
# Look for the attribute name/value pair for the address
|
||||
if attr[0] == 'IFA_ADDRESS':
|
||||
# Compare the normalized address with the address we
|
||||
# we are looking for. Since we have matched the name
|
||||
# above, attr[1] is the address value
|
||||
if normalized_addr == socket.inet_ntop(
|
||||
interface_af,
|
||||
socket.inet_pton(interface_af, attr[1])):
|
||||
|
||||
# Lookup the matching interface name by
|
||||
# getting the interface with the index we found
|
||||
# in the above address search
|
||||
lookup_int = netns.get_links(interface_idx)
|
||||
# Search through the attributes of the matching
|
||||
# interface record
|
||||
for int_attr in lookup_int[0]['attrs']:
|
||||
# Look for the attribute name/value pair
|
||||
# that includes the interface name
|
||||
if int_attr[0] == 'IFLA_IFNAME':
|
||||
# Return the response with the matching
|
||||
# interface name that is in int_attr[1]
|
||||
# for the matching interface attribute
|
||||
# name
|
||||
return webob.Response(
|
||||
json=dict(message='OK',
|
||||
interface=int_attr[1]),
|
||||
status=200)
|
||||
|
||||
return webob.Response(
|
||||
json=dict(message="Error interface not found for IP address"),
|
||||
status=404)
|
||||
json=dict(message="Error interface not found for IP address"),
|
||||
status=404)
|
||||
return webob.Response(json=dict(message='OK', interface=interface),
|
||||
status=200)
|
||||
|
@ -47,6 +47,7 @@ class Keepalived(object):
|
||||
|
||||
if not os.path.exists(util.keepalived_dir()):
|
||||
os.makedirs(util.keepalived_dir())
|
||||
if not os.path.exists(util.keepalived_check_scripts_dir()):
|
||||
os.makedirs(util.keepalived_check_scripts_dir())
|
||||
|
||||
conf_file = util.keepalived_cfg_path()
|
||||
@ -112,6 +113,9 @@ class Keepalived(object):
|
||||
)
|
||||
text_file.write(text)
|
||||
|
||||
# Configure the monitoring of haproxy
|
||||
util.vrrp_check_script_update(None, consts.AMP_ACTION_START)
|
||||
|
||||
# Make sure the new service is enabled on boot
|
||||
if init_system != consts.INIT_UPSTART:
|
||||
try:
|
||||
|
@ -78,7 +78,8 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
|
||||
# Active-Standby topology will create the directory below. So for
|
||||
# Single topology, it should not create the directory and the check
|
||||
# scripts for status change.
|
||||
if not os.path.exists(util.keepalived_check_scripts_dir()):
|
||||
if (CONF.controller_worker.loadbalancer_topology !=
|
||||
consts.TOPOLOGY_ACTIVE_STANDBY):
|
||||
NEED_CHECK = False
|
||||
|
||||
conf_file = util.keepalived_lvs_cfg_path(listener_id)
|
||||
@ -157,6 +158,9 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
|
||||
script_path = os.path.join(util.keepalived_check_scripts_dir(),
|
||||
KEEPALIVED_CHECK_SCRIPT_NAME)
|
||||
if not os.path.exists(script_path):
|
||||
if not os.path.exists(util.keepalived_check_scripts_dir()):
|
||||
os.makedirs(util.keepalived_check_scripts_dir())
|
||||
|
||||
with os.fdopen(os.open(script_path, flags, stat.S_IEXEC),
|
||||
'w') as script_file:
|
||||
text = check_script_file_template.render(
|
||||
|
@ -235,12 +235,11 @@ class Loadbalancer(object):
|
||||
details="Unknown action: {0}".format(action)), status=400)
|
||||
|
||||
self._check_lb_exists(lb_id)
|
||||
is_vrrp = (CONF.controller_worker.loadbalancer_topology ==
|
||||
consts.TOPOLOGY_ACTIVE_STANDBY)
|
||||
|
||||
# Since this script should be created at LB create time
|
||||
# we can check for this path to see if VRRP is enabled
|
||||
# on this amphora and not write the file if VRRP is not in use
|
||||
if os.path.exists(util.keepalived_check_script_path()):
|
||||
self.vrrp_check_script_update(lb_id, action)
|
||||
if is_vrrp:
|
||||
util.vrrp_check_script_update(lb_id, action)
|
||||
|
||||
# HAProxy does not start the process when given a reload
|
||||
# so start it if haproxy is not already running
|
||||
@ -262,6 +261,14 @@ class Loadbalancer(object):
|
||||
return webob.Response(json=dict(
|
||||
message="Error {0}ing haproxy".format(action),
|
||||
details=e.output), status=500)
|
||||
|
||||
# If we are not in active/standby we need to send an IP
|
||||
# advertisement (GARP or NA). Keepalived handles this for
|
||||
# active/standby load balancers.
|
||||
if not is_vrrp and action in [consts.AMP_ACTION_START,
|
||||
consts.AMP_ACTION_RELOAD]:
|
||||
util.send_vip_advertisements(lb_id)
|
||||
|
||||
if action in [consts.AMP_ACTION_STOP,
|
||||
consts.AMP_ACTION_RELOAD]:
|
||||
return webob.Response(json=dict(
|
||||
@ -307,7 +314,7 @@ class Loadbalancer(object):
|
||||
# we can check for this path to see if VRRP is enabled
|
||||
# on this amphora and not write the file if VRRP is not in use
|
||||
if os.path.exists(util.keepalived_check_script_path()):
|
||||
self.vrrp_check_script_update(
|
||||
util.vrrp_check_script_update(
|
||||
lb_id, action=consts.AMP_ACTION_STOP)
|
||||
|
||||
# delete the ssl files
|
||||
@ -455,22 +462,6 @@ class Loadbalancer(object):
|
||||
def _cert_file_path(self, lb_id, filename):
|
||||
return os.path.join(self._cert_dir(lb_id), filename)
|
||||
|
||||
def vrrp_check_script_update(self, lb_id, action):
|
||||
lb_ids = util.get_loadbalancers()
|
||||
if action == consts.AMP_ACTION_STOP:
|
||||
lb_ids.remove(lb_id)
|
||||
args = []
|
||||
for lbid in lb_ids:
|
||||
args.append(util.haproxy_sock_path(lbid))
|
||||
|
||||
if not os.path.exists(util.keepalived_dir()):
|
||||
os.makedirs(util.keepalived_dir())
|
||||
os.makedirs(util.keepalived_check_scripts_dir())
|
||||
|
||||
cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
|
||||
with open(util.haproxy_check_script_path(), 'w') as text_file:
|
||||
text_file.write(cmd)
|
||||
|
||||
def _check_haproxy_status(self, lb_id):
|
||||
if os.path.exists(util.pid_path(lb_id)):
|
||||
if os.path.exists(
|
||||
|
@ -23,6 +23,8 @@ from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from octavia.amphorae.backends.agent.api_server import osutils
|
||||
from octavia.amphorae.backends.utils import ip_advertisement
|
||||
from octavia.amphorae.backends.utils import network_utils
|
||||
from octavia.common import constants as consts
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -188,7 +190,7 @@ def get_listeners():
|
||||
def get_loadbalancers():
|
||||
"""Get Load balancers
|
||||
|
||||
:returns: An array with the ids of all load balancers,
|
||||
:returns: An array with the uuids of all load balancers,
|
||||
e.g. ['123', '456', ...] or [] if no loadbalancers exist
|
||||
"""
|
||||
if os.path.exists(CONF.haproxy_amphora.base_path):
|
||||
@ -332,3 +334,71 @@ def parse_haproxy_file(lb_id):
|
||||
stats_socket = m.group(1)
|
||||
|
||||
return stats_socket, listeners
|
||||
|
||||
|
||||
def vrrp_check_script_update(lb_id, action):
|
||||
os.makedirs(keepalived_dir(), exist_ok=True)
|
||||
os.makedirs(keepalived_check_scripts_dir(), exist_ok=True)
|
||||
|
||||
lb_ids = get_loadbalancers()
|
||||
udp_ids = get_udp_listeners()
|
||||
# If no LBs are found, so make sure keepalived thinks haproxy is down.
|
||||
if not lb_ids:
|
||||
if not udp_ids:
|
||||
with open(haproxy_check_script_path(), 'w') as text_file:
|
||||
text_file.write('exit 1')
|
||||
return
|
||||
if action == consts.AMP_ACTION_STOP:
|
||||
lb_ids.remove(lb_id)
|
||||
args = []
|
||||
for lbid in lb_ids:
|
||||
args.append(haproxy_sock_path(lbid))
|
||||
|
||||
cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
|
||||
with open(haproxy_check_script_path(), 'w') as text_file:
|
||||
text_file.write(cmd)
|
||||
|
||||
|
||||
def get_haproxy_vip_addresses(lb_id):
|
||||
"""Get the VIP addresses for a load balancer.
|
||||
|
||||
:param lb_id: The load balancer ID to get VIP addresses from.
|
||||
:returns: List of VIP addresses (IPv4 and IPv6)
|
||||
"""
|
||||
vips = []
|
||||
with open(config_path(lb_id), 'r') as file:
|
||||
for line in file:
|
||||
current_line = line.strip()
|
||||
if current_line.startswith('bind'):
|
||||
for section in current_line.split(' '):
|
||||
# We will always have a port assigned per the template.
|
||||
if ':' in section:
|
||||
if ',' in section:
|
||||
addr_port = section.rstrip(',')
|
||||
vips.append(addr_port.rpartition(':')[0])
|
||||
else:
|
||||
vips.append(section.rpartition(':')[0])
|
||||
break
|
||||
return vips
|
||||
|
||||
|
||||
def send_vip_advertisements(lb_id):
|
||||
"""Sends address advertisements for each load balancer VIP.
|
||||
|
||||
This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
|
||||
for the VIP addresses on a load balancer.
|
||||
|
||||
:param lb_id: The load balancer ID to send advertisements for.
|
||||
:returns: None
|
||||
"""
|
||||
try:
|
||||
vips = get_haproxy_vip_addresses(lb_id)
|
||||
|
||||
for vip in vips:
|
||||
interface = network_utils.get_interface_name(
|
||||
vip, net_ns=consts.AMPHORA_NAMESPACE)
|
||||
ip_advertisement.send_ip_advertisement(
|
||||
interface, vip, net_ns=consts.AMPHORA_NAMESPACE)
|
||||
except Exception as e:
|
||||
LOG.debug('Send VIP advertisement failed due to :%s. '
|
||||
'This amphora may not be the MASTER. Ignoring.', str(e))
|
||||
|
183
octavia/amphorae/backends/utils/ip_advertisement.py
Normal file
183
octavia/amphorae/backends/utils/ip_advertisement.py
Normal file
@ -0,0 +1,183 @@
|
||||
# Copyright 2020 Red Hat, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import fcntl
|
||||
import socket
|
||||
from struct import pack
|
||||
from struct import unpack
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from octavia.amphorae.backends.utils import network_namespace
|
||||
from octavia.common import constants
|
||||
from octavia.common import utils as common_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def garp(interface, ip_address, net_ns=None):
|
||||
"""Sends a gratuitous ARP for ip_address on the interface.
|
||||
|
||||
:param interface: The interface name to send the GARP on.
|
||||
:param ip_address: The IP address to advertise in the GARP.
|
||||
:param net_ns: The network namespace to send the GARP from.
|
||||
:returns: None
|
||||
"""
|
||||
ARP_ETHERTYPE = 0x0806
|
||||
BROADCAST_MAC = b'\xff\xff\xff\xff\xff\xff'
|
||||
|
||||
# Get a socket, optionally inside a network namespace
|
||||
garp_socket = None
|
||||
if net_ns:
|
||||
with network_namespace.NetworkNamespace(net_ns):
|
||||
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
|
||||
else:
|
||||
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
|
||||
|
||||
# Bind the socket with the ARP ethertype protocol
|
||||
garp_socket.bind((interface, ARP_ETHERTYPE))
|
||||
|
||||
# Get the MAC address of the interface
|
||||
source_mac = garp_socket.getsockname()[4]
|
||||
|
||||
garp_msg = [
|
||||
pack('!h', 1), # Hardware type ethernet
|
||||
pack('!h', 0x0800), # Protocol type IPv4
|
||||
pack('!B', 6), # Hardware size
|
||||
pack('!B', 4), # Protocol size
|
||||
pack('!h', 1), # Opcode request
|
||||
source_mac, # Sender MAC address
|
||||
socket.inet_aton(ip_address), # Sender IP address
|
||||
BROADCAST_MAC, # Target MAC address
|
||||
socket.inet_aton(ip_address)] # Target IP address
|
||||
|
||||
garp_ethernet = [
|
||||
BROADCAST_MAC, # Ethernet destination
|
||||
source_mac, # Ethernet source
|
||||
pack('!h', ARP_ETHERTYPE), # Ethernet type
|
||||
b''.join(garp_msg)] # The GARP message
|
||||
|
||||
garp_socket.send(b''.join(garp_ethernet))
|
||||
garp_socket.close()
|
||||
|
||||
|
||||
def calculate_icmpv6_checksum(packet):
|
||||
"""Calculate the ICMPv6 checksum for a packet.
|
||||
|
||||
:param packet: The packet bytes to checksum.
|
||||
:returns: The checksum integer.
|
||||
"""
|
||||
total = 0
|
||||
|
||||
# Add up 16-bit words
|
||||
num_words = len(packet) // 2
|
||||
for chunk in unpack("!%sH" % num_words, packet[0:num_words * 2]):
|
||||
total += chunk
|
||||
|
||||
# Add any left over byte
|
||||
if len(packet) % 2:
|
||||
total += packet[-1] << 8
|
||||
|
||||
# Fold 32-bits into 16-bits
|
||||
total = (total >> 16) + (total & 0xffff)
|
||||
total += total >> 16
|
||||
return ~total + 0x10000 & 0xffff
|
||||
|
||||
|
||||
def neighbor_advertisement(interface, ip_address, net_ns=None):
|
||||
"""Sends a unsolicited neighbor advertisement for an ip on the interface.
|
||||
|
||||
:param interface: The interface name to send the GARP on.
|
||||
:param ip_address: The IP address to advertise in the GARP.
|
||||
:param net_ns: The network namespace to send the GARP from.
|
||||
:returns: None
|
||||
"""
|
||||
ALL_NODES_ADDR = 'ff02::1'
|
||||
SIOCGIFHWADDR = 0x8927
|
||||
|
||||
# Get a socket, optionally inside a network namespace
|
||||
na_socket = None
|
||||
if net_ns:
|
||||
with network_namespace.NetworkNamespace(net_ns):
|
||||
na_socket = socket.socket(
|
||||
socket.AF_INET6, socket.SOCK_RAW,
|
||||
socket.getprotobyname(constants.IPV6_ICMP))
|
||||
else:
|
||||
na_socket = socket.socket(socket.AF_INET6, socket.SOCK_RAW,
|
||||
socket.getprotobyname(constants.IPV6_ICMP))
|
||||
|
||||
# Per RFC 4861 section 4.4, the hop limit should be 255
|
||||
na_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 255)
|
||||
|
||||
# Bind the socket with the source address
|
||||
na_socket.bind((ip_address, 0))
|
||||
|
||||
# Get the byte representation of the MAC address of the interface
|
||||
# Note: You can't use getsockname() to get the MAC on this type of socket
|
||||
source_mac = fcntl.ioctl(na_socket.fileno(), SIOCGIFHWADDR, pack('256s',
|
||||
bytes(interface, 'utf-8')))[18:24]
|
||||
|
||||
# Get the byte representation of the source IP address
|
||||
source_ip_bytes = socket.inet_pton(socket.AF_INET6, ip_address)
|
||||
|
||||
icmpv6_na_msg_prefix = [
|
||||
pack('!B', 136), # ICMP Type Neighbor Advertisement
|
||||
pack('!B', 0)] # ICMP Code
|
||||
icmpv6_na_msg_postfix = [
|
||||
pack('!I', 0xa0000000), # Flags (Router, Override)
|
||||
source_ip_bytes, # Target address
|
||||
pack('!B', 2), # ICMPv6 option type target link-layer address
|
||||
pack('!B', 1), # ICMPv6 option length
|
||||
source_mac] # ICMPv6 option link-layer address
|
||||
|
||||
# Calculate the ICMPv6 checksum
|
||||
icmpv6_pseudo_header = [
|
||||
source_ip_bytes, # Source IP address
|
||||
socket.inet_pton(socket.AF_INET6, ALL_NODES_ADDR), # Destination IP
|
||||
pack('!I', 58), # IPv6 next header (ICMPv6)
|
||||
pack('!h', 32)] # IPv6 payload length
|
||||
icmpv6_tmp_chksum = pack('!H', 0) # Checksum are zeros for calculation
|
||||
tmp_chksum_msg = b''.join(icmpv6_pseudo_header + icmpv6_na_msg_prefix +
|
||||
[icmpv6_tmp_chksum] + icmpv6_pseudo_header)
|
||||
checksum = pack('!H', calculate_icmpv6_checksum(tmp_chksum_msg))
|
||||
|
||||
# Build the ICMPv6 unsolicitated neighbor advertisement
|
||||
icmpv6_msg = b''.join(icmpv6_na_msg_prefix + [checksum] +
|
||||
icmpv6_na_msg_postfix)
|
||||
|
||||
na_socket.sendto(icmpv6_msg, (ALL_NODES_ADDR, 0, 0, 0))
|
||||
na_socket.close()
|
||||
|
||||
|
||||
def send_ip_advertisement(interface, ip_address, net_ns=None):
|
||||
"""Send an address advertisement.
|
||||
|
||||
This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
|
||||
for the ip address specified.
|
||||
|
||||
:param interface: The interface name to send the advertisement on.
|
||||
:param ip_address: The IP address to advertise.
|
||||
:param net_ns: The network namespace to send the advertisement from.
|
||||
:returns: None
|
||||
"""
|
||||
try:
|
||||
if common_utils.is_ipv4(ip_address):
|
||||
garp(interface, ip_address, net_ns)
|
||||
elif common_utils.is_ipv6(ip_address):
|
||||
neighbor_advertisement(interface, ip_address, net_ns)
|
||||
else:
|
||||
LOG.error('Unknown IP version for address: "%s". Skipping',
|
||||
ip_address)
|
||||
except Exception as e:
|
||||
LOG.warning('Unable to send address advertisement for address: "%s", '
|
||||
'error: %s. Skipping', ip_address, str(e))
|
50
octavia/amphorae/backends/utils/network_namespace.py
Normal file
50
octavia/amphorae/backends/utils/network_namespace.py
Normal file
@ -0,0 +1,50 @@
|
||||
# Copyright 2020 Red Hat, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import ctypes
|
||||
import os
|
||||
|
||||
|
||||
class NetworkNamespace(object):
|
||||
"""A network namespace context manager.
|
||||
|
||||
Runs wrapped code inside the specified network namespace.
|
||||
|
||||
:param netns: The network namespace name to enter.
|
||||
"""
|
||||
# from linux/sched.h - We want to enter a network namespace
|
||||
CLONE_NEWNET = 0x40000000
|
||||
|
||||
@staticmethod
|
||||
def _error_handler(result, func, arguments):
|
||||
if result == -1:
|
||||
errno = ctypes.get_errno()
|
||||
raise OSError(errno, os.strerror(errno))
|
||||
|
||||
def __init__(self, netns):
|
||||
self.current_netns = '/proc/{pid}/ns/net'.format(pid=os.getpid())
|
||||
self.target_netns = '/var/run/netns/{netns}'.format(netns=netns)
|
||||
# reference: man setns(2)
|
||||
self.set_netns = ctypes.CDLL('libc.so.6', use_errno=True).setns
|
||||
self.set_netns.errcheck = self._error_handler
|
||||
|
||||
def __enter__(self):
|
||||
# Save the current network namespace
|
||||
self.current_netns_fd = open(self.current_netns)
|
||||
with open(self.target_netns) as fd:
|
||||
self.set_netns(fd.fileno(), self.CLONE_NEWNET)
|
||||
|
||||
def __exit__(self, *args):
|
||||
# Return to the previous network namespace
|
||||
self.set_netns(self.current_netns_fd.fileno(), self.CLONE_NEWNET)
|
||||
self.current_netns_fd.close()
|
83
octavia/amphorae/backends/utils/network_utils.py
Normal file
83
octavia/amphorae/backends/utils/network_utils.py
Normal file
@ -0,0 +1,83 @@
|
||||
# Copyright 2020 Red Hat, Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import ipaddress
|
||||
|
||||
import pyroute2
|
||||
|
||||
from octavia.common import exceptions
|
||||
|
||||
|
||||
def _find_interface(ip_address, rtnl_api, normalized_addr):
|
||||
"""Find the interface using a routing netlink API.
|
||||
|
||||
:param ip_address: The IP address to search with.
|
||||
:param rtnl_api: A pyroute2 rtnl_api instance. (IPRoute, NetNS, etc.)
|
||||
:returns: The interface name if found, None if not found.
|
||||
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
|
||||
"""
|
||||
for addr in rtnl_api.get_addr(address=ip_address):
|
||||
# Save the interface index as IPv6 records don't list a textual
|
||||
# interface
|
||||
interface_idx = addr['index']
|
||||
# Search through the attributes of each address record
|
||||
for attr in addr['attrs']:
|
||||
# Look for the attribute name/value pair for the address
|
||||
if attr[0] == 'IFA_ADDRESS':
|
||||
# Compare the normalized address with the address we are
|
||||
# looking for. Since we have matched the name above, attr[1]
|
||||
# is the address value
|
||||
if normalized_addr == ipaddress.ip_address(attr[1]).compressed:
|
||||
# Lookup the matching interface name by getting the
|
||||
# interface with the index we found in the above address
|
||||
# search
|
||||
lookup_int = rtnl_api.get_links(interface_idx)
|
||||
# Search through the attributes of the matching interface
|
||||
# record
|
||||
for int_attr in lookup_int[0]['attrs']:
|
||||
# Look for the attribute name/value pair that includes
|
||||
# the interface name
|
||||
if int_attr[0] == 'IFLA_IFNAME':
|
||||
# Return the matching interface name that is in
|
||||
# int_attr[1] for the matching interface attribute
|
||||
# name
|
||||
return int_attr[1]
|
||||
# We didn't find an interface with that IP address.
|
||||
return None
|
||||
|
||||
|
||||
def get_interface_name(ip_address, net_ns=None):
|
||||
"""Gets the interface name from an IP address.
|
||||
|
||||
:param ip_address: The IP address to lookup.
|
||||
:param net_ns: The network namespace to find the interface in.
|
||||
:returns: The interface name.
|
||||
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
|
||||
:raises octavia.common.exceptions.NotFound: No interface was found.
|
||||
"""
|
||||
# We need to normalize the address as IPv6 has multiple representations
|
||||
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
|
||||
try:
|
||||
normalized_addr = ipaddress.ip_address(ip_address).compressed
|
||||
except ValueError:
|
||||
raise exceptions.InvalidIPAddress(ip_addr=ip_address)
|
||||
|
||||
if net_ns:
|
||||
with pyroute2.NetNS(net_ns) as rtnl_api:
|
||||
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
|
||||
else:
|
||||
with pyroute2.IPRoute() as rtnl_api:
|
||||
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
|
||||
if interface is not None:
|
||||
return interface
|
||||
raise exceptions.NotFound(resource='IP address', id=ip_address)
|
@ -202,6 +202,21 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
|
||||
:type agent_config: string
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
|
||||
"""Get the interface name from an IP address.
|
||||
|
||||
:param amphora: The amphora to query.
|
||||
:type amphora: octavia.db.models.Amphora
|
||||
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
|
||||
:type ip_address: string
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
:type timeout_dict: dict
|
||||
"""
|
||||
|
||||
|
||||
class HealthMixin(object, metaclass=abc.ABCMeta):
|
||||
@abc.abstractmethod
|
||||
@ -252,10 +267,17 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
|
||||
class XYZ: ...
|
||||
"""
|
||||
@abc.abstractmethod
|
||||
def update_vrrp_conf(self, loadbalancer):
|
||||
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
|
||||
timeout_dict=None):
|
||||
"""Update amphorae of the loadbalancer with a new VRRP configuration
|
||||
|
||||
:param loadbalancer: loadbalancer object
|
||||
:param amphorae_network_config: amphorae network configurations
|
||||
:param amphora: The amphora object to update.
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
@ -266,10 +288,14 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def start_vrrp_service(self, loadbalancer):
|
||||
"""Start the VRRP services of all amphorae of the loadbalancer
|
||||
def start_vrrp_service(self, amphora, timeout_dict=None):
|
||||
"""Start the VRRP services on the amphora
|
||||
|
||||
:param loadbalancer: loadbalancer object
|
||||
:param amphora: The amphora object to start the service on.
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
@ -278,10 +304,3 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
|
||||
|
||||
:param loadbalancer: loadbalancer object
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_vrrp_interface(self, amphora):
|
||||
"""Get the VRRP interface object for a specific amphora
|
||||
|
||||
:param amphora: amphora object
|
||||
"""
|
||||
|
@ -20,7 +20,7 @@ from oslo_log import log as logging
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def check_exception(response, ignore=tuple()):
|
||||
def check_exception(response, ignore=tuple(), log_error=True):
|
||||
status_code = response.status_code
|
||||
responses = {
|
||||
400: InvalidRequest,
|
||||
@ -34,8 +34,9 @@ def check_exception(response, ignore=tuple()):
|
||||
}
|
||||
if (status_code not in ignore) and (status_code in responses):
|
||||
try:
|
||||
LOG.error('Amphora agent returned unexpected result code %s with '
|
||||
'response %s', status_code, response.json())
|
||||
if log_error:
|
||||
LOG.error('Amphora agent returned unexpected result code %s '
|
||||
'with response %s', status_code, response.json())
|
||||
except Exception:
|
||||
# Handle the odd case where there is no response body
|
||||
# like when using requests_mock which doesn't support has_body
|
||||
|
@ -90,7 +90,7 @@ class HaproxyAmphoraLoadBalancerDriver(
|
||||
|
||||
return haproxy_version_string.split('.')[:2]
|
||||
|
||||
def _populate_amphora_api_version(self, amphora,
|
||||
def _populate_amphora_api_version(self, amphora, timeout_dict=None,
|
||||
raise_retry_exception=False):
|
||||
"""Populate the amphora object with the api_version
|
||||
|
||||
@ -102,7 +102,7 @@ class HaproxyAmphoraLoadBalancerDriver(
|
||||
if not getattr(amphora, 'api_version', None):
|
||||
try:
|
||||
amphora.api_version = self.clients['base'].get_api_version(
|
||||
amphora,
|
||||
amphora, timeout_dict=timeout_dict,
|
||||
raise_retry_exception=raise_retry_exception)['api_version']
|
||||
except exc.NotFound:
|
||||
# Amphora is too old for version discovery, default to 0.5
|
||||
@ -291,8 +291,11 @@ class HaproxyAmphoraLoadBalancerDriver(
|
||||
getattr(self.clients[amp.api_version], func_name)(
|
||||
amp, loadbalancer.id, *args)
|
||||
|
||||
def start(self, loadbalancer, amphora=None):
|
||||
self._apply('start_listener', loadbalancer, amphora)
|
||||
def reload(self, loadbalancer, amphora=None, timeout_dict=None):
|
||||
self._apply('reload_listener', loadbalancer, amphora, timeout_dict)
|
||||
|
||||
def start(self, loadbalancer, amphora=None, timeout_dict=None):
|
||||
self._apply('start_listener', loadbalancer, amphora, timeout_dict)
|
||||
|
||||
def delete(self, listener):
|
||||
# Delete any UDP listeners the old way (we didn't update the way they
|
||||
@ -588,6 +591,28 @@ class HaproxyAmphoraLoadBalancerDriver(
|
||||
'API.'.format(amphora.id))
|
||||
raise driver_except.AmpDriverNotImplementedError()
|
||||
|
||||
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
|
||||
"""Get the interface name for an IP address.
|
||||
|
||||
:param amphora: The amphora to query.
|
||||
:type amphora: octavia.db.models.Amphora
|
||||
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
|
||||
:type ip_address: string
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
:type timeout_dict: dict
|
||||
:returns: None if not found, the interface name string if found.
|
||||
"""
|
||||
try:
|
||||
self._populate_amphora_api_version(amphora, timeout_dict)
|
||||
response_json = self.clients[amphora.api_version].get_interface(
|
||||
amphora, ip_address, timeout_dict, log_error=False)
|
||||
return response_json.get('interface', None)
|
||||
except (exc.NotFound, driver_except.TimeOutException):
|
||||
return None
|
||||
|
||||
|
||||
# Check a custom hostname
|
||||
class CustomHostNameCheckingAdapter(requests.adapters.HTTPAdapter):
|
||||
@ -713,9 +738,10 @@ class AmphoraAPIClientBase(object):
|
||||
'exception': exception})
|
||||
raise driver_except.TimeOutException()
|
||||
|
||||
def get_api_version(self, amp, raise_retry_exception=False):
|
||||
def get_api_version(self, amp, timeout_dict=None,
|
||||
raise_retry_exception=False):
|
||||
amp.api_version = None
|
||||
r = self.get(amp, retry_404=False,
|
||||
r = self.get(amp, retry_404=False, timeout_dict=timeout_dict,
|
||||
raise_retry_exception=raise_retry_exception)
|
||||
# Handle 404 special as we don't want to log an ERROR on 404
|
||||
exc.check_exception(r, (404,))
|
||||
@ -816,16 +842,15 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
|
||||
r = self.put(amp, 'vrrp/upload', data=config)
|
||||
return exc.check_exception(r)
|
||||
|
||||
def _vrrp_action(self, action, amp):
|
||||
r = self.put(amp, 'vrrp/{action}'.format(action=action))
|
||||
def _vrrp_action(self, action, amp, timeout_dict=None):
|
||||
r = self.put(amp, 'vrrp/{action}'.format(action=action),
|
||||
timeout_dict=timeout_dict)
|
||||
return exc.check_exception(r)
|
||||
|
||||
def get_interface(self, amp, ip_addr, timeout_dict=None):
|
||||
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
|
||||
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
|
||||
timeout_dict=timeout_dict)
|
||||
if exc.check_exception(r):
|
||||
return r.json()
|
||||
return None
|
||||
return exc.check_exception(r, log_error=log_error).json()
|
||||
|
||||
def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
|
||||
r = self.put(
|
||||
@ -946,16 +971,15 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
|
||||
r = self.put(amp, 'vrrp/upload', data=config)
|
||||
return exc.check_exception(r)
|
||||
|
||||
def _vrrp_action(self, action, amp):
|
||||
r = self.put(amp, 'vrrp/{action}'.format(action=action))
|
||||
def _vrrp_action(self, action, amp, timeout_dict=None):
|
||||
r = self.put(amp, 'vrrp/{action}'.format(action=action),
|
||||
timeout_dict=timeout_dict)
|
||||
return exc.check_exception(r)
|
||||
|
||||
def get_interface(self, amp, ip_addr, timeout_dict=None):
|
||||
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
|
||||
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
|
||||
timeout_dict=timeout_dict)
|
||||
if exc.check_exception(r):
|
||||
return r.json()
|
||||
return None
|
||||
return exc.check_exception(r, log_error=log_error).json()
|
||||
|
||||
def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
|
||||
r = self.put(
|
||||
|
@ -29,34 +29,40 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
|
||||
# The Mixed class must define a self.client object for the
|
||||
# AmphoraApiClient
|
||||
|
||||
def update_vrrp_conf(self, loadbalancer, amphorae_network_config):
|
||||
"""Update amphorae of the loadbalancer with a new VRRP configuration
|
||||
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
|
||||
timeout_dict=None):
|
||||
"""Update amphora of the loadbalancer with a new VRRP configuration
|
||||
|
||||
:param loadbalancer: loadbalancer object
|
||||
:param amphorae_network_config: amphorae network configurations
|
||||
:param amphora: The amphora object to update.
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
"""
|
||||
if amphora.status != constants.AMPHORA_ALLOCATED:
|
||||
LOG.debug('update_vrrp_conf called for un-allocated amphora %s. '
|
||||
'Ignoring.', amphora.id)
|
||||
return
|
||||
|
||||
templater = jinja_cfg.KeepalivedJinjaTemplater()
|
||||
|
||||
LOG.debug("Update loadbalancer %s amphora VRRP configuration.",
|
||||
loadbalancer.id)
|
||||
LOG.debug("Update amphora %s VRRP configuration.", amphora.id)
|
||||
|
||||
for amp in filter(
|
||||
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
|
||||
loadbalancer.amphorae):
|
||||
self._populate_amphora_api_version(amphora)
|
||||
# Get the VIP subnet prefix for the amphora
|
||||
# For amphorav2 amphorae_network_config will be list of dicts
|
||||
try:
|
||||
vip_cidr = amphorae_network_config[amphora.id].vip_subnet.cidr
|
||||
except AttributeError:
|
||||
vip_cidr = amphorae_network_config[amphora.id][
|
||||
constants.VIP_SUBNET][constants.CIDR]
|
||||
|
||||
self._populate_amphora_api_version(amp)
|
||||
# Get the VIP subnet prefix for the amphora
|
||||
# For amphorav2 amphorae_network_config will be list of dicts
|
||||
try:
|
||||
vip_cidr = amphorae_network_config[amp.id].vip_subnet.cidr
|
||||
except AttributeError:
|
||||
vip_cidr = amphorae_network_config[amp.id][
|
||||
constants.VIP_SUBNET][constants.CIDR]
|
||||
|
||||
# Generate Keepalived configuration from loadbalancer object
|
||||
config = templater.build_keepalived_config(
|
||||
loadbalancer, amp, vip_cidr)
|
||||
self.clients[amp.api_version].upload_vrrp_config(amp, config)
|
||||
# Generate Keepalived configuration from loadbalancer object
|
||||
config = templater.build_keepalived_config(
|
||||
loadbalancer, amphora, vip_cidr)
|
||||
self.clients[amphora.api_version].upload_vrrp_config(amphora, config)
|
||||
|
||||
def stop_vrrp_service(self, loadbalancer):
|
||||
"""Stop the vrrp services running on the loadbalancer's amphorae
|
||||
@ -73,21 +79,25 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
|
||||
self._populate_amphora_api_version(amp)
|
||||
self.clients[amp.api_version].stop_vrrp(amp)
|
||||
|
||||
def start_vrrp_service(self, loadbalancer):
|
||||
"""Start the VRRP services of all amphorae of the loadbalancer
|
||||
def start_vrrp_service(self, amphora, timeout_dict=None):
|
||||
"""Start the VRRP services on an amphorae.
|
||||
|
||||
:param loadbalancer: loadbalancer object
|
||||
:param amphora: amphora object
|
||||
:param timeout_dict: Dictionary of timeout values for calls to the
|
||||
amphora. May contain: req_conn_timeout,
|
||||
req_read_timeout, conn_max_retries,
|
||||
conn_retry_interval
|
||||
"""
|
||||
LOG.info("Start loadbalancer %s amphora VRRP Service.",
|
||||
loadbalancer.id)
|
||||
if amphora.status != constants.AMPHORA_ALLOCATED:
|
||||
LOG.debug('start_vrrp_service called for un-allocated amphora %s. '
|
||||
'Ignoring.', amphora.id)
|
||||
return
|
||||
|
||||
for amp in filter(
|
||||
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
|
||||
loadbalancer.amphorae):
|
||||
LOG.info("Start amphora %s VRRP Service.", amphora.id)
|
||||
|
||||
LOG.debug("Start VRRP Service on amphora %s .", amp.lb_network_ip)
|
||||
self._populate_amphora_api_version(amp)
|
||||
self.clients[amp.api_version].start_vrrp(amp)
|
||||
self._populate_amphora_api_version(amphora)
|
||||
self.clients[amphora.api_version].start_vrrp(amphora,
|
||||
timeout_dict=timeout_dict)
|
||||
|
||||
def reload_vrrp_service(self, loadbalancer):
|
||||
"""Reload the VRRP services of all amphorae of the loadbalancer
|
||||
@ -103,8 +113,3 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
|
||||
|
||||
self._populate_amphora_api_version(amp)
|
||||
self.clients[amp.api_version].reload_vrrp(amp)
|
||||
|
||||
def get_vrrp_interface(self, amphora, timeout_dict=None):
|
||||
self._populate_amphora_api_version(amphora)
|
||||
return self.clients[amphora.api_version].get_interface(
|
||||
amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface']
|
||||
|
@ -114,6 +114,13 @@ class NoopManager(object):
|
||||
self.amphoraconfig[amphora.id, agent_config] = (
|
||||
amphora.id, agent_config, 'update_amphora_agent_config')
|
||||
|
||||
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
|
||||
LOG.debug("Amphora %s no-op, get interface from amphora %s for IP %s",
|
||||
self.__class__.__name__, amphora.id, ip_address)
|
||||
if ip_address == '198.51.100.99':
|
||||
return "noop0"
|
||||
return None
|
||||
|
||||
|
||||
class NoopAmphoraLoadBalancerDriver(
|
||||
driver_base.AmphoraLoadBalancerDriver,
|
||||
@ -170,17 +177,19 @@ class NoopAmphoraLoadBalancerDriver(
|
||||
def update_amphora_agent_config(self, amphora, agent_config):
|
||||
self.driver.update_amphora_agent_config(amphora, agent_config)
|
||||
|
||||
def update_vrrp_conf(self, loadbalancer):
|
||||
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
|
||||
return self.driver.get_interface_from_ip(amphora, ip_address,
|
||||
timeout_dict)
|
||||
|
||||
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
|
||||
timeout_dict=None):
|
||||
pass
|
||||
|
||||
def stop_vrrp_service(self, loadbalancer):
|
||||
pass
|
||||
|
||||
def start_vrrp_service(self, loadbalancer):
|
||||
def start_vrrp_service(self, amphora, timeout_dict=None):
|
||||
pass
|
||||
|
||||
def reload_vrrp_service(self, loadbalancer):
|
||||
pass
|
||||
|
||||
def get_vrrp_interface(self, amphora):
|
||||
pass
|
||||
|
@ -72,8 +72,11 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
|
||||
try:
|
||||
vip = network_driver.allocate_vip(lb_obj)
|
||||
except network_base.AllocateVIPException as e:
|
||||
raise exceptions.DriverError(user_fault_string=e.orig_msg,
|
||||
operator_fault_string=e.orig_msg)
|
||||
message = str(e)
|
||||
if getattr(e, 'orig_msg', None) is not None:
|
||||
message = e.orig_msg
|
||||
raise exceptions.DriverError(user_fault_string=message,
|
||||
operator_fault_string=message)
|
||||
|
||||
LOG.info('Amphora provider created VIP port %s for load balancer %s.',
|
||||
vip.port_id, loadbalancer_id)
|
||||
|
@ -22,6 +22,7 @@ from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from stevedore import driver as stevedore_driver
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.common import data_models
|
||||
from octavia.common import exceptions
|
||||
from octavia.common.tls_utils import cert_parser
|
||||
@ -544,6 +545,9 @@ def vip_dict_to_provider_dict(vip_dict):
|
||||
new_vip_dict['vip_subnet_id'] = vip_dict['subnet_id']
|
||||
if 'qos_policy_id' in vip_dict:
|
||||
new_vip_dict['vip_qos_policy_id'] = vip_dict['qos_policy_id']
|
||||
if constants.OCTAVIA_OWNED in vip_dict:
|
||||
new_vip_dict[constants.OCTAVIA_OWNED] = vip_dict[
|
||||
constants.OCTAVIA_OWNED]
|
||||
return new_vip_dict
|
||||
|
||||
|
||||
@ -559,4 +563,6 @@ def provider_vip_dict_to_vip_obj(vip_dictionary):
|
||||
vip_obj.subnet_id = vip_dictionary['vip_subnet_id']
|
||||
if 'vip_qos_policy_id' in vip_dictionary:
|
||||
vip_obj.qos_policy_id = vip_dictionary['vip_qos_policy_id']
|
||||
if constants.OCTAVIA_OWNED in vip_dictionary:
|
||||
vip_obj.octavia_owned = vip_dictionary[constants.OCTAVIA_OWNED]
|
||||
return vip_obj
|
||||
|
@ -12,6 +12,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import ipaddress
|
||||
|
||||
from octavia_lib.api.drivers import data_models as driver_dm
|
||||
from oslo_config import cfg
|
||||
@ -187,26 +188,40 @@ class LoadBalancersController(base.BaseController):
|
||||
isinstance(load_balancer.vip_qos_policy_id, wtypes.UnsetType)):
|
||||
load_balancer.vip_qos_policy_id = port_qos_policy_id
|
||||
|
||||
# Identify the subnet for this port
|
||||
if load_balancer.vip_subnet_id:
|
||||
# If we were provided a subnet_id, validate it exists and that
|
||||
# there is a fixed_ip on the port that matches the provided subnet
|
||||
validate.subnet_exists(subnet_id=load_balancer.vip_subnet_id,
|
||||
context=context)
|
||||
else:
|
||||
if load_balancer.vip_address:
|
||||
for port_fixed_ip in port.fixed_ips:
|
||||
if port_fixed_ip.ip_address == load_balancer.vip_address:
|
||||
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
|
||||
break
|
||||
if not load_balancer.vip_subnet_id:
|
||||
raise exceptions.ValidationException(detail=_(
|
||||
"Specified VIP address not found on the "
|
||||
"specified VIP port."))
|
||||
elif len(port.fixed_ips) == 1:
|
||||
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
|
||||
else:
|
||||
for port_fixed_ip in port.fixed_ips:
|
||||
if port_fixed_ip.subnet_id == load_balancer.vip_subnet_id:
|
||||
load_balancer.vip_address = port_fixed_ip.ip_address
|
||||
break # Just pick the first address found in the subnet
|
||||
if not load_balancer.vip_address:
|
||||
raise exceptions.ValidationException(detail=_(
|
||||
"VIP port's subnet could not be determined. Please "
|
||||
"specify either a VIP subnet or address."))
|
||||
"No VIP address found on the specified VIP port within "
|
||||
"the specified subnet."))
|
||||
elif load_balancer.vip_address:
|
||||
normalized_lb_ip = ipaddress.ip_address(
|
||||
load_balancer.vip_address).compressed
|
||||
for port_fixed_ip in port.fixed_ips:
|
||||
normalized_port_ip = ipaddress.ip_address(
|
||||
port_fixed_ip.ip_address).compressed
|
||||
if normalized_port_ip == normalized_lb_ip:
|
||||
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
|
||||
break
|
||||
if not load_balancer.vip_subnet_id:
|
||||
raise exceptions.ValidationException(detail=_(
|
||||
"Specified VIP address not found on the "
|
||||
"specified VIP port."))
|
||||
elif len(port.fixed_ips) == 1:
|
||||
# User provided only a port, get the subnet and address from it
|
||||
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
|
||||
load_balancer.vip_address = port.fixed_ips[0].ip_address
|
||||
else:
|
||||
raise exceptions.ValidationException(detail=_(
|
||||
"VIP port's subnet could not be determined. Please "
|
||||
"specify either a VIP subnet or address."))
|
||||
|
||||
def _validate_vip_request_object(self, load_balancer, context=None):
|
||||
allowed_network_objects = []
|
||||
@ -450,7 +465,10 @@ class LoadBalancersController(base.BaseController):
|
||||
# Do the same with the availability_zone dict
|
||||
lb_dict['availability_zone'] = az_dict
|
||||
|
||||
# See if the provider driver wants to create the VIP port
|
||||
# See if the provider driver wants to manage the VIP port
|
||||
# This will still be called if the user provided a port to
|
||||
# allow drivers to collect any required information about the
|
||||
# VIP port.
|
||||
octavia_owned = False
|
||||
try:
|
||||
provider_vip_dict = driver_utils.vip_dict_to_provider_dict(
|
||||
@ -470,6 +488,10 @@ class LoadBalancersController(base.BaseController):
|
||||
if 'port_id' not in vip_dict or not vip_dict['port_id']:
|
||||
octavia_owned = True
|
||||
|
||||
# Check if the driver claims octavia owns the VIP port.
|
||||
if vip.octavia_owned:
|
||||
octavia_owned = True
|
||||
|
||||
self.repositories.vip.update(
|
||||
lock_session, db_lb.id, ip_address=vip.ip_address,
|
||||
port_id=vip.port_id, network_id=vip.network_id,
|
||||
|
@ -198,6 +198,20 @@ amphora_agent_opts = [
|
||||
help='The UDP API backend for amphora agent.'),
|
||||
]
|
||||
|
||||
compute_opts = [
|
||||
cfg.IntOpt('max_retries', default=15,
|
||||
help=_('The maximum attempts to retry an action with the '
|
||||
'compute service.')),
|
||||
cfg.IntOpt('retry_interval', default=1,
|
||||
help=_('Seconds to wait before retrying an action with the '
|
||||
'compute service.')),
|
||||
cfg.IntOpt('retry_backoff', default=1,
|
||||
help=_('The seconds to backoff retry attempts.')),
|
||||
cfg.IntOpt('retry_max', default=10,
|
||||
help=_('The maximum interval in seconds between retry '
|
||||
'attempts.')),
|
||||
]
|
||||
|
||||
networking_opts = [
|
||||
cfg.IntOpt('max_retries', default=15,
|
||||
help=_('The maximum attempts to retry an action with the '
|
||||
@ -205,6 +219,11 @@ networking_opts = [
|
||||
cfg.IntOpt('retry_interval', default=1,
|
||||
help=_('Seconds to wait before retrying an action with the '
|
||||
'networking service.')),
|
||||
cfg.IntOpt('retry_backoff', default=1,
|
||||
help=_('The seconds to backoff retry attempts.')),
|
||||
cfg.IntOpt('retry_max', default=10,
|
||||
help=_('The maximum interval in seconds between retry '
|
||||
'attempts.')),
|
||||
cfg.IntOpt('port_detach_timeout', default=300,
|
||||
help=_('Seconds to wait for a port to detach from an '
|
||||
'amphora.')),
|
||||
@ -317,6 +336,14 @@ haproxy_amphora_opts = [
|
||||
default=2,
|
||||
help=_('Retry timeout between connection attempts in '
|
||||
'seconds for active amphora.')),
|
||||
cfg.IntOpt('failover_connection_max_retries',
|
||||
default=2,
|
||||
help=_('Retry threshold for connecting to an amphora in '
|
||||
'failover.')),
|
||||
cfg.IntOpt('failover_connection_retry_interval',
|
||||
default=5,
|
||||
help=_('Retry timeout between connection attempts in '
|
||||
'seconds for amphora in failover.')),
|
||||
cfg.IntOpt('build_rate_limit',
|
||||
default=-1,
|
||||
help=_('Number of amphorae that could be built per controller '
|
||||
@ -380,6 +407,16 @@ haproxy_amphora_opts = [
|
||||
deprecated_reason='This is now automatically discovered '
|
||||
' and configured.',
|
||||
help=_("If False, use sysvinit.")),
|
||||
cfg.IntOpt('api_db_commit_retry_attempts', default=15,
|
||||
help=_('The number of times the database action will be '
|
||||
'attempted.')),
|
||||
cfg.IntOpt('api_db_commit_retry_initial_delay', default=1,
|
||||
help=_('The initial delay before a retry attempt.')),
|
||||
cfg.IntOpt('api_db_commit_retry_backoff', default=1,
|
||||
help=_('The time to backoff retry attempts.')),
|
||||
cfg.IntOpt('api_db_commit_retry_max', default=5,
|
||||
help=_('The maximum amount of time to wait between retry '
|
||||
'attempts.')),
|
||||
]
|
||||
|
||||
controller_worker_opts = [
|
||||
@ -462,7 +499,11 @@ controller_worker_opts = [
|
||||
help=_('If True, build cloud-init user-data that is passed '
|
||||
'to the config drive on Amphora boot instead of '
|
||||
'personality files. If False, utilize personality '
|
||||
'files.'))
|
||||
'files.')),
|
||||
cfg.IntOpt('amphora_delete_retries', default=5,
|
||||
help=_('Number of times an amphora delete should be retried.')),
|
||||
cfg.IntOpt('amphora_delete_retry_interval', default=5,
|
||||
help=_('Time, in seconds, between amphora delete retries.')),
|
||||
]
|
||||
|
||||
task_flow_opts = [
|
||||
@ -790,6 +831,7 @@ driver_agent_opts = [
|
||||
cfg.CONF.register_opts(core_opts)
|
||||
cfg.CONF.register_opts(api_opts, group='api_settings')
|
||||
cfg.CONF.register_opts(amphora_agent_opts, group='amphora_agent')
|
||||
cfg.CONF.register_opts(compute_opts, group='compute')
|
||||
cfg.CONF.register_opts(networking_opts, group='networking')
|
||||
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
|
||||
cfg.CONF.register_opts(haproxy_amphora_opts, group='haproxy_amphora')
|
||||
|
@ -296,7 +296,9 @@ ACTIVE_CONNECTIONS = 'active_connections'
|
||||