Retry to set loadbalancer prov status on failures
In case of DB outages when a flow is running, an exception is caught and the flow is reverted. In most of the flows, the revert function of the first task's (the last to be reverted) unlocks the load balancer by setting its provisioning status (to ERROR or ACTIVE, depending on the flow), but it fails if the DB is not reachable, leaving the LB in a PENDING_* state. This commit adds tenacity.retry to those functions, Octavia retries to set the status during ~2h45 (2000 attempts, 1 sec initial delay, 5 sec max delay). Note: stable/2023.1 and older, the patch also includes modifications for v1/tasks/lifecycle_tasks.py Conflicts: octavia/common/config.py octavia/tests/unit/controller/worker/test_task_utils.py octavia/controller/worker/v1/tasks/lifecycle_tasks.py Closes-Bug: #2036952 Change-Id: I458dd6d6f5383edc24116ea0fa27e3a593044146 (cherry picked from commit be91493332786365b8e997fcf88779a12d1ae130) (cherry picked from commit 96782e2c543bfb488c7629e9ba2cf009b2a6b033) (cherry picked from commit 57833dbdad964f8a7861a0ab1d2847159f483577)
This commit is contained in:
parent
4c97b585ce
commit
27060603db
@ -73,6 +73,14 @@ configuration from successfully completing. In this case the load balancer is
|
|||||||
continuing to process traffic through the load balancer, but might not have
|
continuing to process traffic through the load balancer, but might not have
|
||||||
applied the latest configuration updates yet.
|
applied the latest configuration updates yet.
|
||||||
|
|
||||||
|
A load balancer in a PENDING provisioning status is immutable, it cannot be
|
||||||
|
updated or deleted by another process, this PENDING status acts as a lock on
|
||||||
|
the resource.
|
||||||
|
If a database outage occurs while a load balancer is deleted, created or
|
||||||
|
updated, the Octavia control plane will try to remove the PENDING status and
|
||||||
|
set it to ERROR during a long period of time (around 2h45min with the default
|
||||||
|
settings), to prevent the resource from remaining immutable.
|
||||||
|
|
||||||
Monitoring load balancer functionality
|
Monitoring load balancer functionality
|
||||||
--------------------------------------
|
--------------------------------------
|
||||||
|
|
||||||
|
@ -532,6 +532,17 @@ controller_worker_opts = [
|
|||||||
cfg.IntOpt('amphora_delete_retry_interval', default=5,
|
cfg.IntOpt('amphora_delete_retry_interval', default=5,
|
||||||
help=_('Time, in seconds, between amphora delete retries.')),
|
help=_('Time, in seconds, between amphora delete retries.')),
|
||||||
cfg.BoolOpt('event_notifications', default=True),
|
cfg.BoolOpt('event_notifications', default=True),
|
||||||
|
# 2000 attempts is around 2h45 with the default settings
|
||||||
|
cfg.IntOpt('db_commit_retry_attempts', default=2000,
|
||||||
|
help=_('The number of times the database action will be '
|
||||||
|
'attempted.')),
|
||||||
|
cfg.IntOpt('db_commit_retry_initial_delay', default=1,
|
||||||
|
help=_('The initial delay before a retry attempt.')),
|
||||||
|
cfg.IntOpt('db_commit_retry_backoff', default=1,
|
||||||
|
help=_('The time to backoff retry attempts.')),
|
||||||
|
cfg.IntOpt('db_commit_retry_max', default=5,
|
||||||
|
help=_('The maximum amount of time to wait between retry '
|
||||||
|
'attempts.')),
|
||||||
]
|
]
|
||||||
|
|
||||||
task_flow_opts = [
|
task_flow_opts = [
|
||||||
|
@ -14,18 +14,32 @@
|
|||||||
|
|
||||||
""" Methods common to the controller work tasks."""
|
""" Methods common to the controller work tasks."""
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from oslo_utils import excutils
|
||||||
|
import tenacity
|
||||||
|
|
||||||
from octavia.common import constants
|
from octavia.common import constants
|
||||||
from octavia.db import api as db_apis
|
from octavia.db import api as db_apis
|
||||||
from octavia.db import repositories as repo
|
from octavia.db import repositories as repo
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TaskUtils(object):
|
class TaskUtils(object):
|
||||||
"""Class of helper/utility methods used by tasks."""
|
"""Class of helper/utility methods used by tasks."""
|
||||||
|
|
||||||
|
status_update_retry = tenacity.retry(
|
||||||
|
retry=tenacity.retry_if_exception_type(Exception),
|
||||||
|
wait=tenacity.wait_incrementing(
|
||||||
|
CONF.controller_worker.db_commit_retry_initial_delay,
|
||||||
|
CONF.controller_worker.db_commit_retry_backoff,
|
||||||
|
CONF.controller_worker.db_commit_retry_max),
|
||||||
|
stop=tenacity.stop_after_attempt(
|
||||||
|
CONF.controller_worker.db_commit_retry_attempts),
|
||||||
|
after=tenacity.after_log(LOG, logging.DEBUG))
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
self.amphora_repo = repo.AmphoraRepository()
|
self.amphora_repo = repo.AmphoraRepository()
|
||||||
self.health_mon_repo = repo.HealthMonitorRepository()
|
self.health_mon_repo = repo.HealthMonitorRepository()
|
||||||
@ -153,6 +167,7 @@ class TaskUtils(object):
|
|||||||
"provisioning status to ERROR due to: "
|
"provisioning status to ERROR due to: "
|
||||||
"%(except)s", {'list': listener_id, 'except': str(e)})
|
"%(except)s", {'list': listener_id, 'except': str(e)})
|
||||||
|
|
||||||
|
@status_update_retry
|
||||||
def mark_loadbalancer_prov_status_error(self, loadbalancer_id):
|
def mark_loadbalancer_prov_status_error(self, loadbalancer_id):
|
||||||
"""Sets a load balancer provisioning status to ERROR.
|
"""Sets a load balancer provisioning status to ERROR.
|
||||||
|
|
||||||
@ -166,9 +181,12 @@ class TaskUtils(object):
|
|||||||
id=loadbalancer_id,
|
id=loadbalancer_id,
|
||||||
provisioning_status=constants.ERROR)
|
provisioning_status=constants.ERROR)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# Reraise for tenacity
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error("Failed to update load balancer %(lb)s "
|
LOG.error("Failed to update load balancer %(lb)s "
|
||||||
"provisioning status to ERROR due to: "
|
"provisioning status to ERROR due to: "
|
||||||
"%(except)s", {'lb': loadbalancer_id, 'except': str(e)})
|
"%(except)s", {'lb': loadbalancer_id,
|
||||||
|
'except': str(e)})
|
||||||
|
|
||||||
def mark_listener_prov_status_active(self, listener_id):
|
def mark_listener_prov_status_active(self, listener_id):
|
||||||
"""Sets a listener provisioning status to ACTIVE.
|
"""Sets a listener provisioning status to ACTIVE.
|
||||||
@ -203,6 +221,7 @@ class TaskUtils(object):
|
|||||||
"to ACTIVE due to: %(except)s", {'pool': pool_id,
|
"to ACTIVE due to: %(except)s", {'pool': pool_id,
|
||||||
'except': str(e)})
|
'except': str(e)})
|
||||||
|
|
||||||
|
@status_update_retry
|
||||||
def mark_loadbalancer_prov_status_active(self, loadbalancer_id):
|
def mark_loadbalancer_prov_status_active(self, loadbalancer_id):
|
||||||
"""Sets a load balancer provisioning status to ACTIVE.
|
"""Sets a load balancer provisioning status to ACTIVE.
|
||||||
|
|
||||||
@ -216,9 +235,12 @@ class TaskUtils(object):
|
|||||||
id=loadbalancer_id,
|
id=loadbalancer_id,
|
||||||
provisioning_status=constants.ACTIVE)
|
provisioning_status=constants.ACTIVE)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# Reraise for tenacity
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
LOG.error("Failed to update load balancer %(lb)s "
|
LOG.error("Failed to update load balancer %(lb)s "
|
||||||
"provisioning status to ACTIVE due to: "
|
"provisioning status to ACTIVE due to: "
|
||||||
"%(except)s", {'lb': loadbalancer_id, 'except': str(e)})
|
"%(except)s", {'lb': loadbalancer_id,
|
||||||
|
'except': str(e)})
|
||||||
|
|
||||||
def mark_member_prov_status_error(self, member_id):
|
def mark_member_prov_status_error(self, member_id):
|
||||||
"""Sets a member provisioning status to ERROR.
|
"""Sets a member provisioning status to ERROR.
|
||||||
|
@ -52,11 +52,18 @@ class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
|
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
|
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
|
||||||
self.task_utils.mark_pool_prov_status_active(health_mon.pool_id)
|
self.task_utils.mark_pool_prov_status_active(health_mon.pool_id)
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
|
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
@ -66,10 +73,17 @@ class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, l7policy, listeners, loadbalancer, *args, **kwargs):
|
def revert(self, l7policy, listeners, loadbalancer, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
|
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
|
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
@ -79,11 +93,19 @@ class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, l7rule, listeners, loadbalancer, *args, **kwargs):
|
def revert(self, l7rule, listeners, loadbalancer, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
|
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
|
||||||
self.task_utils.mark_l7policy_prov_status_active(l7rule.l7policy_id)
|
self.task_utils.mark_l7policy_prov_status_active(
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
l7rule.l7policy_id)
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
|
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
@ -93,7 +115,14 @@ class ListenerToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, listener, *args, **kwargs):
|
def revert(self, listener, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_listener_prov_status_error(listener.id)
|
self.task_utils.mark_listener_prov_status_error(listener.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
listener.load_balancer.id)
|
listener.load_balancer.id)
|
||||||
|
|
||||||
@ -105,10 +134,17 @@ class ListenersToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, listeners, loadbalancer, *args, **kwargs):
|
def revert(self, listeners, loadbalancer, *args, **kwargs):
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
try:
|
||||||
loadbalancer.id)
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_error(listener.id)
|
self.task_utils.mark_listener_prov_status_error(listener.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
|
loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
class LoadBalancerIDToErrorOnRevertTask(BaseLifecycleTask):
|
class LoadBalancerIDToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
@ -138,10 +174,17 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, member, listeners, loadbalancer, pool, *args, **kwargs):
|
def revert(self, member, listeners, loadbalancer, pool, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_member_prov_status_error(member.id)
|
self.task_utils.mark_member_prov_status_error(member.id)
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
self.task_utils.mark_pool_prov_status_active(pool.id)
|
self.task_utils.mark_pool_prov_status_active(pool.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
@ -152,11 +195,18 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, members, listeners, loadbalancer, pool, *args, **kwargs):
|
def revert(self, members, listeners, loadbalancer, pool, *args, **kwargs):
|
||||||
|
try:
|
||||||
for m in members:
|
for m in members:
|
||||||
self.task_utils.mark_member_prov_status_error(m.id)
|
self.task_utils.mark_member_prov_status_error(m.id)
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
self.task_utils.mark_pool_prov_status_active(pool.id)
|
self.task_utils.mark_pool_prov_status_active(pool.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
|
||||||
|
|
||||||
@ -167,7 +217,14 @@ class PoolToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, pool, listeners, loadbalancer, *args, **kwargs):
|
def revert(self, pool, listeners, loadbalancer, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_pool_prov_status_error(pool.id)
|
self.task_utils.mark_pool_prov_status_error(pool.id)
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||||
|
@ -54,15 +54,22 @@ class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
|
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_health_mon_prov_status_error(
|
self.task_utils.mark_health_mon_prov_status_error(
|
||||||
health_mon[constants.HEALTHMONITOR_ID])
|
health_mon[constants.HEALTHMONITOR_ID])
|
||||||
self.task_utils.mark_pool_prov_status_active(
|
self.task_utils.mark_pool_prov_status_active(
|
||||||
health_mon[constants.POOL_ID])
|
health_mon[constants.POOL_ID])
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
|
||||||
loadbalancer[constants.LOADBALANCER_ID])
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(
|
self.task_utils.mark_listener_prov_status_active(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
|
loadbalancer[constants.LOADBALANCER_ID])
|
||||||
|
|
||||||
|
|
||||||
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
|
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
@ -72,12 +79,19 @@ class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, l7policy, listeners, loadbalancer_id, *args, **kwargs):
|
def revert(self, l7policy, listeners, loadbalancer_id, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_l7policy_prov_status_error(
|
self.task_utils.mark_l7policy_prov_status_error(
|
||||||
l7policy[constants.L7POLICY_ID])
|
l7policy[constants.L7POLICY_ID])
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer_id)
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(
|
self.task_utils.mark_listener_prov_status_active(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer_id)
|
||||||
|
|
||||||
|
|
||||||
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
|
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
@ -88,14 +102,21 @@ class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
|
|
||||||
def revert(self, l7rule, l7policy_id, listeners, loadbalancer_id, *args,
|
def revert(self, l7rule, l7policy_id, listeners, loadbalancer_id, *args,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_l7rule_prov_status_error(
|
self.task_utils.mark_l7rule_prov_status_error(
|
||||||
l7rule[constants.L7RULE_ID])
|
l7rule[constants.L7RULE_ID])
|
||||||
self.task_utils.mark_l7policy_prov_status_active(l7policy_id)
|
self.task_utils.mark_l7policy_prov_status_active(l7policy_id)
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
|
||||||
loadbalancer_id)
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(
|
self.task_utils.mark_listener_prov_status_active(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
|
loadbalancer_id)
|
||||||
|
|
||||||
|
|
||||||
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
|
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
|
||||||
@ -105,8 +126,15 @@ class ListenerToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, listener, *args, **kwargs):
|
def revert(self, listener, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_listener_prov_status_error(
|
self.task_utils.mark_listener_prov_status_error(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
listener[constants.LOADBALANCER_ID])
|
listener[constants.LOADBALANCER_ID])
|
||||||
|
|
||||||
@ -118,9 +146,16 @@ class ListenersToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, listeners, *args, **kwargs):
|
def revert(self, listeners, *args, **kwargs):
|
||||||
|
try:
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_error(
|
self.task_utils.mark_listener_prov_status_error(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
listeners[0][constants.LOADBALANCER_ID])
|
listeners[0][constants.LOADBALANCER_ID])
|
||||||
|
|
||||||
@ -154,12 +189,19 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
|
|
||||||
def revert(self, member, listeners, loadbalancer, pool_id, *args,
|
def revert(self, member, listeners, loadbalancer, pool_id, *args,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_member_prov_status_error(
|
self.task_utils.mark_member_prov_status_error(
|
||||||
member[constants.MEMBER_ID])
|
member[constants.MEMBER_ID])
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(
|
self.task_utils.mark_listener_prov_status_active(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
self.task_utils.mark_pool_prov_status_active(pool_id)
|
self.task_utils.mark_pool_prov_status_active(pool_id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
loadbalancer[constants.LOADBALANCER_ID])
|
loadbalancer[constants.LOADBALANCER_ID])
|
||||||
|
|
||||||
@ -172,6 +214,7 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
|
|
||||||
def revert(self, members, listeners, loadbalancer, pool_id, *args,
|
def revert(self, members, listeners, loadbalancer, pool_id, *args,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
|
try:
|
||||||
for m in members:
|
for m in members:
|
||||||
self.task_utils.mark_member_prov_status_error(
|
self.task_utils.mark_member_prov_status_error(
|
||||||
m[constants.MEMBER_ID])
|
m[constants.MEMBER_ID])
|
||||||
@ -179,6 +222,12 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
self.task_utils.mark_listener_prov_status_active(
|
self.task_utils.mark_listener_prov_status_active(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
self.task_utils.mark_pool_prov_status_active(pool_id)
|
self.task_utils.mark_pool_prov_status_active(pool_id)
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
loadbalancer[constants.LOADBALANCER_ID])
|
loadbalancer[constants.LOADBALANCER_ID])
|
||||||
|
|
||||||
@ -190,9 +239,16 @@ class PoolToErrorOnRevertTask(BaseLifecycleTask):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def revert(self, pool_id, listeners, loadbalancer, *args, **kwargs):
|
def revert(self, pool_id, listeners, loadbalancer, *args, **kwargs):
|
||||||
|
try:
|
||||||
self.task_utils.mark_pool_prov_status_error(pool_id)
|
self.task_utils.mark_pool_prov_status_error(pool_id)
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
|
||||||
loadbalancer[constants.LOADBALANCER_ID])
|
|
||||||
for listener in listeners:
|
for listener in listeners:
|
||||||
self.task_utils.mark_listener_prov_status_active(
|
self.task_utils.mark_listener_prov_status_active(
|
||||||
listener[constants.LISTENER_ID])
|
listener[constants.LISTENER_ID])
|
||||||
|
except Exception:
|
||||||
|
# Catching and skipping, errors are already reported by task_utils
|
||||||
|
# and we want to ensure that mark_loadbalancer_prov_status_active
|
||||||
|
# is called to unlock the LB (it will pass or it will fail after a
|
||||||
|
# very long timeout)
|
||||||
|
pass
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
|
loadbalancer[constants.LOADBALANCER_ID])
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
|
import tenacity
|
||||||
|
|
||||||
from octavia.common import constants
|
from octavia.common import constants
|
||||||
from octavia.controller.worker import task_utils as task_utilities
|
from octavia.controller.worker import task_utils as task_utilities
|
||||||
@ -173,7 +174,13 @@ class TestTaskUtils(base.TestCase):
|
|||||||
|
|
||||||
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
|
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
|
||||||
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
|
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
|
||||||
|
@mock.patch('tenacity.nap.time')
|
||||||
|
# mock LOG so we don't fill the console with log messages from
|
||||||
|
# tenacity.retry
|
||||||
|
@mock.patch('octavia.controller.worker.task_utils.LOG')
|
||||||
def test_mark_loadbalancer_prov_status_active(self,
|
def test_mark_loadbalancer_prov_status_active(self,
|
||||||
|
mock_LOG,
|
||||||
|
mock_time,
|
||||||
mock_lb_repo_update,
|
mock_lb_repo_update,
|
||||||
mock_get_session):
|
mock_get_session):
|
||||||
|
|
||||||
@ -190,14 +197,41 @@ class TestTaskUtils(base.TestCase):
|
|||||||
mock_lb_repo_update.reset_mock()
|
mock_lb_repo_update.reset_mock()
|
||||||
mock_get_session.side_effect = Exception('fail')
|
mock_get_session.side_effect = Exception('fail')
|
||||||
|
|
||||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
self.assertRaises(
|
||||||
|
tenacity.RetryError,
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active,
|
||||||
self.LOADBALANCER_ID)
|
self.LOADBALANCER_ID)
|
||||||
|
|
||||||
self.assertFalse(mock_lb_repo_update.called)
|
self.assertFalse(mock_lb_repo_update.called)
|
||||||
|
|
||||||
|
# Exceptions then happy path
|
||||||
|
mock_get_session.reset_mock(side_effect=True)
|
||||||
|
mock_lb_repo_update.reset_mock()
|
||||||
|
|
||||||
|
mock_session = mock_get_session()
|
||||||
|
mock_get_session.side_effect = [
|
||||||
|
Exception('fail'),
|
||||||
|
Exception('fail'),
|
||||||
|
Exception('fail'),
|
||||||
|
mock_session]
|
||||||
|
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||||
|
self.LOADBALANCER_ID)
|
||||||
|
|
||||||
|
mock_lb_repo_update.assert_called_once_with(
|
||||||
|
mock_session,
|
||||||
|
id=self.LOADBALANCER_ID,
|
||||||
|
provisioning_status=constants.ACTIVE)
|
||||||
|
|
||||||
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
|
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
|
||||||
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
|
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
|
||||||
|
@mock.patch('tenacity.nap.time')
|
||||||
|
# mock LOG so we don't fill the console with log messages from
|
||||||
|
# tenacity.retry
|
||||||
|
@mock.patch('octavia.controller.worker.task_utils.LOG')
|
||||||
def test_mark_loadbalancer_prov_status_error(self,
|
def test_mark_loadbalancer_prov_status_error(self,
|
||||||
|
mock_LOG,
|
||||||
|
mock_time,
|
||||||
mock_lb_repo_update,
|
mock_lb_repo_update,
|
||||||
mock_get_session):
|
mock_get_session):
|
||||||
|
|
||||||
@ -214,11 +248,31 @@ class TestTaskUtils(base.TestCase):
|
|||||||
mock_lb_repo_update.reset_mock()
|
mock_lb_repo_update.reset_mock()
|
||||||
mock_get_session.side_effect = Exception('fail')
|
mock_get_session.side_effect = Exception('fail')
|
||||||
|
|
||||||
self.task_utils.mark_loadbalancer_prov_status_error(
|
self.assertRaises(tenacity.RetryError,
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_error,
|
||||||
self.LOADBALANCER_ID)
|
self.LOADBALANCER_ID)
|
||||||
|
|
||||||
self.assertFalse(mock_lb_repo_update.called)
|
self.assertFalse(mock_lb_repo_update.called)
|
||||||
|
|
||||||
|
# Exceptions then happy path
|
||||||
|
mock_get_session.reset_mock(side_effect=True)
|
||||||
|
mock_lb_repo_update.reset_mock()
|
||||||
|
|
||||||
|
mock_session = mock_get_session()
|
||||||
|
mock_get_session.side_effect = [
|
||||||
|
Exception('fail'),
|
||||||
|
Exception('fail'),
|
||||||
|
Exception('fail'),
|
||||||
|
mock_session]
|
||||||
|
|
||||||
|
self.task_utils.mark_loadbalancer_prov_status_error(
|
||||||
|
self.LOADBALANCER_ID)
|
||||||
|
|
||||||
|
mock_lb_repo_update.assert_called_once_with(
|
||||||
|
mock_session,
|
||||||
|
id=self.LOADBALANCER_ID,
|
||||||
|
provisioning_status=constants.ERROR)
|
||||||
|
|
||||||
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
|
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
|
||||||
@mock.patch('octavia.db.repositories.MemberRepository.update')
|
@mock.patch('octavia.db.repositories.MemberRepository.update')
|
||||||
def test_mark_member_prov_status_error(self,
|
def test_mark_member_prov_status_error(self,
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fixed an issue with load balancers stuck in a ``PENDING_*`` state during
|
||||||
|
database outages. Now when a task fails in Octavia, it retries to update
|
||||||
|
the ``provisioning_status`` of the load balancer until the database is back
|
||||||
|
(or it gives up after a really long timeout - around 2h45)
|
Loading…
x
Reference in New Issue
Block a user