Browse Source

Refactor the failover flows

This patch refactors the failover flows to improve the performance
and reliability of failovers in Octavia.

Specific improvements are:
* More tasks and flows will retry when other OpenStack services are
  failing.
* Failover can now succeed even when all of the amphora are missing
  for a given load balancer.
* It will check and repair the load balancer VIP should the VIP
  port(s) become corrupted in neutron.
* It will cleanup extra resources that may be associated with a
  load balancer in the event of a cloud service failure.

This patch also removes some dead code.

Conflicts:
    octavia/common/constants.py

Change-Id: I04cb2f1f10ec566298834f81df0cf8b100ca916c
Story: 2003084
Task: 23166
Story: 2004440
Task: 28108
(cherry picked from commit 955bb8840616d96ed74de3086f8959ad4190a472)
changes/02/739002/4
Michael Johnson 8 months ago
committed by Ann Taraday
parent
commit
bfa44b7576
73 changed files with 6372 additions and 1352 deletions
  1. +37
    -0
      etc/octavia.conf
  2. +12
    -62
      octavia/amphorae/backends/agent/api_server/amphora_info.py
  3. +4
    -0
      octavia/amphorae/backends/agent/api_server/keepalived.py
  4. +5
    -1
      octavia/amphorae/backends/agent/api_server/keepalivedlvs.py
  5. +13
    -22
      octavia/amphorae/backends/agent/api_server/loadbalancer.py
  6. +71
    -1
      octavia/amphorae/backends/agent/api_server/util.py
  7. +183
    -0
      octavia/amphorae/backends/utils/ip_advertisement.py
  8. +50
    -0
      octavia/amphorae/backends/utils/network_namespace.py
  9. +83
    -0
      octavia/amphorae/backends/utils/network_utils.py
  10. +30
    -11
      octavia/amphorae/drivers/driver_base.py
  11. +4
    -3
      octavia/amphorae/drivers/haproxy/exceptions.py
  12. +42
    -18
      octavia/amphorae/drivers/haproxy/rest_api_driver.py
  13. +42
    -37
      octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py
  14. +14
    -5
      octavia/amphorae/drivers/noop_driver/driver.py
  15. +5
    -2
      octavia/api/drivers/amphora_driver/v1/driver.py
  16. +6
    -0
      octavia/api/drivers/utils.py
  17. +39
    -17
      octavia/api/v2/controllers/load_balancer.py
  18. +43
    -1
      octavia/common/config.py
  19. +50
    -3
      octavia/common/constants.py
  20. +19
    -1
      octavia/common/exceptions.py
  21. +23
    -0
      octavia/common/utils.py
  22. +4
    -4
      octavia/compute/drivers/noop_driver/driver.py
  23. +33
    -7
      octavia/compute/drivers/nova_driver.py
  24. +271
    -166
      octavia/controller/worker/v1/controller_worker.py
  25. +421
    -345
      octavia/controller/worker/v1/flows/amphora_flows.py
  26. +399
    -51
      octavia/controller/worker/v1/flows/load_balancer_flows.py
  27. +150
    -61
      octavia/controller/worker/v1/tasks/amphora_driver_tasks.py
  28. +73
    -11
      octavia/controller/worker/v1/tasks/compute_tasks.py
  29. +34
    -17
      octavia/controller/worker/v1/tasks/database_tasks.py
  30. +180
    -39
      octavia/controller/worker/v1/tasks/network_tasks.py
  31. +74
    -0
      octavia/controller/worker/v1/tasks/retry_tasks.py
  32. +2
    -2
      octavia/controller/worker/v2/tasks/amphora_driver_tasks.py
  33. +52
    -0
      octavia/network/base.py
  34. +15
    -1
      octavia/network/data_models.py
  35. +225
    -55
      octavia/network/drivers/neutron/allowed_address_pairs.py
  36. +13
    -4
      octavia/network/drivers/neutron/base.py
  37. +32
    -13
      octavia/network/drivers/neutron/utils.py
  38. +69
    -0
      octavia/network/drivers/noop_driver/driver.py
  39. +1
    -0
      octavia/opts.py
  40. +46
    -0
      octavia/tests/common/constants.py
  41. +3
    -1
      octavia/tests/common/data_model_helpers.py
  42. +4
    -2
      octavia/tests/common/sample_data_models.py
  43. +198
    -0
      octavia/tests/common/sample_network_data.py
  44. +9
    -3
      octavia/tests/functional/amphorae/backend/agent/api_server/test_keepalivedlvs.py
  45. +15
    -7
      octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py
  46. +13
    -35
      octavia/tests/unit/amphorae/backends/agent/api_server/test_loadbalancer.py
  47. +128
    -0
      octavia/tests/unit/amphorae/backends/agent/api_server/test_util.py
  48. +212
    -0
      octavia/tests/unit/amphorae/backends/utils/test_ip_advertisement.py
  49. +116
    -0
      octavia/tests/unit/amphorae/backends/utils/test_network_namespace.py
  50. +140
    -0
      octavia/tests/unit/amphorae/backends/utils/test_network_utils.py
  51. +52
    -0
      octavia/tests/unit/amphorae/drivers/haproxy/test_exceptions.py
  52. +26
    -8
      octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver_0_5.py
  53. +26
    -8
      octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver_1_0.py
  54. +83
    -0
      octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver_common.py
  55. +44
    -4
      octavia/tests/unit/amphorae/drivers/keepalived/test_vrrp_rest_driver.py
  56. +10
    -1
      octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py
  57. +31
    -0
      octavia/tests/unit/common/test_utils.py
  58. +64
    -2
      octavia/tests/unit/compute/drivers/test_nova_driver.py
  59. +164
    -65
      octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py
  60. +231
    -5
      octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py
  61. +210
    -45
      octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py
  62. +69
    -1
      octavia/tests/unit/controller/worker/v1/tasks/test_compute_tasks.py
  63. +31
    -3
      octavia/tests/unit/controller/worker/v1/tasks/test_database_tasks.py
  64. +379
    -37
      octavia/tests/unit/controller/worker/v1/tasks/test_network_tasks.py
  65. +47
    -0
      octavia/tests/unit/controller/worker/v1/tasks/test_retry_tasks.py
  66. +693
    -141
      octavia/tests/unit/controller/worker/v1/test_controller_worker.py
  67. +3
    -2
      octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py
  68. +421
    -18
      octavia/tests/unit/network/drivers/neutron/test_allowed_address_pairs.py
  69. +1
    -0
      octavia/tests/unit/network/drivers/neutron/test_utils.py
  70. +56
    -0
      octavia/tests/unit/network/drivers/test_network_noop_driver.py
  71. +11
    -0
      releasenotes/notes/refactor_failover_flow-9efcd854240f71ad.yaml
  72. +11
    -3
      tools/create_flow_docs.py
  73. +2
    -1
      tools/flow-list.txt

+ 37
- 0
etc/octavia.conf View File

@@ -153,6 +153,19 @@
# Endpoint type to use for communication with the Barbican service.
# endpoint_type = publicURL

[compute]
# The maximum attempts to retry an action with the compute service.
# max_retries = 15

# Seconds to wait before retrying an action with the compute service.
# retry_interval = 1

# The seconds to backoff retry attempts
# retry_backoff = 1

# The maximum interval in seconds between retry attempts
# retry_max = 10

[networking]
# The maximum attempts to retry an action with the networking service.
# max_retries = 15
@@ -160,6 +173,12 @@
# Seconds to wait before retrying an action with the networking service.
# retry_interval = 1

# The seconds to backoff retry attempts
# retry_backoff = 1

# The maximum interval in seconds between retry attempts
# retry_max = 10

# The maximum time to wait, in seconds, for a port to detach from an amphora
# port_detach_timeout = 300

@@ -224,11 +243,26 @@
# active_connection_max_retries = 15
# active_connection_rety_interval = 2

# These "failover" timeouts are used during the failover process to probe
# amphorae that are part of the load balancer being failed over.
# These values are very low to facilitate "fail fast" should an amphora
# not respond in a failure situation.
# failover_connection_max_retries = 2
# failover_connection_retry_interval = 5

# The user flow log format for HAProxy.
# {{ project_id }} and {{ lb_id }} will be automatically substituted by the
# controller when configuring HAProxy if they are present in the string.
# user_log_format = '{{ project_id }} {{ lb_id }} %f %ci %cp %t %{+Q}r %ST %B %U %[ssl_c_verify] %{+Q}[ssl_c_s_dn] %b %s %Tt %tsc'

# API messaging / database commit retries
# This is many times the controller worker retries waiting for the API to
# complete a database commit for a message received over the queue.
# api_db_commit_retry_attempts = 15
# api_db_commit_retry_initial_delay = 1
# api_db_commit_retry_backoff = 1
# api_db_commit_retry_max = 5

[controller_worker]
# workers = 1
# amp_active_retries = 30
@@ -285,6 +319,9 @@
# loadbalancer_topology = SINGLE
# user_data_config_drive = False

# amphora_delete_retries = 5
# amphora_delete_retry_interval = 5

[task_flow]
# TaskFlow engine options are:
# - serial: Runs all tasks on a single thread.


+ 12
- 62
octavia/amphorae/backends/agent/api_server/amphora_info.py View File

@@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.

import ipaddress
import os
import re
import socket
@@ -21,10 +20,11 @@ import subprocess
import pyroute2
import webob

import netifaces
from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts
from octavia.common import exceptions


class AmphoraInfo(object):
@@ -175,65 +175,15 @@ class AmphoraInfo(object):
return networks

def get_interface(self, ip_addr):

try:
ip_version = ipaddress.ip_address(ip_addr).version
except Exception:
return webob.Response(
json=dict(message="Invalid IP address"), status=400)

if ip_version == 4:
address_format = netifaces.AF_INET
elif ip_version == 6:
address_format = netifaces.AF_INET6
else:
interface = network_utils.get_interface_name(
ip_addr, net_ns=consts.AMPHORA_NAMESPACE)
except exceptions.InvalidIPAddress:
return webob.Response(json=dict(message="Invalid IP address"),
status=400)
except exceptions.NotFound:
return webob.Response(
json=dict(message="Bad IP address version"), status=400)

# We need to normalize the address as IPv6 has multiple representations
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
normalized_addr = socket.inet_ntop(address_format,
socket.inet_pton(address_format,
ip_addr))

with pyroute2.NetNS(consts.AMPHORA_NAMESPACE) as netns:
for addr in netns.get_addr():
# Save the interface index as IPv6 records don't list a
# textual interface
interface_idx = addr['index']
# Save the address family (IPv4/IPv6) for use normalizing
# the IP address for comparison
interface_af = addr['family']
# Search through the attributes of each address record
for attr in addr['attrs']:
# Look for the attribute name/value pair for the address
if attr[0] == 'IFA_ADDRESS':
# Compare the normalized address with the address we
# we are looking for. Since we have matched the name
# above, attr[1] is the address value
if normalized_addr == socket.inet_ntop(
interface_af,
socket.inet_pton(interface_af, attr[1])):

# Lookup the matching interface name by
# getting the interface with the index we found
# in the above address search
lookup_int = netns.get_links(interface_idx)
# Search through the attributes of the matching
# interface record
for int_attr in lookup_int[0]['attrs']:
# Look for the attribute name/value pair
# that includes the interface name
if int_attr[0] == 'IFLA_IFNAME':
# Return the response with the matching
# interface name that is in int_attr[1]
# for the matching interface attribute
# name
return webob.Response(
json=dict(message='OK',
interface=int_attr[1]),
status=200)

return webob.Response(
json=dict(message="Error interface not found for IP address"),
status=404)
json=dict(message="Error interface not found for IP address"),
status=404)
return webob.Response(json=dict(message='OK', interface=interface),
status=200)

+ 4
- 0
octavia/amphorae/backends/agent/api_server/keepalived.py View File

@@ -47,6 +47,7 @@ class Keepalived(object):

if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
if not os.path.exists(util.keepalived_check_scripts_dir()):
os.makedirs(util.keepalived_check_scripts_dir())

conf_file = util.keepalived_cfg_path()
@@ -112,6 +113,9 @@ class Keepalived(object):
)
text_file.write(text)

# Configure the monitoring of haproxy
util.vrrp_check_script_update(None, consts.AMP_ACTION_START)

# Make sure the new service is enabled on boot
if init_system != consts.INIT_UPSTART:
try:


+ 5
- 1
octavia/amphorae/backends/agent/api_server/keepalivedlvs.py View File

@@ -78,7 +78,8 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
# Active-Standby topology will create the directory below. So for
# Single topology, it should not create the directory and the check
# scripts for status change.
if not os.path.exists(util.keepalived_check_scripts_dir()):
if (CONF.controller_worker.loadbalancer_topology !=
consts.TOPOLOGY_ACTIVE_STANDBY):
NEED_CHECK = False

conf_file = util.keepalived_lvs_cfg_path(listener_id)
@@ -157,6 +158,9 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
script_path = os.path.join(util.keepalived_check_scripts_dir(),
KEEPALIVED_CHECK_SCRIPT_NAME)
if not os.path.exists(script_path):
if not os.path.exists(util.keepalived_check_scripts_dir()):
os.makedirs(util.keepalived_check_scripts_dir())

with os.fdopen(os.open(script_path, flags, stat.S_IEXEC),
'w') as script_file:
text = check_script_file_template.render(


+ 13
- 22
octavia/amphorae/backends/agent/api_server/loadbalancer.py View File

@@ -235,12 +235,11 @@ class Loadbalancer(object):
details="Unknown action: {0}".format(action)), status=400)

self._check_lb_exists(lb_id)
is_vrrp = (CONF.controller_worker.loadbalancer_topology ==
consts.TOPOLOGY_ACTIVE_STANDBY)

# Since this script should be created at LB create time
# we can check for this path to see if VRRP is enabled
# on this amphora and not write the file if VRRP is not in use
if os.path.exists(util.keepalived_check_script_path()):
self.vrrp_check_script_update(lb_id, action)
if is_vrrp:
util.vrrp_check_script_update(lb_id, action)

# HAProxy does not start the process when given a reload
# so start it if haproxy is not already running
@@ -262,6 +261,14 @@ class Loadbalancer(object):
return webob.Response(json=dict(
message="Error {0}ing haproxy".format(action),
details=e.output), status=500)

# If we are not in active/standby we need to send an IP
# advertisement (GARP or NA). Keepalived handles this for
# active/standby load balancers.
if not is_vrrp and action in [consts.AMP_ACTION_START,
consts.AMP_ACTION_RELOAD]:
util.send_vip_advertisements(lb_id)

if action in [consts.AMP_ACTION_STOP,
consts.AMP_ACTION_RELOAD]:
return webob.Response(json=dict(
@@ -307,7 +314,7 @@ class Loadbalancer(object):
# we can check for this path to see if VRRP is enabled
# on this amphora and not write the file if VRRP is not in use
if os.path.exists(util.keepalived_check_script_path()):
self.vrrp_check_script_update(
util.vrrp_check_script_update(
lb_id, action=consts.AMP_ACTION_STOP)

# delete the ssl files
@@ -455,22 +462,6 @@ class Loadbalancer(object):
def _cert_file_path(self, lb_id, filename):
return os.path.join(self._cert_dir(lb_id), filename)

def vrrp_check_script_update(self, lb_id, action):
lb_ids = util.get_loadbalancers()
if action == consts.AMP_ACTION_STOP:
lb_ids.remove(lb_id)
args = []
for lbid in lb_ids:
args.append(util.haproxy_sock_path(lbid))

if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
os.makedirs(util.keepalived_check_scripts_dir())

cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(util.haproxy_check_script_path(), 'w') as text_file:
text_file.write(cmd)

def _check_haproxy_status(self, lb_id):
if os.path.exists(util.pid_path(lb_id)):
if os.path.exists(


+ 71
- 1
octavia/amphorae/backends/agent/api_server/util.py View File

@@ -23,6 +23,8 @@ from oslo_config import cfg
from oslo_log import log as logging

from octavia.amphorae.backends.agent.api_server import osutils
from octavia.amphorae.backends.utils import ip_advertisement
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts

CONF = cfg.CONF
@@ -188,7 +190,7 @@ def get_listeners():
def get_loadbalancers():
"""Get Load balancers

:returns: An array with the ids of all load balancers,
:returns: An array with the uuids of all load balancers,
e.g. ['123', '456', ...] or [] if no loadbalancers exist
"""
if os.path.exists(CONF.haproxy_amphora.base_path):
@@ -332,3 +334,71 @@ def parse_haproxy_file(lb_id):
stats_socket = m.group(1)

return stats_socket, listeners


def vrrp_check_script_update(lb_id, action):
os.makedirs(keepalived_dir(), exist_ok=True)
os.makedirs(keepalived_check_scripts_dir(), exist_ok=True)

lb_ids = get_loadbalancers()
udp_ids = get_udp_listeners()
# If no LBs are found, so make sure keepalived thinks haproxy is down.
if not lb_ids:
if not udp_ids:
with open(haproxy_check_script_path(), 'w') as text_file:
text_file.write('exit 1')
return
if action == consts.AMP_ACTION_STOP:
lb_ids.remove(lb_id)
args = []
for lbid in lb_ids:
args.append(haproxy_sock_path(lbid))

cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(haproxy_check_script_path(), 'w') as text_file:
text_file.write(cmd)


def get_haproxy_vip_addresses(lb_id):
"""Get the VIP addresses for a load balancer.

:param lb_id: The load balancer ID to get VIP addresses from.
:returns: List of VIP addresses (IPv4 and IPv6)
"""
vips = []
with open(config_path(lb_id), 'r') as file:
for line in file:
current_line = line.strip()
if current_line.startswith('bind'):
for section in current_line.split(' '):
# We will always have a port assigned per the template.
if ':' in section:
if ',' in section:
addr_port = section.rstrip(',')
vips.append(addr_port.rpartition(':')[0])
else:
vips.append(section.rpartition(':')[0])
break
return vips


def send_vip_advertisements(lb_id):
"""Sends address advertisements for each load balancer VIP.

This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the VIP addresses on a load balancer.

:param lb_id: The load balancer ID to send advertisements for.
:returns: None
"""
try:
vips = get_haproxy_vip_addresses(lb_id)

for vip in vips:
interface = network_utils.get_interface_name(
vip, net_ns=consts.AMPHORA_NAMESPACE)
ip_advertisement.send_ip_advertisement(
interface, vip, net_ns=consts.AMPHORA_NAMESPACE)
except Exception as e:
LOG.debug('Send VIP advertisement failed due to :%s. '
'This amphora may not be the MASTER. Ignoring.', str(e))

+ 183
- 0
octavia/amphorae/backends/utils/ip_advertisement.py View File

@@ -0,0 +1,183 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fcntl
import socket
from struct import pack
from struct import unpack

from oslo_log import log as logging

from octavia.amphorae.backends.utils import network_namespace
from octavia.common import constants
from octavia.common import utils as common_utils

LOG = logging.getLogger(__name__)


def garp(interface, ip_address, net_ns=None):
"""Sends a gratuitous ARP for ip_address on the interface.

:param interface: The interface name to send the GARP on.
:param ip_address: The IP address to advertise in the GARP.
:param net_ns: The network namespace to send the GARP from.
:returns: None
"""
ARP_ETHERTYPE = 0x0806
BROADCAST_MAC = b'\xff\xff\xff\xff\xff\xff'

# Get a socket, optionally inside a network namespace
garp_socket = None
if net_ns:
with network_namespace.NetworkNamespace(net_ns):
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
else:
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)

# Bind the socket with the ARP ethertype protocol
garp_socket.bind((interface, ARP_ETHERTYPE))

# Get the MAC address of the interface
source_mac = garp_socket.getsockname()[4]

garp_msg = [
pack('!h', 1), # Hardware type ethernet
pack('!h', 0x0800), # Protocol type IPv4
pack('!B', 6), # Hardware size
pack('!B', 4), # Protocol size
pack('!h', 1), # Opcode request
source_mac, # Sender MAC address
socket.inet_aton(ip_address), # Sender IP address
BROADCAST_MAC, # Target MAC address
socket.inet_aton(ip_address)] # Target IP address

garp_ethernet = [
BROADCAST_MAC, # Ethernet destination
source_mac, # Ethernet source
pack('!h', ARP_ETHERTYPE), # Ethernet type
b''.join(garp_msg)] # The GARP message

garp_socket.send(b''.join(garp_ethernet))
garp_socket.close()


def calculate_icmpv6_checksum(packet):
"""Calculate the ICMPv6 checksum for a packet.

:param packet: The packet bytes to checksum.
:returns: The checksum integer.
"""
total = 0

# Add up 16-bit words
num_words = len(packet) // 2
for chunk in unpack("!%sH" % num_words, packet[0:num_words * 2]):
total += chunk

# Add any left over byte
if len(packet) % 2:
total += packet[-1] << 8

# Fold 32-bits into 16-bits
total = (total >> 16) + (total & 0xffff)
total += total >> 16
return ~total + 0x10000 & 0xffff


def neighbor_advertisement(interface, ip_address, net_ns=None):
"""Sends a unsolicited neighbor advertisement for an ip on the interface.

:param interface: The interface name to send the GARP on.
:param ip_address: The IP address to advertise in the GARP.
:param net_ns: The network namespace to send the GARP from.
:returns: None
"""
ALL_NODES_ADDR = 'ff02::1'
SIOCGIFHWADDR = 0x8927

# Get a socket, optionally inside a network namespace
na_socket = None
if net_ns:
with network_namespace.NetworkNamespace(net_ns):
na_socket = socket.socket(
socket.AF_INET6, socket.SOCK_RAW,
socket.getprotobyname(constants.IPV6_ICMP))
else:
na_socket = socket.socket(socket.AF_INET6, socket.SOCK_RAW,
socket.getprotobyname(constants.IPV6_ICMP))

# Per RFC 4861 section 4.4, the hop limit should be 255
na_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 255)

# Bind the socket with the source address
na_socket.bind((ip_address, 0))

# Get the byte representation of the MAC address of the interface
# Note: You can't use getsockname() to get the MAC on this type of socket
source_mac = fcntl.ioctl(na_socket.fileno(), SIOCGIFHWADDR, pack('256s',
bytes(interface, 'utf-8')))[18:24]

# Get the byte representation of the source IP address
source_ip_bytes = socket.inet_pton(socket.AF_INET6, ip_address)

icmpv6_na_msg_prefix = [
pack('!B', 136), # ICMP Type Neighbor Advertisement
pack('!B', 0)] # ICMP Code
icmpv6_na_msg_postfix = [
pack('!I', 0xa0000000), # Flags (Router, Override)
source_ip_bytes, # Target address
pack('!B', 2), # ICMPv6 option type target link-layer address
pack('!B', 1), # ICMPv6 option length
source_mac] # ICMPv6 option link-layer address

# Calculate the ICMPv6 checksum
icmpv6_pseudo_header = [
source_ip_bytes, # Source IP address
socket.inet_pton(socket.AF_INET6, ALL_NODES_ADDR), # Destination IP
pack('!I', 58), # IPv6 next header (ICMPv6)
pack('!h', 32)] # IPv6 payload length
icmpv6_tmp_chksum = pack('!H', 0) # Checksum are zeros for calculation
tmp_chksum_msg = b''.join(icmpv6_pseudo_header + icmpv6_na_msg_prefix +
[icmpv6_tmp_chksum] + icmpv6_pseudo_header)
checksum = pack('!H', calculate_icmpv6_checksum(tmp_chksum_msg))

# Build the ICMPv6 unsolicitated neighbor advertisement
icmpv6_msg = b''.join(icmpv6_na_msg_prefix + [checksum] +
icmpv6_na_msg_postfix)

na_socket.sendto(icmpv6_msg, (ALL_NODES_ADDR, 0, 0, 0))
na_socket.close()


def send_ip_advertisement(interface, ip_address, net_ns=None):
"""Send an address advertisement.

This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the ip address specified.

:param interface: The interface name to send the advertisement on.
:param ip_address: The IP address to advertise.
:param net_ns: The network namespace to send the advertisement from.
:returns: None
"""
try:
if common_utils.is_ipv4(ip_address):
garp(interface, ip_address, net_ns)
elif common_utils.is_ipv6(ip_address):
neighbor_advertisement(interface, ip_address, net_ns)
else:
LOG.error('Unknown IP version for address: "%s". Skipping',
ip_address)
except Exception as e:
LOG.warning('Unable to send address advertisement for address: "%s", '
'error: %s. Skipping', ip_address, str(e))

+ 50
- 0
octavia/amphorae/backends/utils/network_namespace.py View File

@@ -0,0 +1,50 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import os


class NetworkNamespace(object):
"""A network namespace context manager.

Runs wrapped code inside the specified network namespace.

:param netns: The network namespace name to enter.
"""
# from linux/sched.h - We want to enter a network namespace
CLONE_NEWNET = 0x40000000

@staticmethod
def _error_handler(result, func, arguments):
if result == -1:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))

def __init__(self, netns):
self.current_netns = '/proc/{pid}/ns/net'.format(pid=os.getpid())
self.target_netns = '/var/run/netns/{netns}'.format(netns=netns)
# reference: man setns(2)
self.set_netns = ctypes.CDLL('libc.so.6', use_errno=True).setns
self.set_netns.errcheck = self._error_handler

def __enter__(self):
# Save the current network namespace
self.current_netns_fd = open(self.current_netns)
with open(self.target_netns) as fd:
self.set_netns(fd.fileno(), self.CLONE_NEWNET)

def __exit__(self, *args):
# Return to the previous network namespace
self.set_netns(self.current_netns_fd.fileno(), self.CLONE_NEWNET)
self.current_netns_fd.close()

+ 83
- 0
octavia/amphorae/backends/utils/network_utils.py View File

@@ -0,0 +1,83 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress

import pyroute2

from octavia.common import exceptions


def _find_interface(ip_address, rtnl_api, normalized_addr):
"""Find the interface using a routing netlink API.

:param ip_address: The IP address to search with.
:param rtnl_api: A pyroute2 rtnl_api instance. (IPRoute, NetNS, etc.)
:returns: The interface name if found, None if not found.
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
"""
for addr in rtnl_api.get_addr(address=ip_address):
# Save the interface index as IPv6 records don't list a textual
# interface
interface_idx = addr['index']
# Search through the attributes of each address record
for attr in addr['attrs']:
# Look for the attribute name/value pair for the address
if attr[0] == 'IFA_ADDRESS':
# Compare the normalized address with the address we are
# looking for. Since we have matched the name above, attr[1]
# is the address value
if normalized_addr == ipaddress.ip_address(attr[1]).compressed:
# Lookup the matching interface name by getting the
# interface with the index we found in the above address
# search
lookup_int = rtnl_api.get_links(interface_idx)
# Search through the attributes of the matching interface
# record
for int_attr in lookup_int[0]['attrs']:
# Look for the attribute name/value pair that includes
# the interface name
if int_attr[0] == 'IFLA_IFNAME':
# Return the matching interface name that is in
# int_attr[1] for the matching interface attribute
# name
return int_attr[1]
# We didn't find an interface with that IP address.
return None


def get_interface_name(ip_address, net_ns=None):
"""Gets the interface name from an IP address.

:param ip_address: The IP address to lookup.
:param net_ns: The network namespace to find the interface in.
:returns: The interface name.
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
:raises octavia.common.exceptions.NotFound: No interface was found.
"""
# We need to normalize the address as IPv6 has multiple representations
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
try:
normalized_addr = ipaddress.ip_address(ip_address).compressed
except ValueError:
raise exceptions.InvalidIPAddress(ip_addr=ip_address)

if net_ns:
with pyroute2.NetNS(net_ns) as rtnl_api:
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
else:
with pyroute2.IPRoute() as rtnl_api:
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
if interface is not None:
return interface
raise exceptions.NotFound(resource='IP address', id=ip_address)

+ 30
- 11
octavia/amphorae/drivers/driver_base.py View File

@@ -202,6 +202,21 @@ class AmphoraLoadBalancerDriver(object, metaclass=abc.ABCMeta):
:type agent_config: string
"""

@abc.abstractmethod
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
"""Get the interface name from an IP address.

:param amphora: The amphora to query.
:type amphora: octavia.db.models.Amphora
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
:type ip_address: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:type timeout_dict: dict
"""


class HealthMixin(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
@@ -252,10 +267,17 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
class XYZ: ...
"""
@abc.abstractmethod
def update_vrrp_conf(self, loadbalancer):
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
"""Update amphorae of the loadbalancer with a new VRRP configuration

:param loadbalancer: loadbalancer object
:param amphorae_network_config: amphorae network configurations
:param amphora: The amphora object to update.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""

@abc.abstractmethod
@@ -266,10 +288,14 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):
"""

@abc.abstractmethod
def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
def start_vrrp_service(self, amphora, timeout_dict=None):
"""Start the VRRP services on the amphora

:param loadbalancer: loadbalancer object
:param amphora: The amphora object to start the service on.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""

@abc.abstractmethod
@@ -278,10 +304,3 @@ class VRRPDriverMixin(object, metaclass=abc.ABCMeta):

:param loadbalancer: loadbalancer object
"""

@abc.abstractmethod
def get_vrrp_interface(self, amphora):
"""Get the VRRP interface object for a specific amphora

:param amphora: amphora object
"""

+ 4
- 3
octavia/amphorae/drivers/haproxy/exceptions.py View File

@@ -20,7 +20,7 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)


def check_exception(response, ignore=tuple()):
def check_exception(response, ignore=tuple(), log_error=True):
status_code = response.status_code
responses = {
400: InvalidRequest,
@@ -34,8 +34,9 @@ def check_exception(response, ignore=tuple()):
}
if (status_code not in ignore) and (status_code in responses):
try:
LOG.error('Amphora agent returned unexpected result code %s with '
'response %s', status_code, response.json())
if log_error:
LOG.error('Amphora agent returned unexpected result code %s '
'with response %s', status_code, response.json())
except Exception:
# Handle the odd case where there is no response body
# like when using requests_mock which doesn't support has_body


+ 42
- 18
octavia/amphorae/drivers/haproxy/rest_api_driver.py View File

@@ -90,7 +90,7 @@ class HaproxyAmphoraLoadBalancerDriver(

return haproxy_version_string.split('.')[:2]

def _populate_amphora_api_version(self, amphora,
def _populate_amphora_api_version(self, amphora, timeout_dict=None,
raise_retry_exception=False):
"""Populate the amphora object with the api_version

@@ -102,7 +102,7 @@ class HaproxyAmphoraLoadBalancerDriver(
if not getattr(amphora, 'api_version', None):
try:
amphora.api_version = self.clients['base'].get_api_version(
amphora,
amphora, timeout_dict=timeout_dict,
raise_retry_exception=raise_retry_exception)['api_version']
except exc.NotFound:
# Amphora is too old for version discovery, default to 0.5
@@ -291,8 +291,11 @@ class HaproxyAmphoraLoadBalancerDriver(
getattr(self.clients[amp.api_version], func_name)(
amp, loadbalancer.id, *args)

def start(self, loadbalancer, amphora=None):
self._apply('start_listener', loadbalancer, amphora)
def reload(self, loadbalancer, amphora=None, timeout_dict=None):
self._apply('reload_listener', loadbalancer, amphora, timeout_dict)

def start(self, loadbalancer, amphora=None, timeout_dict=None):
self._apply('start_listener', loadbalancer, amphora, timeout_dict)

def delete(self, listener):
# Delete any UDP listeners the old way (we didn't update the way they
@@ -588,6 +591,28 @@ class HaproxyAmphoraLoadBalancerDriver(
'API.'.format(amphora.id))
raise driver_except.AmpDriverNotImplementedError()

def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
"""Get the interface name for an IP address.

:param amphora: The amphora to query.
:type amphora: octavia.db.models.Amphora
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
:type ip_address: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:type timeout_dict: dict
:returns: None if not found, the interface name string if found.
"""
try:
self._populate_amphora_api_version(amphora, timeout_dict)
response_json = self.clients[amphora.api_version].get_interface(
amphora, ip_address, timeout_dict, log_error=False)
return response_json.get('interface', None)
except (exc.NotFound, driver_except.TimeOutException):
return None


# Check a custom hostname
class CustomHostNameCheckingAdapter(requests.adapters.HTTPAdapter):
@@ -713,9 +738,10 @@ class AmphoraAPIClientBase(object):
'exception': exception})
raise driver_except.TimeOutException()

def get_api_version(self, amp, raise_retry_exception=False):
def get_api_version(self, amp, timeout_dict=None,
raise_retry_exception=False):
amp.api_version = None
r = self.get(amp, retry_404=False,
r = self.get(amp, retry_404=False, timeout_dict=timeout_dict,
raise_retry_exception=raise_retry_exception)
# Handle 404 special as we don't want to log an ERROR on 404
exc.check_exception(r, (404,))
@@ -816,16 +842,15 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
r = self.put(amp, 'vrrp/upload', data=config)
return exc.check_exception(r)

def _vrrp_action(self, action, amp):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
def _vrrp_action(self, action, amp, timeout_dict=None):
r = self.put(amp, 'vrrp/{action}'.format(action=action),
timeout_dict=timeout_dict)
return exc.check_exception(r)

def get_interface(self, amp, ip_addr, timeout_dict=None):
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()
return None
return exc.check_exception(r, log_error=log_error).json()

def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(
@@ -946,16 +971,15 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
r = self.put(amp, 'vrrp/upload', data=config)
return exc.check_exception(r)

def _vrrp_action(self, action, amp):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
def _vrrp_action(self, action, amp, timeout_dict=None):
r = self.put(amp, 'vrrp/{action}'.format(action=action),
timeout_dict=timeout_dict)
return exc.check_exception(r)

def get_interface(self, amp, ip_addr, timeout_dict=None):
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()
return None
return exc.check_exception(r, log_error=log_error).json()

def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(


+ 42
- 37
octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py View File

@@ -29,34 +29,40 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
# The Mixed class must define a self.client object for the
# AmphoraApiClient

def update_vrrp_conf(self, loadbalancer, amphorae_network_config):
"""Update amphorae of the loadbalancer with a new VRRP configuration
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
"""Update amphora of the loadbalancer with a new VRRP configuration

:param loadbalancer: loadbalancer object
:param amphorae_network_config: amphorae network configurations
:param amphora: The amphora object to update.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
templater = jinja_cfg.KeepalivedJinjaTemplater()
if amphora.status != constants.AMPHORA_ALLOCATED:
LOG.debug('update_vrrp_conf called for un-allocated amphora %s. '
'Ignoring.', amphora.id)
return

LOG.debug("Update loadbalancer %s amphora VRRP configuration.",
loadbalancer.id)
templater = jinja_cfg.KeepalivedJinjaTemplater()

for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
LOG.debug("Update amphora %s VRRP configuration.", amphora.id)

self._populate_amphora_api_version(amp)
# Get the VIP subnet prefix for the amphora
# For amphorav2 amphorae_network_config will be list of dicts
try:
vip_cidr = amphorae_network_config[amp.id].vip_subnet.cidr
except AttributeError:
vip_cidr = amphorae_network_config[amp.id][
constants.VIP_SUBNET][constants.CIDR]
# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(
loadbalancer, amp, vip_cidr)
self.clients[amp.api_version].upload_vrrp_config(amp, config)
self._populate_amphora_api_version(amphora)
# Get the VIP subnet prefix for the amphora
# For amphorav2 amphorae_network_config will be list of dicts
try:
vip_cidr = amphorae_network_config[amphora.id].vip_subnet.cidr
except AttributeError:
vip_cidr = amphorae_network_config[amphora.id][
constants.VIP_SUBNET][constants.CIDR]
# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(
loadbalancer, amphora, vip_cidr)
self.clients[amphora.api_version].upload_vrrp_config(amphora, config)

def stop_vrrp_service(self, loadbalancer):
"""Stop the vrrp services running on the loadbalancer's amphorae
@@ -73,21 +79,25 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
self._populate_amphora_api_version(amp)
self.clients[amp.api_version].stop_vrrp(amp)

def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
def start_vrrp_service(self, amphora, timeout_dict=None):
"""Start the VRRP services on an amphorae.

:param loadbalancer: loadbalancer object
:param amphora: amphora object
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
LOG.info("Start loadbalancer %s amphora VRRP Service.",
loadbalancer.id)
if amphora.status != constants.AMPHORA_ALLOCATED:
LOG.debug('start_vrrp_service called for un-allocated amphora %s. '
'Ignoring.', amphora.id)
return

for amp in filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
LOG.info("Start amphora %s VRRP Service.", amphora.id)

LOG.debug("Start VRRP Service on amphora %s .", amp.lb_network_ip)
self._populate_amphora_api_version(amp)
self.clients[amp.api_version].start_vrrp(amp)
self._populate_amphora_api_version(amphora)
self.clients[amphora.api_version].start_vrrp(amphora,
timeout_dict=timeout_dict)

def reload_vrrp_service(self, loadbalancer):
"""Reload the VRRP services of all amphorae of the loadbalancer
@@ -103,8 +113,3 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):

self._populate_amphora_api_version(amp)
self.clients[amp.api_version].reload_vrrp(amp)

def get_vrrp_interface(self, amphora, timeout_dict=None):
self._populate_amphora_api_version(amphora)
return self.clients[amphora.api_version].get_interface(
amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface']

+ 14
- 5
octavia/amphorae/drivers/noop_driver/driver.py View File

@@ -114,6 +114,13 @@ class NoopManager(object):
self.amphoraconfig[amphora.id, agent_config] = (
amphora.id, agent_config, 'update_amphora_agent_config')

def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
LOG.debug("Amphora %s no-op, get interface from amphora %s for IP %s",
self.__class__.__name__, amphora.id, ip_address)
if ip_address == '198.51.100.99':
return "noop0"
return None


class NoopAmphoraLoadBalancerDriver(
driver_base.AmphoraLoadBalancerDriver,
@@ -170,17 +177,19 @@ class NoopAmphoraLoadBalancerDriver(
def update_amphora_agent_config(self, amphora, agent_config):
self.driver.update_amphora_agent_config(amphora, agent_config)

def update_vrrp_conf(self, loadbalancer):
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
return self.driver.get_interface_from_ip(amphora, ip_address,
timeout_dict)

def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
pass

def stop_vrrp_service(self, loadbalancer):
pass

def start_vrrp_service(self, loadbalancer):
def start_vrrp_service(self, amphora, timeout_dict=None):
pass

def reload_vrrp_service(self, loadbalancer):
pass

def get_vrrp_interface(self, amphora):
pass

+ 5
- 2
octavia/api/drivers/amphora_driver/v1/driver.py View File

@@ -72,8 +72,11 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
try:
vip = network_driver.allocate_vip(lb_obj)
except network_base.AllocateVIPException as e:
raise exceptions.DriverError(user_fault_string=e.orig_msg,
operator_fault_string=e.orig_msg)
message = str(e)
if getattr(e, 'orig_msg', None) is not None:
message = e.orig_msg
raise exceptions.DriverError(user_fault_string=message,
operator_fault_string=message)

LOG.info('Amphora provider created VIP port %s for load balancer %s.',
vip.port_id, loadbalancer_id)


+ 6
- 0
octavia/api/drivers/utils.py View File

@@ -22,6 +22,7 @@ from oslo_log import log as logging
from oslo_utils import excutils
from stevedore import driver as stevedore_driver

from octavia.common import constants
from octavia.common import data_models
from octavia.common import exceptions
from octavia.common.tls_utils import cert_parser
@@ -543,6 +544,9 @@ def vip_dict_to_provider_dict(vip_dict):
new_vip_dict['vip_subnet_id'] = vip_dict['subnet_id']
if 'qos_policy_id' in vip_dict:
new_vip_dict['vip_qos_policy_id'] = vip_dict['qos_policy_id']
if constants.OCTAVIA_OWNED in vip_dict:
new_vip_dict[constants.OCTAVIA_OWNED] = vip_dict[
constants.OCTAVIA_OWNED]
return new_vip_dict


@@ -558,4 +562,6 @@ def provider_vip_dict_to_vip_obj(vip_dictionary):
vip_obj.subnet_id = vip_dictionary['vip_subnet_id']
if 'vip_qos_policy_id' in vip_dictionary:
vip_obj.qos_policy_id = vip_dictionary['vip_qos_policy_id']
if constants.OCTAVIA_OWNED in vip_dictionary:
vip_obj.octavia_owned = vip_dictionary[constants.OCTAVIA_OWNED]
return vip_obj

+ 39
- 17
octavia/api/v2/controllers/load_balancer.py View File

@@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress

from oslo_config import cfg
from oslo_db import exception as odb_exceptions
@@ -187,26 +188,40 @@ class LoadBalancersController(base.BaseController):
isinstance(load_balancer.vip_qos_policy_id, wtypes.UnsetType)):
load_balancer.vip_qos_policy_id = port_qos_policy_id

# Identify the subnet for this port
if load_balancer.vip_subnet_id:
# If we were provided a subnet_id, validate it exists and that
# there is a fixed_ip on the port that matches the provided subnet
validate.subnet_exists(subnet_id=load_balancer.vip_subnet_id,
context=context)
else:
if load_balancer.vip_address:
for port_fixed_ip in port.fixed_ips:
if port_fixed_ip.ip_address == load_balancer.vip_address:
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
break
if not load_balancer.vip_subnet_id:
raise exceptions.ValidationException(detail=_(
"Specified VIP address not found on the "
"specified VIP port."))
elif len(port.fixed_ips) == 1:
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
else:
for port_fixed_ip in port.fixed_ips:
if port_fixed_ip.subnet_id == load_balancer.vip_subnet_id:
load_balancer.vip_address = port_fixed_ip.ip_address
break # Just pick the first address found in the subnet
if not load_balancer.vip_address:
raise exceptions.ValidationException(detail=_(
"No VIP address found on the specified VIP port within "
"the specified subnet."))
elif load_balancer.vip_address:
normalized_lb_ip = ipaddress.ip_address(
load_balancer.vip_address).compressed
for port_fixed_ip in port.fixed_ips:
normalized_port_ip = ipaddress.ip_address(
port_fixed_ip.ip_address).compressed
if normalized_port_ip == normalized_lb_ip:
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
break
if not load_balancer.vip_subnet_id:
raise exceptions.ValidationException(detail=_(
"VIP port's subnet could not be determined. Please "
"specify either a VIP subnet or address."))
"Specified VIP address not found on the "
"specified VIP port."))
elif len(port.fixed_ips) == 1:
# User provided only a port, get the subnet and address from it
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
load_balancer.vip_address = port.fixed_ips[0].ip_address
else:
raise exceptions.ValidationException(detail=_(
"VIP port's subnet could not be determined. Please "
"specify either a VIP subnet or address."))

def _validate_vip_request_object(self, load_balancer, context=None):
allowed_network_objects = []
@@ -450,7 +465,10 @@ class LoadBalancersController(base.BaseController):
# Do the same with the availability_zone dict
lb_dict['availability_zone'] = az_dict

# See if the provider driver wants to create the VIP port
# See if the provider driver wants to manage the VIP port
# This will still be called if the user provided a port to
# allow drivers to collect any required information about the
# VIP port.
octavia_owned = False
try:
provider_vip_dict = driver_utils.vip_dict_to_provider_dict(
@@ -470,6 +488,10 @@ class LoadBalancersController(base.BaseController):
if 'port_id' not in vip_dict or not vip_dict['port_id']:
octavia_owned = True

# Check if the driver claims octavia owns the VIP port.
if vip.octavia_owned:
octavia_owned = True

self.repositories.vip.update(
lock_session, db_lb.id, ip_address=vip.ip_address,
port_id=vip.port_id, network_id=vip.network_id,


+ 43
- 1
octavia/common/config.py View File

@@ -186,6 +186,20 @@ amphora_agent_opts = [
help='The UDP API backend for amphora agent.'),
]

compute_opts = [
cfg.IntOpt('max_retries', default=15,
help=_('The maximum attempts to retry an action with the '
'compute service.')),
cfg.IntOpt('retry_interval', default=1,
help=_('Seconds to wait before retrying an action with the '
'compute service.')),
cfg.IntOpt('retry_backoff', default=1,
help=_('The seconds to backoff retry attempts.')),
cfg.IntOpt('retry_max', default=10,
help=_('The maximum interval in seconds between retry '
'attempts.')),
]

networking_opts = [
cfg.IntOpt('max_retries', default=15,
help=_('The maximum attempts to retry an action with the '
@@ -193,6 +207,11 @@ networking_opts = [
cfg.IntOpt('retry_interval', default=1,
help=_('Seconds to wait before retrying an action with the '
'networking service.')),
cfg.IntOpt('retry_backoff', default=1,
help=_('The seconds to backoff retry attempts.')),
cfg.IntOpt('retry_max', default=10,
help=_('The maximum interval in seconds between retry '
'attempts.')),
cfg.IntOpt('port_detach_timeout', default=300,
help=_('Seconds to wait for a port to detach from an '
'amphora.')),
@@ -305,6 +324,14 @@ haproxy_amphora_opts = [
default=2,
help=_('Retry timeout between connection attempts in '
'seconds for active amphora.')),
cfg.IntOpt('failover_connection_max_retries',
default=2,
help=_('Retry threshold for connecting to an amphora in '
'failover.')),
cfg.IntOpt('failover_connection_retry_interval',
default=5,
help=_('Retry timeout between connection attempts in '
'seconds for amphora in failover.')),
cfg.IntOpt('build_rate_limit',
default=-1,
help=_('Number of amphorae that could be built per controller '
@@ -369,6 +396,16 @@ haproxy_amphora_opts = [
deprecated_reason='This is now automatically discovered '
' and configured.',
help=_("If False, use sysvinit.")),
cfg.IntOpt('api_db_commit_retry_attempts', default=15,
help=_('The number of times the database action will be '
'attempted.')),
cfg.IntOpt('api_db_commit_retry_initial_delay', default=1,
help=_('The initial delay before a retry attempt.')),
cfg.IntOpt('api_db_commit_retry_backoff', default=1,
help=_('The time to backoff retry attempts.')),
cfg.IntOpt('api_db_commit_retry_max', default=5,
help=_('The maximum amount of time to wait between retry '
'attempts.')),
]

controller_worker_opts = [
@@ -451,7 +488,11 @@ controller_worker_opts = [
help=_('If True, build cloud-init user-data that is passed '
'to the config drive on Amphora boot instead of '
'personality files. If False, utilize personality '
'files.'))
'files.')),
cfg.IntOpt('amphora_delete_retries', default=5,
help=_('Number of times an amphora delete should be retried.')),
cfg.IntOpt('amphora_delete_retry_interval', default=5,
help=_('Time, in seconds, between amphora delete retries.')),
]

task_flow_opts = [
@@ -779,6 +820,7 @@ driver_agent_opts = [
cfg.CONF.register_opts(core_opts)
cfg.CONF.register_opts(api_opts, group='api_settings')
cfg.CONF.register_opts(amphora_agent_opts, group='amphora_agent')
cfg.CONF.register_opts(compute_opts, group='compute')
cfg.CONF.register_opts(networking_opts, group='networking')
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
cfg.CONF.register_opts(haproxy_amphora_opts, group='haproxy_amphora')


+ 50
- 3
octavia/common/constants.py View File

@@ -296,7 +296,9 @@ ACTIVE_CONNECTIONS = 'active_connections'
ADD_NICS = 'add_nics'
ADDED_PORTS = 'added_ports'
ADMIN_STATE_UP = 'admin_state_up'
ALLOWED_ADDRESS_PAIRS = 'allowed_address_pairs'
AMP_DATA = 'amp_data'
AMP_VRRP_INT = 'amp_vrrp_int'
AMPHORA = 'amphora'
AMPHORA_ID = 'amphora_id'
AMPHORA_INDEX = 'amphora_index'
@@ -305,6 +307,8 @@ AMPHORAE = 'amphorae'
AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config'
AMPS_DATA = 'amps_data'
ANTI_AFFINITY = 'anti-affinity'
ATTEMPT_NUMBER = 'attempt_number'
BASE_PORT = 'base_port'
BYTES_IN = 'bytes_in'
BYTES_OUT = 'bytes_out'
CACHED_ZONE = 'cached_zone'
@@ -324,7 +328,9 @@ DELETE_NICS = 'delete_nics'
DELTA = 'delta'
DELTAS = 'deltas'
DESCRIPTION = 'description'
DEVICE_OWNER = 'device_owner'
ENABLED = 'enabled'
FAILED_AMP_VRRP_PORT_ID = 'failed_amp_vrrp_port_id'
FAILED_AMPHORA = 'failed_amphora'
FAILOVER_AMPHORA = 'failover_amphora'
FAILOVER_AMPHORA_ID = 'failover_amphora_id'
@@ -341,6 +347,7 @@ HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
ID = 'id'
IMAGE_ID = 'image_id'
IP_ADDRESS = 'ip_address'
IPV6_ICMP = 'ipv6-icmp'
LB_NETWORK_IP = 'lb_network_ip'
L7POLICY = 'l7policy'
L7POLICY_ID = 'l7policy_id'
@@ -360,6 +367,7 @@ MEMBER = 'member'
MEMBER_ID = 'member_id'
MEMBER_PORTS = 'member_ports'
MEMBER_UPDATES = 'member_updates'
MESSAGE = 'message'
NAME = 'name'
NETWORK = 'network'
NETWORK_ID = 'network_id'
@@ -372,14 +380,16 @@ ORIGINAL_LISTENER = 'original_listener'
ORIGINAL_LOADBALANCER = 'original_load_balancer'
ORIGINAL_MEMBER = 'original_member'
ORIGINAL_POOL = 'original_pool'
PASSIVE_FAILURE = 'passive_failure'
PEER_PORT = 'peer_port'
POOL = 'pool'
POOL_CHILD_COUNT = 'pool_child_count'
POOL_ID = 'pool_id'
PROJECT_ID = 'project_id'
POOL_UPDATES = 'pool_updates'
PORT = 'port'
PORT_ID = 'port_id'
PORTS = 'ports'
PROJECT_ID = 'project_id'
PROVIDER = 'provider'
PROVIDER_NAME = 'provider_name'
QOS_POLICY_ID = 'qos_policy_id'
@@ -388,15 +398,19 @@ REQ_CONN_TIMEOUT = 'req_conn_timeout'
REQ_READ_TIMEOUT = 'req_read_timeout'
REQUEST_ERRORS = 'request_errors'
ROLE = 'role'
SECURITY_GROUPS = 'security_groups'
SECURITY_GROUP_RULES = 'security_group_rules'
SERVER_GROUP_ID = 'server_group_id'
SERVER_PEM = 'server_pem'
SNI_CONTAINER_DATA = 'sni_container_data'
SNI_CONTAINERS = 'sni_containers'
SOFT_ANTI_AFFINITY = 'soft-anti-affinity'
STATUS = 'status'
STATUS_CODE = 'status_code'
SUBNET = 'subnet'
SUBNET_ID = 'subnet_id'
TAGS = 'tags'
TENANT_ID = 'tenant_id'
TIMEOUT_DICT = 'timeout_dict'
TLS_CERTIFICATE_ID = 'tls_certificate_id'
TLS_CONTAINER_ID = 'tls_container_id'
@@ -410,6 +424,7 @@ VIP_ADDRESS = 'vip_address'
VIP_NETWORK = 'vip_network'
VIP_PORT_ID = 'vip_port_id'
VIP_QOS_POLICY_ID = 'vip_qos_policy_id'
VIP_SG_ID = 'vip_sg_id'
VIP_SUBNET = 'vip_subnet'
VIP_SUBNET_ID = 'vip_subnet_id'
VRRP_ID = 'vrrp_id'
@@ -437,6 +452,7 @@ CREATE_POOL_FLOW = 'octavia-create-pool-flow'
CREATE_L7POLICY_FLOW = 'octavia-create-l7policy-flow'
CREATE_L7RULE_FLOW = 'octavia-create-l7rule-flow'
DELETE_AMPHORA_FLOW = 'octavia-delete-amphora-flow'
DELETE_EXTRA_AMPHORAE_FLOW = 'octavia-delete-extra-amphorae-flow'
DELETE_HEALTH_MONITOR_FLOW = 'octavia-delete-health-monitor-flow'
DELETE_LISTENER_FLOW = 'octavia-delete-listener_flow'
DELETE_LOADBALANCER_FLOW = 'octavia-delete-loadbalancer-flow'
@@ -445,6 +461,7 @@ DELETE_POOL_FLOW = 'octavia-delete-pool-flow'
DELETE_L7POLICY_FLOW = 'octavia-delete-l7policy-flow'
DELETE_L7RULE_FLOW = 'octavia-delete-l7policy-flow'
FAILOVER_AMPHORA_FLOW = 'octavia-failover-amphora-flow'
FAILOVER_LOADBALANCER_FLOW = 'octavia-failover-loadbalancer-flow'
FINALIZE_AMPHORA_FLOW = 'octavia-finalize-amphora-flow'
LOADBALANCER_NETWORKING_SUBFLOW = 'octavia-new-loadbalancer-net-subflow'
UPDATE_HEALTH_MONITOR_FLOW = 'octavia-update-health-monitor-flow'
@@ -459,10 +476,13 @@ UPDATE_AMPHORA_CONFIG_FLOW = 'octavia-update-amp-config-flow'

POST_MAP_AMP_TO_LB_SUBFLOW = 'octavia-post-map-amp-to-lb-subflow'
CREATE_AMP_FOR_LB_SUBFLOW = 'octavia-create-amp-for-lb-subflow'
CREATE_AMP_FOR_FAILOVER_SUBFLOW = 'octavia-create-amp-for-failover-subflow'
AMP_PLUG_NET_SUBFLOW = 'octavia-plug-net-subflow'
GET_AMPHORA_FOR_LB_SUBFLOW = 'octavia-get-amphora-for-lb-subflow'
POST_LB_AMP_ASSOCIATION_SUBFLOW = (
'octavia-post-loadbalancer-amp_association-subflow')
AMPHORA_LISTENER_START_SUBFLOW = 'amphora-listener-start-subflow'
AMPHORA_LISTENER_RELOAD_SUBFLOW = 'amphora-listener-start-subflow'

MAP_LOADBALANCER_TO_AMPHORA = 'octavia-mapload-balancer-to-amphora'
RELOAD_AMPHORA = 'octavia-reload-amphora'
@@ -478,7 +498,7 @@ COMPUTE_WAIT = 'octavia-compute-wait'
UPDATE_AMPHORA_INFO = 'octavia-update-amphora-info'
AMPHORA_FINALIZE = 'octavia-amphora-finalize'
MARK_AMPHORA_ALLOCATED_INDB = 'octavia-mark-amphora-allocated-indb'
RELOADLOAD_BALANCER = 'octavia-reloadload-balancer'
MARK_AMPHORA_READY_INDB = 'octavia-mark-amphora-ready-indb'
MARK_LB_ACTIVE_INDB = 'octavia-mark-lb-active-indb'
MARK_AMP_MASTER_INDB = 'octavia-mark-amp-master-indb'
MARK_AMP_BACKUP_INDB = 'octavia-mark-amp-backup-indb'
@@ -492,6 +512,7 @@ CREATE_VRRP_GROUP_FOR_LB = 'octavia-create-vrrp-group-for-lb'
CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules'
AMP_COMPUTE_CONNECTIVITY_WAIT = 'octavia-amp-compute-connectivity-wait'
AMP_LISTENER_UPDATE = 'octavia-amp-listeners-update'
AMP_LISTENER_START = 'octavia-amp-listeners-start'
PLUG_VIP_AMPHORA = 'octavia-amp-plug-vip'
APPLY_QOS_AMP = 'octavia-amp-apply-qos'
UPDATE_AMPHORA_VIP_DATA = 'ocatvia-amp-update-vip-data'
@@ -499,6 +520,8 @@ GET_AMP_NETWORK_CONFIG = 'octavia-amp-get-network-config'
AMP_POST_VIP_PLUG = 'octavia-amp-post-vip-plug'
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'
AMPHORA_CONFIG_UPDATE_TASK = 'AmphoraConfigUpdateTask'
FIRST_AMP_NETWORK_CONFIGS = 'first-amp-network-configs'
FIRST_AMP_VRRP_INTERFACE = 'first-amp-vrrp_interface'

# Batch Member Update constants
UNORDERED_MEMBER_UPDATES_FLOW = 'octavia-unordered-member-updates-flow'
@@ -513,11 +536,30 @@ UPDATE_MEMBER_INDB = 'octavia-update-member-indb'
DELETE_MEMBER_INDB = 'octavia-delete-member-indb'

# Task Names
ADMIN_DOWN_PORT = 'admin-down-port'
AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug'
AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener'
AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert'
AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug'
ATTACH_PORT = 'attach-port'
CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta'
CREATE_VIP_BASE_PORT = 'create-vip-base-port'
DELETE_AMPHORA = 'delete-amphora'
DELETE_PORT = 'delete-port'
DISABLE_AMP_HEALTH_MONITORING = 'disable-amphora-health-monitoring'
GET_AMPHORA_NETWORK_CONFIGS_BY_ID = 'get-amphora-network-configs-by-id'
GET_AMPHORAE_FROM_LB = 'get-amphorae-from-lb'
HANDLE_NETWORK_DELTA = 'handle-network-delta'
MARK_AMPHORA_DELETED = 'mark-amphora-deleted'
MARK_AMPHORA_PENDING_DELETE = 'mark-amphora-pending-delete'
MARK_AMPHORA_HEALTH_BUSY = 'mark-amphora-health-busy'
RELOAD_AMP_AFTER_PLUG_VIP = 'reload-amp-after-plug-vip'
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'
RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH = 'reload-lb-after-amp-assoc-full-graph'
RELOAD_LB_AFTER_PLUG_VIP = 'reload-lb-after-plug-vip'
RELOAD_LB_BEFOR_ALLOCATE_VIP = "reload-lb-before-allocate-vip"
RELOAD_LB_BEFOR_ALLOCATE_VIP = 'reload-lb-before-allocate-vip'
UPDATE_AMP_FAILOVER_DETAILS = 'update-amp-failover-details'


NOVA_1 = '1.1'
NOVA_21 = '2.1'
@@ -786,6 +828,11 @@ CIPHERS_OWASP_SUITE_B = ('TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:'
'DHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384:'
'ECDHE-RSA-AES128-SHA256')

VIP_SECURITY_GROUP_PREFIX = 'lb-'

AMP_BASE_PORT_PREFIX = 'octavia-lb-vrrp-'
OCTAVIA_OWNED = 'octavia_owned'

# Sadly in the LBaaS v2 API, header insertions are on the listener objects
# but they should be on the pool. Dealing with it until v3.
LISTENER_PROTOCOLS_SUPPORTING_HEADER_INSERTION = [PROTOCOL_HTTP,


+ 19
- 1
octavia/common/exceptions.py View File

@@ -202,7 +202,8 @@ class ComputeBuildQueueTimeoutException(OctaviaException):


class ComputeDeleteException(OctaviaException):
message = _('Failed to delete compute instance.')
message = _('Failed to delete compute instance. The compute service '
'reports: %(compute_msg)s')


class ComputeGetException(OctaviaException):
@@ -243,6 +244,14 @@ class ComputeWaitTimeoutException(OctaviaException):
message = _('Waiting for compute id %(id)s to go active timeout.')


class ComputePortInUseException(OctaviaException):
message = _('Compute driver reports port %(port)s is already in use.')


class ComputeUnknownException(OctaviaException):
message = _('Unknown exception from the compute driver: %(exc)s.')


class InvalidTopology(OctaviaException):
message = _('Invalid topology specified: %(topology)s')

@@ -396,3 +405,12 @@ class VolumeDeleteException(OctaviaException):

class VolumeGetException(OctaviaException):
message = _('Failed to retrieve volume instance.')


class NetworkServiceError(OctaviaException):
message = _('The networking service had a failure: %(net_error)s')


class InvalidIPAddress(APIException):
msg = _('The IP Address %(ip_addr)s is invalid.')
code = 400

+ 23
- 0
octavia/common/utils.py View File

@@ -29,6 +29,8 @@ from oslo_log import log as logging
from oslo_utils import excutils
from stevedore import driver as stevedore_driver

from octavia.common import constants

CONF = cfg.CONF

LOG = logging.getLogger(__name__)
@@ -50,6 +52,15 @@ def base64_sha1_string(string_to_hash):
return re.sub(r"^-", "x", b64_sha1)


def get_amphora_driver():
amphora_driver = stevedore_driver.DriverManager(
namespace='octavia.amphora.drivers',
name=CONF.controller_worker.amphora_driver,
invoke_on_load=True
).driver
return amphora_driver


def get_network_driver():
CONF.import_group('controller_worker', 'octavia.common.config')
network_driver = stevedore_driver.DriverManager(
@@ -60,6 +71,12 @@ def get_network_driver():
return network_driver


def is_ipv4(ip_address):
"""Check if ip address is IPv4 address."""
ip = netaddr.IPAddress(ip_address)
return ip.version == 4


def is_ipv6(ip_address):
"""Check if ip address is IPv6 address."""
ip = netaddr.IPAddress(ip_address)
@@ -99,6 +116,12 @@ def ip_netmask_to_cidr(ip, netmask):
return "{ip}/{netmask}".format(ip=net.network, netmask=net.prefixlen)


def get_vip_security_group_name(loadbalancer_id):
if loadbalancer_id:
return constants.VIP_SECURITY_GROUP_PREFIX + loadbalancer_id
return None


def get_compatible_value(value):
if isinstance(value, str):
value = value.encode('utf-8')


+ 4
- 4
octavia/compute/drivers/noop_driver/driver.py View File

@@ -84,8 +84,8 @@ class NoopManager(object):
self.__class__.__name__, server_group_id)
self.computeconfig[server_group_id] = (server_group_id, 'delete')

def attach_network_or_port(self, compute_id, network_id, ip_address=None,
port_id=None):
def attach_network_or_port(self, compute_id, network_id=None,
ip_address=None, port_id=None):
LOG.debug("Compute %s no-op, attach_network_or_port compute_id %s,"
"network_id %s, ip_address %s, port_id %s",
self.__class__.__name__, compute_id,
@@ -153,8 +153,8 @@ class NoopComputeDriver(driver_base.ComputeBase):
def delete_server_group(self, server_group_id):
self.driver.delete_server_group(server_group_id)

def attach_network_or_port(self, compute_id, network_id, ip_address=None,
port_id=None):
def attach_network_or_port(self, compute_id, network_id=None,
ip_address=None, port_id=None):
self.driver.attach_network_or_port(compute_id, network_id, ip_address,
port_id)



+ 33
- 7
octavia/compute/drivers/nova_driver.py View File

@@ -199,9 +199,9 @@ class VirtualMachineManager(compute_base.ComputeBase):
except nova_exceptions.NotFound:
LOG.warning("Nova instance with id: %s not found. "
"Assuming already deleted.", compute_id)
except Exception:
except Exception as e:
LOG.exception("Error deleting nova virtual machine.")
raise exceptions.ComputeDeleteException()
raise exceptions.ComputeDeleteException(compute_msg=str(e))

def status(self, compute_id):
'''Retrieve the status of a virtual machine.
@@ -339,8 +339,8 @@ class VirtualMachineManager(compute_base.ComputeBase):
LOG.exception("Error delete server group instance.")
raise exceptions.ServerGroupObjectDeleteException()

def attach_network_or_port(self, compute_id, network_id, ip_address=None,
port_id=None):
def attach_network_or_port(self, compute_id, network_id=None,
ip_address=None, port_id=None):
"""Attaching a port or a network to an existing amphora

:param compute_id: id of an amphora in the compute service
@@ -348,13 +348,39 @@ class VirtualMachineManager(compute_base.ComputeBase):
:param ip_address: ip address to attempt to be assigned to interface
:param port_id: id of the neutron port
:return: nova interface instance
:raises: Exception
:raises ComputePortInUseException: The port is in use somewhere else
:raises ComputeUnknownException: Unknown nova error
"""
try:
interface = self.manager.interface_attach(
server=compute_id, net_id=network_id, fixed_ip=ip_address,
port_id=port_id)
except Exception:
except nova_exceptions.Conflict as e:
# The port is already in use.
if port_id:
# Check if the port we want is already attached
try:
interfaces = self.manager.interface_list(compute_id)
for interface in interfaces:
if interface.id == port_id:
return interface
except Exception as e:
raise exceptions.ComputeUnknownException(exc=str(e))

raise exceptions.ComputePortInUseException(port=port_id)

# Nova should have created the port, so something is really
# wrong in nova if we get here.
raise exceptions.ComputeUnknownException(exc=str(e))
except nova_exceptions.NotFound as e:
if 'Instance' in str(e):
raise exceptions.NotFound(resource='Instance', id=compute_id)
if 'Network' in str(e):
raise exceptions.NotFound(resource='Network', id=network_id)
if 'Port' in str(e):
raise exceptions.NotFound(resource='Port', id=port_id)
raise exceptions.NotFound(resource=str(e), id=compute_id)
except Exception as e:
LOG.error('Error attaching network %(network_id)s with ip '
'%(ip_address)s and port %(port)s to amphora '
'(compute_id: %(compute_id)s) ',
@@ -364,7 +390,7 @@ class VirtualMachineManager(compute_base.ComputeBase):
'ip_address': ip_address,
'port': port_id
})
raise
raise exceptions.ComputeUnknownException(exc=str(e))
return interface

def detach_port(self, compute_id, port_id):


+ 271
- 166
octavia/controller/worker/v1/controller_worker.py View File

@@ -23,6 +23,8 @@ import tenacity

from octavia.common import base_taskflow
from octavia.common import constants
from octavia.common import exceptions
from octavia.common import utils
from octavia.controller.worker.v1.flows import amphora_flows
from octavia.controller.worker.v1.flows import health_monitor_flows
from octavia.controller.worker.v1.flows import l7policy_flows
@@ -37,11 +39,6 @@ from octavia.db import repositories as repo
CONF = cfg.CONF
LOG = logging.getLogger(__name__)

RETRY_ATTEMPTS = 15
RETRY_INITIAL_DELAY = 1
RETRY_BACKOFF = 1
RETRY_MAX = 5


def _is_provisioning_status_pending_update(lb_obj):
return not lb_obj.provisioning_status == constants.PENDING_UPDATE
@@ -79,8 +76,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
tenacity.retry_if_result(_is_provisioning_status_pending_update) |
tenacity.retry_if_exception_type()),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def _get_db_obj_until_pending_update(self, repo, id):

return repo.get(db_apis.get_session(), id=id)
@@ -96,6 +96,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
store = {constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: None}
if availability_zone:
store[constants.AVAILABILITY_ZONE] = (
@@ -111,27 +112,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
except Exception as e:
LOG.error('Failed to create an amphora due to: {}'.format(str(e)))

def delete_amphora(self, amphora_id):
"""Deletes an existing Amphora.

:param amphora_id: ID of the amphora to delete
:returns: None
:raises AmphoraNotFound: The referenced Amphora was not found
"""
amphora = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
delete_amp_tf = self._taskflow_load(self._amphora_flows.
get_delete_amphora_flow(),
store={constants.AMPHORA: amphora})
with tf_logging.DynamicLoggingListener(delete_amp_tf,
log=LOG):
delete_amp_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_health_monitor(self, health_monitor_id):
"""Creates a health monitor.

@@ -224,8 +212,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_listener(self, listener_id):
"""Creates a listener.

@@ -310,8 +301,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_load_balancer(self, load_balancer_id, flavor=None,
availability_zone=None):
"""Creates a load balancer by allocating Amphorae.
@@ -338,6 +332,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
constants.FLAVOR: flavor,
constants.AVAILABILITY_ZONE: availability_zone}

if not CONF.nova.enable_anti_affinity:
store[constants.SERVER_GROUP_ID] = None

topology = lb.topology

store[constants.UPDATE_DICT] = {
@@ -411,8 +408,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_member(self, member_id):
"""Creates a pool member.

@@ -486,8 +486,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def batch_update_members(self, old_member_ids, new_member_ids,
updated_members):
new_members = [self._member_repo.get(db_apis.get_session(), id=mid)
@@ -577,8 +580,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_pool(self, pool_id):
"""Creates a node pool.

@@ -667,8 +673,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_l7policy(self, l7policy_id):
"""Creates an L7 Policy.

@@ -753,8 +762,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_l7rule(self, l7rule_id):
"""Creates an L7 Rule.

@@ -841,155 +853,248 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_l7rule_tf.run()

def _perform_amphora_failover(self, amp, priority):
"""Internal method to perform failover operations for an amphora.

:param amp: The amphora to failover