Merge "Refactor the failover flows"

This commit is contained in:
Zuul 2020-06-19 21:58:32 +00:00 committed by Gerrit Code Review
commit 77786595e5
73 changed files with 6374 additions and 1353 deletions

View File

@ -165,6 +165,19 @@
# Endpoint type to use for communication with the Barbican service.
# endpoint_type = publicURL
[compute]
# The maximum attempts to retry an action with the compute service.
# max_retries = 15
# Seconds to wait before retrying an action with the compute service.
# retry_interval = 1
# The seconds to backoff retry attempts
# retry_backoff = 1
# The maximum interval in seconds between retry attempts
# retry_max = 10
[networking]
# The maximum attempts to retry an action with the networking service.
# max_retries = 15
@ -172,6 +185,12 @@
# Seconds to wait before retrying an action with the networking service.
# retry_interval = 1
# The seconds to backoff retry attempts
# retry_backoff = 1
# The maximum interval in seconds between retry attempts
# retry_max = 10
# The maximum time to wait, in seconds, for a port to detach from an amphora
# port_detach_timeout = 300
@ -236,11 +255,26 @@
# active_connection_max_retries = 15
# active_connection_rety_interval = 2
# These "failover" timeouts are used during the failover process to probe
# amphorae that are part of the load balancer being failed over.
# These values are very low to facilitate "fail fast" should an amphora
# not respond in a failure situation.
# failover_connection_max_retries = 2
# failover_connection_retry_interval = 5
# The user flow log format for HAProxy.
# {{ project_id }} and {{ lb_id }} will be automatically substituted by the
# controller when configuring HAProxy if they are present in the string.
# user_log_format = '{{ project_id }} {{ lb_id }} %f %ci %cp %t %{+Q}r %ST %B %U %[ssl_c_verify] %{+Q}[ssl_c_s_dn] %b %s %Tt %tsc'
# API messaging / database commit retries
# This is many times the controller worker retries waiting for the API to
# complete a database commit for a message received over the queue.
# api_db_commit_retry_attempts = 15
# api_db_commit_retry_initial_delay = 1
# api_db_commit_retry_backoff = 1
# api_db_commit_retry_max = 5
[controller_worker]
# workers = 1
# amp_active_retries = 30
@ -297,6 +331,9 @@
# loadbalancer_topology = SINGLE
# user_data_config_drive = False
# amphora_delete_retries = 5
# amphora_delete_retry_interval = 5
[task_flow]
# TaskFlow engine options are:
# - serial: Runs all tasks on a single thread.

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
import os
import re
import socket
@ -21,10 +20,11 @@ import subprocess
import pyroute2
import webob
import netifaces
from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts
from octavia.common import exceptions
class AmphoraInfo(object):
@ -175,65 +175,15 @@ class AmphoraInfo(object):
return networks
def get_interface(self, ip_addr):
try:
ip_version = ipaddress.ip_address(ip_addr).version
except Exception:
interface = network_utils.get_interface_name(
ip_addr, net_ns=consts.AMPHORA_NAMESPACE)
except exceptions.InvalidIPAddress:
return webob.Response(json=dict(message="Invalid IP address"),
status=400)
except exceptions.NotFound:
return webob.Response(
json=dict(message="Invalid IP address"), status=400)
if ip_version == 4:
address_format = netifaces.AF_INET
elif ip_version == 6:
address_format = netifaces.AF_INET6
else:
return webob.Response(
json=dict(message="Bad IP address version"), status=400)
# We need to normalize the address as IPv6 has multiple representations
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
normalized_addr = socket.inet_ntop(address_format,
socket.inet_pton(address_format,
ip_addr))
with pyroute2.NetNS(consts.AMPHORA_NAMESPACE) as netns:
for addr in netns.get_addr():
# Save the interface index as IPv6 records don't list a
# textual interface
interface_idx = addr['index']
# Save the address family (IPv4/IPv6) for use normalizing
# the IP address for comparison
interface_af = addr['family']
# Search through the attributes of each address record
for attr in addr['attrs']:
# Look for the attribute name/value pair for the address
if attr[0] == 'IFA_ADDRESS':
# Compare the normalized address with the address we
# we are looking for. Since we have matched the name
# above, attr[1] is the address value
if normalized_addr == socket.inet_ntop(
interface_af,
socket.inet_pton(interface_af, attr[1])):
# Lookup the matching interface name by
# getting the interface with the index we found
# in the above address search
lookup_int = netns.get_links(interface_idx)
# Search through the attributes of the matching
# interface record
for int_attr in lookup_int[0]['attrs']:
# Look for the attribute name/value pair
# that includes the interface name
if int_attr[0] == 'IFLA_IFNAME':
# Return the response with the matching
# interface name that is in int_attr[1]
# for the matching interface attribute
# name
return webob.Response(
json=dict(message='OK',
interface=int_attr[1]),
status=200)
return webob.Response(
json=dict(message="Error interface not found for IP address"),
status=404)
json=dict(message="Error interface not found for IP address"),
status=404)
return webob.Response(json=dict(message='OK', interface=interface),
status=200)

View File

@ -47,6 +47,7 @@ class Keepalived(object):
if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
if not os.path.exists(util.keepalived_check_scripts_dir()):
os.makedirs(util.keepalived_check_scripts_dir())
conf_file = util.keepalived_cfg_path()
@ -112,6 +113,9 @@ class Keepalived(object):
)
text_file.write(text)
# Configure the monitoring of haproxy
util.vrrp_check_script_update(None, consts.AMP_ACTION_START)
# Make sure the new service is enabled on boot
if init_system != consts.INIT_UPSTART:
try:

View File

@ -78,7 +78,8 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
# Active-Standby topology will create the directory below. So for
# Single topology, it should not create the directory and the check
# scripts for status change.
if not os.path.exists(util.keepalived_check_scripts_dir()):
if (CONF.controller_worker.loadbalancer_topology !=
consts.TOPOLOGY_ACTIVE_STANDBY):
NEED_CHECK = False
conf_file = util.keepalived_lvs_cfg_path(listener_id)
@ -157,6 +158,9 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
script_path = os.path.join(util.keepalived_check_scripts_dir(),
KEEPALIVED_CHECK_SCRIPT_NAME)
if not os.path.exists(script_path):
if not os.path.exists(util.keepalived_check_scripts_dir()):
os.makedirs(util.keepalived_check_scripts_dir())
with os.fdopen(os.open(script_path, flags, stat.S_IEXEC),
'w') as script_file:
text = check_script_file_template.render(

View File

@ -235,12 +235,11 @@ class Loadbalancer(object):
details="Unknown action: {0}".format(action)), status=400)
self._check_lb_exists(lb_id)
is_vrrp = (CONF.controller_worker.loadbalancer_topology ==
consts.TOPOLOGY_ACTIVE_STANDBY)
# Since this script should be created at LB create time
# we can check for this path to see if VRRP is enabled
# on this amphora and not write the file if VRRP is not in use
if os.path.exists(util.keepalived_check_script_path()):
self.vrrp_check_script_update(lb_id, action)
if is_vrrp:
util.vrrp_check_script_update(lb_id, action)
# HAProxy does not start the process when given a reload
# so start it if haproxy is not already running
@ -262,6 +261,14 @@ class Loadbalancer(object):
return webob.Response(json=dict(
message="Error {0}ing haproxy".format(action),
details=e.output), status=500)
# If we are not in active/standby we need to send an IP
# advertisement (GARP or NA). Keepalived handles this for
# active/standby load balancers.
if not is_vrrp and action in [consts.AMP_ACTION_START,
consts.AMP_ACTION_RELOAD]:
util.send_vip_advertisements(lb_id)
if action in [consts.AMP_ACTION_STOP,
consts.AMP_ACTION_RELOAD]:
return webob.Response(json=dict(
@ -307,7 +314,7 @@ class Loadbalancer(object):
# we can check for this path to see if VRRP is enabled
# on this amphora and not write the file if VRRP is not in use
if os.path.exists(util.keepalived_check_script_path()):
self.vrrp_check_script_update(
util.vrrp_check_script_update(
lb_id, action=consts.AMP_ACTION_STOP)
# delete the ssl files
@ -455,22 +462,6 @@ class Loadbalancer(object):
def _cert_file_path(self, lb_id, filename):
return os.path.join(self._cert_dir(lb_id), filename)
def vrrp_check_script_update(self, lb_id, action):
lb_ids = util.get_loadbalancers()
if action == consts.AMP_ACTION_STOP:
lb_ids.remove(lb_id)
args = []
for lbid in lb_ids:
args.append(util.haproxy_sock_path(lbid))
if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
os.makedirs(util.keepalived_check_scripts_dir())
cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(util.haproxy_check_script_path(), 'w') as text_file:
text_file.write(cmd)
def _check_haproxy_status(self, lb_id):
if os.path.exists(util.pid_path(lb_id)):
if os.path.exists(

View File

@ -23,6 +23,8 @@ from oslo_config import cfg
from oslo_log import log as logging
from octavia.amphorae.backends.agent.api_server import osutils
from octavia.amphorae.backends.utils import ip_advertisement
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts
CONF = cfg.CONF
@ -188,7 +190,7 @@ def get_listeners():
def get_loadbalancers():
"""Get Load balancers
:returns: An array with the ids of all load balancers,
:returns: An array with the uuids of all load balancers,
e.g. ['123', '456', ...] or [] if no loadbalancers exist
"""
if os.path.exists(CONF.haproxy_amphora.base_path):
@ -332,3 +334,71 @@ def parse_haproxy_file(lb_id):
stats_socket = m.group(1)
return stats_socket, listeners
def vrrp_check_script_update(lb_id, action):
os.makedirs(keepalived_dir(), exist_ok=True)
os.makedirs(keepalived_check_scripts_dir(), exist_ok=True)
lb_ids = get_loadbalancers()
udp_ids = get_udp_listeners()
# If no LBs are found, so make sure keepalived thinks haproxy is down.
if not lb_ids:
if not udp_ids:
with open(haproxy_check_script_path(), 'w') as text_file:
text_file.write('exit 1')
return
if action == consts.AMP_ACTION_STOP:
lb_ids.remove(lb_id)
args = []
for lbid in lb_ids:
args.append(haproxy_sock_path(lbid))
cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(haproxy_check_script_path(), 'w') as text_file:
text_file.write(cmd)
def get_haproxy_vip_addresses(lb_id):
"""Get the VIP addresses for a load balancer.
:param lb_id: The load balancer ID to get VIP addresses from.
:returns: List of VIP addresses (IPv4 and IPv6)
"""
vips = []
with open(config_path(lb_id), 'r') as file:
for line in file:
current_line = line.strip()
if current_line.startswith('bind'):
for section in current_line.split(' '):
# We will always have a port assigned per the template.
if ':' in section:
if ',' in section:
addr_port = section.rstrip(',')
vips.append(addr_port.rpartition(':')[0])
else:
vips.append(section.rpartition(':')[0])
break
return vips
def send_vip_advertisements(lb_id):
"""Sends address advertisements for each load balancer VIP.
This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the VIP addresses on a load balancer.
:param lb_id: The load balancer ID to send advertisements for.
:returns: None
"""
try:
vips = get_haproxy_vip_addresses(lb_id)
for vip in vips:
interface = network_utils.get_interface_name(
vip, net_ns=consts.AMPHORA_NAMESPACE)
ip_advertisement.send_ip_advertisement(
interface, vip, net_ns=consts.AMPHORA_NAMESPACE)
except Exception as e:
LOG.debug('Send VIP advertisement failed due to :%s. '
'This amphora may not be the MASTER. Ignoring.', str(e))

View File

@ -0,0 +1,183 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fcntl
import socket
from struct import pack
from struct import unpack
from oslo_log import log as logging
from octavia.amphorae.backends.utils import network_namespace
from octavia.common import constants
from octavia.common import utils as common_utils
LOG = logging.getLogger(__name__)
def garp(interface, ip_address, net_ns=None):
"""Sends a gratuitous ARP for ip_address on the interface.
:param interface: The interface name to send the GARP on.
:param ip_address: The IP address to advertise in the GARP.
:param net_ns: The network namespace to send the GARP from.
:returns: None
"""
ARP_ETHERTYPE = 0x0806
BROADCAST_MAC = b'\xff\xff\xff\xff\xff\xff'
# Get a socket, optionally inside a network namespace
garp_socket = None
if net_ns:
with network_namespace.NetworkNamespace(net_ns):
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
else:
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
# Bind the socket with the ARP ethertype protocol
garp_socket.bind((interface, ARP_ETHERTYPE))
# Get the MAC address of the interface
source_mac = garp_socket.getsockname()[4]
garp_msg = [
pack('!h', 1), # Hardware type ethernet
pack('!h', 0x0800), # Protocol type IPv4
pack('!B', 6), # Hardware size
pack('!B', 4), # Protocol size
pack('!h', 1), # Opcode request
source_mac, # Sender MAC address
socket.inet_aton(ip_address), # Sender IP address
BROADCAST_MAC, # Target MAC address
socket.inet_aton(ip_address)] # Target IP address
garp_ethernet = [
BROADCAST_MAC, # Ethernet destination
source_mac, # Ethernet source
pack('!h', ARP_ETHERTYPE), # Ethernet type
b''.join(garp_msg)] # The GARP message
garp_socket.send(b''.join(garp_ethernet))
garp_socket.close()
def calculate_icmpv6_checksum(packet):
"""Calculate the ICMPv6 checksum for a packet.
:param packet: The packet bytes to checksum.
:returns: The checksum integer.
"""
total = 0
# Add up 16-bit words
num_words = len(packet) // 2
for chunk in unpack("!%sH" % num_words, packet[0:num_words * 2]):
total += chunk
# Add any left over byte
if len(packet) % 2:
total += packet[-1] << 8
# Fold 32-bits into 16-bits
total = (total >> 16) + (total & 0xffff)
total += total >> 16
return ~total + 0x10000 & 0xffff
def neighbor_advertisement(interface, ip_address, net_ns=None):
"""Sends a unsolicited neighbor advertisement for an ip on the interface.
:param interface: The interface name to send the GARP on.
:param ip_address: The IP address to advertise in the GARP.
:param net_ns: The network namespace to send the GARP from.
:returns: None
"""
ALL_NODES_ADDR = 'ff02::1'
SIOCGIFHWADDR = 0x8927
# Get a socket, optionally inside a network namespace
na_socket = None
if net_ns:
with network_namespace.NetworkNamespace(net_ns):
na_socket = socket.socket(
socket.AF_INET6, socket.SOCK_RAW,
socket.getprotobyname(constants.IPV6_ICMP))
else:
na_socket = socket.socket(socket.AF_INET6, socket.SOCK_RAW,
socket.getprotobyname(constants.IPV6_ICMP))
# Per RFC 4861 section 4.4, the hop limit should be 255
na_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 255)
# Bind the socket with the source address
na_socket.bind((ip_address, 0))
# Get the byte representation of the MAC address of the interface
# Note: You can't use getsockname() to get the MAC on this type of socket
source_mac = fcntl.ioctl(na_socket.fileno(), SIOCGIFHWADDR, pack('256s',
bytes(interface, 'utf-8')))[18:24]
# Get the byte representation of the source IP address
source_ip_bytes = socket.inet_pton(socket.AF_INET6, ip_address)
icmpv6_na_msg_prefix = [
pack('!B', 136), # ICMP Type Neighbor Advertisement
pack('!B', 0)] # ICMP Code
icmpv6_na_msg_postfix = [
pack('!I', 0xa0000000), # Flags (Router, Override)
source_ip_bytes, # Target address
pack('!B', 2), # ICMPv6 option type target link-layer address
pack('!B', 1), # ICMPv6 option length
source_mac] # ICMPv6 option link-layer address
# Calculate the ICMPv6 checksum
icmpv6_pseudo_header = [
source_ip_bytes, # Source IP address
socket.inet_pton(socket.AF_INET6, ALL_NODES_ADDR), # Destination IP
pack('!I', 58), # IPv6 next header (ICMPv6)
pack('!h', 32)] # IPv6 payload length
icmpv6_tmp_chksum = pack('!H', 0) # Checksum are zeros for calculation
tmp_chksum_msg = b''.join(icmpv6_pseudo_header + icmpv6_na_msg_prefix +
[icmpv6_tmp_chksum] + icmpv6_pseudo_header)
checksum = pack('!H', calculate_icmpv6_checksum(tmp_chksum_msg))
# Build the ICMPv6 unsolicitated neighbor advertisement
icmpv6_msg = b''.join(icmpv6_na_msg_prefix + [checksum] +
icmpv6_na_msg_postfix)
na_socket.sendto(icmpv6_msg, (ALL_NODES_ADDR, 0, 0, 0))
na_socket.close()
def send_ip_advertisement(interface, ip_address, net_ns=None):
"""Send an address advertisement.
This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the ip address specified.
:param interface: The interface name to send the advertisement on.
:param ip_address: The IP address to advertise.
:param net_ns: The network namespace to send the advertisement from.
:returns: None
"""
try:
if common_utils.is_ipv4(ip_address):
garp(interface, ip_address, net_ns)
elif common_utils.is_ipv6(ip_address):
neighbor_advertisement(interface, ip_address, net_ns)
else:
LOG.error('Unknown IP version for address: "%s". Skipping',
ip_address)
except Exception as e:
LOG.warning('Unable to send address advertisement for address: "%s", '
'error: %s. Skipping', ip_address, str(e))

View File

@ -0,0 +1,50 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import os
class NetworkNamespace(object):
"""A network namespace context manager.
Runs wrapped code inside the specified network namespace.
:param netns: The network namespace name to enter.
"""
# from linux/sched.h - We want to enter a network namespace
CLONE_NEWNET = 0x40000000
@staticmethod
def _error_handler(result, func, arguments):
if result == -1:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
def __init__(self, netns):
self.current_netns = '/proc/{pid}/ns/net'.format(pid=os.getpid())
self.target_netns = '/var/run/netns/{netns}'.format(netns=netns)
# reference: man setns(2)
self.set_netns = ctypes.CDLL('libc.so.6', use_errno=True).setns
self.set_netns.errcheck = self._error_handler
def __enter__(self):
# Save the current network namespace
self.current_netns_fd = open(self.current_netns)
with open(self.target_netns) as fd:
self.set_netns(fd.fileno(), self.CLONE_NEWNET)
def __exit__(self, *args):
# Return to the previous network namespace
self.set_netns(self.current_netns_fd.fileno(), self.CLONE_NEWNET)
self.current_netns_fd.close()

View File

@ -0,0 +1,83 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
import pyroute2
from octavia.common import exceptions
def _find_interface(ip_address, rtnl_api, normalized_addr):
"""Find the interface using a routing netlink API.
:param ip_address: The IP address to search with.
:param rtnl_api: A pyroute2 rtnl_api instance. (IPRoute, NetNS, etc.)
:returns: The interface name if found, None if not found.
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
"""
for addr in rtnl_api.get_addr(address=ip_address):
# Save the interface index as IPv6 records don't list a textual
# interface
interface_idx = addr['index']
# Search through the attributes of each address record
for attr in addr['attrs']:
# Look for the attribute name/value pair for the address
if attr[0] == 'IFA_ADDRESS':
# Compare the normalized address with the address we are
# looking for. Since we have matched the name above, attr[1]
# is the address value
if normalized_addr == ipaddress.ip_address(attr[1]).compressed:
# Lookup the matching interface name by getting the
# interface with the index we found in the above address
# search
lookup_int = rtnl_api.get_links(interface_idx)
# Search through the attributes of the matching interface
# record
for int_attr in lookup_int[0]['attrs']:
# Look for the attribute name/value pair that includes
# the interface name
if int_attr[0] == 'IFLA_IFNAME':
# Return the matching interface name that is in
# int_attr[1] for the matching interface attribute
# name
return int_attr[1]
# We didn't find an interface with that IP address.
return None
def get_interface_name(ip_address, net_ns=None):
"""Gets the interface name from an IP address.
:param ip_address: The IP address to lookup.
:param net_ns: The network namespace to find the interface in.
:returns: The interface name.
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
:raises octavia.common.exceptions.NotFound: No interface was found.
"""
# We need to normalize the address as IPv6 has multiple representations
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
try:
normalized_addr = ipaddress.ip_address(ip_address).compressed
except ValueError:
raise exceptions.InvalidIPAddress(ip_addr=ip_address)
if net_ns:
with pyroute2.NetNS(net_ns) as rtnl_api:
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
else:
with pyroute2.IPRoute() as rtnl_api:
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
if interface is not None:
return interface
raise exceptions.NotFound(resource='IP address', id=ip_address)

View File

@ -202,6 +202,21 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
:type agent_config: string
"""
@abc.abstractmethod
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
"""Get the interface name from an IP address.
:param amphora: The amphora to query.
:type amphora: octavia.db.models.Amphora
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
:type ip_address: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:type timeout_dict: dict
"""
class HealthMixin(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
@ -252,10 +267,17 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
class XYZ: ...
"""
@abc.abstractmethod
def update_vrrp_conf(self, loadbalancer):
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
"""Update amphorae of the loadbalancer with a new VRRP configuration
:param loadbalancer: loadbalancer object
:param amphorae_network_config: amphorae network configurations
:param amphora: The amphora object to update.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
@abc.abstractmethod
@ -266,10 +288,14 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
def start_vrrp_service(self, amphora, timeout_dict=None):
"""Start the VRRP services on the amphora
:param loadbalancer: loadbalancer object
:param amphora: The amphora object to start the service on.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
@abc.abstractmethod
@ -278,10 +304,3 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
:param loadbalancer: loadbalancer object
"""
@abc.abstractmethod
def get_vrrp_interface(self, amphora):
"""Get the VRRP interface object for a specific amphora
:param amphora: amphora object
"""

View File

@ -20,7 +20,7 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def check_exception(response, ignore=tuple()):
def check_exception(response, ignore=tuple(), log_error=True):
status_code = response.status_code
responses = {
400: InvalidRequest,
@ -34,8 +34,9 @@ def check_exception(response, ignore=tuple()):
}
if (status_code not in ignore) and (status_code in responses):
try:
LOG.error('Amphora agent returned unexpected result code %s with '
'response %s', status_code, response.json())
if log_error:
LOG.error('Amphora agent returned unexpected result code %s '
'with response %s', status_code, response.json())
except Exception:
# Handle the odd case where there is no response body
# like when using requests_mock which doesn't support has_body

View File

@ -90,7 +90,7 @@ class HaproxyAmphoraLoadBalancerDriver(
return haproxy_version_string.split('.')[:2]
def _populate_amphora_api_version(self, amphora,
def _populate_amphora_api_version(self, amphora, timeout_dict=None,
raise_retry_exception=False):
"""Populate the amphora object with the api_version
@ -102,7 +102,7 @@ class HaproxyAmphoraLoadBalancerDriver(
if not getattr(amphora, 'api_version', None):
try:
amphora.api_version = self.clients['base'].get_api_version(
amphora,
amphora, timeout_dict=timeout_dict,
raise_retry_exception=raise_retry_exception)['api_version']
except exc.NotFound:
# Amphora is too old for version discovery, default to 0.5
@ -291,8 +291,11 @@ class HaproxyAmphoraLoadBalancerDriver(
getattr(self.clients[amp.api_version], func_name)(
amp, loadbalancer.id, *args)
def start(self, loadbalancer, amphora=None):
self._apply('start_listener', loadbalancer, amphora)
def reload(self, loadbalancer, amphora=None, timeout_dict=None):
self._apply('reload_listener', loadbalancer, amphora, timeout_dict)
def start(self, loadbalancer, amphora=None, timeout_dict=None):
self._apply('start_listener', loadbalancer, amphora, timeout_dict)
def delete(self, listener):
# Delete any UDP listeners the old way (we didn't update the way they
@ -588,6 +591,28 @@ class HaproxyAmphoraLoadBalancerDriver(
'API.'.format(amphora.id))
raise driver_except.AmpDriverNotImplementedError()
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
"""Get the interface name for an IP address.
:param amphora: The amphora to query.
:type amphora: octavia.db.models.Amphora
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
:type ip_address: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:type timeout_dict: dict
:returns: None if not found, the interface name string if found.
"""
try:
self._populate_amphora_api_version(amphora, timeout_dict)
response_json = self.clients[amphora.api_version].get_interface(
amphora, ip_address, timeout_dict, log_error=False)
return response_json.get('interface', None)
except (exc.NotFound, driver_except.TimeOutException):
return None
# Check a custom hostname
class CustomHostNameCheckingAdapter(requests.adapters.HTTPAdapter):
@ -713,9 +738,10 @@ class AmphoraAPIClientBase(object):
'exception': exception})
raise driver_except.TimeOutException()
def get_api_version(self, amp, raise_retry_exception=False):
def get_api_version(self, amp, timeout_dict=None,
raise_retry_exception=False):
amp.api_version = None
r = self.get(amp, retry_404=False,
r = self.get(amp, retry_404=False, timeout_dict=timeout_dict,
raise_retry_exception=raise_retry_exception)
# Handle 404 special as we don't want to log an ERROR on 404
exc.check_exception(r, (404,))
@ -816,16 +842,15 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
r = self.put(amp, 'vrrp/upload', data=config)
return exc.check_exception(r)
def _vrrp_action(self, action, amp):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
def _vrrp_action(self, action, amp, timeout_dict=None):
r = self.put(amp, 'vrrp/{action}'.format(action=action),
timeout_dict=timeout_dict)
return exc.check_exception(r)
def get_interface(self, amp, ip_addr, timeout_dict=None):
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()
return None
return exc.check_exception(r, log_error=log_error).json()
def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(
@ -946,16 +971,15 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
r = self.put(amp, 'vrrp/upload', data=config)
return exc.check_exception(r)
def _vrrp_action(self, action, amp):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
def _vrrp_action(self, action, amp, timeout_dict=None):
r = self.put(amp, 'vrrp/{action}'.format(action=action),
timeout_dict=timeout_dict)
return exc.check_exception(r)
def get_interface(self, amp, ip_addr, timeout_dict=None):
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()
return None
return exc.check_exception(r, log_error=log_error).json()
def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(

View File

@ -29,34 +29,40 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
# The Mixed class must define a self.client object for the
# AmphoraApiClient
def update_vrrp_conf(self, loadbalancer, amphorae_network_config):
"""Update amphorae of the loadbalancer with a new VRRP configuration
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
"""Update amphora of the loadbalancer with a new VRRP configuration
:param loadbalancer: loadbalancer object
:param amphorae_network_config: amphorae network configurations
:param amphora: The amphora object to update.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
if amphora.status != constants.AMPHORA_ALLOCATED:
LOG.debug('update_vrrp_conf called for un-allocated amphora %s. '
'Ignoring.', amphora.id)
return
templater = jinja_cfg.KeepalivedJinjaTemplater()
LOG.debug("Update loadbalancer %s amphora VRRP configuration.",
loadbalancer.id)
LOG.debug("Update amphora %s VRRP configuration.", amphora.id)
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
self._populate_amphora_api_version(amphora)
# Get the VIP subnet prefix for the amphora
# For amphorav2 amphorae_network_config will be list of dicts
try:
vip_cidr = amphorae_network_config[amphora.id].vip_subnet.cidr
except AttributeError:
vip_cidr = amphorae_network_config[amphora.id][
constants.VIP_SUBNET][constants.CIDR]
self._populate_amphora_api_version(amp)
# Get the VIP subnet prefix for the amphora
# For amphorav2 amphorae_network_config will be list of dicts
try:
vip_cidr = amphorae_network_config[amp.id].vip_subnet.cidr
except AttributeError:
vip_cidr = amphorae_network_config[amp.id][
constants.VIP_SUBNET][constants.CIDR]
# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(
loadbalancer, amp, vip_cidr)
self.clients[amp.api_version].upload_vrrp_config(amp, config)
# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(
loadbalancer, amphora, vip_cidr)
self.clients[amphora.api_version].upload_vrrp_config(amphora, config)
def stop_vrrp_service(self, loadbalancer):
"""Stop the vrrp services running on the loadbalancer's amphorae
@ -73,21 +79,25 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
self._populate_amphora_api_version(amp)
self.clients[amp.api_version].stop_vrrp(amp)
def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
def start_vrrp_service(self, amphora, timeout_dict=None):
"""Start the VRRP services on an amphorae.
:param loadbalancer: loadbalancer object
:param amphora: amphora object
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
LOG.info("Start loadbalancer %s amphora VRRP Service.",
loadbalancer.id)
if amphora.status != constants.AMPHORA_ALLOCATED:
LOG.debug('start_vrrp_service called for un-allocated amphora %s. '
'Ignoring.', amphora.id)
return
for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
LOG.info("Start amphora %s VRRP Service.", amphora.id)
LOG.debug("Start VRRP Service on amphora %s .", amp.lb_network_ip)
self._populate_amphora_api_version(amp)
self.clients[amp.api_version].start_vrrp(amp)
self._populate_amphora_api_version(amphora)
self.clients[amphora.api_version].start_vrrp(amphora,
timeout_dict=timeout_dict)
def reload_vrrp_service(self, loadbalancer):
"""Reload the VRRP services of all amphorae of the loadbalancer
@ -103,8 +113,3 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
self._populate_amphora_api_version(amp)
self.clients[amp.api_version].reload_vrrp(amp)
def get_vrrp_interface(self, amphora, timeout_dict=None):
self._populate_amphora_api_version(amphora)
return self.clients[amphora.api_version].get_interface(
amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface']

View File

@ -114,6 +114,13 @@ class NoopManager(object):
self.amphoraconfig[amphora.id, agent_config] = (
amphora.id, agent_config, 'update_amphora_agent_config')
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
LOG.debug("Amphora %s no-op, get interface from amphora %s for IP %s",
self.__class__.__name__, amphora.id, ip_address)
if ip_address == '198.51.100.99':
return "noop0"
return None
class NoopAmphoraLoadBalancerDriver(
driver_base.AmphoraLoadBalancerDriver,
@ -170,17 +177,19 @@ class NoopAmphoraLoadBalancerDriver(
def update_amphora_agent_config(self, amphora, agent_config):
self.driver.update_amphora_agent_config(amphora, agent_config)
def update_vrrp_conf(self, loadbalancer):
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
return self.driver.get_interface_from_ip(amphora, ip_address,
timeout_dict)
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
pass
def stop_vrrp_service(self, loadbalancer):
pass
def start_vrrp_service(self, loadbalancer):
def start_vrrp_service(self, amphora, timeout_dict=None):
pass
def reload_vrrp_service(self, loadbalancer):
pass
def get_vrrp_interface(self, amphora):
pass

View File

@ -72,8 +72,11 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
try:
vip = network_driver.allocate_vip(lb_obj)
except network_base.AllocateVIPException as e:
raise exceptions.DriverError(user_fault_string=e.orig_msg,
operator_fault_string=e.orig_msg)
message = str(e)
if getattr(e, 'orig_msg', None) is not None:
message = e.orig_msg
raise exceptions.DriverError(user_fault_string=message,
operator_fault_string=message)
LOG.info('Amphora provider created VIP port %s for load balancer %s.',
vip.port_id, loadbalancer_id)

View File

@ -22,6 +22,7 @@ from oslo_log import log as logging
from oslo_utils import excutils
from stevedore import driver as stevedore_driver
from octavia.common import constants
from octavia.common import data_models
from octavia.common import exceptions
from octavia.common.tls_utils import cert_parser
@ -544,6 +545,9 @@ def vip_dict_to_provider_dict(vip_dict):
new_vip_dict['vip_subnet_id'] = vip_dict['subnet_id']
if 'qos_policy_id' in vip_dict:
new_vip_dict['vip_qos_policy_id'] = vip_dict['qos_policy_id']
if constants.OCTAVIA_OWNED in vip_dict:
new_vip_dict[constants.OCTAVIA_OWNED] = vip_dict[
constants.OCTAVIA_OWNED]
return new_vip_dict
@ -559,4 +563,6 @@ def provider_vip_dict_to_vip_obj(vip_dictionary):
vip_obj.subnet_id = vip_dictionary['vip_subnet_id']
if 'vip_qos_policy_id' in vip_dictionary:
vip_obj.qos_policy_id = vip_dictionary['vip_qos_policy_id']
if constants.OCTAVIA_OWNED in vip_dictionary:
vip_obj.octavia_owned = vip_dictionary[constants.OCTAVIA_OWNED]
return vip_obj

View File

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
from octavia_lib.api.drivers import data_models as driver_dm
from oslo_config import cfg
@ -187,26 +188,40 @@ class LoadBalancersController(base.BaseController):
isinstance(load_balancer.vip_qos_policy_id, wtypes.UnsetType)):
load_balancer.vip_qos_policy_id = port_qos_policy_id
# Identify the subnet for this port
if load_balancer.vip_subnet_id:
# If we were provided a subnet_id, validate it exists and that
# there is a fixed_ip on the port that matches the provided subnet
validate.subnet_exists(subnet_id=load_balancer.vip_subnet_id,
context=context)
else:
if load_balancer.vip_address:
for port_fixed_ip in port.fixed_ips:
if port_fixed_ip.ip_address == load_balancer.vip_address:
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
break
if not load_balancer.vip_subnet_id:
raise exceptions.ValidationException(detail=_(
"Specified VIP address not found on the "
"specified VIP port."))
elif len(port.fixed_ips) == 1:
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
else:
for port_fixed_ip in port.fixed_ips:
if port_fixed_ip.subnet_id == load_balancer.vip_subnet_id:
load_balancer.vip_address = port_fixed_ip.ip_address
break # Just pick the first address found in the subnet
if not load_balancer.vip_address:
raise exceptions.ValidationException(detail=_(
"VIP port's subnet could not be determined. Please "
"specify either a VIP subnet or address."))
"No VIP address found on the specified VIP port within "
"the specified subnet."))
elif load_balancer.vip_address:
normalized_lb_ip = ipaddress.ip_address(
load_balancer.vip_address).compressed
for port_fixed_ip in port.fixed_ips:
normalized_port_ip = ipaddress.ip_address(
port_fixed_ip.ip_address).compressed
if normalized_port_ip == normalized_lb_ip:
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
break
if not load_balancer.vip_subnet_id:
raise exceptions.ValidationException(detail=_(
"Specified VIP address not found on the "
"specified VIP port."))
elif len(port.fixed_ips) == 1:
# User provided only a port, get the subnet and address from it
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
load_balancer.vip_address = port.fixed_ips[0].ip_address
else:
raise exceptions.ValidationException(detail=_(
"VIP port's subnet could not be determined. Please "
"specify either a VIP subnet or address."))
def _validate_vip_request_object(self, load_balancer, context=None):
allowed_network_objects = []
@ -450,7 +465,10 @@ class LoadBalancersController(base.BaseController):
# Do the same with the availability_zone dict
lb_dict['availability_zone'] = az_dict
# See if the provider driver wants to create the VIP port
# See if the provider driver wants to manage the VIP port
# This will still be called if the user provided a port to
# allow drivers to collect any required information about the
# VIP port.
octavia_owned = False
try:
provider_vip_dict = driver_utils.vip_dict_to_provider_dict(
@ -470,6 +488,10 @@ class LoadBalancersController(base.BaseController):
if 'port_id' not in vip_dict or not vip_dict['port_id']:
octavia_owned = True
# Check if the driver claims octavia owns the VIP port.
if vip.octavia_owned:
octavia_owned = True
self.repositories.vip.update(
lock_session, db_lb.id, ip_address=vip.ip_address,
port_id=vip.port_id, network_id=vip.network_id,

View File

@ -198,6 +198,20 @@ amphora_agent_opts = [
help='The UDP API backend for amphora agent.'),
]
compute_opts = [
cfg.IntOpt('max_retries', default=15,
help=_('The maximum attempts to retry an action with the '
'compute service.')),
cfg.IntOpt('retry_interval', default=1,
help=_('Seconds to wait before retrying an action with the '
'compute service.')),
cfg.IntOpt('retry_backoff', default=1,
help=_('The seconds to backoff retry attempts.')),
cfg.IntOpt('retry_max', default=10,
help=_('The maximum interval in seconds between retry '
'attempts.')),
]
networking_opts = [
cfg.IntOpt('max_retries', default=15,
help=_('The maximum attempts to retry an action with the '
@ -205,6 +219,11 @@ networking_opts = [
cfg.IntOpt('retry_interval', default=1,
help=_('Seconds to wait before retrying an action with the '
'networking service.')),
cfg.IntOpt('retry_backoff', default=1,
help=_('The seconds to backoff retry attempts.')),
cfg.IntOpt('retry_max', default=10,
help=_('The maximum interval in seconds between retry '
'attempts.')),
cfg.IntOpt('port_detach_timeout', default=300,
help=_('Seconds to wait for a port to detach from an '
'amphora.')),
@ -317,6 +336,14 @@ haproxy_amphora_opts = [
default=2,
help=_('Retry timeout between connection attempts in '
'seconds for active amphora.')),
cfg.IntOpt('failover_connection_max_retries',
default=2,
help=_('Retry threshold for connecting to an amphora in '
'failover.')),
cfg.IntOpt('failover_connection_retry_interval',
default=5,
help=_('Retry timeout between connection attempts in '
'seconds for amphora in failover.')),
cfg.IntOpt('build_rate_limit',
default=-1,
help=_('Number of amphorae that could be built per controller '
@ -380,6 +407,16 @@ haproxy_amphora_opts = [
deprecated_reason='This is now automatically discovered '
' and configured.',
help=_("If False, use sysvinit.")),
cfg.IntOpt('api_db_commit_retry_attempts', default=15,
help=_('The number of times the database action will be '
'attempted.')),
cfg.IntOpt('api_db_commit_retry_initial_delay', default=1,
help=_('The initial delay before a retry attempt.')),
cfg.IntOpt('api_db_commit_retry_backoff', default=1,
help=_('The time to backoff retry attempts.')),
cfg.IntOpt('api_db_commit_retry_max', default=5,
help=_('The maximum amount of time to wait between retry '
'attempts.')),
]
controller_worker_opts = [
@ -462,7 +499,11 @@ controller_worker_opts = [
help=_('If True, build cloud-init user-data that is passed '
'to the config drive on Amphora boot instead of '
'personality files. If False, utilize personality '
'files.'))
'files.')),
cfg.IntOpt('amphora_delete_retries', default=5,
help=_('Number of times an amphora delete should be retried.')),
cfg.IntOpt('amphora_delete_retry_interval', default=5,
help=_('Time, in seconds, between amphora delete retries.')),
]
task_flow_opts = [
@ -790,6 +831,7 @@ driver_agent_opts = [
cfg.CONF.register_opts(core_opts)
cfg.CONF.register_opts(api_opts, group='api_settings')
cfg.CONF.register_opts(amphora_agent_opts, group='amphora_agent')
cfg.CONF.register_opts(compute_opts, group='compute')
cfg.CONF.register_opts(networking_opts, group='networking')
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
cfg.CONF.register_opts(haproxy_amphora_opts, group='haproxy_amphora')

View File

@ -296,7 +296,9 @@ ACTIVE_CONNECTIONS = 'active_connections'
ADD_NICS = 'add_nics'