Browse Source

Update amphora v2 for the failover refactor

This patch will update the amphora v2 code for the failover refactor[1].

[1] https://review.opendev.org/705317

Change-Id: I43803d0b750e8ca4722ababe296f2725148da405
changes/92/738292/14
Michael Johnson 1 year ago
committed by Ann Taraday
parent
commit
d5fe092a3e
  1. 1
      .gitignore
  2. 2
      doc/source/conf.py
  3. 14
      doc/source/contributor/devref/flows.rst
  4. 7
      octavia/api/drivers/amphora_driver/v2/driver.py
  5. 1
      octavia/common/constants.py
  6. 13
      octavia/controller/worker/v1/tasks/amphora_driver_tasks.py
  7. 428
      octavia/controller/worker/v2/controller_worker.py
  8. 786
      octavia/controller/worker/v2/flows/amphora_flows.py
  9. 47
      octavia/controller/worker/v2/flows/flow_utils.py
  10. 14
      octavia/controller/worker/v2/flows/listener_flows.py
  11. 503
      octavia/controller/worker/v2/flows/load_balancer_flows.py
  12. 9
      octavia/controller/worker/v2/flows/pool_flows.py
  13. 264
      octavia/controller/worker/v2/tasks/amphora_driver_tasks.py
  14. 92
      octavia/controller/worker/v2/tasks/compute_tasks.py
  15. 124
      octavia/controller/worker/v2/tasks/database_tasks.py
  16. 254
      octavia/controller/worker/v2/tasks/network_tasks.py
  17. 73
      octavia/controller/worker/v2/tasks/retry_tasks.py
  18. 22
      octavia/tests/unit/controller/worker/v1/flows/test_amphora_flows.py
  19. 21
      octavia/tests/unit/controller/worker/v1/tasks/test_amphora_driver_tasks.py
  20. 48
      octavia/tests/unit/controller/worker/v1/test_controller_worker.py
  21. 167
      octavia/tests/unit/controller/worker/v2/flows/test_amphora_flows.py
  22. 8
      octavia/tests/unit/controller/worker/v2/flows/test_listener_flows.py
  23. 274
      octavia/tests/unit/controller/worker/v2/flows/test_load_balancer_flows.py
  24. 3
      octavia/tests/unit/controller/worker/v2/flows/test_pool_flows.py
  25. 288
      octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py
  26. 67
      octavia/tests/unit/controller/worker/v2/tasks/test_compute_tasks.py
  27. 37
      octavia/tests/unit/controller/worker/v2/tasks/test_database_tasks.py
  28. 436
      octavia/tests/unit/controller/worker/v2/tasks/test_network_tasks.py
  29. 47
      octavia/tests/unit/controller/worker/v2/tasks/test_retry_tasks.py
  30. 1011
      octavia/tests/unit/controller/worker/v2/test_controller_worker.py
  31. 31
      tools/create_flow_docs.py
  32. 32
      tools/flow-list-v2.txt

1
.gitignore

@ -10,6 +10,7 @@ doc/build
doc/source/configuration/_static/octavia.policy.yaml.sample
doc/source/contributor/devref/erd.svg
doc/source/contributor/devref/flow_diagrams/
doc/source/contributor/devref/flow_diagrams_v2/
doc/source/contributor/modules
api-ref/build
.idea/*

2
doc/source/conf.py

@ -29,6 +29,8 @@ from tools import create_flow_docs
# Generate our flow diagrams
create_flow_docs.generate(
'tools/flow-list.txt', 'doc/source/contributor/devref/flow_diagrams')
create_flow_docs.generate(
'tools/flow-list-v2.txt', 'doc/source/contributor/devref/flow_diagrams_v2')
# Generate entity relationship diagram
desc = sadisplay.describe(

14
doc/source/contributor/devref/flows.rst

@ -19,3 +19,17 @@ Octavia controller.
flow_diagrams/LoadBalancerFlows.rst
flow_diagrams/MemberFlows.rst
flow_diagrams/PoolFlows.rst
The following are flow diagrams for the **amphora V2** driver.
.. toctree::
:maxdepth: 1
flow_diagrams_v2/AmphoraFlows.rst
flow_diagrams_v2/HealthMonitorFlows.rst
flow_diagrams_v2/L7PolicyFlows.rst
flow_diagrams_v2/L7RuleFlows.rst
flow_diagrams_v2/ListenerFlows.rst
flow_diagrams_v2/LoadBalancerFlows.rst
flow_diagrams_v2/MemberFlows.rst
flow_diagrams_v2/PoolFlows.rst

7
octavia/api/drivers/amphora_driver/v2/driver.py

@ -87,8 +87,11 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
try:
vip = network_driver.allocate_vip(lb_obj)
except network_base.AllocateVIPException as e:
raise exceptions.DriverError(user_fault_string=e.orig_msg,
operator_fault_string=e.orig_msg)
message = str(e)
if getattr(e, 'orig_msg', None) is not None:
message = e.orig_msg
raise exceptions.DriverError(user_fault_string=message,
operator_fault_string=message)
LOG.info('Amphora provider created VIP port %s for load balancer %s.',
vip.port_id, loadbalancer_id)

1
octavia/common/constants.py

@ -300,6 +300,7 @@ ALLOWED_ADDRESS_PAIRS = 'allowed_address_pairs'
AMP_DATA = 'amp_data'
AMP_VRRP_INT = 'amp_vrrp_int'
AMPHORA = 'amphora'
AMPHORA_DICT = 'amphora_dict'
AMPHORA_ID = 'amphora_id'
AMPHORA_INDEX = 'amphora_index'
AMPHORA_NETWORK_CONFIG = 'amphora_network_config'

13
octavia/controller/worker/v1/tasks/amphora_driver_tasks.py

@ -122,7 +122,7 @@ class ListenersStart(BaseAmphoraTask):
class AmphoraIndexListenersReload(BaseAmphoraTask):
"""Task to reload all listeners on an amphora."""
def execute(self, loadbalancer, amphorae, amphora_index,
def execute(self, loadbalancer, amphora_index, amphorae,
timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora."""
if loadbalancer.listeners:
@ -304,7 +304,7 @@ class AmphoraUpdateVRRPInterface(BaseAmphoraTask):
class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
"""Task to get and update the VRRP interface device name from amphora."""
def execute(self, amphorae, amphora_index, timeout_dict=None):
def execute(self, amphora_index, amphorae, timeout_dict=None):
amphora_id = amphorae[amphora_index].id
try:
interface = self.amphora_driver.get_interface_from_ip(
@ -376,15 +376,6 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id)
class AmphoraVRRPStop(BaseAmphoraTask):
"""Task to stop keepalived of all amphorae of a LB."""
def execute(self, loadbalancer):
self.amphora_driver.stop_vrrp_service(loadbalancer)
LOG.debug("Stopped VRRP of loadbalancer %s amphorae",
loadbalancer.id)
class AmphoraVRRPStart(BaseAmphoraTask):
"""Task to start keepalived on an amphora.

428
octavia/controller/worker/v2/controller_worker.py

@ -13,6 +13,7 @@
# under the License.
#
from octavia_lib.common import constants as lib_consts
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
@ -21,10 +22,12 @@ from stevedore import driver as stevedore_driver
from taskflow.listeners import logging as tf_logging
import tenacity
from octavia.amphorae.driver_exceptions import exceptions
from octavia.amphorae.driver_exceptions import exceptions as driver_exc
from octavia.api.drivers import utils as provider_utils
from octavia.common import base_taskflow
from octavia.common import constants
from octavia.common import exceptions
from octavia.common import utils
from octavia.controller.worker.v2.flows import flow_utils
from octavia.controller.worker.v2 import taskflow_jobboard_driver as tsk_driver
from octavia.db import api as db_apis
@ -33,17 +36,12 @@ from octavia.db import repositories as repo
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
RETRY_ATTEMPTS = 15
RETRY_INITIAL_DELAY = 1
RETRY_BACKOFF = 1
RETRY_MAX = 5
# We do not need to log retry exception information. Warning "Could not connect
# to instance" will be logged as usual.
def retryMaskFilter(record):
if record.exc_info is not None and isinstance(
record.exc_info[1], exceptions.AmpConnectionRetry):
record.exc_info[1], driver_exc.AmpConnectionRetry):
return False
return True
@ -87,8 +85,11 @@ class ControllerWorker(object):
tenacity.retry_if_result(_is_provisioning_status_pending_update) |
tenacity.retry_if_exception_type()),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def _get_db_obj_until_pending_update(self, repo, id):
return repo.get(db_apis.get_session(), id=id)
@ -117,6 +118,7 @@ class ControllerWorker(object):
store = {constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: None}
if availability_zone:
store[constants.AVAILABILITY_ZONE] = (
@ -145,8 +147,11 @@ class ControllerWorker(object):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_health_monitor(self, health_monitor):
"""Creates a health monitor.
@ -251,8 +256,11 @@ class ControllerWorker(object):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_listener(self, listener):
"""Creates a listener.
@ -292,14 +300,10 @@ class ControllerWorker(object):
:returns: None
:raises ListenerNotFound: The referenced listener was not found
"""
# TODO(johnsom) Remove once the provider data model includes
# the project ID
lb = self._lb_repo.get(db_apis.get_session(),
id=listener[constants.LOADBALANCER_ID])
store = {constants.LISTENER: listener,
constants.LOADBALANCER_ID:
listener[constants.LOADBALANCER_ID],
constants.PROJECT_ID: lb.project_id}
constants.PROJECT_ID: listener[constants.PROJECT_ID]}
self.run_flow(
flow_utils.get_delete_listener_flow,
store=store)
@ -325,8 +329,11 @@ class ControllerWorker(object):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_load_balancer(self, loadbalancer, flavor=None,
availability_zone=None):
"""Creates a load balancer by allocating Amphorae.
@ -347,16 +354,18 @@ class ControllerWorker(object):
loadbalancer[constants.LOADBALANCER_ID])
raise db_exceptions.NoResultFound
# TODO(johnsom) convert this to octavia_lib constant flavor
# once octavia is transitioned to use octavia_lib
store = {constants.LOADBALANCER_ID:
loadbalancer[constants.LOADBALANCER_ID],
store = {lib_consts.LOADBALANCER_ID:
loadbalancer[lib_consts.LOADBALANCER_ID],
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_NORMAL_PRIORITY,
constants.FLAVOR: flavor,
constants.AVAILABILITY_ZONE: availability_zone}
lib_consts.FLAVOR: flavor,
lib_consts.AVAILABILITY_ZONE: availability_zone}
topology = lb.topology
if (not CONF.nova.enable_anti_affinity or
topology == constants.TOPOLOGY_SINGLE):
store[constants.SERVER_GROUP_ID] = None
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
lb.listeners)
@ -377,17 +386,19 @@ class ControllerWorker(object):
:returns: None
:raises LBNotFound: The referenced load balancer was not found
"""
db_lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer[constants.LOADBALANCER_ID])
loadbalancer_id = load_balancer[constants.LOADBALANCER_ID]
db_lb = self._lb_repo.get(db_apis.get_session(), id=loadbalancer_id)
store = {constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: loadbalancer_id,
constants.SERVER_GROUP_ID: db_lb.server_group_id,
constants.PROJECT_ID: db_lb.project_id}
if cascade:
store.update(flow_utils.get_delete_pools_store(db_lb))
store.update(flow_utils.get_delete_listeners_store(db_lb))
listeners = flow_utils.get_listeners_on_lb(db_lb)
pools = flow_utils.get_pools_on_lb(db_lb)
self.run_flow(
flow_utils.get_cascade_delete_load_balancer_flow,
load_balancer, store=store)
load_balancer, listeners, pools, store=store)
else:
self.run_flow(
flow_utils.get_delete_load_balancer_flow,
@ -548,7 +559,6 @@ class ControllerWorker(object):
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
store = {
constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
@ -570,8 +580,11 @@ class ControllerWorker(object):
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def create_pool(self, pool):
"""Creates a node pool.
@ -816,153 +829,250 @@ class ControllerWorker(object):
flow_utils.get_update_l7rule_flow,
store=store)
def _perform_amphora_failover(self, amp, priority):
"""Internal method to perform failover operations for an amphora.
:param amp: The amphora to failover
:param priority: The create priority
:returns: None
"""
stored_params = {constants.FAILED_AMPHORA: amp.to_dict(),
constants.LOADBALANCER_ID: amp.load_balancer_id,
constants.BUILD_TYPE_PRIORITY: priority, }
if amp.role in (constants.ROLE_MASTER, constants.ROLE_BACKUP):
amp_role = 'master_or_backup'
elif amp.role == constants.ROLE_STANDALONE:
amp_role = 'standalone'
elif amp.role is None:
amp_role = 'spare'
else:
amp_role = 'undefined'
LOG.info("Perform failover for an amphora: %s",
{"id": amp.id,
"load_balancer_id": amp.load_balancer_id,
"lb_network_ip": amp.lb_network_ip,
"compute_id": amp.compute_id,
"role": amp_role})
if amp.status == constants.DELETED:
LOG.warning('Amphora %s is marked DELETED in the database but '
'was submitted for failover. Deleting it from the '
'amphora health table to exclude it from health '
'checks and skipping the failover.', amp.id)
self._amphora_health_repo.delete(db_apis.get_session(),
amphora_id=amp.id)
return
if (CONF.house_keeping.spare_amphora_pool_size == 0) and (
CONF.nova.enable_anti_affinity is False):
LOG.warning("Failing over amphora with no spares pool may "
"cause delays in failover times while a new "
"amphora instance boots.")
# if we run with anti-affinity we need to set the server group
# as well
lb = self._amphora_repo.get_lb_for_amphora(
db_apis.get_session(), amp.id)
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
lb).to_dict() if lb else lb
if CONF.nova.enable_anti_affinity and lb:
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
if lb is not None and lb.flavor_id:
stored_params[constants.FLAVOR] = (
self._flavor_repo.get_flavor_metadata_dict(
db_apis.get_session(), lb.flavor_id))
else:
stored_params[constants.FLAVOR] = {}
if lb and lb.availability_zone:
stored_params[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), lb.availability_zone))
else:
stored_params[constants.AVAILABILITY_ZONE] = {}
self.run_flow(
flow_utils.get_failover_flow,
role=amp.role, load_balancer=provider_lb,
store=stored_params, wait=True)
LOG.info("Successfully completed the failover for an amphora: %s",
{"id": amp.id,
"load_balancer_id": amp.load_balancer_id,
"lb_network_ip": amp.lb_network_ip,
"compute_id": amp.compute_id,
"role": amp_role})
def failover_amphora(self, amphora_id):
"""Perform failover operations for an amphora.
Note: This expects the load balancer to already be in
provisioning_status=PENDING_UPDATE state.
:param amphora_id: ID for amphora to failover
:returns: None
:raises AmphoraNotFound: The referenced amphora was not found
:raises octavia.common.exceptions.NotFound: The referenced amphora was
not found
"""
amphora = None
try:
amp = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
if not amp:
LOG.warning("Could not fetch Amphora %s from DB, ignoring "
"failover request.", amphora_id)
amphora = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
if amphora is None:
LOG.error('Amphora failover for amphora %s failed because '
'there is no record of this amphora in the '
'database. Check that the [house_keeping] '
'amphora_expiry_age configuration setting is not '
'too short. Skipping failover.', amphora_id)
raise exceptions.NotFound(resource=constants.AMPHORA,
id=amphora_id)
if amphora.status == constants.DELETED:
LOG.warning('Amphora %s is marked DELETED in the database but '
'was submitted for failover. Deleting it from the '
'amphora health table to exclude it from health '
'checks and skipping the failover.', amphora.id)
self._amphora_health_repo.delete(db_apis.get_session(),
amphora_id=amphora.id)
return
self._perform_amphora_failover(
amp, constants.LB_CREATE_FAILOVER_PRIORITY)
if amp.load_balancer_id:
LOG.info("Mark ACTIVE in DB for load balancer id: %s",
amp.load_balancer_id)
self._lb_repo.update(
db_apis.get_session(), amp.load_balancer_id,
provisioning_status=constants.ACTIVE)
loadbalancer = None
if amphora.load_balancer_id:
loadbalancer = self._lb_repo.get(db_apis.get_session(),
id=amphora.load_balancer_id)
lb_amp_count = None
if loadbalancer:
if loadbalancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
lb_amp_count = 2
elif loadbalancer.topology == constants.TOPOLOGY_SINGLE:
lb_amp_count = 1
az_metadata = {}
flavor_dict = {}
lb_id = None
vip_dict = {}
server_group_id = None
if loadbalancer:
lb_id = loadbalancer.id
# Even if the LB doesn't have a flavor, create one and
# pass through the topology.
if loadbalancer.flavor_id:
flavor_dict = self._flavor_repo.get_flavor_metadata_dict(
db_apis.get_session(), loadbalancer.flavor_id)
flavor_dict[constants.LOADBALANCER_TOPOLOGY] = (
loadbalancer.topology)
else:
flavor_dict = {constants.LOADBALANCER_TOPOLOGY:
loadbalancer.topology}
if loadbalancer.availability_zone:
az_metadata = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(),
loadbalancer.availability_zone))
vip_dict = loadbalancer.vip.to_dict()
server_group_id = loadbalancer.server_group_id
provider_lb_dict = (provider_utils.
db_loadbalancer_to_provider_loadbalancer)(
loadbalancer).to_dict() if loadbalancer else loadbalancer
stored_params = {constants.AVAILABILITY_ZONE: az_metadata,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_FAILOVER_PRIORITY,
constants.FLAVOR: flavor_dict,
constants.LOADBALANCER: provider_lb_dict,
constants.SERVER_GROUP_ID: server_group_id,
constants.LOADBALANCER_ID: lb_id,
constants.VIP: vip_dict}
self.services_controller.run_poster(
flow_utils.get_failover_amphora_flow,
amphora.to_dict(), lb_amp_count,
store=stored_params, wait=True)
LOG.info("Successfully completed the failover for an amphora: %s",
{"id": amphora_id,
"load_balancer_id": lb_id,
"lb_network_ip": amphora.lb_network_ip,
"compute_id": amphora.compute_id,
"role": amphora.role})
except Exception as e:
try:
self._lb_repo.update(
db_apis.get_session(), amp.load_balancer_id,
provisioning_status=constants.ERROR)
except Exception:
LOG.error("Unable to revert LB status to ERROR.")
with excutils.save_and_reraise_exception():
LOG.error("Amphora %(id)s failover exception: %(exc)s",
{'id': amphora_id, 'exc': e})
with excutils.save_and_reraise_exception(reraise=False):
LOG.exception("Amphora %s failover exception: %s",
amphora_id, str(e))
self._amphora_repo.update(db_apis.get_session(),
amphora_id, status=constants.ERROR)
if amphora and amphora.load_balancer_id:
self._lb_repo.update(
db_apis.get_session(), amphora.load_balancer_id,
provisioning_status=constants.ERROR)
@staticmethod
def _get_amphorae_for_failover(load_balancer):
"""Returns an ordered list of amphora to failover.
:param load_balancer: The load balancer being failed over.
:returns: An ordered list of amphora to failover,
first amp to failover is last in the list
:raises octavia.common.exceptions.InvalidTopology: LB has an unknown
topology.
"""
if load_balancer.topology == constants.TOPOLOGY_SINGLE:
# In SINGLE topology, amp failover order does not matter
return [a.to_dict() for a in load_balancer.amphorae
if a.status != constants.DELETED]
if load_balancer.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
# In Active/Standby we should preference the standby amp
# for failover first in case the Active is still able to pass
# traffic.
# Note: The active amp can switch at any time and in less than a
# second, so this is "best effort".
amphora_driver = utils.get_amphora_driver()
timeout_dict = {
constants.CONN_MAX_RETRIES:
CONF.haproxy_amphora.failover_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.failover_connection_retry_interval}
amps = []
selected_amp = None
for amp in load_balancer.amphorae:
if amp.status == constants.DELETED:
continue
if selected_amp is None:
try:
if amphora_driver.get_interface_from_ip(
amp, load_balancer.vip.ip_address,
timeout_dict):
# This is a potential ACTIVE, add it to the list
amps.append(amp.to_dict())
else:
# This one doesn't have the VIP IP, so start
# failovers here.
selected_amp = amp
LOG.debug("Selected amphora %s as the initial "
"failover amphora.", amp.id)
except Exception:
# This amphora is broken, so start failovers here.
selected_amp = amp
else:
# We have already found a STANDBY, so add the rest to the
# list without querying them.
amps.append(amp.to_dict())
# Put the selected amphora at the end of the list so it is
# first to failover.
if selected_amp:
amps.append(selected_amp.to_dict())
return amps
LOG.error('Unknown load balancer topology found: %s, aborting '
'failover.', load_balancer.topology)
raise exceptions.InvalidTopology(topology=load_balancer.topology)
def failover_loadbalancer(self, load_balancer_id):
"""Perform failover operations for a load balancer.
Note: This expects the load balancer to already be in
provisioning_status=PENDING_UPDATE state.
:param load_balancer_id: ID for load balancer to failover
:returns: None
:raises LBNotFound: The referenced load balancer was not found
:raises octavia.commom.exceptions.NotFound: The load balancer was not
found.
"""
# Note: This expects that the load balancer is already in
# provisioning_status=PENDING_UPDATE state
try:
lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer_id)
if lb is None:
raise exceptions.NotFound(resource=constants.LOADBALANCER,
id=load_balancer_id)
# Get the ordered list of amphorae to failover for this LB.
amps = self._get_amphorae_for_failover(lb)
if lb.topology == constants.TOPOLOGY_SINGLE:
if len(amps) != 1:
LOG.warning('%d amphorae found on load balancer %s where '
'one should exist. Repairing.', len(amps),
load_balancer_id)
elif lb.topology == constants.TOPOLOGY_ACTIVE_STANDBY:
if len(amps) != 2:
LOG.warning('%d amphorae found on load balancer %s where '
'two should exist. Repairing.', len(amps),
load_balancer_id)
else:
LOG.error('Unknown load balancer topology found: %s, aborting '
'failover!', lb.topology)
raise exceptions.InvalidTopology(topology=lb.topology)
# We must provide a topology in the flavor definition
# here for the amphora to be created with the correct
# configuration.
if lb.flavor_id:
flavor = self._flavor_repo.get_flavor_metadata_dict(
db_apis.get_session(), lb.flavor_id)
flavor[constants.LOADBALANCER_TOPOLOGY] = lb.topology
else:
flavor = {constants.LOADBALANCER_TOPOLOGY: lb.topology}
provider_lb_dict = (
provider_utils.db_loadbalancer_to_provider_loadbalancer(
lb).to_dict() if lb else lb)
provider_lb_dict[constants.FLAVOR] = flavor
stored_params = {constants.LOADBALANCER: provider_lb_dict,
constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_FAILOVER_PRIORITY,
constants.SERVER_GROUP_ID: lb.server_group_id,
constants.LOADBALANCER_ID: lb.id,
constants.FLAVOR: flavor}
if lb.availability_zone:
stored_params[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), lb.availability_zone))
else:
stored_params[constants.AVAILABILITY_ZONE] = {}
self.services_controller.run_poster(
flow_utils.get_failover_LB_flow, amps, provider_lb_dict,
store=stored_params, wait=True)
# Exclude amphora already deleted
amps = [a for a in lb.amphorae if a.status != constants.DELETED]
for amp in amps:
# failover amphora in backup role
# Note: this amp may not currently be the backup
# TODO(johnsom) Change this to query the amp state
# once the amp API supports it.
if amp.role == constants.ROLE_BACKUP:
self._perform_amphora_failover(
amp, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
for amp in amps:
# failover everyhting else
if amp.role != constants.ROLE_BACKUP:
self._perform_amphora_failover(
amp, constants.LB_CREATE_ADMIN_FAILOVER_PRIORITY)
self._lb_repo.update(
db_apis.get_session(), load_balancer_id,
provisioning_status=constants.ACTIVE)
LOG.info('Failover of load balancer %s completed successfully.',
lb.id)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("LB %(lbid)s failover exception: %(exc)s",
{'lbid': load_balancer_id, 'exc': e})
with excutils.save_and_reraise_exception(reraise=False):
LOG.exception("LB %(lbid)s failover exception: %(exc)s",
{'lbid': load_balancer_id, 'exc': e})
self._lb_repo.update(
db_apis.get_session(), load_balancer_id,
provisioning_status=constants.ERROR)

786
octavia/controller/worker/v2/flows/amphora_flows.py

@ -1,4 +1,5 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
# Copyright 2020 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,28 +15,27 @@
#
from oslo_config import cfg
from oslo_log import log as logging
from taskflow.patterns import graph_flow
from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow
from octavia.common import constants
from octavia.common import utils
from octavia.controller.worker.v2.tasks import amphora_driver_tasks
from octavia.controller.worker.v2.tasks import cert_task
from octavia.controller.worker.v2.tasks import compute_tasks
from octavia.controller.worker.v2.tasks import database_tasks
from octavia.controller.worker.v2.tasks import lifecycle_tasks
from octavia.controller.worker.v2.tasks import network_tasks
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.controller.worker.v2.tasks import retry_tasks
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class AmphoraFlows(object):
def __init__(self):
self.lb_repo = repo.LoadBalancerRepository()
def get_create_amphora_flow(self):
"""Creates a flow to create an amphora.
@ -46,24 +46,16 @@ class AmphoraFlows(object):
provides=constants.AMPHORA_ID))
create_amphora_flow.add(lifecycle_tasks.AmphoraIDToErrorOnRevertTask(
requires=constants.AMPHORA_ID))
if (CONF.controller_worker.amphora_driver ==
'amphora_haproxy_rest_driver'):
create_amphora_flow.add(cert_task.GenerateServerPEMTask(
provides=constants.SERVER_PEM))
create_amphora_flow.add(
database_tasks.UpdateAmphoraDBCertExpiration(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM)))
create_amphora_flow.add(compute_tasks.CertComputeCreate(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY, constants.FLAVOR),
provides=constants.COMPUTE_ID))
else:
create_amphora_flow.add(compute_tasks.ComputeCreate(
requires=(constants.AMPHORA_ID, constants.BUILD_TYPE_PRIORITY,
constants.FLAVOR),
provides=constants.COMPUTE_ID))
create_amphora_flow.add(cert_task.GenerateServerPEMTask(
provides=constants.SERVER_PEM))
create_amphora_flow.add(
database_tasks.UpdateAmphoraDBCertExpiration(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM)))
create_amphora_flow.add(compute_tasks.CertComputeCreate(
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.SERVER_GROUP_ID,
constants.BUILD_TYPE_PRIORITY, constants.FLAVOR),
provides=constants.COMPUTE_ID))
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amphora_flow.add(compute_tasks.ComputeActiveWait(
@ -97,11 +89,6 @@ class AmphoraFlows(object):
post_map_amp_to_lb = linear_flow.Flow(
sf_name)
post_map_amp_to_lb.add(database_tasks.ReloadAmphora(
name=sf_name + '-' + constants.RELOAD_AMPHORA,
requires=constants.AMPHORA,
provides=constants.AMPHORA))
post_map_amp_to_lb.add(amphora_driver_tasks.AmphoraConfigUpdate(
name=sf_name + '-' + constants.AMPHORA_CONFIG_UPDATE_TASK,
requires=(constants.AMPHORA, constants.FLAVOR)))
@ -121,7 +108,7 @@ class AmphoraFlows(object):
return post_map_amp_to_lb
def _get_create_amp_for_lb_subflow(self, prefix, role):
def _get_create_amp_for_lb_subflow(self, prefix, role, is_spare=False):
"""Create a new amphora for lb."""
sf_name = prefix + '-' + constants.CREATE_AMP_FOR_LB_SUBFLOW
@ -131,67 +118,22 @@ class AmphoraFlows(object):
requires=constants.LOADBALANCER_ID,
provides=constants.AMPHORA_ID))
require_server_group_id_condition = (
role in (constants.ROLE_BACKUP, constants.ROLE_MASTER) and
CONF.nova.enable_anti_affinity)
if (CONF.controller_worker.amphora_driver ==
'amphora_haproxy_rest_driver'):
create_amp_for_lb_subflow.add(cert_task.GenerateServerPEMTask(
name=sf_name + '-' + constants.GENERATE_SERVER_PEM,
provides=constants.SERVER_PEM))
create_amp_for_lb_subflow.add(
database_tasks.UpdateAmphoraDBCertExpiration(
name=sf_name + '-' + constants.UPDATE_CERT_EXPIRATION,
requires=(constants.AMPHORA_ID, constants.SERVER_PEM)))
if require_server_group_id_condition:
create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate(
name=sf_name + '-' + constants.CERT_COMPUTE_CREATE,
requires=(
constants.AMPHORA_ID,
constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY,
constants.SERVER_GROUP_ID,
constants.FLAVOR,
constants.AVAILABILITY_ZONE,
),
provides=constants.COMPUTE_ID))
else:
create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate(
name=sf_name + '-' + constants.CERT_COMPUTE_CREATE,
requires=(
constants.AMPHORA_ID,
constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY,
constants.FLAVOR,
constants.AVAILABILITY_ZONE,
),
provides=constants.COMPUTE_ID))
else:
if require_server_group_id_condition:
create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate(
name=sf_name + '-' + constants.COMPUTE_CREATE,
requires=(
constants.AMPHORA_ID,
constants.BUILD_TYPE_PRIORITY,
constants.SERVER_GROUP_ID,
constants.FLAVOR,
constants.AVAILABILITY_ZONE,
),
provides=constants.COMPUTE_ID))
else:
create_amp_for_lb_subflow.add(compute_tasks.ComputeCreate(
name=sf_name + '-' + constants.COMPUTE_CREATE,
requires=(
constants.AMPHORA_ID,
constants.BUILD_TYPE_PRIORITY,
constants.FLAVOR,
constants.AVAILABILITY_ZONE,
),
provides=constants.COMPUTE_ID))
create_amp_for_lb_subflow.add(cert_task.GenerateServerPEMTask(
name=sf_name + '-' + constants.GENERATE_SERVER_PEM,
provides=constants.SERVER_PEM))
create_amp_for_lb_subflow.add(
database_tasks.UpdateAmphoraDBCertExpiration(
name=sf_name + '-' + constants.UPDATE_CERT_EXPIRATION,
requires=(constants.AMPHORA_ID, constants.SERVER_PEM)))
create_amp_for_lb_subflow.add(compute_tasks.CertComputeCreate(
name=sf_name + '-' + constants.CERT_COMPUTE_CREATE,
requires=(constants.AMPHORA_ID, constants.SERVER_PEM,
constants.BUILD_TYPE_PRIORITY,
constants.SERVER_GROUP_ID,
constants.FLAVOR, constants.AVAILABILITY_ZONE),
provides=constants.COMPUTE_ID))
create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraComputeId(
name=sf_name + '-' + constants.UPDATE_AMPHORA_COMPUTEID,
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
@ -207,6 +149,33 @@ class AmphoraFlows(object):
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amp_for_lb_subflow.add(self._retry_flow(sf_name))
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA))
if is_spare:
create_amp_for_lb_subflow.add(
database_tasks.MarkAmphoraReadyInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_READY_INDB,
requires=constants.AMPHORA))
else:
create_amp_for_lb_subflow.add(
database_tasks.MarkAmphoraAllocatedInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_ALLOCATED_INDB,
requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
if role == constants.ROLE_MASTER:
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraMasterInDB(
name=sf_name + '-' + constants.MARK_AMP_MASTER_INDB,
requires=constants.AMPHORA))
elif role == constants.ROLE_BACKUP:
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBackupInDB(
name=sf_name + '-' + constants.MARK_AMP_BACKUP_INDB,
requires=constants.AMPHORA))
elif role == constants.ROLE_STANDALONE:
create_amp_for_lb_subflow.add(
database_tasks.MarkAmphoraStandAloneInDB(
name=sf_name + '-' + constants.MARK_AMP_STANDALONE_INDB,
requires=constants.AMPHORA))
return create_amp_for_lb_subflow
@ -268,7 +237,7 @@ class AmphoraFlows(object):
return create_amp_for_lb_subflow
def get_amphora_for_lb_subflow(
self, prefix, role=constants.ROLE_STANDALONE):
self, prefix, role=constants.ROLE_STANDALONE, is_spare=False):
"""Tries to allocate a spare amphora to a loadbalancer if none
exists, create a new amphora.
@ -276,6 +245,14 @@ class AmphoraFlows(object):
sf_name = prefix + '-' + constants.GET_AMPHORA_FOR_LB_SUBFLOW
# Don't replace a spare with another spare, just build a fresh one.
if is_spare:
get_spare_amp_flow = linear_flow.Flow(sf_name)
get_spare_amp_flow.add(self._get_create_amp_for_lb_subflow(
prefix, role, is_spare=is_spare))
return get_spare_amp_flow
# We need a graph flow here for a conditional flow
amp_for_lb_flow = graph_flow.Flow(sf_name)
@ -318,289 +295,145 @@ class AmphoraFlows(object):
decider=self._create_new_amp_for_lb_decider,
decider_depth='flow')
# Plug the network
# todo(xgerman): Rework failover flow
if prefix != constants.FAILOVER_AMPHORA_FLOW:
sf_name = prefix + '-' + constants.AMP_PLUG_NET_SUBFLOW
amp_for_lb_net_flow = linear_flow.Flow(sf_name)
amp_for_lb_net_flow.add(amp_for_lb_flow)
amp_for_lb_net_flow.add(*self._get_amp_net_subflow(sf_name))
return amp_for_lb_net_flow
return amp_for_lb_flow
def _get_amp_net_subflow(self, sf_name):
flows = []
flows.append(network_tasks.PlugVIPAmpphora(
name=sf_name + '-' + constants.PLUG_VIP_AMPHORA,
requires=(constants.LOADBALANCER, constants.AMPHORA,
constants.SUBNET),
provides=constants.AMP_DATA))
flows.append(network_tasks.ApplyQosAmphora(
name=sf_name + '-' + constants.APPLY_QOS_AMP,
requires=(constants.LOADBALANCER, constants.AMP_DATA,
constants.UPDATE_DICT)))
flows.append(database_tasks.UpdateAmphoraVIPData(
name=sf_name + '-' + constants.UPDATE_AMPHORA_VIP_DATA,
requires=constants.AMP_DATA))
flows.append(database_tasks.ReloadAmphora(
name=sf_name + '-' + constants.RELOAD_AMP_AFTER_PLUG_VIP,
requires=constants.AMPHORA,
provides=constants.AMPHORA))
flows.append(database_tasks.ReloadLoadBalancer(
name=sf_name + '-' + constants.RELOAD_LB_AFTER_PLUG_VIP,
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
flows.append(network_tasks.GetAmphoraNetworkConfigs(
name=sf_name + '-' + constants.GET_AMP_NETWORK_CONFIG,
requires=(constants.LOADBALANCER, constants.AMPHORA),
provides=constants.AMPHORA_NETWORK_CONFIG))
flows.append(amphora_driver_tasks.AmphoraPostVIPPlug(
name=sf_name + '-' + constants.AMP_POST_VIP_PLUG,
rebind={constants.AMPHORAE_NETWORK_CONFIG:
constants.AMPHORA_NETWORK_CONFIG},
requires=(constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))
return flows
def get_delete_amphora_flow(
self, amphora,
retry_attempts=CONF.controller_worker.amphora_delete_retries,
retry_interval=(
CONF.controller_worker.amphora_delete_retry_interval)):
"""Creates a subflow to delete an amphora and it's port.
def get_delete_amphora_flow(self):
"""Creates a flow to delete an amphora.
This flow is idempotent and safe to retry.
This should be configurable in the config file
:returns: The flow for deleting the amphora
:raises AmphoraNotFound: The referenced Amphora was not found
:param amphora: An amphora dict object.
:param retry_attempts: The number of times the flow is retried.
:param retry_interval: The time to wait, in seconds, between retries.
:returns: The subflow for deleting the amphora.
:raises AmphoraNotFound: The referenced Amphora was not found.
"""
delete_amphora_flow = linear_flow.Flow(constants.DELETE_AMPHORA_FLOW)
amphora_id = amphora[constants.ID]
delete_amphora_flow = linear_flow.Flow(
name=constants.DELETE_AMPHORA_FLOW + '-' + amphora_id,
retry=retry_tasks.SleepingRetryTimesController(
name='retry-' + constants.DELETE_AMPHORA_FLOW + '-' +
amphora_id,
attempts=retry_attempts, interval=retry_interval))
delete_amphora_flow.add(lifecycle_tasks.AmphoraToErrorOnRevertTask(
requires=constants.AMPHORA))
delete_amphora_flow.add(database_tasks.
MarkAmphoraPendingDeleteInDB(
requires=constants.AMPHORA))
delete_amphora_flow.add(database_tasks.
MarkAmphoraHealthBusy(
requires=constants.AMPHORA))
delete_amphora_flow.add(compute_tasks.ComputeDelete(
requires=constants.AMPHORA))
delete_amphora_flow.add(database_tasks.
DisableAmphoraHealthMonitoring(
requires=constants.AMPHORA))
delete_amphora_flow.add(database_tasks.
MarkAmphoraDeletedInDB(
requires=constants.AMPHORA))
return delete_amphora_flow
def get_failover_flow(self, role=constants.ROLE_STANDALONE,
load_balancer=None):
"""Creates a flow to failover a stale amphora
:returns: The flow for amphora failover
"""
failover_amphora_flow = linear_flow.Flow(
constants.FAILOVER_AMPHORA_FLOW)
failover_amphora_flow.add(lifecycle_tasks.AmphoraToErrorOnRevertTask(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(network_tasks.FailoverPreparationForAmphora(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
# Note: It seems intuitive to boot an amphora prior to deleting
# the old amphora, however this is a complicated issue.
# If the target host (due to anit-affinity) is resource
# constrained, this will fail where a post-delete will
# succeed. Since this is async with the API it would result
# in the LB ending in ERROR though the amps are still alive.
# Consider in the future making this a complicated
# try-on-failure-retry flow, or move upgrade failovers to be
# synchronous with the API. For now spares pool and act/stdby
# will mitigate most of this delay.
# Delete the old amphora
failover_amphora_flow.add(
name=constants.AMPHORA_TO_ERROR_ON_REVERT + '-' + amphora_id,
inject={constants.AMPHORA: amphora}))
delete_amphora_flow.add(
database_tasks.MarkAmphoraPendingDeleteInDB(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(
database_tasks.MarkAmphoraHealthBusy(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(compute_tasks.ComputeDelete(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(network_tasks.WaitForPortDetach(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
failover_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
# If this is an unallocated amp (spares pool), we're done
if not load_balancer:
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
return failover_amphora_flow
name=constants.MARK_AMPHORA_PENDING_DELETE + '-' + amphora_id,
inject={constants.AMPHORA: amphora}))
delete_amphora_flow.add(database_tasks.MarkAmphoraHealthBusy(
name=constants.MARK_AMPHORA_HEALTH_BUSY + '-' + amphora_id,
inject={constants.AMPHORA: amphora}))
delete_amphora_flow.add(compute_tasks.ComputeDelete(
name=constants.DELETE_AMPHORA + '-' + amphora_id,
inject={constants.AMPHORA: amphora,
constants.PASSIVE_FAILURE: True}))
delete_amphora_flow.add(database_tasks.DisableAmphoraHealthMonitoring(
name=constants.DISABLE_AMP_HEALTH_MONITORING + '-' + amphora_id,
inject={constants.AMPHORA: amphora}))
delete_amphora_flow.add(database_tasks.MarkAmphoraDeletedInDB(
name=constants.MARK_AMPHORA_DELETED + '-' + amphora_id,
inject={constants.AMPHORA: amphora}))
if amphora.get(constants.VRRP_PORT_ID):
delete_amphora_flow.add(network_tasks.DeletePort(
name=(constants.DELETE_PORT + '-' + str(amphora_id) + '-' +
str(amphora[constants.VRRP_PORT_ID])),
inject={constants.PORT_ID: amphora[constants.VRRP_PORT_ID],
constants.PASSIVE_FAILURE: True}))
# TODO(johnsom) What about cleaning up any member ports?
# maybe we should get the list of attached ports prior to delete
# and call delete on them here. Fix this as part of
# https://storyboard.openstack.org/#!/story/2007077
# Save failed amphora details for later
failover_amphora_flow.add(
database_tasks.GetAmphoraDetails(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA,
provides=constants.AMP_DATA))
return delete_amphora_flow
# Get a new amphora
# Note: Role doesn't matter here. We will update it later.
get_amp_subflow = self.get_amphora_for_lb_subflow(
prefix=constants.FAILOVER_AMPHORA_FLOW)
failover_amphora_flow.add(get_amp_subflow)
def get_vrrp_subflow(self, prefix, timeout_dict=None,
create_vrrp_group=True):
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
vrrp_subflow = linear_flow.Flow(sf_name)
# Update the new amphora with the failed amphora details
failover_amphora_flow.add(database_tasks.UpdateAmpFailoverDetails(
requires=(constants.AMPHORA, constants.AMP_DATA)))
# Optimization for failover flow. No reason to call this
# when configuring the secondary amphora.
if create_vrrp_group:
vrrp_subflow.add(database_tasks.CreateVRRPGroupForLB(
name=sf_name + '-' + constants.CREATE_VRRP_GROUP_FOR_LB,
requires=constants.LOADBALANCER_ID))
# Update the data stored in the flow from the database
failover_amphora_flow.add(database_tasks.ReloadLoadBalancer(
vrrp_subflow.add(network_tasks.GetAmphoraeNetworkConfigs(
name=sf_name + '-' + constants.GET_AMP_NETWORK_CONFIG,
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
failover_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA,
provides=constants.AMPHORA))
# Prepare to reconnect the network interface(s)
failover_amphora_flow.add(network_tasks.GetAmphoraeNetworkConfigs(
requires=constants.LOADBALANCER,
provides=constants.AMPHORAE_NETWORK_CONFIG))
failover_amphora_flow.add(database_tasks.GetListenersFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.LISTENERS))
failover_amphora_flow.add(database_tasks.GetAmphoraeFromLoadbalancer(
requires=constants.LOADBALANCER, provides=constants.AMPHORAE))
# Plug the VIP ports into the new amphora
# The reason for moving these steps here is the udp listeners want to
# do some kernel configuration before Listener update for forbidding
# failure during rebuild amphora.
failover_amphora_flow.add(network_tasks.PlugVIPPort(
requires=(constants.AMPHORA, constants.AMPHORAE_NETWORK_CONFIG)))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
requires=(constants.AMPHORA, constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))
# Listeners update needs to be run on all amphora to update
# VRRP update needs to be run on all amphora to update
# their peer configurations. So parallelize this with an
# unordered subflow.
update_amps_subflow = unordered_flow.Flow(
constants.UPDATE_AMPS_SUBFLOW)
update_amps_subflow = unordered_flow.Flow('VRRP-update-subflow')
# We have three tasks to run in order, per amphora
amp_0_subflow = linear_flow.Flow('VRRP-amp-0-update-subflow')
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.AMPHORAE,
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPUpdate(
name=sf_name + '-0-' + constants.AMP_VRRP_UPDATE,
requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMP_VRRP_INT),
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-0-' + constants.AMP_VRRP_START,
requires=constants.AMPHORAE,
inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow = linear_flow.Flow('VRRP-amp-1-update-subflow')
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.AMPHORAE,
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPUpdate(
name=sf_name + '-1-' + constants.AMP_VRRP_UPDATE,
requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMP_VRRP_INT),
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-1-' + constants.AMP_VRRP_START,
requires=constants.AMPHORAE,
inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}))
update_amps_subflow.add(amp_0_subflow)
update_amps_subflow.add(amp_1_subflow)
vrrp_subflow.add(update_amps_subflow)
timeout_dict = {
constants.CONN_MAX_RETRIES:
CONF.haproxy_amphora.active_connection_max_retries,
constants.CONN_RETRY_INTERVAL:
CONF.haproxy_amphora.active_connection_rety_interval}
# Setup parallel flows for each amp. We don't know the new amp
# details at flow creation time, so setup a subflow for each
# amp on the LB, they let the task index into a list of amps
# to find the amphora it should work on.
amp_index = 0
db_lb = self.lb_repo.get(db_apis.get_session(),
id=load_balancer[constants.LOADBALANCER_ID])
for amp in db_lb.amphorae:
if amp.status == constants.DELETED:
continue
update_amps_subflow.add(
amphora_driver_tasks.AmpListenersUpdate(
name=constants.AMP_LISTENER_UPDATE + '-' + str(amp_index),
requires=(constants.LOADBALANCER, constants.AMPHORAE),
inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict}))
amp_index += 1
failover_amphora_flow.add(update_amps_subflow)
# Plug the member networks into the new amphora
failover_amphora_flow.add(network_tasks.CalculateAmphoraDelta(
requires=(constants.LOADBALANCER, constants.AMPHORA,
constants.AVAILABILITY_ZONE),
provides=constants.DELTA))
failover_amphora_flow.add(network_tasks.HandleNetworkDelta(
requires=(constants.AMPHORA, constants.DELTA),
provides=constants.ADDED_PORTS))
failover_amphora_flow.add(amphora_driver_tasks.AmphoraePostNetworkPlug(
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
failover_amphora_flow.add(database_tasks.ReloadLoadBalancer(
name='octavia-failover-LB-reload-2',
requires=constants.LOADBALANCER_ID,
provides=constants.LOADBALANCER))
# Handle the amphora role and VRRP if necessary
if role == constants.ROLE_MASTER:
failover_amphora_flow.add(database_tasks.MarkAmphoraMasterInDB(
name=constants.MARK_AMP_MASTER_INDB,
requires=constants.AMPHORA))
vrrp_subflow = self.get_vrrp_subflow(role)
failover_amphora_flow.add(vrrp_subflow)
elif role == constants.ROLE_BACKUP:
failover_amphora_flow.add(database_tasks.MarkAmphoraBackupInDB(
name=constants.MARK_AMP_BACKUP_INDB,
requires=constants.AMPHORA))
vrrp_subflow = self.get_vrrp_subflow(role)
failover_amphora_flow.add(vrrp_subflow)
elif role == constants.ROLE_STANDALONE:
failover_amphora_flow.add(
database_tasks.MarkAmphoraStandAloneInDB(
name=constants.MARK_AMP_STANDALONE_INDB,
requires=constants.AMPHORA))
failover_amphora_flow.add(amphora_driver_tasks.ListenersStart(
requires=(constants.LOADBALANCER, constants.AMPHORA)))
failover_amphora_flow.add(
database_tasks.DisableAmphoraHealthMonitoring(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
return failover_amphora_flow
def get_vrrp_subflow(self, prefix):
sf_name = prefix + '-' + constants.GET_VRRP_SUBFLOW
vrrp_subflow = linear_flow.Flow(sf_name)
vrrp_subflow.add(network_tasks.GetAmphoraeNetworkConfigs(
name=sf_name + '-' + constants.GET_AMP_NETWORK_CONFIG,
requires=constants.LOADBALANCER,
provides=constants.AMPHORAE_NETWORK_CONFIG))
vrrp_subflow.add(amphora_driver_tasks.AmphoraUpdateVRRPInterface(
name=sf_name + '-' + constants.AMP_UPDATE_VRRP_INTF,
requires=constants.LOADBALANCER,
provides=constants.LOADBALANCER))
vrrp_subflow.add(database_tasks.CreateVRRPGroupForLB(
name=sf_name + '-' + constants.CREATE_VRRP_GROUP_FOR_LB,
requires=constants.LOADBALANCER,
provides=constants.LOADBALANCER))
vrrp_subflow.add(amphora_driver_tasks.AmphoraVRRPUpdate(
name=sf_name + '-' + constants.AMP_VRRP_UPDATE,
requires=(constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))
vrrp_subflow.add(amphora_driver_tasks.AmphoraVRRPStart(
name=sf_name + '-' + constants.AMP_VRRP_START,
requires=constants.LOADBALANCER))
return vrrp_subflow
def cert_rotate_amphora_flow(self):
"""Implement rotation for amphora's cert.
1. Create a new certificate
2. Upload the cert to amphora
3. update the newly created certificate info to amphora
4. update the cert_busy flag to be false after rotation
1. Create a new certificate
2. Upload the cert to amphora
3. update the newly created certificate info to amphora
4. update the cert_busy flag to be false after rotation
:returns: The flow for updating an amphora
"""
@ -644,3 +477,254 @@ class AmphoraFlows(object):
requires=(constants.AMPHORA, constants.FLAVOR)))
return update_amphora_flow
def get_amphora_for_lb_failover_subflow(
self, prefix, role=constants.ROLE_STANDALONE,
failed_amp_vrrp_port_id=None, is_vrrp_ipv6=False, is_spare=False):
"""Creates a new amphora that will be used in a failover flow.
:requires: loadbalancer_id, flavor, vip, vip_sg_id, loadbalancer
:provides: amphora_id, amphora
:param prefix: The flow name prefix to use on the flow and tasks.
:param role: The role this amphora will have in the topology.
:param failed_amp_vrrp_port_id: The base port ID of the failed amp.
:param is_vrrp_ipv6: True if the base port IP is IPv6.
:param is_spare: True if we are getting a spare amphroa.
:return: A Taskflow sub-flow that will create the amphora.
"""
sf_name = prefix + '-' + constants.CREATE_AMP_FOR_FAILOVER_SUBFLOW
amp_for_failover_flow = linear_flow.Flow(sf_name)
# Try to allocate or boot an amphora instance (unconfigured)
amp_for_failover_flow.add(self.get_amphora_for_lb_subflow(
prefix=prefix + '-' + constants.FAILOVER_LOADBALANCER_FLOW,
role=role, is_spare=is_spare))
# If we are getting a spare amphora, this is all we need to do.
if is_spare:
return amp_for_failover_flow
# Create the VIP base (aka VRRP) port for the amphora.
amp_for_failover_flow.add(network_tasks.CreateVIPBasePort(
name=prefix + '-' + constants.CREATE_VIP_BASE_PORT,
requires=(constants.VIP, constants.VIP_SG_ID,
constants.AMPHORA_ID),
provides=constants.BASE_PORT))
# Attach the VIP base (aka VRRP) port to the amphora.
amp_for_failover_flow.add(compute_tasks.AttachPort(
name=prefix + '-' + constants.ATTACH_PORT,
requires=(constants.AMPHORA, constants.PORT),
rebind={constants.PORT: constants.BASE_PORT}))
# Update the amphora database record with the VIP base port info.
amp_for_failover_flow.add(database_tasks.UpdateAmpFailoverDetails(
name=prefix + '-' + constants.UPDATE_AMP_FAILOVER_DETAILS,
requires=(constants.AMPHORA, constants.VIP, constants.BASE_PORT)))
# Update the amphora networking for the plugged VIP port
amp_for_failover_flow.add(network_tasks.GetAmphoraNetworkConfigsByID(
name=prefix + '-' + constants.GET_AMPHORA_NETWORK_CONFIGS_BY_ID,
requires=(constants.LOADBALANCER_ID, constants.AMPHORA_ID),
provides=constants.AMPHORAE_NETWORK_CONFIG))
# Disable the base (vrrp) port on the failed amphora
# This prevents a DAD failure when bringing up the new amphora.
# Keepalived will handle this for act/stdby.
if (role == constants.ROLE_STANDALONE and failed_amp_vrrp_port_id and
is_vrrp_ipv6):
amp_for_failover_flow.add(network_tasks.AdminDownPort(
name=prefix + '-' + constants.ADMIN_DOWN_PORT,
inject={constants.PORT_ID: failed_amp_vrrp_port_id}))
amp_for_failover_flow.add(amphora_driver_tasks.AmphoraPostVIPPlug(
name=prefix + '-' + constants.AMPHORA_POST_VIP_PLUG,
requires=(constants.AMPHORA, constants.LOADBALANCER,
constants.AMPHORAE_NETWORK_CONFIG)))
# Plug member ports
amp_for_failover_flow.add(network_tasks.CalculateAmphoraDelta(
name=prefix + '-' + constants.CALCULATE_AMPHORA_DELTA,
requires=(constants.LOADBALANCER, constants.AMPHORA,
constants.AVAILABILITY_ZONE, constants.VRRP_PORT),
rebind={constants.VRRP_PORT: constants.BASE_PORT},
provides=constants.DELTA))
amp_for_failover_flow.add(network_tasks.HandleNetworkDelta(
name=prefix + '-' + constants.HANDLE_NETWORK_DELTA,
requires=(constants.AMPHORA, constants.DELTA),
provides=constants.ADDED_PORTS))
amp_for_failover_flow.add(amphora_driver_tasks.AmphoraePostNetworkPlug(
name=prefix + '-' + constants.AMPHORAE_POST_NETWORK_PLUG,
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
return amp_for_failover_flow
def get_failover_amphora_flow(self, failed_amphora, lb_amp_count):
"""Get a Taskflow flow to failover an amphora.
1. Build a replacement amphora.
2. Delete the old amphora.
3. Update the amphorae listener configurations.
4. Update the VRRP configurations if needed.
:param failed_amphora: The amphora dict to failover.
:param lb_amp_count: The number of amphora on this load balancer.
:returns: The flow that will provide the failover.
"""
failover_amp_flow = linear_flow.Flow(
constants.FAILOVER_AMPHORA_FLOW)
# Revert amphora to status ERROR if this flow goes wrong
failover_amp_flow.add(lifecycle_tasks.AmphoraToErrorOnRevertTask(
requires=constants.AMPHORA,
inject={constants.AMPHORA: failed_amphora}))