Spare pool removal

Spare pool feature was deprecated in Victoria, we decided to remove it
during the Xena release cycle.

Change-Id: I830c6a4c49fa47105f788cf99a0f775e5dbdcaea
This commit is contained in:
Gregory Thiemonge 2021-04-22 18:43:42 +02:00
parent 042a6ea925
commit 815a283823
36 changed files with 83 additions and 1488 deletions

View File

@ -36,11 +36,6 @@ enable_service o-cw
enable_service o-hm enable_service o-hm
enable_service o-hk enable_service o-hk
#NOTE(mangelajo): there are possibly bugs in the housekeeper that needs to be
# addressed to make it fully stateless. Now as per @lingxian
# the house keeper could create more spare amphoras than needed
# in parallel nodes.
OCTAVIA_USE_PREGENERATED_CERTS=True OCTAVIA_USE_PREGENERATED_CERTS=True
OCTAVIA_USE_PREGENERATED_SSH_KEY=True OCTAVIA_USE_PREGENERATED_SSH_KEY=True
OCTAVIA_CONTROLLER_IP_PORT_LIST=192.168.0.3:5555,192.168.0.4:5555 OCTAVIA_CONTROLLER_IP_PORT_LIST=192.168.0.3:5555,192.168.0.4:5555

View File

@ -186,33 +186,6 @@ If you didn't configure image tags and instead configured an image id, you
will need to update the Octavia configuration file with the new id and restart will need to update the Octavia configuration file with the new id and restart
the Octavia services (except octavia-api). the Octavia services (except octavia-api).
Rotating spare Amphorae
-----------------------
.. warning::
Spares pool support is deprecated as of the Victoria release.
If the spare pool is enabled in Octavia, spare amphorae must be rotated
first, so a new load balancer will use the new amphora image from a newly
spawned spare amphora.
To rotate spare amphorae, list the IDs of all amphorae in ``READY`` status:
.. code-block:: bash
openstack loadbalancer amphora list -c id -f value --status READY
Then, for each ID, perform the failover on the amphora:
.. code-block:: bash
openstack loadbalancer amphora failover <amphora id>
Spare amphorae now use the new amphora image, and those spare amphorae will be
used when creating a new load balancer or when performing a failover of a load
balancer.
Generating a List of Load Balancers to Rotate Generating a List of Load Balancers to Rotate
--------------------------------------------- ---------------------------------------------
@ -242,32 +215,6 @@ show <loadbalancer id>`` until the load balancer goes ``ACTIVE`` again.
Best Practices/Optimizations Best Practices/Optimizations
---------------------------- ----------------------------
To speed up the failovers, the spare pool can be temporarily increased to
accommodate the rapid failover of the amphora. In this case after the
new image has been loaded into glance, shut down or initiate a failover of the
amphora in the spare pool. They can be found by listing amphorae in ``READY``
status:
.. code-block:: bash
openstack loadbalancer amphora list --status READY
After you have increased the spare pool size and restarted all Octavia
services, failovers will be greatly accelerated. To preserve resources,
restore the old settings and restart the Octavia services. Since Octavia won't
terminate superfluous spare amphora on its own, they can be left in the system
and will automatically be used up as new load balancers are created and/or
load balancers in error state are failed over.
.. warning::
If you are using the anti-affinity feature please be aware that it is
not compatible with spare pools and you are risking both the ACTIVE and
BACKUP amphora being scheduled on the same host. It is recommended to
not increase the spare pool during fail overs in this case (and not to use
the spare pool at all).
Since a failover puts significant load on the OpenStack installation by Since a failover puts significant load on the OpenStack installation by
creating new virtual machines and ports, it should either be done at a very creating new virtual machines and ports, it should either be done at a very
slow pace, during a time with little load, or with the right throttling slow pace, during a time with little load, or with the right throttling
@ -392,10 +339,7 @@ Once this is changed Octavia can't read any heartbeats and will assume
all amphora are in an error state and initiate an immediate failover. all amphora are in an error state and initiate an immediate failover.
In preparation, read the chapter on :ref:`best_practice` in In preparation, read the chapter on :ref:`best_practice` in
the Failover section. In particular, it is advisable if the throttling the Failover section.
enhancement (available in Pike) doesn't exist to create a sufficient
number of spare amphorae to mitigate the stress on the OpenStack installation
when Octavia starts to replace all amphora immediately.
Given the risks involved with changing this key it should not be changed Given the risks involved with changing this key it should not be changed
during routine maintenance but only when a compromise is strongly suspected. during routine maintenance but only when a compromise is strongly suspected.

View File

@ -346,8 +346,6 @@ defaults. Your specific environment may require more than this:
+-----------------------+-------------------------------+ +-----------------------+-------------------------------+
| health_manager | heartbeat_key | | health_manager | heartbeat_key |
+-----------------------+-------------------------------+ +-----------------------+-------------------------------+
| house_keeping | spare_amphora_pool_size |
+-----------------------+-------------------------------+
| keystone_authtoken | admin_password | | keystone_authtoken | admin_password |
+-----------------------+-------------------------------+ +-----------------------+-------------------------------+
| keystone_authtoken | admin_tenant_name | | keystone_authtoken | admin_tenant_name |
@ -374,30 +372,6 @@ You must:
* Create or update ``/etc/octavia/octavia.conf`` appropriately. * Create or update ``/etc/octavia/octavia.conf`` appropriately.
Spares pool considerations
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. warning::
Spares pool support is deprecated as of the Victoria release.
One configuration directive deserves some extra consideration in this document:
Depending on the specifics of your production environment, you may decide to
run Octavia with a non-empty "spares pool." Since the time it takes to spin up
a new amphora can be non-trivial in some cloud environments (and the
reliability of such operations can sometimes be less than ideal), this
directive instructs Octavia to attempt to maintain a certain number of amphorae
running in an idle, unconfigured state. These amphora will run base amphora
health checks and wait for configuration from the Octavia controller. The
overall effect of this is to greatly reduce the time it takes and increase the
reliability of deploying a new load balancing service on demand. This comes at
the cost of having a number of deployed amphorae which consume resources but
are not actively providing load balancing services, and at the cost of not
being able to use Nova anti-affinity features for ACTIVE-STANDBY load
balancer topologies.
Initialize Octavia Database Initialize Octavia Database
___________________________ ___________________________
This is controlled through alembic migrations under the octavia/db directory in This is controlled through alembic migrations under the octavia/db directory in

View File

@ -143,8 +143,7 @@ Octavia version 4.0 consists of the following major components:
failover events if amphorae fail unexpectedly. failover events if amphorae fail unexpectedly.
* **Housekeeping Manager** - This subcomponent cleans up stale (deleted) * **Housekeeping Manager** - This subcomponent cleans up stale (deleted)
database records, manages the spares pool, and manages amphora certificate database records and manages amphora certificate rotation.
rotation.
* **Driver Agent** - The driver agent receives status and statistics updates * **Driver Agent** - The driver agent receives status and statistics updates
from provider drivers. from provider drivers.

View File

@ -431,11 +431,6 @@
[house_keeping] [house_keeping]
# Note: Spares pools support is deprecated as of the Victoria release.
# Interval in seconds to initiate spare amphora checks
# spare_check_interval = 30
# spare_amphora_pool_size = 0
# Cleanup interval for Deleted amphora # Cleanup interval for Deleted amphora
# cleanup_interval = 30 # cleanup_interval = 30
# Amphora expiry age in seconds. Default is 1 week # Amphora expiry age in seconds. Default is 1 week

View File

@ -357,35 +357,33 @@ class UpdateHealthDb:
self._update_listener_count_for_UDP( self._update_listener_count_for_UDP(
session, db_lb, expected_listener_count)) session, db_lb, expected_listener_count))
else: else:
# If this is not a spare amp, log and skip it.
amp = self.amphora_repo.get(session, id=health['id']) amp = self.amphora_repo.get(session, id=health['id'])
if not amp or amp.load_balancer_id: # This is debug and not warning because this can happen under
# This is debug and not warning because this can happen under # normal deleting operations.
# normal deleting operations. LOG.debug('Received a health heartbeat from amphora %s with '
LOG.debug('Received a health heartbeat from amphora %s with ' 'IP %s that should not exist. This amphora may be '
'IP %s that should not exist. This amphora may be ' 'in the process of being deleted, in which case you '
'in the process of being deleted, in which case you ' 'will only see this message a few '
'will only see this message a few ' 'times', health['id'], srcaddr)
'times', health['id'], srcaddr) if not amp:
if not amp: LOG.warning('The amphora %s with IP %s is missing from '
LOG.warning('The amphora %s with IP %s is missing from ' 'the DB, so it cannot be automatically '
'the DB, so it cannot be automatically ' 'deleted (the compute_id is unknown). An '
'deleted (the compute_id is unknown). An ' 'operator must manually delete it from the '
'operator must manually delete it from the ' 'compute service.', health['id'], srcaddr)
'compute service.', health['id'], srcaddr) return
return # delete the amp right there
# delete the amp right there try:
try: compute = stevedore_driver.DriverManager(
compute = stevedore_driver.DriverManager( namespace='octavia.compute.drivers',
namespace='octavia.compute.drivers', name=CONF.controller_worker.compute_driver,
name=CONF.controller_worker.compute_driver, invoke_on_load=True
invoke_on_load=True ).driver
).driver compute.delete(amp.compute_id)
compute.delete(amp.compute_id) return
return except Exception as e:
except Exception as e: LOG.info("Error deleting amp %s with IP %s Error: %s",
LOG.info("Error deleting amp %s with IP %s Error: %s", health['id'], srcaddr, str(e))
health['id'], srcaddr, str(e))
expected_listener_count = 0 expected_listener_count = 0
listeners = health['listeners'] listeners = health['listeners']
@ -426,7 +424,7 @@ class UpdateHealthDb:
{'id': health['id'], 'found': len(listeners), {'id': health['id'], 'found': len(listeners),
'expected': expected_listener_count}) 'expected': expected_listener_count})
# Don't try to update status for spares pool amphora # Don't try to update status for bogus or old spares pool amphora
if not db_lb: if not db_lb:
return return

View File

@ -144,18 +144,13 @@ class FailoverController(base.BaseController):
db_amp = self._get_db_amp(context.session, self.amp_id, db_amp = self._get_db_amp(context.session, self.amp_id,
show_deleted=False) show_deleted=False)
# Check to see if the amphora is a spare (not associated with an LB) self._auth_validate_action(
if db_amp.load_balancer: context, db_amp.load_balancer.project_id,
self._auth_validate_action( constants.RBAC_PUT_FAILOVER)
context, db_amp.load_balancer.project_id,
constants.RBAC_PUT_FAILOVER)
self.repositories.load_balancer.test_and_set_provisioning_status( self.repositories.load_balancer.test_and_set_provisioning_status(
context.session, db_amp.load_balancer_id, context.session, db_amp.load_balancer_id,
status=constants.PENDING_UPDATE, raise_exception=True) status=constants.PENDING_UPDATE, raise_exception=True)
else:
self._auth_validate_action(
context, context.project_id, constants.RBAC_PUT_FAILOVER)
try: try:
LOG.info("Sending failover request for amphora %s to the queue", LOG.info("Sending failover request for amphora %s to the queue",
@ -196,14 +191,9 @@ class AmphoraUpdateController(base.BaseController):
db_amp = self._get_db_amp(context.session, self.amp_id, db_amp = self._get_db_amp(context.session, self.amp_id,
show_deleted=False) show_deleted=False)
# Check to see if the amphora is a spare (not associated with an LB) self._auth_validate_action(
if db_amp.load_balancer: context, db_amp.load_balancer.project_id,
self._auth_validate_action( constants.RBAC_PUT_CONFIG)
context, db_amp.load_balancer.project_id,
constants.RBAC_PUT_CONFIG)
else:
self._auth_validate_action(
context, context.project_id, constants.RBAC_PUT_CONFIG)
try: try:
LOG.info("Sending amphora agent update request for amphora %s to " LOG.info("Sending amphora agent update request for amphora %s to "

View File

@ -29,29 +29,10 @@ from octavia import version
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
spare_amp_thread_event = threading.Event()
db_cleanup_thread_event = threading.Event() db_cleanup_thread_event = threading.Event()
cert_rotate_thread_event = threading.Event() cert_rotate_thread_event = threading.Event()
def spare_amphora_check():
"""Initiates spare amp check with respect to configured interval."""
# Read the interval from CONF
interval = CONF.house_keeping.spare_check_interval
LOG.info("Spare check interval is set to %d sec", interval)
spare_amp = house_keeping.SpareAmphora()
while not spare_amp_thread_event.is_set():
LOG.debug("Initiating spare amphora check...")
try:
spare_amp.spare_check()
except Exception as e:
LOG.debug('spare_amphora caught the following exception and '
'is restarting: %s', str(e))
spare_amp_thread_event.wait(interval)
def db_cleanup(): def db_cleanup():
"""Perform db cleanup for old resources.""" """Perform db cleanup for old resources."""
# Read the interval from CONF # Read the interval from CONF
@ -105,12 +86,6 @@ def main():
threads = [] threads = []
# Thread to perform spare amphora check
spare_amp_thread = threading.Thread(target=spare_amphora_check)
spare_amp_thread.daemon = True
spare_amp_thread.start()
threads.append(spare_amp_thread)
# Thread to perform db cleanup # Thread to perform db cleanup
db_cleanup_thread = threading.Thread(target=db_cleanup) db_cleanup_thread = threading.Thread(target=db_cleanup)
db_cleanup_thread.daemon = True db_cleanup_thread.daemon = True
@ -125,10 +100,8 @@ def main():
def process_cleanup(*args, **kwargs): def process_cleanup(*args, **kwargs):
LOG.info("Attempting to gracefully terminate House-Keeping") LOG.info("Attempting to gracefully terminate House-Keeping")
spare_amp_thread_event.set()
db_cleanup_thread_event.set() db_cleanup_thread_event.set()
cert_rotate_thread_event.set() cert_rotate_thread_event.set()
spare_amp_thread.join()
db_cleanup_thread.join() db_cleanup_thread.join()
cert_rotate_thread.join() cert_rotate_thread.join()
LOG.info("House-Keeping process terminated") LOG.info("House-Keeping process terminated")

View File

@ -616,20 +616,6 @@ certificate_opts = [
] ]
house_keeping_opts = [ house_keeping_opts = [
cfg.IntOpt('spare_check_interval',
deprecated_for_removal=True,
deprecated_since='Victoria',
deprecated_reason='Spares Pool support will be removed in the '
'X release.',
default=30,
help=_('Spare check interval in seconds')),
cfg.IntOpt('spare_amphora_pool_size',
deprecated_for_removal=True,
deprecated_since='Victoria',
deprecated_reason='Spares Pool support will be removed in the '
'X release.',
default=0,
help=_('Number of spare amphorae')),
cfg.IntOpt('cleanup_interval', cfg.IntOpt('cleanup_interval',
default=30, default=30,
help=_('DB cleanup interval in seconds')), help=_('DB cleanup interval in seconds')),

View File

@ -579,7 +579,6 @@ RPC_NAMESPACE_CONTROLLER_AGENT = 'controller'
# Build Type Priority # Build Type Priority
LB_CREATE_FAILOVER_PRIORITY = 20 LB_CREATE_FAILOVER_PRIORITY = 20
LB_CREATE_NORMAL_PRIORITY = 40 LB_CREATE_NORMAL_PRIORITY = 40
LB_CREATE_SPARES_POOL_PRIORITY = 60
LB_CREATE_ADMIN_FAILOVER_PRIORITY = 80 LB_CREATE_ADMIN_FAILOVER_PRIORITY = 80
BUILD_TYPE_PRIORITY = 'build_type_priority' BUILD_TYPE_PRIORITY = 'build_type_priority'

View File

@ -429,14 +429,6 @@ def ip_not_reserved(ip_address):
option='member address') option='member address')
def is_flavor_spares_compatible(flavor):
if flavor:
# If a compute flavor is specified, the flavor is not spares compatible
if flavor.get(constants.COMPUTE_FLAVOR, None):
return False
return True
def check_cipher_prohibit_list(cipherstring): def check_cipher_prohibit_list(cipherstring):
ciphers = cipherstring.split(':') ciphers = cipherstring.split(':')
prohibit_list = CONF.api_settings.tls_cipher_prohibit_list.split(':') prohibit_list = CONF.api_settings.tls_cipher_prohibit_list.split(':')

View File

@ -17,7 +17,6 @@ import datetime
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import timeutils
from sqlalchemy.orm import exc as sqlalchemy_exceptions from sqlalchemy.orm import exc as sqlalchemy_exceptions
from octavia.common import constants from octavia.common import constants
@ -30,85 +29,6 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
class SpareAmphora(object):
def __init__(self):
self.amp_repo = repo.AmphoraRepository()
self.spares_repo = repo.SparesPoolRepository()
self.az_repo = repo.AvailabilityZoneRepository()
if CONF.api_settings.default_provider_driver == constants.AMPHORAV2:
self.cw = cw2.ControllerWorker()
self.check_booting_amphora = True
else:
self.cw = cw1.ControllerWorker()
self.check_booting_amphora = False
def spare_check(self):
"""Checks the DB for the Spare amphora count.
If it's less than the requirement, starts new amphora.
"""
lock_session = db_api.get_session(autocommit=False)
session = db_api.get_session()
try:
# Lock the spares_pool record for read and write
spare_amp_row = self.spares_repo.get_for_update(lock_session)
conf_spare_cnt = CONF.house_keeping.spare_amphora_pool_size
LOG.debug("Required Spare Amphora count : %d", conf_spare_cnt)
availability_zones, links = self.az_repo.get_all(session,
enabled=True)
compute_zones = set()
for az in availability_zones:
az_meta = self.az_repo.get_availability_zone_metadata_dict(
session, az.name)
compute_zones.add(az_meta.get(constants.COMPUTE_ZONE))
# If no AZs objects then build in the configured AZ (even if None)
# Also if configured AZ is not None then also build in there
# as could be different to the current AZs objects.
if CONF.nova.availability_zone or not compute_zones:
compute_zones.add(CONF.nova.availability_zone)
amp_booting = []
for az_name in compute_zones:
# TODO(rm_work): If az_name is None, this will get ALL spares
# across all AZs. This is the safest/most backwards compatible
# way I can think of, as cached_zone on the amphora records
# won't ever match. This should not impact any existing deploys
# with no AZ configured, as the behavior should be identical
# in that case. In the case of multiple AZs configured, it will
# simply ensure there are at least <N> spares *somewhere*, but
# will function more accurately if the operator actually
# configures the AZ setting properly.
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(
session, availability_zone=az_name,
check_booting_amphora=self.check_booting_amphora)
LOG.debug("Current Spare Amphora count for AZ %s: %d",
az_name, curr_spare_cnt)
diff_count = conf_spare_cnt - curr_spare_cnt
# When the current spare amphora is less than required
if diff_count > 0:
LOG.info("Initiating creation of %d spare amphora "
"for az %s.", diff_count, az_name)
# Call Amphora Create Flow diff_count times
with futures.ThreadPoolExecutor(
max_workers=conf_spare_cnt) as executor:
for i in range(1, diff_count + 1):
LOG.debug("Starting amphorae number %d ...", i)
amp_booting.append(executor.submit(
self.cw.create_amphora, az_name))
else:
LOG.debug("Current spare amphora count for AZ %s "
"satisfies the requirement", az_name)
# Wait for the amphora boot threads to finish
futures.wait(amp_booting)
spare_amp_row.updated_at = timeutils.utcnow()
lock_session.commit()
except Exception:
lock_session.rollback()
class DatabaseCleanup(object): class DatabaseCleanup(object):
def __init__(self): def __init__(self):
self.amp_repo = repo.AmphoraRepository() self.amp_repo = repo.AmphoraRepository()

View File

@ -85,34 +85,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
return repo.get(db_apis.get_session(), id=id) return repo.get(db_apis.get_session(), id=id)
def create_amphora(self, availability_zone=None):
"""Creates an Amphora.
This is used to create spare amphora.
:returns: amphora_id
"""
try:
store = {constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: None}
if availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), availability_zone))
create_amp_tf = self.taskflow_load(
self._amphora_flows.get_create_amphora_flow(),
store=store)
with tf_logging.DynamicLoggingListener(create_amp_tf, log=LOG):
create_amp_tf.run()
return create_amp_tf.storage.fetch('amphora')
except Exception as e:
LOG.error('Failed to create an amphora due to: %s', str(e))
return None
def delete_amphora(self, amphora_id): def delete_amphora(self, amphora_id):
"""Deletes an existing Amphora. """Deletes an existing Amphora.

View File

@ -16,7 +16,6 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from taskflow.patterns import graph_flow
from taskflow.patterns import linear_flow from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow from taskflow.patterns import unordered_flow
@ -108,7 +107,7 @@ class AmphoraFlows(object):
return post_map_amp_to_lb return post_map_amp_to_lb
def _get_create_amp_for_lb_subflow(self, prefix, role, is_spare=False): def _get_create_amp_for_lb_subflow(self, prefix, role):
"""Create a new amphora for lb.""" """Create a new amphora for lb."""
sf_name = prefix + '-' + constants.CREATE_AMP_FOR_LB_SUBFLOW sf_name = prefix + '-' + constants.CREATE_AMP_FOR_LB_SUBFLOW
@ -156,16 +155,10 @@ class AmphoraFlows(object):
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize( create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE, name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA)) requires=constants.AMPHORA))
if is_spare: create_amp_for_lb_subflow.add(
create_amp_for_lb_subflow.add( database_tasks.MarkAmphoraAllocatedInDB(
database_tasks.MarkAmphoraReadyInDB( name=sf_name + '-' + constants.MARK_AMPHORA_ALLOCATED_INDB,
name=sf_name + '-' + constants.MARK_AMPHORA_READY_INDB, requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
requires=constants.AMPHORA))
else:
create_amp_for_lb_subflow.add(
database_tasks.MarkAmphoraAllocatedInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_ALLOCATED_INDB,
requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
create_amp_for_lb_subflow.add(database_tasks.ReloadAmphora( create_amp_for_lb_subflow.add(database_tasks.ReloadAmphora(
name=sf_name + '-' + constants.RELOAD_AMPHORA, name=sf_name + '-' + constants.RELOAD_AMPHORA,
requires=constants.AMPHORA_ID, requires=constants.AMPHORA_ID,
@ -187,68 +180,9 @@ class AmphoraFlows(object):
return create_amp_for_lb_subflow return create_amp_for_lb_subflow
def _allocate_amp_to_lb_decider(self, history):
"""decides if the lb shall be mapped to a spare amphora
:return: True if a spare amphora exists in DB
"""
return list(history.values())[0] is not None
def _create_new_amp_for_lb_decider(self, history):
"""decides if a new amphora must be created for the lb
:return: True if there is no spare amphora
"""
return list(history.values())[0] is None
def get_amphora_for_lb_subflow( def get_amphora_for_lb_subflow(
self, prefix, role=constants.ROLE_STANDALONE, is_spare=False): self, prefix, role=constants.ROLE_STANDALONE):
"""Tries to allocate a spare amphora to a loadbalancer if none return self._get_create_amp_for_lb_subflow(prefix, role)
exists, create a new amphora.
"""
sf_name = prefix + '-' + constants.GET_AMPHORA_FOR_LB_SUBFLOW
# Don't replace a spare with another spare, just build a fresh one.
if is_spare:
get_spare_amp_flow = linear_flow.Flow(sf_name)
get_spare_amp_flow.add(self._get_create_amp_for_lb_subflow(
prefix, role, is_spare=is_spare))
return get_spare_amp_flow
# We need a graph flow here for a conditional flow
amp_for_lb_flow = graph_flow.Flow(sf_name)
# Setup the task that maps an amphora to a load balancer
allocate_and_associate_amp = database_tasks.MapLoadbalancerToAmphora(
name=sf_name + '-' + constants.MAP_LOADBALANCER_TO_AMPHORA,
requires=(constants.LOADBALANCER_ID, constants.FLAVOR,
constants.AVAILABILITY_ZONE),
provides=constants.AMPHORA_ID)
# Define a subflow for if we successfully map an amphora
map_lb_to_amp = self._get_post_map_lb_subflow(prefix, role)
# Define a subflow for if we can't map an amphora
create_amp = self._get_create_amp_for_lb_subflow(prefix, role)
# Add them to the graph flow
amp_for_lb_flow.add(allocate_and_associate_amp,
map_lb_to_amp, create_amp)
# Setup the decider for the path if we can map an amphora
amp_for_lb_flow.link(allocate_and_associate_amp, map_lb_to_amp,
decider=self._allocate_amp_to_lb_decider,
decider_depth='flow')
# Setup the decider for the path if we can't map an amphora
amp_for_lb_flow.link(allocate_and_associate_amp, create_amp,
decider=self._create_new_amp_for_lb_decider,
decider_depth='flow')
return amp_for_lb_flow
def get_delete_amphora_flow( def get_delete_amphora_flow(
self, amphora, self, amphora,
@ -433,7 +367,7 @@ class AmphoraFlows(object):
def get_amphora_for_lb_failover_subflow( def get_amphora_for_lb_failover_subflow(
self, prefix, role=constants.ROLE_STANDALONE, self, prefix, role=constants.ROLE_STANDALONE,
failed_amp_vrrp_port_id=None, is_vrrp_ipv6=False, is_spare=False): failed_amp_vrrp_port_id=None, is_vrrp_ipv6=False):
"""Creates a new amphora that will be used in a failover flow. """Creates a new amphora that will be used in a failover flow.
:requires: loadbalancer_id, flavor, vip, vip_sg_id, loadbalancer :requires: loadbalancer_id, flavor, vip, vip_sg_id, loadbalancer
@ -442,7 +376,6 @@ class AmphoraFlows(object):
:param role: The role this amphora will have in the topology. :param role: The role this amphora will have in the topology.
:param failed_amp_vrrp_port_id: The base port ID of the failed amp. :param failed_amp_vrrp_port_id: The base port ID of the failed amp.
:param is_vrrp_ipv6: True if the base port IP is IPv6. :param is_vrrp_ipv6: True if the base port IP is IPv6.
:param is_spare: True if we are getting a spare amphroa.
:return: A Taskflow sub-flow that will create the amphora. :return: A Taskflow sub-flow that will create the amphora.
""" """
@ -453,11 +386,7 @@ class AmphoraFlows(object):
# Try to allocate or boot an amphora instance (unconfigured) # Try to allocate or boot an amphora instance (unconfigured)
amp_for_failover_flow.add(self.get_amphora_for_lb_subflow( amp_for_failover_flow.add(self.get_amphora_for_lb_subflow(
prefix=prefix + '-' + constants.FAILOVER_LOADBALANCER_FLOW, prefix=prefix + '-' + constants.FAILOVER_LOADBALANCER_FLOW,
role=role, is_spare=is_spare)) role=role))
# If we are getting a spare amphora, this is all we need to do.
if is_spare:
return amp_for_failover_flow
# Create the VIP base (aka VRRP) port for the amphora. # Create the VIP base (aka VRRP) port for the amphora.
amp_for_failover_flow.add(network_tasks.CreateVIPBasePort( amp_for_failover_flow.add(network_tasks.CreateVIPBasePort(
@ -547,8 +476,6 @@ class AmphoraFlows(object):
amp_role = 'master_or_backup' amp_role = 'master_or_backup'
elif failed_amphora.role == constants.ROLE_STANDALONE: elif failed_amphora.role == constants.ROLE_STANDALONE:
amp_role = 'standalone' amp_role = 'standalone'
elif failed_amphora.role is None:
amp_role = 'spare'
else: else:
amp_role = 'undefined' amp_role = 'undefined'
LOG.info("Performing failover for amphora: %s", LOG.info("Performing failover for amphora: %s",
@ -570,10 +497,8 @@ class AmphoraFlows(object):
requires=constants.LOADBALANCER_ID, requires=constants.LOADBALANCER_ID,
provides=constants.VIP_SG_ID)) provides=constants.VIP_SG_ID))
is_spare = True
is_vrrp_ipv6 = False is_vrrp_ipv6 = False
if failed_amphora.load_balancer_id: if failed_amphora.load_balancer_id:
is_spare = False
if failed_amphora.vrrp_ip: if failed_amphora.vrrp_ip:
is_vrrp_ipv6 = utils.is_ipv6(failed_amphora.vrrp_ip) is_vrrp_ipv6 = utils.is_ipv6(failed_amphora.vrrp_ip)
@ -590,8 +515,7 @@ class AmphoraFlows(object):
prefix=constants.FAILOVER_LOADBALANCER_FLOW, prefix=constants.FAILOVER_LOADBALANCER_FLOW,
role=failed_amphora.role, role=failed_amphora.role,
failed_amp_vrrp_port_id=failed_amphora.vrrp_port_id, failed_amp_vrrp_port_id=failed_amphora.vrrp_port_id,
is_vrrp_ipv6=is_vrrp_ipv6, is_vrrp_ipv6=is_vrrp_ipv6))
is_spare=is_spare))
failover_amp_flow.add( failover_amp_flow.add(
self.get_delete_amphora_flow( self.get_delete_amphora_flow(
@ -605,7 +529,7 @@ class AmphoraFlows(object):
inject={constants.AMPHORA: failed_amphora})) inject={constants.AMPHORA: failed_amphora}))
if not failed_amphora.load_balancer_id: if not failed_amphora.load_balancer_id:
# This is an unallocated amphora (spares pool), we are done. # This is an unallocated amphora (bogus), we are done.
return failover_amp_flow return failover_amp_flow
failover_amp_flow.add(database_tasks.GetLoadBalancer( failover_amp_flow.add(database_tasks.GetLoadBalancer(

View File

@ -45,9 +45,8 @@ class LoadBalancerFlows(object):
self.member_flows = member_flows.MemberFlows() self.member_flows = member_flows.MemberFlows()
def get_create_load_balancer_flow(self, topology, listeners=None): def get_create_load_balancer_flow(self, topology, listeners=None):
"""Creates a conditional graph flow that allocates a loadbalancer to """Creates a conditional graph flow that allocates a loadbalancer.
two spare amphorae.
:raises InvalidTopology: Invalid topology specified :raises InvalidTopology: Invalid topology specified
:return: The graph flow for creating a loadbalancer. :return: The graph flow for creating a loadbalancer.
""" """
@ -400,8 +399,6 @@ class LoadBalancerFlows(object):
amp_role = 'master_or_backup' amp_role = 'master_or_backup'
elif failed_amp.role == constants.ROLE_STANDALONE: elif failed_amp.role == constants.ROLE_STANDALONE:
amp_role = 'standalone' amp_role = 'standalone'
elif failed_amp.role is None:
amp_role = 'spare'
else: else:
amp_role = 'undefined' amp_role = 'undefined'
LOG.info("Performing failover for amphora: %s", LOG.info("Performing failover for amphora: %s",
@ -547,8 +544,6 @@ class LoadBalancerFlows(object):
amp_role = 'master_or_backup' amp_role = 'master_or_backup'
elif failed_amp.role == constants.ROLE_STANDALONE: elif failed_amp.role == constants.ROLE_STANDALONE:
amp_role = 'standalone' amp_role = 'standalone'
elif failed_amp.role is None:
amp_role = 'spare'
else: else:
amp_role = 'undefined' amp_role = 'undefined'
LOG.info("Performing failover for amphora: %s", LOG.info("Performing failover for amphora: %s",

View File

@ -28,7 +28,6 @@ from octavia.common import constants
from octavia.common import data_models from octavia.common import data_models
import octavia.common.tls_utils.cert_parser as cert_parser import octavia.common.tls_utils.cert_parser as cert_parser
from octavia.common import utils from octavia.common import utils
from octavia.common import validate
from octavia.controller.worker import task_utils as task_utilities from octavia.controller.worker import task_utils as task_utilities
from octavia.db import api as db_apis from octavia.db import api as db_apis
from octavia.db import repositories as repo from octavia.db import repositories as repo
@ -497,66 +496,6 @@ class AssociateFailoverAmphoraWithLBID(BaseDatabaseTask):
"%(except)s", {'amp': amphora_id, 'except': str(e)}) "%(except)s", {'amp': amphora_id, 'except': str(e)})
class MapLoadbalancerToAmphora(BaseDatabaseTask):
"""Maps and assigns a load balancer to an amphora in the database."""
def execute(self, loadbalancer_id, server_group_id=None, flavor=None,
availability_zone=None):
"""Allocates an Amphora for the load balancer in the database.
:param loadbalancer_id: The load balancer id to map to an amphora
:returns: Amphora ID if one was allocated, None if it was
unable to allocate an Amphora
"""
LOG.debug("Allocating an Amphora for load balancer with id %s",
loadbalancer_id)
if server_group_id is not None:
LOG.debug("Load balancer is using anti-affinity. Skipping spares "
"pool allocation.")
return None
# Validate the flavor is spares compatible
if not validate.is_flavor_spares_compatible(flavor):
LOG.debug("Load balancer has a flavor that is not compatible with "
"using spares pool amphora. Skipping spares pool "
"allocation.")
return None
if availability_zone:
amp_az = availability_zone.get(constants.COMPUTE_ZONE)
else:
amp_az = CONF.nova.availability_zone
try:
amp = self.amphora_repo.allocate_and_associate(
db_apis.get_session(),
loadbalancer_id,
amp_az)
except Exception as e:
LOG.error("Failed to get a spare amphora (AZ: %(amp_az)s) for "
"loadbalancer %(lb_id)s due to: %(except)s",
{'amp_az': amp_az, 'lb_id': loadbalancer_id,
'except': str(e)})
return None
if amp is None:
LOG.debug("No Amphora available for load balancer with id %s",
loadbalancer_id)
return None
LOG.debug("Allocated Amphora with id %(amp)s for load balancer "
"with id %(lb)s", {'amp': amp.id, 'lb': loadbalancer_id})
return amp.id
def revert(self, result, loadbalancer_id, *args, **kwargs):
LOG.warning("Reverting Amphora allocation for the load "
"balancer %s in the database.", loadbalancer_id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer_id)
class _MarkAmphoraRoleAndPriorityInDB(BaseDatabaseTask): class _MarkAmphoraRoleAndPriorityInDB(BaseDatabaseTask):
"""Alter the amphora role and priority in DB.""" """Alter the amphora role and priority in DB."""

View File

@ -107,29 +107,6 @@ class ControllerWorker(object):
with tf_logging.DynamicLoggingListener(tf, log=LOG): with tf_logging.DynamicLoggingListener(tf, log=LOG):
tf.run() tf.run()
def create_amphora(self, availability_zone=None):
"""Creates an Amphora.
This is used to create spare amphora.
:returns: uuid
"""
try:
store = {constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: None}
if availability_zone:
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), availability_zone))
self.run_flow(
flow_utils.get_create_amphora_flow,
store=store, wait=True)
except Exception as e:
LOG.error('Failed to create an amphora due to: %s', str(e))
def delete_amphora(self, amphora_id): def delete_amphora(self, amphora_id):
"""Deletes an existing Amphora. """Deletes an existing Amphora.

View File

@ -16,7 +16,6 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from taskflow.patterns import graph_flow
from taskflow.patterns import linear_flow from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow from taskflow.patterns import unordered_flow
@ -82,33 +81,7 @@ class AmphoraFlows(object):
return create_amphora_flow return create_amphora_flow
def _get_post_map_lb_subflow(self, prefix, role): def get_amphora_for_lb_subflow(self, prefix, role):
"""Set amphora type after mapped to lb."""
sf_name = prefix + '-' + constants.POST_MAP_AMP_TO_LB_SUBFLOW
post_map_amp_to_lb = linear_flow.Flow(
sf_name)
post_map_amp_to_lb.add(amphora_driver_tasks.AmphoraConfigUpdate(
name=sf_name + '-' + constants.AMPHORA_CONFIG_UPDATE_TASK,
requires=(constants.AMPHORA, constants.FLAVOR)))
if role == constants.ROLE_MASTER:
post_map_amp_to_lb.add(database_tasks.MarkAmphoraMasterInDB(
name=sf_name + '-' + constants.MARK_AMP_MASTER_INDB,
requires=constants.AMPHORA))
elif role == constants.ROLE_BACKUP:
post_map_amp_to_lb.add(database_tasks.MarkAmphoraBackupInDB(
name=sf_name + '-' + constants.MARK_AMP_BACKUP_INDB,
requires=constants.AMPHORA))
elif role == constants.ROLE_STANDALONE:
post_map_amp_to_lb.add(database_tasks.MarkAmphoraStandAloneInDB(
name=sf_name + '-' + constants.MARK_AMP_STANDALONE_INDB,
requires=constants.AMPHORA))
return post_map_amp_to_lb
def _get_create_amp_for_lb_subflow(self, prefix, role, is_spare=False):
"""Create a new amphora for lb.""" """Create a new amphora for lb."""
sf_name = prefix + '-' + constants.CREATE_AMP_FOR_LB_SUBFLOW sf_name = prefix + '-' + constants.CREATE_AMP_FOR_LB_SUBFLOW
@ -153,16 +126,10 @@ class AmphoraFlows(object):
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize( create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE, name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA)) requires=constants.AMPHORA))
if is_spare: create_amp_for_lb_subflow.add(
create_amp_for_lb_subflow.add( database_tasks.MarkAmphoraAllocatedInDB(
database_tasks.MarkAmphoraReadyInDB( name=sf_name + '-' + constants.MARK_AMPHORA_ALLOCATED_INDB,
name=sf_name + '-' + constants.MARK_AMPHORA_READY_INDB, requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
requires=constants.AMPHORA))
else:
create_amp_for_lb_subflow.add(
database_tasks.MarkAmphoraAllocatedInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_ALLOCATED_INDB,
requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
if role == constants.ROLE_MASTER: if role == constants.ROLE_MASTER:
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraMasterInDB( create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraMasterInDB(
name=sf_name + '-' + constants.MARK_AMP_MASTER_INDB, name=sf_name + '-' + constants.MARK_AMP_MASTER_INDB,
@ -179,22 +146,6 @@ class AmphoraFlows(object):
return create_amp_for_lb_subflow return create_amp_for_lb_subflow
def _allocate_amp_to_lb_decider(self, history):
"""decides if the lb shall be mapped to a spare amphora
:return: True if a spare amphora exists in DB
"""
return list(history.values())[0] is not None
def _create_new_amp_for_lb_decider(self, history):
"""decides if a new amphora must be created for the lb
:return: True if there is no spare amphora
"""
values = history.values()
return not values or list(values)[0] is None
def _retry_flow(self, sf_name): def _retry_flow(self, sf_name):
retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT
retry_subflow = linear_flow.Flow( retry_subflow = linear_flow.Flow(
@ -206,97 +157,6 @@ class AmphoraFlows(object):
inject={'raise_retry_exception': True})) inject={'raise_retry_exception': True}))
return retry_subflow return retry_subflow
def _finalize_flow(self, sf_name, role):
sf_name = sf_name + constants.FINALIZE_AMPHORA_FLOW
create_amp_for_lb_subflow = linear_flow.Flow(sf_name)
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA))
create_amp_for_lb_subflow.add(
database_tasks.MarkAmphoraAllocatedInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_ALLOCATED_INDB,
requires=(constants.AMPHORA, constants.LOADBALANCER_ID)))
create_amp_for_lb_subflow.add(database_tasks.ReloadAmphora(
name=sf_name + '-' + constants.RELOAD_AMPHORA,
requires=constants.AMPHORA,
provides=constants.AMPHORA))
if role == constants.ROLE_MASTER:
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraMasterInDB(
name=sf_name + '-' + constants.MARK_AMP_MASTER_INDB,
requires=constants.AMPHORA))
elif role == constants.ROLE_BACKUP:
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBackupInDB(
name=sf_name + '-' + constants.MARK_AMP_BACKUP_INDB,
requires=constants.AMPHORA))
elif role == constants.ROLE_STANDALONE:
create_amp_for_lb_subflow.add(
database_tasks.MarkAmphoraStandAloneInDB(
name=sf_name + '-' + constants.MARK_AMP_STANDALONE_INDB,
requires=constants.AMPHORA))
return create_amp_for_lb_subflow
def get_amphora_for_lb_subflow(
self, prefix, role=constants.ROLE_STANDALONE, is_spare=False):
"""Tries to allocate a spare amphora to a loadbalancer if none
exists, create a new amphora.
"""
sf_name = prefix + '-' + constants.GET_AMPHORA_FOR_LB_SUBFLOW
# Don't replace a spare with another spare, just build a fresh one.
if is_spare:
get_spare_amp_flow = linear_flow.Flow(sf_name)
get_spare_amp_flow.add(self._get_create_amp_for_lb_subflow(
prefix, role, is_spare=is_spare))
return get_spare_amp_flow
# We need a graph flow here for a conditional flow
amp_for_lb_flow = graph_flow.Flow(sf_name)
# Setup the task that maps an amphora to a load balancer
allocate_and_associate_amp = database_tasks.MapLoadbalancerToAmphora(
name=sf_name + '-' + constants.MAP_LOADBALANCER_TO_AMPHORA,
requires=(constants.LOADBALANCER_ID, constants.FLAVOR,
constants.AVAILABILITY_ZONE),
provides=constants.AMPHORA)
# Define a subflow for if we successfully map an amphora
map_lb_to_amp = self._get_post_map_lb_subflow(prefix, role)
# Define a subflow for if we can't map an amphora
create_amp = self._get_create_amp_for_lb_subflow(prefix, role)
# TODO(ataraday): Have to split create flow due lack of functionality
# in taskflow: related https://bugs.launchpad.net/taskflow/+bug/1480907
retry_flow = self._retry_flow(sf_name)
finalize_flow = self._finalize_flow(sf_name, role)
# Add them to the graph flow
amp_for_lb_flow.add(allocate_and_associate_amp,
map_lb_to_amp, create_amp,
retry_flow, finalize_flow, resolve_requires=False)
# Setup the decider for the path if we can map an amphora
amp_for_lb_flow.link(allocate_and_associate_amp, map_lb_to_amp,
decider=self._allocate_amp_to_lb_decider,
decider_depth='flow')
# Setup the decider for the path if we can't map an amphora
amp_for_lb_flow.link(allocate_and_associate_amp, create_amp,
decider=self._create_new_amp_for_lb_decider,
decider_depth='flow')
# TODO(ataraday): setup separate deciders as we need retry flow
# properly ignored
amp_for_lb_flow.link(create_amp, retry_flow,
decider=self._create_new_amp_for_lb_decider,
decider_depth='flow')
amp_for_lb_flow.link(retry_flow, finalize_flow,
decider=self._create_new_amp_for_lb_decider,
decider_depth='flow')
return amp_for_lb_flow
def get_delete_amphora_flow( def get_delete_amphora_flow(
self, amphora, self, amphora,
retry_attempts=CONF.controller_worker.amphora_delete_retries, retry_attempts=CONF.controller_worker.amphora_delete_retries,
@ -480,7 +340,7 @@ class AmphoraFlows(object):
def get_amphora_for_lb_failover_subflow( def get_amphora_for_lb_failover_subflow(
self, prefix, role=constants.ROLE_STANDALONE, self, prefix, role=constants.ROLE_STANDALONE,
failed_amp_vrrp_port_id=None, is_vrrp_ipv6=False, is_spare=False): failed_amp_vrrp_port_id=None, is_vrrp_ipv6=False):
"""Creates a new amphora that will be used in a failover flow. """Creates a new amphora that will be used in a failover flow.
:requires: loadbalancer_id, flavor, vip, vip_sg_id, loadbalancer :requires: loadbalancer_id, flavor, vip, vip_sg_id, loadbalancer
@ -489,7 +349,6 @@ class AmphoraFlows(object):
:param role: The role this amphora will have in the topology. :param role: The role this amphora will have in the topology.
:param failed_amp_vrrp_port_id: The base port ID of the failed amp. :param failed_amp_vrrp_port_id: The base port ID of the failed amp.
:param is_vrrp_ipv6: True if the base port IP is IPv6. :param is_vrrp_ipv6: True if the base port IP is IPv6.
:param is_spare: True if we are getting a spare amphroa.
:return: A Taskflow sub-flow that will create the amphora. :return: A Taskflow sub-flow that will create the amphora.
""" """
@ -500,11 +359,7 @@ class AmphoraFlows(object):
# Try to allocate or boot an amphora instance (unconfigured) # Try to allocate or boot an amphora instance (unconfigured)
amp_for_failover_flow.add(self.get_amphora_for_lb_subflow( amp_for_failover_flow.add(self.get_amphora_for_lb_subflow(
prefix=prefix + '-' + constants.FAILOVER_LOADBALANCER_FLOW, prefix=prefix + '-' + constants.FAILOVER_LOADBALANCER_FLOW,
role=role, is_spare=is_spare)) role=role))
# If we are getting a spare amphora, this is all we need to do.
if is_spare:
return amp_for_failover_flow
# Create the VIP base (aka VRRP) port for the amphora. # Create the VIP base (aka VRRP) port for the amphora.
amp_for_failover_flow.add(network_tasks.CreateVIPBasePort( amp_for_failover_flow.add(network_tasks.CreateVIPBasePort(
@ -588,8 +443,6 @@ class AmphoraFlows(object):
amp_role = 'master_or_backup' amp_role = 'master_or_backup'
elif failed_amphora[constants.ROLE] == constants.ROLE_STANDALONE: elif failed_amphora[constants.ROLE] == constants.ROLE_STANDALONE:
amp_role = 'standalone' amp_role = 'standalone'
elif failed_amphora[constants.ROLE] is None:
amp_role = 'spare'
else: else:
amp_role = 'undefined' amp_role = 'undefined'
LOG.info("Performing failover for amphora: %s", LOG.info("Performing failover for amphora: %s",
@ -612,10 +465,8 @@ class AmphoraFlows(object):
requires=constants.LOADBALANCER_ID, requires=constants.LOADBALANCER_ID,
provides=constants.VIP_SG_ID)) provides=constants.VIP_SG_ID))
is_spare = True
is_vrrp_ipv6 = False is_vrrp_ipv6 = False
if failed_amphora.get(constants.LOAD_BALANCER_ID): if failed_amphora.get(constants.LOAD_BALANCER_ID):
is_spare = False
if failed_amphora.get(constants.VRRP_IP): if failed_amphora.get(constants.VRRP_IP):
is_vrrp_ipv6 = utils.is_ipv6(failed_amphora[constants.VRRP_IP]) is_vrrp_ipv6 = utils.is_ipv6(failed_amphora[constants.VRRP_IP])
@ -633,8 +484,7 @@ class AmphoraFlows(object):
role=failed_amphora[constants.ROLE], role=failed_amphora[constants.ROLE],
failed_amp_vrrp_port_id=failed_amphora.get( failed_amp_vrrp_port_id=failed_amphora.get(
constants.VRRP_PORT_ID), constants.VRRP_PORT_ID),
is_vrrp_ipv6=is_vrrp_ipv6, is_vrrp_ipv6=is_vrrp_ipv6))
is_spare=is_spare))
failover_amp_flow.add( failover_amp_flow.add(
self.get_delete_amphora_flow( self.get_delete_amphora_flow(
@ -648,7 +498,7 @@ class AmphoraFlows(object):
inject={constants.AMPHORA: failed_amphora})) inject={constants.AMPHORA: failed_amphora}))
if not failed_amphora.get(constants.LOAD_BALANCER_ID): if not failed_amphora.get(constants.LOAD_BALANCER_ID):
# This is an unallocated amphora (spares pool), we are done. # This is an unallocated amphora (bogus), we are done.
return failover_amp_flow return failover_amp_flow
failover_amp_flow.add(database_tasks.GetLoadBalancer( failover_amp_flow.add(database_tasks.GetLoadBalancer(

View File

@ -47,9 +47,8 @@ class LoadBalancerFlows(object):
self.lb_repo = repo.LoadBalancerRepository() self.lb_repo = repo.LoadBalancerRepository()
def get_create_load_balancer_flow(self, topology, listeners=None): def get_create_load_balancer_flow(self, topology, listeners=None):
"""Creates a conditional graph flow that allocates a loadbalancer to """Creates a conditional graph flow that allocates a loadbalancer.
two spare amphorae.
:raises InvalidTopology: Invalid topology specified :raises InvalidTopology: Invalid topology specified
:return: The graph flow for creating a loadbalancer. :return: The graph flow for creating a loadbalancer.
""" """
@ -385,8 +384,6 @@ class LoadBalancerFlows(object):
amp_role = 'master_or_backup' amp_role = 'master_or_backup'
elif failed_amp_role == constants.ROLE_STANDALONE: elif failed_amp_role == constants.ROLE_STANDALONE:
amp_role = 'standalone' amp_role = 'standalone'
elif failed_amp_role is None:
amp_role = 'spare'
else: else:
amp_role = 'undefined' amp_role = 'undefined'
LOG.info("Performing failover for amphora: %s", LOG.info("Performing failover for amphora: %s",
@ -535,8 +532,6 @@ class LoadBalancerFlows(object):
amp_role = 'master_or_backup' amp_role = 'master_or_backup'
elif failed_amp_role == constants.ROLE_STANDALONE: elif failed_amp_role == constants.ROLE_STANDALONE:
amp_role = 'standalone' amp_role = 'standalone'
elif failed_amp_role is None:
amp_role = 'spare'
else: else:
amp_role = 'undefined' amp_role = 'undefined'
LOG.info("Performing failover for amphora: %s", LOG.info("Performing failover for amphora: %s",

View File

@ -29,7 +29,6 @@ from octavia.common import constants
from octavia.common import data_models from octavia.common import data_models
import octavia.common.tls_utils.cert_parser as cert_parser import octavia.common.tls_utils.cert_parser as cert_parser
from octavia.common import utils from octavia.common import utils
from octavia.common import validate
from octavia.controller.worker import task_utils as task_utilities from octavia.controller.worker import task_utils as task_utilities
from octavia.db import api as db_apis from octavia.db import api as db_apis
from octavia.db import repositories as repo from octavia.db import repositories as repo
@ -545,59 +544,6 @@ class AssociateFailoverAmphoraWithLBID(BaseDatabaseTask):
"%(except)s", {'amp': amphora_id, 'except': str(e)}) "%(except)s", {'amp': amphora_id, 'except': str(e)})
class MapLoadbalancerToAmphora(BaseDatabaseTask):
"""Maps and assigns a load balancer to an amphora in the database."""
def execute(self, loadbalancer_id, server_group_id=None, flavor=None,
availability_zone=None):
"""Allocates an Amphora for the load balancer in the database.
:param loadbalancer_id: The load balancer id to map to an amphora
:returns: Amphora ID if one was allocated, None if it was
unable to allocate an Amphora
"""
LOG.debug("Allocating an Amphora for load balancer with id %s",
loadbalancer_id)
if server_group_id is not None:
LOG.debug("Load balancer is using anti-affinity. Skipping spares "
"pool allocation.")
return None
# Validate the flavor is spares compatible
if not validate.is_flavor_spares_compatible(flavor):
LOG.debug("Load balancer has a flavor that is not compatible with "
"using spares pool amphora. Skipping spares pool "
"allocation.")
return None
if availability_zone:
amp_az = availability_zone.get(constants.COMPUTE_ZONE)
else:
amp_az = CONF.nova.availability_zone
amp = self.amphora_repo.allocate_and_associate(
db_apis.get_session(),
loadbalancer_id,
amp_az)
if amp is None:
LOG.debug("No Amphora available for load balancer with id %s",
loadbalancer_id)
return None
LOG.debug("Allocated Amphora with id %(amp)s for load balancer "
"with id %(lb)s", {'amp': amp.id, 'lb': loadbalancer_id})
# TODO(ataraday): return AMP here so refactored spit of create amp for
# loadbalancer flow can executed properly
return amp.to_dict()
def revert(self, result, loadbalancer_id, *args, **kwargs):
LOG.warning("Reverting Amphora allocation for the load "
"balancer %s in the database.", loadbalancer_id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer_id)
class _MarkAmphoraRoleAndPriorityInDB(BaseDatabaseTask): class _MarkAmphoraRoleAndPriorityInDB(BaseDatabaseTask):
"""Alter the amphora role and priority in DB.""" """Alter the amphora role and priority in DB."""

View File

@ -856,13 +856,6 @@ class ClientAuthenticationMode(base_models.BASE):
name = sa.Column(sa.String(10), primary_key=True, nullable=False) name = sa.Column(sa.String(10), primary_key=True, nullable=False)
class SparesPool(base_models.BASE):
__tablename__ = "spares_pool"
updated_at = sa.Column(sa.DateTime, primary_key=True, nullable=True)
class ListenerCidr(base_models.BASE): class ListenerCidr(base_models.BASE):
__data_model__ = data_models.ListenerCidr __data_model__ = data_models.ListenerCidr

View File

@ -228,7 +228,6 @@ class Repositories(object):
self.quotas = QuotasRepository() self.quotas = QuotasRepository()
self.flavor = FlavorRepository() self.flavor = FlavorRepository()
self.flavor_profile = FlavorProfileRepository() self.flavor_profile = FlavorProfileRepository()
self.spares_pool = SparesPoolRepository()
self.availability_zone = AvailabilityZoneRepository() self.availability_zone = AvailabilityZoneRepository()
self.availability_zone_profile = AvailabilityZoneProfileRepository() self.availability_zone_profile = AvailabilityZoneProfileRepository()
@ -1376,34 +1375,6 @@ class AmphoraRepository(BaseRepository):
return db_lb.to_data_model() return db_lb.to_data_model()
return None return None
def get_spare_amphora_count(self, session, availability_zone=None,
check_booting_amphora=False):
"""Get the count of the spare amphora.
:returns: Number of current spare amphora.
"""
filters = {
'load_balancer_id': None
}
# For jobboard based controller amphora in booting/pending create state
# can reach READY state after restart of housekeeping/worker service,
# so include amphora in these state to query
if check_booting_amphora:
status = [consts.AMPHORA_READY,
consts.AMPHORA_BOOTING,
consts.PENDING_CREATE]
else:
status = [consts.AMPHORA_READY]
if availability_zone is not None:
filters['cached_zone'] = availability_zone
with session.begin(subtransactions=True):
count = session.query(self.model_class).filter_by(
**filters).filter(self.model_class.status.in_(status)).count()
return count
def get_cert_expiring_amphora(self, session): def get_cert_expiring_amphora(self, session):
"""Retrieves an amphora whose cert is close to expiring.. """Retrieves an amphora whose cert is close to expiring..
@ -1549,8 +1520,7 @@ class AmphoraBuildReqRepository(BaseRepository):
"""Fetches build request with highest priority and least created_time. """Fetches build request with highest priority and least created_time.
priority 20 = failover (highest) priority 20 = failover (highest)
priority 40 = create_loadbalancer priority 40 = create_loadbalancer (lowest)
priority 60 = sparespool (least)
:param session: A Sql Alchemy database session. :param session: A Sql Alchemy database session.
:returns amphora_id corresponding to highest priority and least created :returns amphora_id corresponding to highest priority and least created
time in 'WAITING' status. time in 'WAITING' status.
@ -2084,20 +2054,6 @@ class FlavorProfileRepository(_GetALLExceptDELETEDIdMixin, BaseRepository):
model_class = models.FlavorProfile model_class = models.FlavorProfile
class SparesPoolRepository(BaseRepository):
model_class = models.SparesPool
def get_for_update(self, lock_session):
"""Queries and locks the SparesPool record.
This call will query for the SparesPool table record and lock it
so that other processes cannot read or write it.
:returns: expected_spares_count, updated_at
"""
row = lock_session.query(models.SparesPool).with_for_update().one()
return row
class AvailabilityZoneRepository(_GetALLExceptDELETEDIdMixin, BaseRepository): class AvailabilityZoneRepository(_GetALLExceptDELETEDIdMixin, BaseRepository):
model_class = models.AvailabilityZone model_class = models.AvailabilityZone

View File

@ -230,25 +230,6 @@ class TestAmphora(base.BaseAPITest):
payload = {constants.AMPHORA_ID: self.amp_id} payload = {constants.AMPHORA_ID: self.amp_id}
mock_cast.assert_called_with({}, 'failover_amphora', **payload) mock_cast.assert_called_with({}, 'failover_amphora', **payload)
@mock.patch('oslo_messaging.RPCClient.cast')
def test_failover_spare(self, mock_cast):
amp_args = {
'compute_id': uuidutils.generate_uuid(),
'status': constants.AMPHORA_READY,
'lb_network_ip': '192.168.1.2',
'cert_expiration': datetime.datetime.now(),
'cert_busy': False,
'cached_zone': 'zone1',
'created_at': datetime.datetime.now(),
'updated_at': datetime.datetime.now(),
'image_id': uuidutils.generate_uuid(),
}
amp = self.amphora_repo.create(self.session, **amp_args)
self.put(self.AMPHORA_FAILOVER_PATH.format(
amphora_id=amp.id), body={}, status=202)
payload = {constants.AMPHORA_ID: amp.id}
mock_cast.assert_called_once_with({}, 'failover_amphora', **payload)
def test_failover_deleted(self): def test_failover_deleted(self):
new_amp = self._create_additional_amp() new_amp = self._create_additional_amp()
self.amphora_repo.update(self.session, new_amp.id, self.amphora_repo.update(self.session, new_amp.id,
@ -629,26 +610,6 @@ class TestAmphora(base.BaseAPITest):
self.put(self.AMPHORA_CONFIG_PATH.format( self.put(self.AMPHORA_CONFIG_PATH.format(
amphora_id=self.amp_id), body={}, status=500) amphora_id=self.amp_id), body={}, status=500)
@mock.patch('oslo_messaging.RPCClient.cast')
def test_config_spare_amp(self, mock_cast):
amp_args = {
'compute_id': uuidutils.generate_uuid(),
'status': constants.AMPHORA_READY,
'lb_network_ip': '192.168.1.2',
'cert_expiration': datetime.datetime.now(),
'cert_busy': False,
'cached_zone': 'zone1',
'created_at': datetime.datetime.now(),
'updated_at': datetime.datetime.now(),
'image_id': uuidutils.generate_uuid(),
}
amp = self.amphora_repo.create(self.session, **amp_args)
self.put(self.AMPHORA_CONFIG_PATH.format(
amphora_id=amp.id), body={}, status=202)
payload = {constants.AMPHORA_ID: amp.id}
mock_cast.assert_called_with({}, 'update_amphora_agent_config',
**payload)
@mock.patch('oslo_messaging.RPCClient.cast') @mock.patch('oslo_messaging.RPCClient.cast')
def test_config_authorized(self, mock_cast): def test_config_authorized(self, mock_cast):
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))

View File

@ -120,9 +120,8 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'listener_stats', 'amphora', 'sni', 'listener_stats', 'amphora', 'sni',
'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy', 'amphorahealth', 'vrrpgroup', 'l7rule', 'l7policy',
'amp_build_slots', 'amp_build_req', 'quotas', 'amp_build_slots', 'amp_build_req', 'quotas',
'flavor', 'flavor_profile', 'spares_pool', 'flavor', 'flavor_profile', 'listener_cidr',
'listener_cidr', 'availability_zone', 'availability_zone', 'availability_zone_profile')
'availability_zone_profile')
for repo_attr in repo_attr_names: for repo_attr in repo_attr_names:
single_repo = getattr(self.repos, repo_attr, None) single_repo = getattr(self.repos, repo_attr, None)
message = ("Class Repositories should have %s instance" message = ("Class Repositories should have %s instance"
@ -3949,34 +3948,6 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
self.assertIn(amphora1.id, expiring_ids) self.assertIn(amphora1.id, expiring_ids)
self.assertNotIn(amphora2.id, expiring_ids) self.assertNotIn(amphora2.id, expiring_ids)
def test_get_spare_amphora_count(self):
count = self.amphora_repo.get_spare_amphora_count(self.session)
self.assertEqual(0, count)
amphora1 = self.create_amphora(self.FAKE_UUID_1)
self.amphora_repo.update(self.session, amphora1.id,
status=constants.AMPHORA_READY)
amphora2 = self.create_amphora(self.FAKE_UUID_2)
self.amphora_repo.update(self.session, amphora2.id,
status=constants.AMPHORA_READY)
count = self.amphora_repo.get_spare_amphora_count(self.session)
self.assertEqual(2, count)
def test_get_spare_amphora_count_check_booting_amphora_true(self):
count = self.amphora_repo.get_spare_amphora_count(
self.session, check_booting_amphora=True)
self.assertEqual(0, count)
amphora1 = self.create_amphora(self.FAKE_UUID_1)
self.amphora_repo.update(self.session, amphora1.id,
status=constants.AMPHORA_READY,)
amphora2 = self.create_amphora(self.FAKE_UUID_2)
self.amphora_repo.update(self.session, amphora2.id,
status=constants.AMPHORA_BOOTING)
count = self.amphora_repo.get_spare_amphora_count(
self.session, check_booting_amphora=True)
self.assertEqual(2, count)
def test_get_none_cert_expired_amphora(self): def test_get_none_cert_expired_amphora(self):
# test with no expired amphora # test with no expired amphora
amp = self.amphora_repo.get_cert_expiring_amphora(self.session) amp = self.amphora_repo.get_cert_expiring_amphora(self.session)

View File

@ -22,27 +22,6 @@ class TestHouseKeepingCMD(base.TestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
@mock.patch('octavia.cmd.house_keeping.spare_amp_thread_event')
@mock.patch('octavia.controller.housekeeping.'
'house_keeping.SpareAmphora')
def test_spare_amphora_check(self, mock_SpareAmphora,
spare_amp_thread_event_mock):
spare_amp_mock = mock.MagicMock()
spare_check_mock = mock.MagicMock()
spare_amp_mock.spare_check = spare_check_mock
mock_SpareAmphora.return_value = spare_amp_mock
# mock spare_amp_thread_event.is_set() in the while loop
spare_amp_thread_event_mock.is_set = mock.MagicMock()
spare_amp_thread_event_mock.is_set.side_effect = [False,
Exception('break')]
self.assertRaisesRegex(Exception, 'break',
house_keeping.spare_amphora_check)
mock_SpareAmphora.assert_called_once_with()
self.assertEqual(1, spare_amp_mock.spare_check.call_count)
@mock.patch('octavia.cmd.house_keeping.db_cleanup_thread_event') @mock.patch('octavia.cmd.house_keeping.db_cleanup_thread_event')
@mock.patch('octavia.controller.housekeeping.' @mock.patch('octavia.controller.housekeeping.'
'house_keeping.DatabaseCleanup') 'house_keeping.DatabaseCleanup')
@ -111,78 +90,61 @@ class TestHouseKeepingCMD(base.TestCase):
@mock.patch('octavia.cmd.house_keeping.cert_rotate_thread_event') @mock.patch('octavia.cmd.house_keeping.cert_rotate_thread_event')
@mock.patch('octavia.cmd.house_keeping.db_cleanup_thread_event') @mock.patch('octavia.cmd.house_keeping.db_cleanup_thread_event')
@mock.patch('octavia.cmd.house_keeping.spare_amp_thread_event')
@mock.patch('threading.Thread') @mock.patch('threading.Thread')
@mock.patch('octavia.common.service.prepare_service') @mock.patch('octavia.common.service.prepare_service')
def test_main(self, mock_service, mock_thread, def test_main(self, mock_service, mock_thread,
spare_amp_thread_event_mock,
db_cleanup_thread_event_mock, db_cleanup_thread_event_mock,
cert_rotate_thread_event_mock): cert_rotate_thread_event_mock):
spare_amp_thread_mock = mock.MagicMock()
db_cleanup_thread_mock = mock.MagicMock() db_cleanup_thread_mock = mock.MagicMock()
cert_rotate_thread_mock = mock.MagicMock() cert_rotate_thread_mock = mock.MagicMock()
mock_thread.side_effect = [spare_amp_thread_mock, mock_thread.side_effect = [db_cleanup_thread_mock,
db_cleanup_thread_mock,
cert_rotate_thread_mock] cert_rotate_thread_mock]
spare_amp_thread_mock.daemon.return_value = True
db_cleanup_thread_mock.daemon.return_value = True db_cleanup_thread_mock.daemon.return_value = True
cert_rotate_thread_mock.daemon.return_value = True cert_rotate_thread_mock.daemon.return_value = True
house_keeping.main() house_keeping.main()
spare_amp_thread_mock.start.assert_called_once_with()
db_cleanup_thread_mock.start.assert_called_once_with() db_cleanup_thread_mock.start.assert_called_once_with()
cert_rotate_thread_mock.start.assert_called_once_with() cert_rotate_thread_mock.start.assert_called_once_with()
self.assertTrue(spare_amp_thread_mock.daemon)
self.assertTrue(db_cleanup_thread_mock.daemon) self.assertTrue(db_cleanup_thread_mock.daemon)
self.assertTrue(cert_rotate_thread_mock.daemon) self.assertTrue(cert_rotate_thread_mock.daemon)
@mock.patch('octavia.cmd.house_keeping.cert_rotate_thread_event') @mock.patch('octavia.cmd.house_keeping.cert_rotate_thread_event')
@mock.patch('octavia.cmd.house_keeping.db_cleanup_thread_event') @mock.patch('octavia.cmd.house_keeping.db_cleanup_thread_event')
@mock.patch('octavia.cmd.house_keeping.spare_amp_thread_event')
@mock.patch('threading.Thread') @mock.patch('threading.Thread')
@mock.patch('octavia.common.service.prepare_service') @mock.patch('octavia.common.service.prepare_service')
def test_main_keyboard_interrupt(self, mock_service, mock_thread, def test_main_keyboard_interrupt(self, mock_service, mock_thread,
spare_amp_thread_event_mock,
db_cleanup_thread_event_mock, db_cleanup_thread_event_mock,
cert_rotate_thread_event_mock): cert_rotate_thread_event_mock):
spare_amp_thread_mock = mock.MagicMock()
db_cleanup_thread_mock = mock.MagicMock() db_cleanup_thread_mock = mock.MagicMock()
cert_rotate_thread_mock = mock.MagicMock() cert_rotate_thread_mock = mock.MagicMock()
mock_thread.side_effect = [spare_amp_thread_mock, mock_thread.side_effect = [db_cleanup_thread_mock,
db_cleanup_thread_mock,
cert_rotate_thread_mock] cert_rotate_thread_mock]
spare_amp_thread_mock.daemon.return_value = True
db_cleanup_thread_mock.daemon.return_value = True db_cleanup_thread_mock.daemon.return_value = True
cert_rotate_thread_mock.daemon.return_value = True cert_rotate_thread_mock.daemon.return_value = True
mock_join = mock.MagicMock() mock_join = mock.MagicMock()
mock_join.side_effect = [KeyboardInterrupt, None] mock_join.side_effect = [KeyboardInterrupt, None]
spare_amp_thread_mock.join = mock_join db_cleanup_thread_mock.join = mock_join
house_keeping.main() house_keeping.main()
spare_amp_thread_event_mock.set.assert_called_once_with()
db_cleanup_thread_event_mock.set.assert_called_once_with() db_cleanup_thread_event_mock.set.assert_called_once_with()
cert_rotate_thread_event_mock.set.assert_called_once_with() cert_rotate_thread_event_mock.set.assert_called_once_with()
spare_amp_thread_mock.start.assert_called_once_with()
db_cleanup_thread_mock.start.assert_called_once_with() db_cleanup_thread_mock.start.assert_called_once_with()
cert_rotate_thread_mock.start.assert_called_once_with() cert_rotate_thread_mock.start.assert_called_once_with()
self.assertTrue(spare_amp_thread_mock.daemon)
self.assertTrue(db_cleanup_thread_mock.daemon) self.assertTrue(db_cleanup_thread_mock.daemon)
self.assertTrue(cert_rotate_thread_mock.daemon) self.assertTrue(cert_rotate_thread_mock.daemon)
self.assertEqual(2, spare_amp_thread_mock.join.call_count) self.assertEqual(2, db_cleanup_thread_mock.join.call_count)
db_cleanup_thread_mock.join.assert_called_once_with()
cert_rotate_thread_mock.join.assert_called_once_with() cert_rotate_thread_mock.join.assert_called_once_with()
@mock.patch('oslo_config.cfg.CONF.mutate_config_files') @mock.patch('oslo_config.cfg.CONF.mutate_config_files')

View File

@ -451,16 +451,6 @@ class TestValidations(base.TestCase):
validate.ip_not_reserved, validate.ip_not_reserved,
'2001:0DB8::5') '2001:0DB8::5')
def test_is_flavor_spares_compatible(self):
not_compat_flavor = {constants.COMPUTE_FLAVOR: 'chocolate'}
compat_flavor = {constants.LOADBALANCER_TOPOLOGY:
constants.TOPOLOGY_SINGLE}
self.assertTrue(validate.is_flavor_spares_compatible(None))
self.assertTrue(validate.is_flavor_spares_compatible(compat_flavor))
self.assertFalse(
validate.is_flavor_spares_compatible(not_compat_flavor))
def test_check_default_ciphers_prohibit_list_conflict(self): def test_check_default_ciphers_prohibit_list_conflict(self):
self.conf.config(group='api_settings', self.conf.config(group='api_settings',
tls_cipher_prohibit_list='PSK-AES128-CBC-SHA') tls_cipher_prohibit_list='PSK-AES128-CBC-SHA')

View File

@ -37,105 +37,6 @@ class TestException(Exception):
return repr(self.value) return repr(self.value)
class TestSpareCheck(base.TestCase):
FAKE_CNF_SPAR1 = 5
FAKE_CUR_SPAR1 = 2
FAKE_CNF_SPAR2 = 3
FAKE_CUR_SPAR2 = 3
def setUp(self):
super().setUp()
self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF))
self.CONF.config(group="api_settings",
default_provider_driver='amphora')
self.spare_amp = house_keeping.SpareAmphora()
self.amp_repo = mock.MagicMock()
self.az_repo = mock.MagicMock()
self.cw = mock.MagicMock()
self.spare_amp.amp_repo = self.amp_repo
self.spare_amp.az_repo = self.az_repo
self.spare_amp.cw = self.cw
@mock.patch('octavia.db.api.get_session')
def test_spare_check_diff_count(self, session):
"""When spare amphora count does not meet the requirement."""
session.return_value = session
self.CONF.config(group="house_keeping",
spare_amphora_pool_size=self.FAKE_CNF_SPAR1)
self.amp_repo.get_spare_amphora_count.return_value = (
self.FAKE_CUR_SPAR1)
self.az_repo.get_all.return_value = [], None
self.spare_amp.spare_check()
self.assertTrue(self.amp_repo.get_spare_amphora_count.called)
DIFF_CNT = self.FAKE_CNF_SPAR1 - self.FAKE_CUR_SPAR1
self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
@mock.patch('octavia.db.api.get_session')
def test_spare_check_diff_count_multi_az(self, session):
"""When spare amphora count does not meet the requirement.
Tests when multiple availabilty zones active
"""
session.return_value = session
self.CONF.config(group="house_keeping",
spare_amphora_pool_size=self.FAKE_CNF_SPAR1)
az1 = mock.Mock()
az1.name = 'az1'
az2 = mock.Mock()
az2.name = 'az2'
self.az_repo.get_all.return_value = [az1, az2], None
self.amp_repo.get_spare_amphora_count.return_value = (
self.FAKE_CUR_SPAR1)
self.az_repo.get_availability_zone_metadata_dict().get.side_effect = (
az1.name, az2.name)
self.spare_amp.spare_check()
calls = [mock.call(session, availability_zone=az1.name,
check_booting_amphora=False),
mock.call(session, availability_zone=az2.name,
check_booting_amphora=False)]
self.amp_repo.get_spare_amphora_count.assert_has_calls(calls,
any_order=True)
# 2 AZs so twice as many calls
DIFF_CNT = (self.FAKE_CNF_SPAR1 - self.FAKE_CUR_SPAR1) * 2
self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
@mock.patch('octavia.db.api.get_session')
def test_spare_check_no_diff_count(self, session):
"""When spare amphora count meets the requirement."""
session.return_value = session
self.CONF.config(group="house_keeping",
spare_amphora_pool_size=self.FAKE_CNF_SPAR2)
self.amp_repo.get_spare_amphora_count.return_value = (
self.FAKE_CUR_SPAR2)
self.az_repo.get_all.return_value = [], None
self.spare_amp.spare_check()
self.assertTrue(self.amp_repo.get_spare_amphora_count.called)
DIFF_CNT = self.FAKE_CNF_SPAR2 - self.FAKE_CUR_SPAR2
self.assertEqual(0, DIFF_CNT)
self.assertEqual(DIFF_CNT, self.cw.create_amphora.call_count)
@mock.patch('octavia.db.repositories.SparesPoolRepository.get_for_update')
@mock.patch('octavia.db.api.get_session')
def test_spare_check_rollback(self, mock_session, mock_update):
"""When spare amphora count meets the requirement."""
lock_session = mock.MagicMock()
session = mock.MagicMock()
mock_session.side_effect = [lock_session, session]
mock_update.side_effect = [Exception('boom')]
# self.CONF.config(group="house_keeping",
# spare_amphora_pool_size=self.FAKE_CNF_SPAR2)
# self.amp_repo.get_spare_amphora_count.return_value = (
# self.FAKE_CUR_SPAR2)
self.spare_amp.spare_check()
lock_session.rollback.assert_called_once()
class TestDatabaseCleanup(base.TestCase): class TestDatabaseCleanup(base.TestCase):
FAKE_IP = "10.0.0.1" FAKE_IP = "10.0.0.1"
FAKE_UUID_1 = uuidutils.generate_uuid() FAKE_UUID_1 = uuidutils.generate_uuid()

View File

@ -221,24 +221,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(0, len(amp_flow.provides)) self.assertEqual(0, len(amp_flow.provides))
self.assertEqual(0, len(amp_flow.requires)) self.assertEqual(0, len(amp_flow.requires))
def test_allocate_amp_to_lb_decider(self, mock_get_net_driver):
history = mock.MagicMock()
values = mock.MagicMock(side_effect=[['TEST'], [None]])
history.values = values
result = self.AmpFlow._allocate_amp_to_lb_decider(history)
self.assertTrue(result)
result = self.AmpFlow._allocate_amp_to_lb_decider(history)
self.assertFalse(result)
def test_create_new_amp_for_lb_decider(self, mock_get_net_driver):
history = mock.MagicMock()
values = mock.MagicMock(side_effect=[[None], ['TEST']])
history.values = values
result = self.AmpFlow._create_new_amp_for_lb_decider(history)
self.assertTrue(result)
result = self.AmpFlow._create_new_amp_for_lb_decider(history)
self.assertFalse(result)
def test_get_failover_flow_act_stdby(self, mock_get_net_driver): def test_get_failover_flow_act_stdby(self, mock_get_net_driver):
failed_amphora = data_models.Amphora( failed_amphora = data_models.Amphora(
id=uuidutils.generate_uuid(), role=constants.ROLE_MASTER, id=uuidutils.generate_uuid(), role=constants.ROLE_MASTER,
@ -319,19 +301,7 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.VIP_SG_ID, amp_flow.provides) self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
self.assertEqual(1, len(amp_flow.requires)) print(amp_flow.requires)
self.assertEqual(1, len(amp_flow.provides))
def test_get_failover_flow_spare(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_amphora_flow(self.amp4, 0)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
self.assertEqual(1, len(amp_flow.requires)) self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.provides)) self.assertEqual(1, len(amp_flow.provides))
@ -478,8 +448,7 @@ class TestAmphoraFlows(base.TestCase):
TEST_PREFIX = 'test_prefix' TEST_PREFIX = 'test_prefix'
get_amp_flow = self.AmpFlow.get_amphora_for_lb_failover_subflow( get_amp_flow = self.AmpFlow.get_amphora_for_lb_failover_subflow(
TEST_PREFIX, role=constants.ROLE_MASTER, TEST_PREFIX, role=constants.ROLE_MASTER)
is_spare=False)
self.assertIsInstance(get_amp_flow, flow.Flow) self.assertIsInstance(get_amp_flow, flow.Flow)

View File

@ -275,17 +275,6 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_single([amphora_mock]) self._test_get_failover_LB_flow_single([amphora_mock])
def test_get_failover_LB_flow_one_spare_amp_single(self,
mock_get_net_driver):
amphora_mock = mock.MagicMock()
amphora_mock.role = None
amphora_mock.lb_network_id = uuidutils.generate_uuid()
amphora_mock.compute_id = uuidutils.generate_uuid()
amphora_mock.vrrp_port_id = None
amphora_mock.vrrp_ip = None
self._test_get_failover_LB_flow_single([amphora_mock])
def test_get_failover_LB_flow_one_bogus_amp_single(self, def test_get_failover_LB_flow_one_bogus_amp_single(self,
mock_get_net_driver): mock_get_net_driver):
amphora_mock = mock.MagicMock() amphora_mock = mock.MagicMock()
@ -417,24 +406,6 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_mock, self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_mock,
amphora2_mock]) amphora2_mock])
def test_get_failover_LB_flow_two_amps_spare_act_stdby(
self, mock_get_net_driver):
amphora_mock = mock.MagicMock()
amphora_mock.role = None
amphora_mock.lb_network_id = uuidutils.generate_uuid()
amphora_mock.compute_id = uuidutils.generate_uuid()
amphora_mock.vrrp_port_id = uuidutils.generate_uuid()
amphora_mock.vrrp_ip = '192.0.2.46'
amphora2_mock = mock.MagicMock()
amphora2_mock.role = constants.ROLE_MASTER
amphora2_mock.lb_network_id = uuidutils.generate_uuid()
amphora2_mock.compute_id = uuidutils.generate_uuid()
amphora2_mock.vrrp_port_id = uuidutils.generate_uuid()
amphora2_mock.vrrp_ip = '2001:db8::46'
self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_mock,
amphora2_mock])
def test_get_failover_LB_flow_two_amps_standalone_act_stdby( def test_get_failover_LB_flow_two_amps_standalone_act_stdby(
self, mock_get_net_driver): self, mock_get_net_driver):
amphora_mock = mock.MagicMock() amphora_mock = mock.MagicMock()

View File

@ -546,94 +546,6 @@ class TestDatabaseTasks(base.TestCase):
AMP_ID, AMP_ID,
loadbalancer_id=None) loadbalancer_id=None)
@mock.patch('octavia.db.repositories.AmphoraRepository.'
'allocate_and_associate',
side_effect=[_amphora_mock, None])
def test_map_loadbalancer_to_amphora(self,
mock_allocate_and_associate,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
map_lb_to_amp = database_tasks.MapLoadbalancerToAmphora()
amp_id = map_lb_to_amp.execute(self.loadbalancer_mock.id)
repo.AmphoraRepository.allocate_and_associate.assert_called_once_with(
'TEST',
LB_ID,
None)
self.assertEqual(_amphora_mock.id, amp_id)
amp_id = map_lb_to_amp.execute(self.loadbalancer_mock.id)
self.assertIsNone(amp_id)
# Test revert
map_lb_to_amp.revert(None, self.loadbalancer_mock.id)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
# Test revert with exception
repo.LoadBalancerRepository.update.reset_mock()
mock_loadbalancer_repo_update.side_effect = Exception('fail')
map_lb_to_amp.revert(None, self.loadbalancer_mock.id)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.AmphoraRepository.'
'allocate_and_associate',
side_effect=[_amphora_mock, None])
def test_map_loadbalancer_to_amphora_with_az(self,
mock_allocate_and_associate,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
map_lb_to_amp = database_tasks.MapLoadbalancerToAmphora()
amp_id = map_lb_to_amp.execute(
self.loadbalancer_mock.id, availability_zone={
constants.COMPUTE_ZONE: 'fakeaz'})
repo.AmphoraRepository.allocate_and_associate.assert_called_once_with(
'TEST',
LB_ID,
'fakeaz')
self.assertEqual(_amphora_mock.id, amp_id)
amp_id = map_lb_to_amp.execute(self.loadbalancer_mock.id)
self.assertIsNone(amp_id)
# Test revert
map_lb_to_amp.revert(None, self.loadbalancer_mock.id)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
# Test revert with exception
repo.LoadBalancerRepository.update.reset_mock()
mock_loadbalancer_repo_update.side_effect = Exception('fail')
map_lb_to_amp.revert(None, self.loadbalancer_mock.id)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.AmphoraRepository.get', @mock.patch('octavia.db.repositories.AmphoraRepository.get',
return_value=_amphora_mock) return_value=_amphora_mock)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get', @mock.patch('octavia.db.repositories.LoadBalancerRepository.get',

View File

@ -120,43 +120,6 @@ class TestControllerWorker(base.TestCase):
super().setUp() super().setUp()
@mock.patch('octavia.controller.worker.v1.flows.'
'amphora_flows.AmphoraFlows.get_create_amphora_flow',
return_value='TEST')
def test_create_amphora(self,
mock_get_create_amp_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
amp = cw.create_amphora()
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
'TEST',
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: None}))
_flow_mock.run.assert_called_once_with()
_flow_mock.storage.fetch.assert_called_once_with('amphora')
self.assertEqual(AMP_ID, amp)
@mock.patch('octavia.controller.worker.v1.flows.' @mock.patch('octavia.controller.worker.v1.flows.'
'amphora_flows.AmphoraFlows.get_delete_amphora_flow', 'amphora_flows.AmphoraFlows.get_delete_amphora_flow',
return_value='TEST') return_value='TEST')
@ -185,48 +148,6 @@ class TestControllerWorker(base.TestCase):
mock_get_delete_amp_flow.assert_called_once_with(_amphora_mock) mock_get_delete_amp_flow.assert_called_once_with(_amphora_mock)
_flow_mock.run.assert_called_once_with() _flow_mock.run.assert_called_once_with()
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
@mock.patch('octavia.controller.worker.v1.flows.'
'amphora_flows.AmphoraFlows.get_create_amphora_flow',
return_value='TEST')
def test_create_amphora_with_az(self,
mock_get_create_amp_flow,
mock_get_az_metadata,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
az = 'fake_az'
az_data = {constants.COMPUTE_ZONE: az}
mock_get_az_metadata.return_value = az_data
cw = controller_worker.ControllerWorker()
amp = cw.create_amphora(availability_zone=az)
mock_get_az_metadata.assert_called_once_with(_db_session, az)
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
assert_called_once_with(
'TEST',
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: az_data}))
_flow_mock.run.assert_called_once_with()
_flow_mock.storage.fetch.assert_called_once_with('amphora')
self.assertEqual(AMP_ID, amp)
@mock.patch('octavia.controller.worker.v1.flows.' @mock.patch('octavia.controller.worker.v1.flows.'
'health_monitor_flows.HealthMonitorFlows.' 'health_monitor_flows.HealthMonitorFlows.'
'get_create_health_monitor_flow', 'get_create_health_monitor_flow',

View File

@ -86,9 +86,9 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(5, len(amp_flow.provides)) self.assertEqual(5, len(amp_flow.provides))
self.assertEqual(4, len(amp_flow.requires)) self.assertEqual(4, len(amp_flow.requires))
def test_get_create_amphora_for_lb_flow(self, mock_get_net_driver): def test_get_amphora_for_lb_flow(self, mock_get_net_driver):
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow.get_amphora_for_lb_subflow(
'SOMEPREFIX', constants.ROLE_STANDALONE) 'SOMEPREFIX', constants.ROLE_STANDALONE)
self.assertIsInstance(amp_flow, flow.Flow) self.assertIsInstance(amp_flow, flow.Flow)
@ -111,7 +111,7 @@ class TestAmphoraFlows(base.TestCase):
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow.get_amphora_for_lb_subflow(
'SOMEPREFIX', constants.ROLE_STANDALONE) 'SOMEPREFIX', constants.ROLE_STANDALONE)
self.assertIsInstance(amp_flow, flow.Flow) self.assertIsInstance(amp_flow, flow.Flow)
@ -135,7 +135,7 @@ class TestAmphoraFlows(base.TestCase):
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow.get_amphora_for_lb_subflow(
'SOMEPREFIX', constants.ROLE_MASTER) 'SOMEPREFIX', constants.ROLE_MASTER)
self.assertIsInstance(amp_flow, flow.Flow) self.assertIsInstance(amp_flow, flow.Flow)
@ -160,7 +160,7 @@ class TestAmphoraFlows(base.TestCase):
self.conf.config(group="nova", enable_anti_affinity=True) self.conf.config(group="nova", enable_anti_affinity=True)
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow.get_amphora_for_lb_subflow(
'SOMEPREFIX', constants.ROLE_MASTER) 'SOMEPREFIX', constants.ROLE_MASTER)
self.assertIsInstance(amp_flow, flow.Flow) self.assertIsInstance(amp_flow, flow.Flow)
@ -185,7 +185,7 @@ class TestAmphoraFlows(base.TestCase):
self, mock_get_net_driver): self, mock_get_net_driver):
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow.get_amphora_for_lb_subflow(
'SOMEPREFIX', constants.ROLE_BACKUP) 'SOMEPREFIX', constants.ROLE_BACKUP)
self.assertIsInstance(amp_flow, flow.Flow) self.assertIsInstance(amp_flow, flow.Flow)
@ -208,7 +208,7 @@ class TestAmphoraFlows(base.TestCase):
self, mock_get_net_driver): self, mock_get_net_driver):
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow.get_amphora_for_lb_subflow(
'SOMEPREFIX', 'BOGUS_ROLE') 'SOMEPREFIX', 'BOGUS_ROLE')
self.assertIsInstance(amp_flow, flow.Flow) self.assertIsInstance(amp_flow, flow.Flow)
@ -232,7 +232,7 @@ class TestAmphoraFlows(base.TestCase):
self.conf.config(group="nova", enable_anti_affinity=True) self.conf.config(group="nova", enable_anti_affinity=True)
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_create_amp_for_lb_subflow( amp_flow = self.AmpFlow.get_amphora_for_lb_subflow(
'SOMEPREFIX', constants.ROLE_BACKUP) 'SOMEPREFIX', constants.ROLE_BACKUP)
self.assertIsInstance(amp_flow, flow.Flow) self.assertIsInstance(amp_flow, flow.Flow)
@ -264,26 +264,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(0, len(amp_flow.provides)) self.assertEqual(0, len(amp_flow.provides))
self.assertEqual(0, len(amp_flow.requires)) self.assertEqual(0, len(amp_flow.requires))
def test_allocate_amp_to_lb_decider(self, mock_get_net_driver):
history = mock.MagicMock()
values = mock.MagicMock(side_effect=[['TEST'], [None]])
history.values = values
result = self.AmpFlow._allocate_amp_to_lb_decider(history)
self.assertTrue(result)
result = self.AmpFlow._allocate_amp_to_lb_decider(history)
self.assertFalse(result)
def test_create_new_amp_for_lb_decider(self, mock_get_net_driver):
history = mock.MagicMock()
values = mock.MagicMock(side_effect=[[], [None], ['TEST']])
history.values = values
result = self.AmpFlow._create_new_amp_for_lb_decider(history)
self.assertTrue(result)
result = self.AmpFlow._create_new_amp_for_lb_decider(history)
self.assertTrue(result)
result = self.AmpFlow._create_new_amp_for_lb_decider(history)
self.assertFalse(result)
def test_get_failover_flow_act_stdby(self, mock_get_net_driver): def test_get_failover_flow_act_stdby(self, mock_get_net_driver):
failed_amphora = data_models.Amphora( failed_amphora = data_models.Amphora(
id=uuidutils.generate_uuid(), role=constants.ROLE_MASTER, id=uuidutils.generate_uuid(), role=constants.ROLE_MASTER,
@ -368,20 +348,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(1, len(amp_flow.requires)) self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.provides)) self.assertEqual(1, len(amp_flow.provides))
def test_get_failover_flow_spare(self, mock_get_net_driver):
amp_flow = self.AmpFlow.get_failover_amphora_flow(
self.amp4.to_dict(), 0)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.VIP_SG_ID, amp_flow.provides)
self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.provides))
def test_cert_rotate_amphora_flow(self, mock_get_net_driver): def test_cert_rotate_amphora_flow(self, mock_get_net_driver):
self.AmpFlow = amphora_flows.AmphoraFlows() self.AmpFlow = amphora_flows.AmphoraFlows()
@ -424,54 +390,6 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(2, len(vrrp_subflow.provides)) self.assertEqual(2, len(vrrp_subflow.provides))
self.assertEqual(2, len(vrrp_subflow.requires)) self.assertEqual(2, len(vrrp_subflow.requires))
def test_get_post_map_lb_subflow(self, mock_get_net_driver):
self.AmpFlow = amphora_flows.AmphoraFlows()
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', constants.ROLE_MASTER)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.FLAVOR, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertEqual(0, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', constants.ROLE_BACKUP)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.FLAVOR, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertEqual(0, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', constants.ROLE_STANDALONE)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.FLAVOR, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertEqual(0, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
amp_flow = self.AmpFlow._get_post_map_lb_subflow(
'SOMEPREFIX', 'BOGUS_ROLE')
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.FLAVOR, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertEqual(0, len(amp_flow.provides))
self.assertEqual(2, len(amp_flow.requires))
def test_update_amphora_config_flow(self, mock_get_net_driver): def test_update_amphora_config_flow(self, mock_get_net_driver):
amp_flow = self.AmpFlow.update_amphora_config_flow() amp_flow = self.AmpFlow.update_amphora_config_flow()
@ -493,20 +411,3 @@ class TestAmphoraFlows(base.TestCase):
self.assertEqual(1, len(amp_flow.requires)) self.assertEqual(1, len(amp_flow.requires))
self.assertEqual(0, len(amp_flow.provides)) self.assertEqual(0, len(amp_flow.provides))
def test__finalize_flow(self, mock_get_net_driver):
for role in [constants.ROLE_STANDALONE,
constants.ROLE_BACKUP,
constants.ROLE_MASTER]:
amp_flow = self.AmpFlow._finalize_flow('test',
role)
self.assertIsInstance(amp_flow, flow.Flow)
self.assertIn(constants.AMPHORA, amp_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, amp_flow.requires)
self.assertIn(constants.AMPHORA, amp_flow.provides)
self.assertEqual(2, len(amp_flow.requires))
self.assertEqual(1, len(amp_flow.provides))

View File

@ -310,15 +310,6 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_single([amphora_dict]) self._test_get_failover_LB_flow_single([amphora_dict])
def test_get_failover_LB_flow_one_spare_amp_single(self,
mock_get_net_driver):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: 'bogus',
constants.COMPUTE_ID: uuidutils.generate_uuid(),
constants.VRRP_PORT_ID: None, constants.VRRP_IP: None}
self._test_get_failover_LB_flow_single([amphora_dict])
def test_get_failover_LB_flow_one_bogus_amp_single(self, def test_get_failover_LB_flow_one_bogus_amp_single(self,
mock_get_net_driver): mock_get_net_driver):
amphora_dict = {constants.ID: uuidutils.generate_uuid(), amphora_dict = {constants.ID: uuidutils.generate_uuid(),
@ -434,22 +425,6 @@ class TestLoadBalancerFlows(base.TestCase):
self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_dict, self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_dict,
amphora2_dict]) amphora2_dict])
def test_get_failover_LB_flow_two_amps_spare_act_stdby(
self, mock_get_net_driver):
amphora_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: None,
constants.COMPUTE_ID: uuidutils.generate_uuid(),
constants.VRRP_PORT_ID: uuidutils.generate_uuid(),
constants.VRRP_IP: '192.0.2.46'}
amphora2_dict = {constants.ID: uuidutils.generate_uuid(),
constants.ROLE: constants.ROLE_MASTER,
constants.COMPUTE_ID: uuidutils.generate_uuid(),
constants.VRRP_PORT_ID: uuidutils.generate_uuid(),
constants.VRRP_IP: '2001:db8::46'}
self._test_get_failover_LB_flow_no_amps_act_stdby([amphora_dict,
amphora2_dict])
def test_get_failover_LB_flow_two_amps_standalone_act_stdby( def test_get_failover_LB_flow_two_amps_standalone_act_stdby(
self, mock_get_net_driver): self, mock_get_net_driver):
amphora_dict = {constants.ID: uuidutils.generate_uuid(), amphora_dict = {constants.ID: uuidutils.generate_uuid(),

View File

@ -597,94 +597,6 @@ class TestDatabaseTasks(base.TestCase):
AMP_ID, AMP_ID,
loadbalancer_id=None) loadbalancer_id=None)
@mock.patch('octavia.db.repositories.AmphoraRepository.'
'allocate_and_associate',
side_effect=[_db_amphora_mock, None])
def test_map_loadbalancer_to_amphora(self,
mock_allocate_and_associate,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
map_lb_to_amp = database_tasks.MapLoadbalancerToAmphora()
amp = map_lb_to_amp.execute(LB_ID)
repo.AmphoraRepository.allocate_and_associate.assert_called_once_with(
'TEST',
LB_ID,
None)
self.assertEqual(self.amphora, amp)
amp_id = map_lb_to_amp.execute(LB_ID)
self.assertIsNone(amp_id)
# Test revert
map_lb_to_amp.revert(None, LB_ID)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
# Test revert with exception
repo.LoadBalancerRepository.update.reset_mock()
mock_loadbalancer_repo_update.side_effect = Exception('fail')
map_lb_to_amp.revert(None, LB_ID)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.AmphoraRepository.'
'allocate_and_associate',
side_effect=[_db_amphora_mock, None])
def test_map_loadbalancer_to_amphora_with_az(self,
mock_allocate_and_associate,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
map_lb_to_amp = database_tasks.MapLoadbalancerToAmphora()
amp = map_lb_to_amp.execute(
_db_loadbalancer_mock.id, availability_zone={
constants.COMPUTE_ZONE: 'fakeaz'})
repo.AmphoraRepository.allocate_and_associate.assert_called_once_with(
'TEST',
LB_ID,
'fakeaz')
self.assertEqual(self.amphora, amp)
amp = map_lb_to_amp.execute(_db_loadbalancer_mock.id)
self.assertIsNone(amp)
# Test revert
map_lb_to_amp.revert(None, _db_loadbalancer_mock.id)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
# Test revert with exception
repo.LoadBalancerRepository.update.reset_mock()
mock_loadbalancer_repo_update.side_effect = Exception('fail')
map_lb_to_amp.revert(None, _db_loadbalancer_mock.id)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.AmphoraRepository.get', @mock.patch('octavia.db.repositories.AmphoraRepository.get',
return_value=_db_amphora_mock) return_value=_db_amphora_mock)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get', @mock.patch('octavia.db.repositories.LoadBalancerRepository.get',

View File

@ -165,38 +165,6 @@ class TestControllerWorker(base.TestCase):
super().setUp() super().setUp()
@mock.patch('octavia.controller.worker.v2.flows.'
'amphora_flows.AmphoraFlows.get_create_amphora_flow',
return_value='TEST')
def test_create_amphora(self,
mock_get_create_amp_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
cw = controller_worker.ControllerWorker()
cw.create_amphora()
(cw.services_controller.run_poster.
assert_called_once_with(
flow_utils.get_create_amphora_flow,
wait=True,
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: None}))
@mock.patch('octavia.controller.worker.v2.flows.' @mock.patch('octavia.controller.worker.v2.flows.'
'amphora_flows.AmphoraFlows.get_delete_amphora_flow', 'amphora_flows.AmphoraFlows.get_delete_amphora_flow',
return_value='TEST') return_value='TEST')
@ -224,43 +192,6 @@ class TestControllerWorker(base.TestCase):
flow_utils.get_delete_amphora_flow, flow_utils.get_delete_amphora_flow,
store={constants.AMPHORA: _db_amphora_mock.to_dict()})) store={constants.AMPHORA: _db_amphora_mock.to_dict()}))
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
@mock.patch('octavia.controller.worker.v2.flows.'
'amphora_flows.AmphoraFlows.get_create_amphora_flow',
return_value='TEST')
def test_create_amphora_with_az(self,
mock_get_create_amp_flow,
mock_get_az_metadata,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
az = 'fake_az'
az_data = {constants.COMPUTE_ZONE: az}
mock_get_az_metadata.return_value = az_data
cw = controller_worker.ControllerWorker()
cw.create_amphora(availability_zone=az)
mock_get_az_metadata.assert_called_once_with(_db_session, az)
(cw.services_controller.run_poster.
assert_called_once_with(
flow_utils.get_create_amphora_flow,
wait=True,
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
constants.SERVER_GROUP_ID: None,
constants.AVAILABILITY_ZONE: az_data}))
@mock.patch('octavia.controller.worker.v2.flows.' @mock.patch('octavia.controller.worker.v2.flows.'
'health_monitor_flows.HealthMonitorFlows.' 'health_monitor_flows.HealthMonitorFlows.'
'get_create_health_monitor_flow', 'get_create_health_monitor_flow',

View File

@ -0,0 +1,6 @@
---
upgrade:
- |
The spare pool feature was removed after being deprecated in the Victoria
release. After an upgrade of the controllers, spare amphorae will be
automatically deleted by the Octavia health-manager service.