Browse Source

Merge "Refactor the failover flows" into stable/train

changes/74/741974/1
Zuul 3 weeks ago
committed by Gerrit Code Review
parent
commit
3f01b05932
74 changed files with 6220 additions and 1341 deletions
  1. +37
    -0
      etc/octavia.conf
  2. +12
    -63
      octavia/amphorae/backends/agent/api_server/amphora_info.py
  3. +4
    -0
      octavia/amphorae/backends/agent/api_server/keepalived.py
  4. +5
    -1
      octavia/amphorae/backends/agent/api_server/keepalivedlvs.py
  5. +13
    -22
      octavia/amphorae/backends/agent/api_server/loadbalancer.py
  6. +76
    -2
      octavia/amphorae/backends/agent/api_server/util.py
  7. +183
    -0
      octavia/amphorae/backends/utils/ip_advertisement.py
  8. +50
    -0
      octavia/amphorae/backends/utils/network_namespace.py
  9. +86
    -0
      octavia/amphorae/backends/utils/network_utils.py
  10. +30
    -11
      octavia/amphorae/drivers/driver_base.py
  11. +4
    -3
      octavia/amphorae/drivers/haproxy/exceptions.py
  12. +42
    -18
      octavia/amphorae/drivers/haproxy/rest_api_driver.py
  13. +36
    -31
      octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py
  14. +14
    -5
      octavia/amphorae/drivers/noop_driver/driver.py
  15. +5
    -2
      octavia/api/drivers/amphora_driver/v1/driver.py
  16. +6
    -0
      octavia/api/drivers/utils.py
  17. +40
    -17
      octavia/api/v2/controllers/load_balancer.py
  18. +43
    -1
      octavia/common/config.py
  19. +58
    -2
      octavia/common/constants.py
  20. +19
    -1
      octavia/common/exceptions.py
  21. +23
    -0
      octavia/common/utils.py
  22. +4
    -4
      octavia/compute/drivers/noop_driver/driver.py
  23. +33
    -7
      octavia/compute/drivers/nova_driver.py
  24. +262
    -165
      octavia/controller/worker/v1/controller_worker.py
  25. +421
    -339
      octavia/controller/worker/v1/flows/amphora_flows.py
  26. +399
    -51
      octavia/controller/worker/v1/flows/load_balancer_flows.py
  27. +150
    -62
      octavia/controller/worker/v1/tasks/amphora_driver_tasks.py
  28. +73
    -9
      octavia/controller/worker/v1/tasks/compute_tasks.py
  29. +34
    -17
      octavia/controller/worker/v1/tasks/database_tasks.py
  30. +183
    -41
      octavia/controller/worker/v1/tasks/network_tasks.py
  31. +74
    -0
      octavia/controller/worker/v1/tasks/retry_tasks.py
  32. +2
    -2
      octavia/controller/worker/v2/tasks/amphora_driver_tasks.py
  33. +52
    -0
      octavia/network/base.py
  34. +15
    -1
      octavia/network/data_models.py
  35. +226
    -55
      octavia/network/drivers/neutron/allowed_address_pairs.py
  36. +13
    -4
      octavia/network/drivers/neutron/base.py
  37. +32
    -13
      octavia/network/drivers/neutron/utils.py
  38. +69
    -0
      octavia/network/drivers/noop_driver/driver.py
  39. +1
    -0
      octavia/opts.py
  40. +46
    -0
      octavia/tests/common/constants.py
  41. +3
    -1
      octavia/tests/common/data_model_helpers.py
  42. +4
    -2
      octavia/tests/common/sample_data_models.py
  43. +198
    -0
      octavia/tests/common/sample_network_data.py
  44. +9
    -3
      octavia/tests/functional/amphorae/backend/agent/api_server/test_keepalivedlvs.py
  45. +15
    -7
      octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py
  46. +13
    -35
      octavia/tests/unit/amphorae/backends/agent/api_server/test_loadbalancer.py
  47. +3
    -2
      octavia/tests/unit/amphorae/backends/agent/api_server/test_osutils.py
  48. +128
    -0
      octavia/tests/unit/amphorae/backends/agent/api_server/test_util.py
  49. +213
    -0
      octavia/tests/unit/amphorae/backends/utils/test_ip_advertisement.py
  50. +117
    -0
      octavia/tests/unit/amphorae/backends/utils/test_network_namespace.py
  51. +140
    -0
      octavia/tests/unit/amphorae/backends/utils/test_network_utils.py
  52. +52
    -0
      octavia/tests/unit/amphorae/drivers/haproxy/test_exceptions.py
  53. +26
    -8
      octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver_0_5.py
  54. +26
    -8
      octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver_1_0.py
  55. +83
    -0
      octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver_common.py
  56. +29
    -4
      octavia/tests/unit/amphorae/drivers/keepalived/test_vrrp_rest_driver.py
  57. +10
    -1
      octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py
  58. +31
    -0
      octavia/tests/unit/common/test_utils.py
  59. +64
    -2
      octavia/tests/unit/compute/drivers/test_nova_driver.py
  60. +158
    -65
      octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py
  61. +229
    -5
      octavia/tests/unit/controller/worker/v1/flows/test_load_balancer_flows.py
  62. +210
    -45
      octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py
  63. +69
    -1
      octavia/tests/unit/controller/worker/v1/tasks/test_compute_tasks.py
  64. +31
    -3
      octavia/tests/unit/controller/worker/v1/tasks/test_database_tasks.py
  65. +379
    -36
      octavia/tests/unit/controller/worker/v1/tasks/test_network_tasks.py
  66. +47
    -0
      octavia/tests/unit/controller/worker/v1/tasks/test_retry_tasks.py
  67. +553
    -140
      octavia/tests/unit/controller/worker/v1/test_controller_worker.py
  68. +3
    -2
      octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py
  69. +421
    -18
      octavia/tests/unit/network/drivers/neutron/test_allowed_address_pairs.py
  70. +1
    -0
      octavia/tests/unit/network/drivers/neutron/test_utils.py
  71. +56
    -0
      octavia/tests/unit/network/drivers/test_network_noop_driver.py
  72. +11
    -0
      releasenotes/notes/refactor_failover_flow-9efcd854240f71ad.yaml
  73. +11
    -3
      tools/create_flow_docs.py
  74. +2
    -1
      tools/flow-list.txt

+ 37
- 0
etc/octavia.conf View File

@@ -138,6 +138,19 @@
# Endpoint type to use for communication with the Barbican service.
# endpoint_type = publicURL

[compute]
# The maximum attempts to retry an action with the compute service.
# max_retries = 15

# Seconds to wait before retrying an action with the compute service.
# retry_interval = 1

# The seconds to backoff retry attempts
# retry_backoff = 1

# The maximum interval in seconds between retry attempts
# retry_max = 10

[networking]
# The maximum attempts to retry an action with the networking service.
# max_retries = 15
@@ -145,6 +158,12 @@
# Seconds to wait before retrying an action with the networking service.
# retry_interval = 1

# The seconds to backoff retry attempts
# retry_backoff = 1

# The maximum interval in seconds between retry attempts
# retry_max = 10

# The maximum time to wait, in seconds, for a port to detach from an amphora
# port_detach_timeout = 300

@@ -209,11 +228,26 @@
# active_connection_max_retries = 15
# active_connection_rety_interval = 2

# These "failover" timeouts are used during the failover process to probe
# amphorae that are part of the load balancer being failed over.
# These values are very low to facilitate "fail fast" should an amphora
# not respond in a failure situation.
# failover_connection_max_retries = 2
# failover_connection_retry_interval = 5

# The user flow log format for HAProxy.
# {{ project_id }} and {{ lb_id }} will be automatically substituted by the
# controller when configuring HAProxy if they are present in the string.
# user_log_format = '{{ project_id }} {{ lb_id }} %f %ci %cp %t %{+Q}r %ST %B %U %[ssl_c_verify] %{+Q}[ssl_c_s_dn] %b %s %Tt %tsc'

# API messaging / database commit retries
# This is many times the controller worker retries waiting for the API to
# complete a database commit for a message received over the queue.
# api_db_commit_retry_attempts = 15
# api_db_commit_retry_initial_delay = 1
# api_db_commit_retry_backoff = 1
# api_db_commit_retry_max = 5

[controller_worker]
# workers = 1
# amp_active_retries = 30
@@ -270,6 +304,9 @@
# loadbalancer_topology = SINGLE
# user_data_config_drive = False

# amphora_delete_retries = 5
# amphora_delete_retry_interval = 5

[task_flow]
# TaskFlow engine options are:
# - serial: Runs all tasks on a single thread.


+ 12
- 63
octavia/amphorae/backends/agent/api_server/amphora_info.py View File

@@ -12,20 +12,19 @@
# License for the specific language governing permissions and limitations
# under the License.

import ipaddress
import os
import re
import socket
import subprocess

import pyroute2
import six
import webob

import netifaces
from octavia.amphorae.backends.agent import api_server
from octavia.amphorae.backends.agent.api_server import util
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts
from octavia.common import exceptions


class AmphoraInfo(object):
@@ -176,65 +175,15 @@ class AmphoraInfo(object):
return networks

def get_interface(self, ip_addr):

try:
ip_version = ipaddress.ip_address(six.text_type(ip_addr)).version
except Exception:
return webob.Response(
json=dict(message="Invalid IP address"), status=400)

if ip_version == 4:
address_format = netifaces.AF_INET
elif ip_version == 6:
address_format = netifaces.AF_INET6
else:
interface = network_utils.get_interface_name(
ip_addr, net_ns=consts.AMPHORA_NAMESPACE)
except exceptions.InvalidIPAddress:
return webob.Response(json=dict(message="Invalid IP address"),
status=400)
except exceptions.NotFound:
return webob.Response(
json=dict(message="Bad IP address version"), status=400)

# We need to normalize the address as IPv6 has multiple representations
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
normalized_addr = socket.inet_ntop(address_format,
socket.inet_pton(address_format,
ip_addr))

with pyroute2.NetNS(consts.AMPHORA_NAMESPACE) as netns:
for addr in netns.get_addr():
# Save the interface index as IPv6 records don't list a
# textual interface
interface_idx = addr['index']
# Save the address family (IPv4/IPv6) for use normalizing
# the IP address for comparison
interface_af = addr['family']
# Search through the attributes of each address record
for attr in addr['attrs']:
# Look for the attribute name/value pair for the address
if attr[0] == 'IFA_ADDRESS':
# Compare the normalized address with the address we
# we are looking for. Since we have matched the name
# above, attr[1] is the address value
if normalized_addr == socket.inet_ntop(
interface_af,
socket.inet_pton(interface_af, attr[1])):

# Lookup the matching interface name by
# getting the interface with the index we found
# in the above address search
lookup_int = netns.get_links(interface_idx)
# Search through the attributes of the matching
# interface record
for int_attr in lookup_int[0]['attrs']:
# Look for the attribute name/value pair
# that includes the interface name
if int_attr[0] == 'IFLA_IFNAME':
# Return the response with the matching
# interface name that is in int_attr[1]
# for the matching interface attribute
# name
return webob.Response(
json=dict(message='OK',
interface=int_attr[1]),
status=200)

return webob.Response(
json=dict(message="Error interface not found for IP address"),
status=404)
json=dict(message="Error interface not found for IP address"),
status=404)
return webob.Response(json=dict(message='OK', interface=interface),
status=200)

+ 4
- 0
octavia/amphorae/backends/agent/api_server/keepalived.py View File

@@ -47,6 +47,7 @@ class Keepalived(object):

if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
if not os.path.exists(util.keepalived_check_scripts_dir()):
os.makedirs(util.keepalived_check_scripts_dir())

conf_file = util.keepalived_cfg_path()
@@ -112,6 +113,9 @@ class Keepalived(object):
)
text_file.write(text)

# Configure the monitoring of haproxy
util.vrrp_check_script_update(None, consts.AMP_ACTION_START)

# Make sure the new service is enabled on boot
if init_system != consts.INIT_UPSTART:
try:


+ 5
- 1
octavia/amphorae/backends/agent/api_server/keepalivedlvs.py View File

@@ -78,7 +78,8 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
# Active-Standby topology will create the directory below. So for
# Single topology, it should not create the directory and the check
# scripts for status change.
if not os.path.exists(util.keepalived_check_scripts_dir()):
if (CONF.controller_worker.loadbalancer_topology !=
consts.TOPOLOGY_ACTIVE_STANDBY):
NEED_CHECK = False

conf_file = util.keepalived_lvs_cfg_path(listener_id)
@@ -157,6 +158,9 @@ class KeepalivedLvs(udp_listener_base.UdpListenerApiServerBase):
script_path = os.path.join(util.keepalived_check_scripts_dir(),
KEEPALIVED_CHECK_SCRIPT_NAME)
if not os.path.exists(script_path):
if not os.path.exists(util.keepalived_check_scripts_dir()):
os.makedirs(util.keepalived_check_scripts_dir())

with os.fdopen(os.open(script_path, flags, stat.S_IEXEC),
'w') as script_file:
text = check_script_file_template.render(


+ 13
- 22
octavia/amphorae/backends/agent/api_server/loadbalancer.py View File

@@ -235,12 +235,11 @@ class Loadbalancer(object):
details="Unknown action: {0}".format(action)), status=400)

self._check_lb_exists(lb_id)
is_vrrp = (CONF.controller_worker.loadbalancer_topology ==
consts.TOPOLOGY_ACTIVE_STANDBY)

# Since this script should be created at LB create time
# we can check for this path to see if VRRP is enabled
# on this amphora and not write the file if VRRP is not in use
if os.path.exists(util.keepalived_check_script_path()):
self.vrrp_check_script_update(lb_id, action)
if is_vrrp:
util.vrrp_check_script_update(lb_id, action)

# HAProxy does not start the process when given a reload
# so start it if haproxy is not already running
@@ -262,6 +261,14 @@ class Loadbalancer(object):
return webob.Response(json=dict(
message="Error {0}ing haproxy".format(action),
details=e.output), status=500)

# If we are not in active/standby we need to send an IP
# advertisement (GARP or NA). Keepalived handles this for
# active/standby load balancers.
if not is_vrrp and action in [consts.AMP_ACTION_START,
consts.AMP_ACTION_RELOAD]:
util.send_vip_advertisements(lb_id)

if action in [consts.AMP_ACTION_STOP,
consts.AMP_ACTION_RELOAD]:
return webob.Response(json=dict(
@@ -307,7 +314,7 @@ class Loadbalancer(object):
# we can check for this path to see if VRRP is enabled
# on this amphora and not write the file if VRRP is not in use
if os.path.exists(util.keepalived_check_script_path()):
self.vrrp_check_script_update(
util.vrrp_check_script_update(
lb_id, action=consts.AMP_ACTION_STOP)

# delete the ssl files
@@ -455,22 +462,6 @@ class Loadbalancer(object):
def _cert_file_path(self, lb_id, filename):
return os.path.join(self._cert_dir(lb_id), filename)

def vrrp_check_script_update(self, lb_id, action):
lb_ids = util.get_loadbalancers()
if action == consts.AMP_ACTION_STOP:
lb_ids.remove(lb_id)
args = []
for lbid in lb_ids:
args.append(util.haproxy_sock_path(lbid))

if not os.path.exists(util.keepalived_dir()):
os.makedirs(util.keepalived_dir())
os.makedirs(util.keepalived_check_scripts_dir())

cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(util.haproxy_check_script_path(), 'w') as text_file:
text_file.write(cmd)

def _check_haproxy_status(self, lb_id):
if os.path.exists(util.pid_path(lb_id)):
if os.path.exists(


+ 76
- 2
octavia/amphorae/backends/agent/api_server/util.py View File

@@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.

import errno
import os
import re
import stat
@@ -23,6 +23,8 @@ from oslo_config import cfg
from oslo_log import log as logging

from octavia.amphorae.backends.agent.api_server import osutils
from octavia.amphorae.backends.utils import ip_advertisement
from octavia.amphorae.backends.utils import network_utils
from octavia.common import constants as consts

CONF = cfg.CONF
@@ -188,7 +190,7 @@ def get_listeners():
def get_loadbalancers():
"""Get Load balancers

:returns: An array with the ids of all load balancers,
:returns: An array with the uuids of all load balancers,
e.g. ['123', '456', ...] or [] if no loadbalancers exist
"""
if os.path.exists(CONF.haproxy_amphora.base_path):
@@ -332,3 +334,75 @@ def parse_haproxy_file(lb_id):
stats_socket = m.group(1)

return stats_socket, listeners


def vrrp_check_script_update(lb_id, action):
try:
os.makedirs(keepalived_dir())
os.makedirs(keepalived_check_scripts_dir())
except OSError as e:
if e.errno != errno.EEXIST:
raise

lb_ids = get_loadbalancers()
udp_ids = get_udp_listeners()
# If no LBs are found, so make sure keepalived thinks haproxy is down.
if not lb_ids:
if not udp_ids:
with open(haproxy_check_script_path(), 'w') as text_file:
text_file.write('exit 1')
return
if action == consts.AMP_ACTION_STOP:
lb_ids.remove(lb_id)
args = []
for lbid in lb_ids:
args.append(haproxy_sock_path(lbid))

cmd = 'haproxy-vrrp-check {args}; exit $?'.format(args=' '.join(args))
with open(haproxy_check_script_path(), 'w') as text_file:
text_file.write(cmd)


def get_haproxy_vip_addresses(lb_id):
"""Get the VIP addresses for a load balancer.

:param lb_id: The load balancer ID to get VIP addresses from.
:returns: List of VIP addresses (IPv4 and IPv6)
"""
vips = []
with open(config_path(lb_id), 'r') as file:
for line in file:
current_line = line.strip()
if current_line.startswith('bind'):
for section in current_line.split(' '):
# We will always have a port assigned per the template.
if ':' in section:
if ',' in section:
addr_port = section.rstrip(',')
vips.append(addr_port.rpartition(':')[0])
else:
vips.append(section.rpartition(':')[0])
break
return vips


def send_vip_advertisements(lb_id):
"""Sends address advertisements for each load balancer VIP.

This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the VIP addresses on a load balancer.

:param lb_id: The load balancer ID to send advertisements for.
:returns: None
"""
try:
vips = get_haproxy_vip_addresses(lb_id)

for vip in vips:
interface = network_utils.get_interface_name(
vip, net_ns=consts.AMPHORA_NAMESPACE)
ip_advertisement.send_ip_advertisement(
interface, vip, net_ns=consts.AMPHORA_NAMESPACE)
except Exception as e:
LOG.debug('Send VIP advertisement failed due to :%s. '
'This amphora may not be the MASTER. Ignoring.', str(e))

+ 183
- 0
octavia/amphorae/backends/utils/ip_advertisement.py View File

@@ -0,0 +1,183 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fcntl
import socket
from struct import pack
from struct import unpack

from oslo_log import log as logging

from octavia.amphorae.backends.utils import network_namespace
from octavia.common import constants
from octavia.common import utils as common_utils

LOG = logging.getLogger(__name__)


def garp(interface, ip_address, net_ns=None):
"""Sends a gratuitous ARP for ip_address on the interface.

:param interface: The interface name to send the GARP on.
:param ip_address: The IP address to advertise in the GARP.
:param net_ns: The network namespace to send the GARP from.
:returns: None
"""
ARP_ETHERTYPE = 0x0806
BROADCAST_MAC = b'\xff\xff\xff\xff\xff\xff'

# Get a socket, optionally inside a network namespace
garp_socket = None
if net_ns:
with network_namespace.NetworkNamespace(net_ns):
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)
else:
garp_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW)

# Bind the socket with the ARP ethertype protocol
garp_socket.bind((interface, ARP_ETHERTYPE))

# Get the MAC address of the interface
source_mac = garp_socket.getsockname()[4]

garp_msg = [
pack('!h', 1), # Hardware type ethernet
pack('!h', 0x0800), # Protocol type IPv4
pack('!B', 6), # Hardware size
pack('!B', 4), # Protocol size
pack('!h', 1), # Opcode request
source_mac, # Sender MAC address
socket.inet_aton(ip_address), # Sender IP address
BROADCAST_MAC, # Target MAC address
socket.inet_aton(ip_address)] # Target IP address

garp_ethernet = [
BROADCAST_MAC, # Ethernet destination
source_mac, # Ethernet source
pack('!h', ARP_ETHERTYPE), # Ethernet type
b''.join(garp_msg)] # The GARP message

garp_socket.send(b''.join(garp_ethernet))
garp_socket.close()


def calculate_icmpv6_checksum(packet):
"""Calculate the ICMPv6 checksum for a packet.

:param packet: The packet bytes to checksum.
:returns: The checksum integer.
"""
total = 0

# Add up 16-bit words
num_words = len(packet) // 2
for chunk in unpack("!%sH" % num_words, packet[0:num_words * 2]):
total += chunk

# Add any left over byte
if len(packet) % 2:
total += bytearray(packet)[-1] << 8

# Fold 32-bits into 16-bits
total = (total >> 16) + (total & 0xffff)
total += total >> 16
return ~total + 0x10000 & 0xffff


def neighbor_advertisement(interface, ip_address, net_ns=None):
"""Sends a unsolicited neighbor advertisement for an ip on the interface.

:param interface: The interface name to send the GARP on.
:param ip_address: The IP address to advertise in the GARP.
:param net_ns: The network namespace to send the GARP from.
:returns: None
"""
ALL_NODES_ADDR = 'ff02::1'
SIOCGIFHWADDR = 0x8927

# Get a socket, optionally inside a network namespace
na_socket = None
if net_ns:
with network_namespace.NetworkNamespace(net_ns):
na_socket = socket.socket(
socket.AF_INET6, socket.SOCK_RAW,
socket.getprotobyname(constants.IPV6_ICMP))
else:
na_socket = socket.socket(socket.AF_INET6, socket.SOCK_RAW,
socket.getprotobyname(constants.IPV6_ICMP))

# Per RFC 4861 section 4.4, the hop limit should be 255
na_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 255)

# Bind the socket with the source address
na_socket.bind((ip_address, 0))

# Get the byte representation of the MAC address of the interface
# Note: You can't use getsockname() to get the MAC on this type of socket
source_mac = fcntl.ioctl(na_socket.fileno(), SIOCGIFHWADDR, pack('256s',
bytes(interface.encode('utf-8'))))[18:24]

# Get the byte representation of the source IP address
source_ip_bytes = socket.inet_pton(socket.AF_INET6, ip_address)

icmpv6_na_msg_prefix = [
pack('!B', 136), # ICMP Type Neighbor Advertisement
pack('!B', 0)] # ICMP Code
icmpv6_na_msg_postfix = [
pack('!I', 0xa0000000), # Flags (Router, Override)
source_ip_bytes, # Target address
pack('!B', 2), # ICMPv6 option type target link-layer address
pack('!B', 1), # ICMPv6 option length
source_mac] # ICMPv6 option link-layer address

# Calculate the ICMPv6 checksum
icmpv6_pseudo_header = [
source_ip_bytes, # Source IP address
socket.inet_pton(socket.AF_INET6, ALL_NODES_ADDR), # Destination IP
pack('!I', 58), # IPv6 next header (ICMPv6)
pack('!h', 32)] # IPv6 payload length
icmpv6_tmp_chksum = pack('!H', 0) # Checksum are zeros for calculation
tmp_chksum_msg = b''.join(icmpv6_pseudo_header + icmpv6_na_msg_prefix +
[icmpv6_tmp_chksum] + icmpv6_pseudo_header)
checksum = pack('!H', calculate_icmpv6_checksum(tmp_chksum_msg))

# Build the ICMPv6 unsolicitated neighbor advertisement
icmpv6_msg = b''.join(icmpv6_na_msg_prefix + [checksum] +
icmpv6_na_msg_postfix)

na_socket.sendto(icmpv6_msg, (ALL_NODES_ADDR, 0, 0, 0))
na_socket.close()


def send_ip_advertisement(interface, ip_address, net_ns=None):
"""Send an address advertisement.

This method will send either GARP (IPv4) or neighbor advertisements (IPv6)
for the ip address specified.

:param interface: The interface name to send the advertisement on.
:param ip_address: The IP address to advertise.
:param net_ns: The network namespace to send the advertisement from.
:returns: None
"""
try:
if common_utils.is_ipv4(ip_address):
garp(interface, ip_address, net_ns)
elif common_utils.is_ipv6(ip_address):
neighbor_advertisement(interface, ip_address, net_ns)
else:
LOG.error('Unknown IP version for address: "%s". Skipping',
ip_address)
except Exception as e:
LOG.warning('Unable to send address advertisement for address: "%s", '
'error: %s. Skipping', ip_address, str(e))

+ 50
- 0
octavia/amphorae/backends/utils/network_namespace.py View File

@@ -0,0 +1,50 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import os


class NetworkNamespace(object):
"""A network namespace context manager.

Runs wrapped code inside the specified network namespace.

:param netns: The network namespace name to enter.
"""
# from linux/sched.h - We want to enter a network namespace
CLONE_NEWNET = 0x40000000

@staticmethod
def _error_handler(result, func, arguments):
if result == -1:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))

def __init__(self, netns):
self.current_netns = '/proc/{pid}/ns/net'.format(pid=os.getpid())
self.target_netns = '/var/run/netns/{netns}'.format(netns=netns)
# reference: man setns(2)
self.set_netns = ctypes.CDLL('libc.so.6', use_errno=True).setns
self.set_netns.errcheck = self._error_handler

def __enter__(self):
# Save the current network namespace
self.current_netns_fd = open(self.current_netns)
with open(self.target_netns) as fd:
self.set_netns(fd.fileno(), self.CLONE_NEWNET)

def __exit__(self, *args):
# Return to the previous network namespace
self.set_netns(self.current_netns_fd.fileno(), self.CLONE_NEWNET)
self.current_netns_fd.close()

+ 86
- 0
octavia/amphorae/backends/utils/network_utils.py View File

@@ -0,0 +1,86 @@
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress

import pyroute2
import six

from octavia.common import exceptions


def _find_interface(ip_address, rtnl_api, normalized_addr):
"""Find the interface using a routing netlink API.

:param ip_address: The IP address to search with.
:param rtnl_api: A pyroute2 rtnl_api instance. (IPRoute, NetNS, etc.)
:returns: The interface name if found, None if not found.
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
"""
for addr in rtnl_api.get_addr(address=ip_address):
# Save the interface index as IPv6 records don't list a textual
# interface
interface_idx = addr['index']
# Search through the attributes of each address record
for attr in addr['attrs']:
# Look for the attribute name/value pair for the address
if attr[0] == 'IFA_ADDRESS':
# Compare the normalized address with the address we are
# looking for. Since we have matched the name above, attr[1]
# is the address value
if normalized_addr == ipaddress.ip_address(
six.text_type(attr[1])).compressed:
# Lookup the matching interface name by getting the
# interface with the index we found in the above address
# search
lookup_int = rtnl_api.get_links(interface_idx)
# Search through the attributes of the matching interface
# record
for int_attr in lookup_int[0]['attrs']:
# Look for the attribute name/value pair that includes
# the interface name
if int_attr[0] == 'IFLA_IFNAME':
# Return the matching interface name that is in
# int_attr[1] for the matching interface attribute
# name
return int_attr[1]
# We didn't find an interface with that IP address.
return None


def get_interface_name(ip_address, net_ns=None):
"""Gets the interface name from an IP address.

:param ip_address: The IP address to lookup.
:param net_ns: The network namespace to find the interface in.
:returns: The interface name.
:raises exceptions.InvalidIPAddress: Invalid IP address provided.
:raises octavia.common.exceptions.NotFound: No interface was found.
"""
# We need to normalize the address as IPv6 has multiple representations
# fe80:0000:0000:0000:f816:3eff:fef2:2058 == fe80::f816:3eff:fef2:2058
try:
normalized_addr = ipaddress.ip_address(
six.text_type(ip_address)).compressed
except ValueError:
raise exceptions.InvalidIPAddress(ip_addr=ip_address)

if net_ns:
with pyroute2.NetNS(net_ns) as rtnl_api:
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
else:
with pyroute2.IPRoute() as rtnl_api:
interface = _find_interface(ip_address, rtnl_api, normalized_addr)
if interface is not None:
return interface
raise exceptions.NotFound(resource='IP address', id=ip_address)

+ 30
- 11
octavia/amphorae/drivers/driver_base.py View File

@@ -199,6 +199,21 @@ class AmphoraLoadBalancerDriver(object):
:type agent_config: string
"""

@abc.abstractmethod
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
"""Get the interface name from an IP address.

:param amphora: The amphora to query.
:type amphora: octavia.db.models.Amphora
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
:type ip_address: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:type timeout_dict: dict
"""


@six.add_metaclass(abc.ABCMeta)
class HealthMixin(object):
@@ -252,10 +267,17 @@ class VRRPDriverMixin(object):
class XYZ: ...
"""
@abc.abstractmethod
def update_vrrp_conf(self, loadbalancer):
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
"""Update amphorae of the loadbalancer with a new VRRP configuration

:param loadbalancer: loadbalancer object
:param amphorae_network_config: amphorae network configurations
:param amphora: The amphora object to update.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""

@abc.abstractmethod
@@ -266,10 +288,14 @@ class VRRPDriverMixin(object):
"""

@abc.abstractmethod
def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
def start_vrrp_service(self, amphora, timeout_dict=None):
"""Start the VRRP services on the amphora

:param loadbalancer: loadbalancer object
:param amphora: The amphora object to start the service on.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""

@abc.abstractmethod
@@ -278,10 +304,3 @@ class VRRPDriverMixin(object):

:param loadbalancer: loadbalancer object
"""

@abc.abstractmethod
def get_vrrp_interface(self, amphora):
"""Get the VRRP interface object for a specific amphora

:param amphora: amphora object
"""

+ 4
- 3
octavia/amphorae/drivers/haproxy/exceptions.py View File

@@ -20,7 +20,7 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)


def check_exception(response, ignore=tuple()):
def check_exception(response, ignore=tuple(), log_error=True):
status_code = response.status_code
responses = {
400: InvalidRequest,
@@ -34,8 +34,9 @@ def check_exception(response, ignore=tuple()):
}
if (status_code not in ignore) and (status_code in responses):
try:
LOG.error('Amphora agent returned unexpected result code %s with '
'response %s', status_code, response.json())
if log_error:
LOG.error('Amphora agent returned unexpected result code %s '
'with response %s', status_code, response.json())
except Exception:
# Handle the odd case where there is no response body
# like when using requests_mock which doesn't support has_body


+ 42
- 18
octavia/amphorae/drivers/haproxy/rest_api_driver.py View File

@@ -91,7 +91,7 @@ class HaproxyAmphoraLoadBalancerDriver(

return haproxy_version_string.split('.')[:2]

def _populate_amphora_api_version(self, amphora,
def _populate_amphora_api_version(self, amphora, timeout_dict=None,
raise_retry_exception=False):
"""Populate the amphora object with the api_version

@@ -103,7 +103,7 @@ class HaproxyAmphoraLoadBalancerDriver(
if not getattr(amphora, 'api_version', None):
try:
amphora.api_version = self.clients['base'].get_api_version(
amphora,
amphora, timeout_dict=timeout_dict,
raise_retry_exception=raise_retry_exception)['api_version']
except exc.NotFound:
# Amphora is too old for version discovery, default to 0.5
@@ -292,8 +292,11 @@ class HaproxyAmphoraLoadBalancerDriver(
getattr(self.clients[amp.api_version], func_name)(
amp, loadbalancer.id, *args)

def start(self, loadbalancer, amphora=None):
self._apply('start_listener', loadbalancer, amphora)
def reload(self, loadbalancer, amphora=None, timeout_dict=None):
self._apply('reload_listener', loadbalancer, amphora, timeout_dict)

def start(self, loadbalancer, amphora=None, timeout_dict=None):
self._apply('start_listener', loadbalancer, amphora, timeout_dict)

def delete(self, listener):
# Delete any UDP listeners the old way (we didn't update the way they
@@ -581,6 +584,28 @@ class HaproxyAmphoraLoadBalancerDriver(
'API.'.format(amphora.id))
raise driver_except.AmpDriverNotImplementedError()

def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
"""Get the interface name for an IP address.

:param amphora: The amphora to query.
:type amphora: octavia.db.models.Amphora
:param ip_address: The IP address to lookup. (IPv4 or IPv6)
:type ip_address: string
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
:type timeout_dict: dict
:returns: None if not found, the interface name string if found.
"""
try:
self._populate_amphora_api_version(amphora, timeout_dict)
response_json = self.clients[amphora.api_version].get_interface(
amphora, ip_address, timeout_dict, log_error=False)
return response_json.get('interface', None)
except (exc.NotFound, driver_except.TimeOutException):
return None


# Check a custom hostname
class CustomHostNameCheckingAdapter(requests.adapters.HTTPAdapter):
@@ -707,9 +732,10 @@ class AmphoraAPIClientBase(object):
'exception': exception})
raise driver_except.TimeOutException()

def get_api_version(self, amp, raise_retry_exception=False):
def get_api_version(self, amp, timeout_dict=None,
raise_retry_exception=False):
amp.api_version = None
r = self.get(amp, retry_404=False,
r = self.get(amp, retry_404=False, timeout_dict=timeout_dict,
raise_retry_exception=raise_retry_exception)
# Handle 404 special as we don't want to log an ERROR on 404
exc.check_exception(r, (404,))
@@ -810,16 +836,15 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
r = self.put(amp, 'vrrp/upload', data=config)
return exc.check_exception(r)

def _vrrp_action(self, action, amp):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
def _vrrp_action(self, action, amp, timeout_dict=None):
r = self.put(amp, 'vrrp/{action}'.format(action=action),
timeout_dict=timeout_dict)
return exc.check_exception(r)

def get_interface(self, amp, ip_addr, timeout_dict=None):
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()
return None
return exc.check_exception(r, log_error=log_error).json()

def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(
@@ -940,16 +965,15 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
r = self.put(amp, 'vrrp/upload', data=config)
return exc.check_exception(r)

def _vrrp_action(self, action, amp):
r = self.put(amp, 'vrrp/{action}'.format(action=action))
def _vrrp_action(self, action, amp, timeout_dict=None):
r = self.put(amp, 'vrrp/{action}'.format(action=action),
timeout_dict=timeout_dict)
return exc.check_exception(r)

def get_interface(self, amp, ip_addr, timeout_dict=None):
def get_interface(self, amp, ip_addr, timeout_dict=None, log_error=True):
r = self.get(amp, 'interface/{ip_addr}'.format(ip_addr=ip_addr),
timeout_dict=timeout_dict)
if exc.check_exception(r):
return r.json()
return None
return exc.check_exception(r, log_error=log_error).json()

def upload_udp_config(self, amp, listener_id, config, timeout_dict=None):
r = self.put(


+ 36
- 31
octavia/amphorae/drivers/keepalived/vrrp_rest_driver.py View File

@@ -30,29 +30,35 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
# The Mixed class must define a self.client object for the
# AmphoraApiClient

def update_vrrp_conf(self, loadbalancer, amphorae_network_config):
"""Update amphorae of the loadbalancer with a new VRRP configuration
def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
"""Update amphora of the loadbalancer with a new VRRP configuration

:param loadbalancer: loadbalancer object
:param amphorae_network_config: amphorae network configurations
:param amphora: The amphora object to update.
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
templater = jinja_cfg.KeepalivedJinjaTemplater()
if amphora.status != constants.AMPHORA_ALLOCATED:
LOG.debug('update_vrrp_conf called for un-allocated amphora %s. '
'Ignoring.', amphora.id)
return

LOG.debug("Update loadbalancer %s amphora VRRP configuration.",
loadbalancer.id)
templater = jinja_cfg.KeepalivedJinjaTemplater()

for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
LOG.debug("Update amphora %s VRRP configuration.", amphora.id)

self._populate_amphora_api_version(amp)
# Get the VIP subnet prefix for the amphora
vip_cidr = amphorae_network_config[amp.id].vip_subnet.cidr
self._populate_amphora_api_version(amphora)
# Get the VIP subnet prefix for the amphora
vip_cidr = amphorae_network_config[amphora.id].vip_subnet.cidr

# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(
loadbalancer, amp, vip_cidr)
self.clients[amp.api_version].upload_vrrp_config(amp, config)
# Generate Keepalived configuration from loadbalancer object
config = templater.build_keepalived_config(
loadbalancer, amphora, vip_cidr)
self.clients[amphora.api_version].upload_vrrp_config(amphora, config)

def stop_vrrp_service(self, loadbalancer):
"""Stop the vrrp services running on the loadbalancer's amphorae
@@ -69,21 +75,25 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):
self._populate_amphora_api_version(amp)
self.clients[amp.api_version].stop_vrrp(amp)

def start_vrrp_service(self, loadbalancer):
"""Start the VRRP services of all amphorae of the loadbalancer
def start_vrrp_service(self, amphora, timeout_dict=None):
"""Start the VRRP services on an amphorae.

:param loadbalancer: loadbalancer object
:param amphora: amphora object
:param timeout_dict: Dictionary of timeout values for calls to the
amphora. May contain: req_conn_timeout,
req_read_timeout, conn_max_retries,
conn_retry_interval
"""
LOG.info("Start loadbalancer %s amphora VRRP Service.",
loadbalancer.id)
if amphora.status != constants.AMPHORA_ALLOCATED:
LOG.debug('start_vrrp_service called for un-allocated amphora %s. '
'Ignoring.', amphora.id)
return

for amp in six.moves.filter(
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
LOG.info("Start amphora %s VRRP Service.", amphora.id)

LOG.debug("Start VRRP Service on amphora %s .", amp.lb_network_ip)
self._populate_amphora_api_version(amp)
self.clients[amp.api_version].start_vrrp(amp)
self._populate_amphora_api_version(amphora)
self.clients[amphora.api_version].start_vrrp(amphora,
timeout_dict=timeout_dict)

def reload_vrrp_service(self, loadbalancer):
"""Reload the VRRP services of all amphorae of the loadbalancer
@@ -99,8 +109,3 @@ class KeepalivedAmphoraDriverMixin(driver_base.VRRPDriverMixin):

self._populate_amphora_api_version(amp)
self.clients[amp.api_version].reload_vrrp(amp)

def get_vrrp_interface(self, amphora, timeout_dict=None):
self._populate_amphora_api_version(amphora)
return self.clients[amphora.api_version].get_interface(
amphora, amphora.vrrp_ip, timeout_dict=timeout_dict)['interface']

+ 14
- 5
octavia/amphorae/drivers/noop_driver/driver.py View File

@@ -113,6 +113,13 @@ class NoopManager(object):
self.amphoraconfig[amphora.id, agent_config] = (
amphora.id, agent_config, 'update_amphora_agent_config')

def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
LOG.debug("Amphora %s no-op, get interface from amphora %s for IP %s",
self.__class__.__name__, amphora.id, ip_address)
if ip_address == '198.51.100.99':
return "noop0"
return None


class NoopAmphoraLoadBalancerDriver(
driver_base.AmphoraLoadBalancerDriver,
@@ -167,17 +174,19 @@ class NoopAmphoraLoadBalancerDriver(
def update_amphora_agent_config(self, amphora, agent_config):
self.driver.update_amphora_agent_config(amphora, agent_config)

def update_vrrp_conf(self, loadbalancer):
def get_interface_from_ip(self, amphora, ip_address, timeout_dict=None):
return self.driver.get_interface_from_ip(amphora, ip_address,
timeout_dict)

def update_vrrp_conf(self, loadbalancer, amphorae_network_config, amphora,
timeout_dict=None):
pass

def stop_vrrp_service(self, loadbalancer):
pass

def start_vrrp_service(self, loadbalancer):
def start_vrrp_service(self, amphora, timeout_dict=None):
pass

def reload_vrrp_service(self, loadbalancer):
pass

def get_vrrp_interface(self, amphora):
pass

+ 5
- 2
octavia/api/drivers/amphora_driver/v1/driver.py View File

@@ -71,8 +71,11 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
try:
vip = network_driver.allocate_vip(lb_obj)
except network_base.AllocateVIPException as e:
raise exceptions.DriverError(user_fault_string=e.orig_msg,
operator_fault_string=e.orig_msg)
message = str(e)
if getattr(e, 'orig_msg', None) is not None:
message = e.orig_msg
raise exceptions.DriverError(user_fault_string=message,
operator_fault_string=message)

LOG.info('Amphora provider created VIP port %s for load balancer %s.',
vip.port_id, loadbalancer_id)


+ 6
- 0
octavia/api/drivers/utils.py View File

@@ -25,6 +25,7 @@ from oslo_utils import excutils
from stevedore import driver as stevedore_driver

from octavia.api.drivers import exceptions as driver_exceptions
from octavia.common import constants
from octavia.common import data_models
from octavia.common import exceptions
from octavia.common.tls_utils import cert_parser
@@ -542,6 +543,9 @@ def vip_dict_to_provider_dict(vip_dict):
new_vip_dict['vip_subnet_id'] = vip_dict['subnet_id']
if 'qos_policy_id' in vip_dict:
new_vip_dict['vip_qos_policy_id'] = vip_dict['qos_policy_id']
if constants.OCTAVIA_OWNED in vip_dict:
new_vip_dict[constants.OCTAVIA_OWNED] = vip_dict[
constants.OCTAVIA_OWNED]
return new_vip_dict


@@ -557,4 +561,6 @@ def provider_vip_dict_to_vip_obj(vip_dictionary):
vip_obj.subnet_id = vip_dictionary['vip_subnet_id']
if 'vip_qos_policy_id' in vip_dictionary:
vip_obj.qos_policy_id = vip_dictionary['vip_qos_policy_id']
if constants.OCTAVIA_OWNED in vip_dictionary:
vip_obj.octavia_owned = vip_dictionary[constants.OCTAVIA_OWNED]
return vip_obj

+ 40
- 17
octavia/api/v2/controllers/load_balancer.py View File

@@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress

from oslo_config import cfg
from oslo_db import exception as odb_exceptions
@@ -19,6 +20,7 @@ from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import strutils
import pecan
import six
from sqlalchemy.orm import exc as sa_exception
from wsme import types as wtypes
from wsmeext import pecan as wsme_pecan
@@ -186,26 +188,40 @@ class LoadBalancersController(base.BaseController):
isinstance(load_balancer.vip_qos_policy_id, wtypes.UnsetType)):
load_balancer.vip_qos_policy_id = port_qos_policy_id

# Identify the subnet for this port
if load_balancer.vip_subnet_id:
# If we were provided a subnet_id, validate it exists and that
# there is a fixed_ip on the port that matches the provided subnet
validate.subnet_exists(subnet_id=load_balancer.vip_subnet_id,
context=context)
else:
if load_balancer.vip_address:
for port_fixed_ip in port.fixed_ips:
if port_fixed_ip.ip_address == load_balancer.vip_address:
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
break
if not load_balancer.vip_subnet_id:
raise exceptions.ValidationException(detail=_(
"Specified VIP address not found on the "
"specified VIP port."))
elif len(port.fixed_ips) == 1:
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
else:
for port_fixed_ip in port.fixed_ips:
if port_fixed_ip.subnet_id == load_balancer.vip_subnet_id:
load_balancer.vip_address = port_fixed_ip.ip_address
break # Just pick the first address found in the subnet
if not load_balancer.vip_address:
raise exceptions.ValidationException(detail=_(
"No VIP address found on the specified VIP port within "
"the specified subnet."))
elif load_balancer.vip_address:
normalized_lb_ip = ipaddress.ip_address(
six.text_type(load_balancer.vip_address)).compressed
for port_fixed_ip in port.fixed_ips:
normalized_port_ip = ipaddress.ip_address(
six.text_type(port_fixed_ip.ip_address)).compressed
if normalized_port_ip == normalized_lb_ip:
load_balancer.vip_subnet_id = port_fixed_ip.subnet_id
break
if not load_balancer.vip_subnet_id:
raise exceptions.ValidationException(detail=_(
"VIP port's subnet could not be determined. Please "
"specify either a VIP subnet or address."))
"Specified VIP address not found on the "
"specified VIP port."))
elif len(port.fixed_ips) == 1:
# User provided only a port, get the subnet and address from it
load_balancer.vip_subnet_id = port.fixed_ips[0].subnet_id
load_balancer.vip_address = port.fixed_ips[0].ip_address
else:
raise exceptions.ValidationException(detail=_(
"VIP port's subnet could not be determined. Please "
"specify either a VIP subnet or address."))

def _validate_vip_request_object(self, load_balancer, context=None):
allowed_network_objects = []
@@ -398,7 +414,10 @@ class LoadBalancersController(base.BaseController):
# flavor dict instead of just the flavor_id we store in the DB.
lb_dict['flavor'] = flavor_dict

# See if the provider driver wants to create the VIP port
# See if the provider driver wants to manage the VIP port
# This will still be called if the user provided a port to
# allow drivers to collect any required information about the
# VIP port.
octavia_owned = False
try:
provider_vip_dict = driver_utils.vip_dict_to_provider_dict(
@@ -418,6 +437,10 @@ class LoadBalancersController(base.BaseController):
if 'port_id' not in vip_dict or not vip_dict['port_id']:
octavia_owned = True

# Check if the driver claims octavia owns the VIP port.
if vip.octavia_owned:
octavia_owned = True

self.repositories.vip.update(
lock_session, db_lb.id, ip_address=vip.ip_address,
port_id=vip.port_id, network_id=vip.network_id,


+ 43
- 1
octavia/common/config.py View File

@@ -170,6 +170,20 @@ amphora_agent_opts = [
help='The UDP API backend for amphora agent.'),
]

compute_opts = [
cfg.IntOpt('max_retries', default=15,
help=_('The maximum attempts to retry an action with the '
'compute service.')),
cfg.IntOpt('retry_interval', default=1,
help=_('Seconds to wait before retrying an action with the '
'compute service.')),
cfg.IntOpt('retry_backoff', default=1,
help=_('The seconds to backoff retry attempts.')),
cfg.IntOpt('retry_max', default=10,
help=_('The maximum interval in seconds between retry '
'attempts.')),
]

networking_opts = [
cfg.IntOpt('max_retries', default=15,
help=_('The maximum attempts to retry an action with the '
@@ -177,6 +191,11 @@ networking_opts = [
cfg.IntOpt('retry_interval', default=1,
help=_('Seconds to wait before retrying an action with the '
'networking service.')),
cfg.IntOpt('retry_backoff', default=1,
help=_('The seconds to backoff retry attempts.')),
cfg.IntOpt('retry_max', default=10,
help=_('The maximum interval in seconds between retry '
'attempts.')),
cfg.IntOpt('port_detach_timeout', default=300,
help=_('Seconds to wait for a port to detach from an '
'amphora.')),
@@ -289,6 +308,14 @@ haproxy_amphora_opts = [
default=2,
help=_('Retry timeout between connection attempts in '
'seconds for active amphora.')),
cfg.IntOpt('failover_connection_max_retries',
default=2,
help=_('Retry threshold for connecting to an amphora in '
'failover.')),
cfg.IntOpt('failover_connection_retry_interval',
default=5,
help=_('Retry timeout between connection attempts in '
'seconds for amphora in failover.')),
cfg.IntOpt('build_rate_limit',
default=-1,
help=_('Number of amphorae that could be built per controller '
@@ -352,6 +379,16 @@ haproxy_amphora_opts = [
deprecated_reason='This is now automatically discovered '
' and configured.',
help=_("If False, use sysvinit.")),
cfg.IntOpt('api_db_commit_retry_attempts', default=15,
help=_('The number of times the database action will be '
'attempted.')),
cfg.IntOpt('api_db_commit_retry_initial_delay', default=1,
help=_('The initial delay before a retry attempt.')),
cfg.IntOpt('api_db_commit_retry_backoff', default=1,
help=_('The time to backoff retry attempts.')),
cfg.IntOpt('api_db_commit_retry_max', default=5,
help=_('The maximum amount of time to wait between retry '
'attempts.')),
]

controller_worker_opts = [
@@ -434,7 +471,11 @@ controller_worker_opts = [
help=_('If True, build cloud-init user-data that is passed '
'to the config drive on Amphora boot instead of '
'personality files. If False, utilize personality '
'files.'))
'files.')),
cfg.IntOpt('amphora_delete_retries', default=5,
help=_('Number of times an amphora delete should be retried.')),
cfg.IntOpt('amphora_delete_retry_interval', default=5,
help=_('Time, in seconds, between amphora delete retries.')),
]

task_flow_opts = [
@@ -723,6 +764,7 @@ driver_agent_opts = [
cfg.CONF.register_opts(core_opts)
cfg.CONF.register_opts(api_opts, group='api_settings')
cfg.CONF.register_opts(amphora_agent_opts, group='amphora_agent')
cfg.CONF.register_opts(compute_opts, group='compute')
cfg.CONF.register_opts(networking_opts, group='networking')
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
cfg.CONF.register_opts(haproxy_amphora_opts, group='haproxy_amphora')


+ 58
- 2
octavia/common/constants.py View File

@@ -294,7 +294,10 @@ SUPPORTED_TASKFLOW_ENGINE_TYPES = ['serial', 'parallel']
# Task/Flow constants
ACTIVE_CONNECTIONS = 'active_connections'
ADDED_PORTS = 'added_ports'
ADMIN_STATE_UP = 'admin_state_up'
ALLOWED_ADDRESS_PAIRS = 'allowed_address_pairs'
AMP_DATA = 'amp_data'
AMP_VRRP_INT = 'amp_vrrp_int'
AMPHORA = 'amphora'
AMPHORA_ID = 'amphora_id'
AMPHORA_INDEX = 'amphora_index'
@@ -303,9 +306,12 @@ AMPHORAE = 'amphorae'
AMPHORAE_NETWORK_CONFIG = 'amphorae_network_config'
AMPS_DATA = 'amps_data'
ANTI_AFFINITY = 'anti-affinity'
ATTEMPT_NUMBER = 'attempt_number'
BASE_PORT = 'base_port'
BYTES_IN = 'bytes_in'
BYTES_OUT = 'bytes_out'
CA_TLS_CERTIFICATE_ID = 'ca_tls_certificate_id'
CIDR = 'cidr'
CLIENT_CA_TLS_CERTIFICATE_ID = 'client_ca_tls_certificate_id'
CLIENT_CRL_CONTAINER_ID = 'client_crl_container_id'
COMPUTE_ID = 'compute_id'
@@ -317,17 +323,22 @@ CRL_CONTAINER_ID = 'crl_container_id'
DELTA = 'delta'
DELTAS = 'deltas'
DESCRIPTION = 'description'
DEVICE_OWNER = 'device_owner'
ENABLED = 'enabled'
FAILED_AMP_VRRP_PORT_ID = 'failed_amp_vrrp_port_id'
FAILED_AMPHORA = 'failed_amphora'
FAILOVER_AMPHORA = 'failover_amphora'
FAILOVER_AMPHORA_ID = 'failover_amphora_id'
FIELDS = 'fields'
FIXED_IPS = 'fixed_ips'
FLAVOR_ID = 'flavor_id'
HEALTH_MON = 'health_mon'
HEALTH_MONITOR = 'health_monitor'
HEALTH_MONITOR_ID = 'health_monitor_id'
HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
ID = 'id'
IP_ADDRESS = 'ip_address'
IPV6_ICMP = 'ipv6-icmp'
L7POLICY = 'l7policy'
L7POLICY_ID = 'l7policy_id'
L7POLICY_UPDATES = 'l7policy_updates'
@@ -345,17 +356,21 @@ MEMBER = 'member'
MEMBER_ID = 'member_id'
MEMBER_PORTS = 'member_ports'
MEMBER_UPDATES = 'member_updates'
MESSAGE = 'message'
NAME = 'name'
NETWORK_ID = 'network_id'
NICS = 'nics'
OBJECT = 'object'
PASSIVE_FAILURE = 'passive_failure'
PEER_PORT = 'peer_port'
POOL = 'pool'
POOL_CHILD_COUNT = 'pool_child_count'
POOL_ID = 'pool_id'
POOL_UPDATES = 'pool_updates'
PORT = 'port'
PORT_ID = 'port_id'
PORTS = 'ports'
PROJECT_ID = 'project_id'
PROVIDER = 'provider'
PROVIDER_NAME = 'provider_name'
QOS_POLICY_ID = 'qos_policy_id'
@@ -363,12 +378,18 @@ REDIRECT_POOL = 'redirect_pool'
REQ_CONN_TIMEOUT = 'req_conn_timeout'
REQ_READ_TIMEOUT = 'req_read_timeout'
REQUEST_ERRORS = 'request_errors'
SECURITY_GROUPS = 'security_groups'
SECURITY_GROUP_RULES = 'security_group_rules'
SERVER_GROUP_ID = 'server_group_id'
SERVER_PEM = 'server_pem'
SNI_CONTAINERS = 'sni_containers'
SOFT_ANTI_AFFINITY = 'soft-anti-affinity'
STATUS_CODE = 'status_code'
SUBNET = 'subnet'
SUBNET_ID = 'subnet_id'
SUBNET = 'subnet'
TAGS = 'tags'
TENANT_ID = 'tenant_id'
TIMEOUT_DICT = 'timeout_dict'
TLS_CERTIFICATE_ID = 'tls_certificate_id'
TLS_CONTAINER_ID = 'tls_container_id'
@@ -378,7 +399,10 @@ UPDATED_AT = 'updated_at'
UPDATE_DICT = 'update_dict'
VIP = 'vip'
VIP_NETWORK = 'vip_network'
VIP_SG_ID = 'vip_sg_id'
VIP_SUBNET = 'vip_subnet'
VRRP_GROUP = 'vrrp_group'
VRRP_PORT = 'vrrp_port'

# Taskflow flow and task names
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
@@ -398,6 +422,7 @@ CREATE_POOL_FLOW = 'octavia-create-pool-flow'
CREATE_L7POLICY_FLOW = 'octavia-create-l7policy-flow'
CREATE_L7RULE_FLOW = 'octavia-create-l7rule-flow'
DELETE_AMPHORA_FLOW = 'octavia-delete-amphora-flow'
DELETE_EXTRA_AMPHORAE_FLOW = 'octavia-delete-extra-amphorae-flow'
DELETE_HEALTH_MONITOR_FLOW = 'octavia-delete-health-monitor-flow'
DELETE_LISTENER_FLOW = 'octavia-delete-listener_flow'
DELETE_LOADBALANCER_FLOW = 'octavia-delete-loadbalancer-flow'
@@ -406,6 +431,7 @@ DELETE_POOL_FLOW = 'octavia-delete-pool-flow'
DELETE_L7POLICY_FLOW = 'octavia-delete-l7policy-flow'
DELETE_L7RULE_FLOW = 'octavia-delete-l7policy-flow'
FAILOVER_AMPHORA_FLOW = 'octavia-failover-amphora-flow'
FAILOVER_LOADBALANCER_FLOW = 'octavia-failover-loadbalancer-flow'
LOADBALANCER_NETWORKING_SUBFLOW = 'octavia-new-loadbalancer-net-subflow'
UPDATE_HEALTH_MONITOR_FLOW = 'octavia-update-health-monitor-flow'
UPDATE_LISTENER_FLOW = 'octavia-update-listener-flow'
@@ -419,10 +445,13 @@ UPDATE_AMPHORA_CONFIG_FLOW = 'octavia-update-amp-config-flow'

POST_MAP_AMP_TO_LB_SUBFLOW = 'octavia-post-map-amp-to-lb-subflow'
CREATE_AMP_FOR_LB_SUBFLOW = 'octavia-create-amp-for-lb-subflow'
CREATE_AMP_FOR_FAILOVER_SUBFLOW = 'octavia-create-amp-for-failover-subflow'
AMP_PLUG_NET_SUBFLOW = 'octavia-plug-net-subflow'
GET_AMPHORA_FOR_LB_SUBFLOW = 'octavia-get-amphora-for-lb-subflow'
POST_LB_AMP_ASSOCIATION_SUBFLOW = (
'octavia-post-loadbalancer-amp_association-subflow')
AMPHORA_LISTENER_START_SUBFLOW = 'amphora-listener-start-subflow'
AMPHORA_LISTENER_RELOAD_SUBFLOW = 'amphora-listener-start-subflow'

MAP_LOADBALANCER_TO_AMPHORA = 'octavia-mapload-balancer-to-amphora'
RELOAD_AMPHORA = 'octavia-reload-amphora'
@@ -438,7 +467,7 @@ COMPUTE_WAIT = 'octavia-compute-wait'
UPDATE_AMPHORA_INFO = 'octavia-update-amphora-info'
AMPHORA_FINALIZE = 'octavia-amphora-finalize'
MARK_AMPHORA_ALLOCATED_INDB = 'octavia-mark-amphora-allocated-indb'
RELOADLOAD_BALANCER = 'octavia-reloadload-balancer'
MARK_AMPHORA_READY_INDB = 'octavia-mark-amphora-ready-indb'
MARK_LB_ACTIVE_INDB = 'octavia-mark-lb-active-indb'
MARK_AMP_MASTER_INDB = 'octavia-mark-amp-master-indb'
MARK_AMP_BACKUP_INDB = 'octavia-mark-amp-backup-indb'
@@ -452,6 +481,7 @@ CREATE_VRRP_GROUP_FOR_LB = 'octavia-create-vrrp-group-for-lb'
CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules'
AMP_COMPUTE_CONNECTIVITY_WAIT = 'octavia-amp-compute-connectivity-wait'
AMP_LISTENER_UPDATE = 'octavia-amp-listeners-update'
AMP_LISTENER_START = 'octavia-amp-listeners-start'
PLUG_VIP_AMPHORA = 'octavia-amp-plug-vip'
APPLY_QOS_AMP = 'octavia-amp-apply-qos'
UPDATE_AMPHORA_VIP_DATA = 'ocatvia-amp-update-vip-data'
@@ -459,6 +489,8 @@ GET_AMP_NETWORK_CONFIG = 'octavia-amp-get-network-config'
AMP_POST_VIP_PLUG = 'octavia-amp-post-vip-plug'
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'
AMPHORA_CONFIG_UPDATE_TASK = 'AmphoraConfigUpdateTask'
FIRST_AMP_NETWORK_CONFIGS = 'first-amp-network-configs'
FIRST_AMP_VRRP_INTERFACE = 'first-amp-vrrp_interface'

# Batch Member Update constants
UNORDERED_MEMBER_UPDATES_FLOW = 'octavia-unordered-member-updates-flow'
@@ -473,11 +505,30 @@ UPDATE_MEMBER_INDB = 'octavia-update-member-indb'
DELETE_MEMBER_INDB = 'octavia-delete-member-indb'

# Task Names
ADMIN_DOWN_PORT = 'admin-down-port'
AMPHORA_POST_VIP_PLUG = 'amphora-post-vip-plug'
AMPHORA_RELOAD_LISTENER = 'amphora-reload-listener'
AMPHORA_TO_ERROR_ON_REVERT = 'amphora-to-error-on-revert'
AMPHORAE_POST_NETWORK_PLUG = 'amphorae-post-network-plug'
ATTACH_PORT = 'attach-port'
CALCULATE_AMPHORA_DELTA = 'calculate-amphora-delta'
CREATE_VIP_BASE_PORT = 'create-vip-base-port'
DELETE_AMPHORA = 'delete-amphora'
DELETE_PORT = 'delete-port'
DISABLE_AMP_HEALTH_MONITORING = 'disable-amphora-health-monitoring'
GET_AMPHORA_NETWORK_CONFIGS_BY_ID = 'get-amphora-network-configs-by-id'
GET_AMPHORAE_FROM_LB = 'get-amphorae-from-lb'
HANDLE_NETWORK_DELTA = 'handle-network-delta'
MARK_AMPHORA_DELETED = 'mark-amphora-deleted'
MARK_AMPHORA_PENDING_DELETE = 'mark-amphora-pending-delete'
MARK_AMPHORA_HEALTH_BUSY = 'mark-amphora-health-busy'
RELOAD_AMP_AFTER_PLUG_VIP = 'reload-amp-after-plug-vip'
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'
RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH = 'reload-lb-after-amp-assoc-full-graph'
RELOAD_LB_AFTER_PLUG_VIP = 'reload-lb-after-plug-vip'
RELOAD_LB_BEFOR_ALLOCATE_VIP = "reload-lb-before-allocate-vip"
RELOAD_LB_BEFOR_ALLOCATE_VIP = 'reload-lb-before-allocate-vip'
UPDATE_AMP_FAILOVER_DETAILS = 'update-amp-failover-details'


NOVA_1 = '1.1'
NOVA_21 = '2.1'
@@ -725,3 +776,8 @@ SUPPORTED_VOLUME_DRIVERS = [VOLUME_NOOP_DRIVER,
CINDER_STATUS_AVAILABLE = 'available'
CINDER_STATUS_ERROR = 'error'
CINDER_ACTION_CREATE_VOLUME = 'create volume'

VIP_SECURITY_GROUP_PREFIX = 'lb-'

AMP_BASE_PORT_PREFIX = 'octavia-lb-vrrp-'
OCTAVIA_OWNED = 'octavia_owned'

+ 19
- 1
octavia/common/exceptions.py View File

@@ -204,7 +204,8 @@ class ComputeBuildQueueTimeoutException(OctaviaException):


class ComputeDeleteException(OctaviaException):
message = _('Failed to delete compute instance.')
message = _('Failed to delete compute instance. The compute service '
'reports: %(compute_msg)s')


class ComputeGetException(OctaviaException):
@@ -245,6 +246,14 @@ class ComputeWaitTimeoutException(OctaviaException):
message = _('Waiting for compute id %(id)s to go active timeout.')


class ComputePortInUseException(OctaviaException):
message = _('Compute driver reports port %(port)s is already in use.')


class ComputeUnknownException(OctaviaException):
message = _('Unknown exception from the compute driver: %(exc)s.')


class InvalidTopology(OctaviaException):
message = _('Invalid topology specified: %(topology)s')

@@ -398,3 +407,12 @@ class VolumeDeleteException(OctaviaException):

class VolumeGetException(OctaviaException):
message = _('Failed to retrieve volume instance.')


class NetworkServiceError(OctaviaException):
message = _('The networking service had a failure: %(net_error)s')


class InvalidIPAddress(APIException):
msg = _('The IP Address %(ip_addr)s is invalid.')
code = 400

+ 23
- 0
octavia/common/utils.py View File

@@ -30,6 +30,8 @@ from oslo_utils import excutils
import six
from stevedore import driver as stevedore_driver

from octavia.common import constants

CONF = cfg.CONF

LOG = logging.getLogger(__name__)
@@ -51,6 +53,15 @@ def base64_sha1_string(string_to_hash):
return re.sub(r"^-", "x", b64_sha1)


def get_amphora_driver():
amphora_driver = stevedore_driver.DriverManager(
namespace='octavia.amphora.drivers',
name=CONF.controller_worker.amphora_driver,
invoke_on_load=True
).driver
return amphora_driver


def get_network_driver():
CONF.import_group('controller_worker', 'octavia.common.config')
network_driver = stevedore_driver.DriverManager(
@@ -61,6 +72,12 @@ def get_network_driver():
return network_driver


def is_ipv4(ip_address):
"""Check if ip address is IPv4 address."""
ip = netaddr.IPAddress(ip_address)
return ip.version == 4


def is_ipv6(ip_address):
"""Check if ip address is IPv6 address."""
ip = netaddr.IPAddress(ip_address)
@@ -100,6 +117,12 @@ def ip_netmask_to_cidr(ip, netmask):
return "{ip}/{netmask}".format(ip=net.network, netmask=net.prefixlen)


def get_vip_security_group_name(loadbalancer_id):
if loadbalancer_id:
return constants.VIP_SECURITY_GROUP_PREFIX + loadbalancer_id
return None


def get_six_compatible_value(value, six_type=six.string_types):
if six.PY3 and isinstance(value, six_type):
value = value.encode('utf-8')


+ 4
- 4
octavia/compute/drivers/noop_driver/driver.py View File

@@ -82,8 +82,8 @@ class NoopManager(object):
self.__class__.__name__, server_group_id)
self.computeconfig[server_group_id] = (server_group_id, 'delete')

def attach_network_or_port(self, compute_id, network_id, ip_address=None,
port_id=None):
def attach_network_or_port(self, compute_id, network_id=None,
ip_address=None, port_id=None):
LOG.debug("Compute %s no-op, attach_network_or_port compute_id %s,"
"network_id %s, ip_address %s, port_id %s",
self.__class__.__name__, compute_id,
@@ -145,8 +145,8 @@ class NoopComputeDriver(driver_base.ComputeBase):
def delete_server_group(self, server_group_id):
self.driver.delete_server_group(server_group_id)

def attach_network_or_port(self, compute_id, network_id, ip_address=None,
port_id=None):
def attach_network_or_port(self, compute_id, network_id=None,
ip_address=None, port_id=None):
self.driver.attach_network_or_port(compute_id, network_id, ip_address,
port_id)



+ 33
- 7
octavia/compute/drivers/nova_driver.py View File

@@ -196,9 +196,9 @@ class VirtualMachineManager(compute_base.ComputeBase):
except nova_exceptions.NotFound:
LOG.warning("Nova instance with id: %s not found. "
"Assuming already deleted.", compute_id)
except Exception:
except Exception as e:
LOG.exception("Error deleting nova virtual machine.")
raise exceptions.ComputeDeleteException()
raise exceptions.ComputeDeleteException(compute_msg=str(e))

def status(self, compute_id):
'''Retrieve the status of a virtual machine.
@@ -334,8 +334,8 @@ class VirtualMachineManager(compute_base.ComputeBase):
LOG.exception("Error delete server group instance.")
raise exceptions.ServerGroupObjectDeleteException()

def attach_network_or_port(self, compute_id, network_id, ip_address=None,
port_id=None):
def attach_network_or_port(self, compute_id, network_id=None,
ip_address=None, port_id=None):
"""Attaching a port or a network to an existing amphora

:param compute_id: id of an amphora in the compute service
@@ -343,13 +343,39 @@ class VirtualMachineManager(compute_base.ComputeBase):
:param ip_address: ip address to attempt to be assigned to interface
:param port_id: id of the neutron port
:return: nova interface instance
:raises: Exception
:raises ComputePortInUseException: The port is in use somewhere else
:raises ComputeUnknownException: Unknown nova error
"""
try:
interface = self.manager.interface_attach(
server=compute_id, net_id=network_id, fixed_ip=ip_address,
port_id=port_id)
except Exception:
except nova_exceptions.Conflict as e:
# The port is already in use.
if port_id:
# Check if the port we want is already attached
try:
interfaces = self.manager.interface_list(compute_id)
for interface in interfaces:
if interface.id == port_id:
return interface
except Exception as e:
raise exceptions.ComputeUnknownException(exc=str(e))

raise exceptions.ComputePortInUseException(port=port_id)

# Nova should have created the port, so something is really
# wrong in nova if we get here.
raise exceptions.ComputeUnknownException(exc=str(e))
except nova_exceptions.NotFound as e:
if 'Instance' in str(e):
raise exceptions.NotFound(resource='Instance', id=compute_id)
if 'Network' in str(e):
raise exceptions.NotFound(resource='Network', id=network_id)
if 'Port' in str(e):
raise exceptions.NotFound(resource='Port', id=port_id)
raise exceptions.NotFound(resource=str(e), id=compute_id)
except Exception as e:
LOG.error('Error attaching network %(network_id)s with ip '
'%(ip_address)s and port %(port)s to amphora '
'(compute_id: %(compute_id)s) ',
@@ -359,7 +385,7 @@ class VirtualMachineManager(compute_base.ComputeBase):
'ip_address': ip_address,
'port': port_id
})
raise
raise exceptions.ComputeUnknownException(exc=str(e))
return interface

def detach_port(self, compute_id, port_id):


+ 262
- 165
octavia/controller/worker/v1/controller_worker.py View File

@@ -23,6 +23,8 @@ import tenacity

from octavia.common import base_taskflow
from octavia.common import constants
from octavia.common import exceptions
from octavia.common import utils
from octavia.controller.worker.v1.flows import amphora_flows
from octavia.controller.worker.v1.flows import health_monitor_flows
from octavia.controller.worker.v1.flows import l7policy_flows
@@ -37,11 +39,6 @@ from octavia.db import repositories as repo
CONF = cfg.CONF
LOG = logging.getLogger(__name__)

RETRY_ATTEMPTS = 15
RETRY_INITIAL_DELAY = 1
RETRY_BACKOFF = 1
RETRY_MAX = 5


def _is_provisioning_status_pending_update(lb_obj):
return not lb_obj.provisioning_status == constants.PENDING_UPDATE
@@ -78,8 +75,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
tenacity.retry_if_result(_is_provisioning_status_pending_update) |
tenacity.retry_if_exception_type()),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def _get_db_obj_until_pending_update(self, repo, id):

return repo.get(db_apis.get_session(), id=id)
@@ -92,12 +92,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:returns: amphora_id
"""
try:
store = {constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None}
create_amp_tf = self._taskflow_load(
self._amphora_flows.get_create_amphora_flow(),
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None}
)
store=store)
with tf_logging.DynamicLoggingListener(create_amp_tf, log=LOG):
create_amp_tf.run()

@@ -105,27 +106,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
except Exception as e:
LOG.error('Failed to create an amphora due to: {}'.format(str(e)))

def delete_amphora(self, amphora_id):
"""Deletes an existing Amphora.

:param amphora_id: ID of the amphora to delete
:returns: None
:raises AmphoraNotFound: The referenced Amphora was not found
"""
amphora = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
delete_amp_tf = self._taskflow_load(self._amphora_flows.
get_delete_amphora_flow(),
store={constants.AMPHORA: amphora})
with tf_logging.DynamicLoggingListener(delete_amp_tf,
log=LOG):
delete_amp_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_health_monitor(self, health_monitor_id):
"""Creates a health monitor.

@@ -218,8 +206,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_listener(self, listener_id):
"""Creates a listener.

@@ -304,8 +295,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_load_balancer(self, load_balancer_id, flavor=None):
"""Creates a load balancer by allocating Amphorae.

@@ -330,6 +324,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
constants.LB_CREATE_NORMAL_PRIORITY,
constants.FLAVOR: flavor}

if not CONF.nova.enable_anti_affinity:
store[constants.SERVER_GROUP_ID] = None

topology = lb.topology

store[constants.UPDATE_DICT] = {
@@ -403,8 +400,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_member(self, member_id):
"""Creates a pool member.

@@ -460,8 +460,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def batch_update_members(self, old_member_ids, new_member_ids,
updated_members):
new_members = [self._member_repo.get(db_apis.get_session(), id=mid)
@@ -538,8 +541,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_pool(self, pool_id):
"""Creates a node pool.

@@ -628,8 +634,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_l7policy(self, l7policy_id):
"""Creates an L7 Policy.