Merge "Retry to set loadbalancer prov status on failures" into stable/xena

This commit is contained in:
Zuul 2023-10-12 19:24:26 +00:00 committed by Gerrit Code Review
commit 942b8e1553
7 changed files with 289 additions and 74 deletions

View File

@ -73,6 +73,14 @@ configuration from successfully completing. In this case the load balancer is
continuing to process traffic through the load balancer, but might not have
applied the latest configuration updates yet.
A load balancer in a PENDING provisioning status is immutable, it cannot be
updated or deleted by another process, this PENDING status acts as a lock on
the resource.
If a database outage occurs while a load balancer is deleted, created or
updated, the Octavia control plane will try to remove the PENDING status and
set it to ERROR during a long period of time (around 2h45min with the default
settings), to prevent the resource from remaining immutable.
Monitoring load balancer functionality
--------------------------------------

View File

@ -526,6 +526,17 @@ controller_worker_opts = [
help=_('Number of times an amphora delete should be retried.')),
cfg.IntOpt('amphora_delete_retry_interval', default=5,
help=_('Time, in seconds, between amphora delete retries.')),
# 2000 attempts is around 2h45 with the default settings
cfg.IntOpt('db_commit_retry_attempts', default=2000,
help=_('The number of times the database action will be '
'attempted.')),
cfg.IntOpt('db_commit_retry_initial_delay', default=1,
help=_('The initial delay before a retry attempt.')),
cfg.IntOpt('db_commit_retry_backoff', default=1,
help=_('The time to backoff retry attempts.')),
cfg.IntOpt('db_commit_retry_max', default=5,
help=_('The maximum amount of time to wait between retry '
'attempts.')),
]
task_flow_opts = [

View File

@ -14,18 +14,32 @@
""" Methods common to the controller work tasks."""
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
import tenacity
from octavia.common import constants
from octavia.db import api as db_apis
from octavia.db import repositories as repo
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class TaskUtils(object):
"""Class of helper/utility methods used by tasks."""
status_update_retry = tenacity.retry(
retry=tenacity.retry_if_exception_type(Exception),
wait=tenacity.wait_incrementing(
CONF.controller_worker.db_commit_retry_initial_delay,
CONF.controller_worker.db_commit_retry_backoff,
CONF.controller_worker.db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.controller_worker.db_commit_retry_attempts),
after=tenacity.after_log(LOG, logging.DEBUG))
def __init__(self, **kwargs):
self.amphora_repo = repo.AmphoraRepository()
self.health_mon_repo = repo.HealthMonitorRepository()
@ -153,6 +167,7 @@ class TaskUtils(object):
"provisioning status to ERROR due to: "
"%(except)s", {'list': listener_id, 'except': str(e)})
@status_update_retry
def mark_loadbalancer_prov_status_error(self, loadbalancer_id):
"""Sets a load balancer provisioning status to ERROR.
@ -166,9 +181,12 @@ class TaskUtils(object):
id=loadbalancer_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error("Failed to update load balancer %(lb)s "
"provisioning status to ERROR due to: "
"%(except)s", {'lb': loadbalancer_id, 'except': str(e)})
# Reraise for tenacity
with excutils.save_and_reraise_exception():
LOG.error("Failed to update load balancer %(lb)s "
"provisioning status to ERROR due to: "
"%(except)s", {'lb': loadbalancer_id,
'except': str(e)})
def mark_listener_prov_status_active(self, listener_id):
"""Sets a listener provisioning status to ACTIVE.
@ -203,6 +221,7 @@ class TaskUtils(object):
"to ACTIVE due to: %(except)s", {'pool': pool_id,
'except': str(e)})
@status_update_retry
def mark_loadbalancer_prov_status_active(self, loadbalancer_id):
"""Sets a load balancer provisioning status to ACTIVE.
@ -216,9 +235,12 @@ class TaskUtils(object):
id=loadbalancer_id,
provisioning_status=constants.ACTIVE)
except Exception as e:
LOG.error("Failed to update load balancer %(lb)s "
"provisioning status to ACTIVE due to: "
"%(except)s", {'lb': loadbalancer_id, 'except': str(e)})
# Reraise for tenacity
with excutils.save_and_reraise_exception():
LOG.error("Failed to update load balancer %(lb)s "
"provisioning status to ACTIVE due to: "
"%(except)s", {'lb': loadbalancer_id,
'except': str(e)})
def mark_member_prov_status_error(self, member_id):
"""Sets a member provisioning status to ERROR.

View File

@ -52,11 +52,18 @@ class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
self.task_utils.mark_pool_prov_status_active(health_mon.pool_id)
try:
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
self.task_utils.mark_pool_prov_status_active(health_mon.pool_id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
@ -66,10 +73,17 @@ class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, l7policy, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
try:
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
@ -79,11 +93,19 @@ class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, l7rule, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
self.task_utils.mark_l7policy_prov_status_active(l7rule.l7policy_id)
try:
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
self.task_utils.mark_l7policy_prov_status_active(
l7rule.l7policy_id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
@ -93,7 +115,14 @@ class ListenerToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, listener, *args, **kwargs):
self.task_utils.mark_listener_prov_status_error(listener.id)
try:
self.task_utils.mark_listener_prov_status_error(listener.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
listener.load_balancer.id)
@ -105,10 +134,17 @@ class ListenersToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, listeners, loadbalancer, *args, **kwargs):
try:
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)
class LoadBalancerIDToErrorOnRevertTask(BaseLifecycleTask):
@ -138,10 +174,17 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, member, listeners, loadbalancer, pool, *args, **kwargs):
self.task_utils.mark_member_prov_status_error(member.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_pool_prov_status_active(pool.id)
try:
self.task_utils.mark_member_prov_status_error(member.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_pool_prov_status_active(pool.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
@ -152,11 +195,18 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, members, listeners, loadbalancer, pool, *args, **kwargs):
for m in members:
self.task_utils.mark_member_prov_status_error(m.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_pool_prov_status_active(pool.id)
try:
for m in members:
self.task_utils.mark_member_prov_status_error(m.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_pool_prov_status_active(pool.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
@ -167,7 +217,14 @@ class PoolToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, pool, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_pool_prov_status_error(pool.id)
try:
self.task_utils.mark_pool_prov_status_error(pool.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)

View File

@ -54,15 +54,22 @@ class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_health_mon_prov_status_error(
health_mon[constants.HEALTHMONITOR_ID])
self.task_utils.mark_pool_prov_status_active(
health_mon[constants.POOL_ID])
try:
self.task_utils.mark_health_mon_prov_status_error(
health_mon[constants.HEALTHMONITOR_ID])
self.task_utils.mark_pool_prov_status_active(
health_mon[constants.POOL_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer[constants.LOADBALANCER_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
@ -72,12 +79,19 @@ class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, l7policy, listeners, loadbalancer_id, *args, **kwargs):
self.task_utils.mark_l7policy_prov_status_error(
l7policy[constants.L7POLICY_ID])
try:
self.task_utils.mark_l7policy_prov_status_error(
l7policy[constants.L7POLICY_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer_id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
@ -88,14 +102,21 @@ class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
def revert(self, l7rule, l7policy_id, listeners, loadbalancer_id, *args,
**kwargs):
self.task_utils.mark_l7rule_prov_status_error(
l7rule[constants.L7RULE_ID])
self.task_utils.mark_l7policy_prov_status_active(l7policy_id)
try:
self.task_utils.mark_l7rule_prov_status_error(
l7rule[constants.L7RULE_ID])
self.task_utils.mark_l7policy_prov_status_active(l7policy_id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer_id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
@ -105,8 +126,15 @@ class ListenerToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, listener, *args, **kwargs):
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
try:
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
listener[constants.LOADBALANCER_ID])
@ -118,9 +146,16 @@ class ListenersToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, listeners, *args, **kwargs):
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
try:
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
listeners[0][constants.LOADBALANCER_ID])
@ -154,12 +189,19 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
def revert(self, member, listeners, loadbalancer, pool_id, *args,
**kwargs):
self.task_utils.mark_member_prov_status_error(
member[constants.MEMBER_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
self.task_utils.mark_pool_prov_status_active(pool_id)
try:
self.task_utils.mark_member_prov_status_error(
member[constants.MEMBER_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
self.task_utils.mark_pool_prov_status_active(pool_id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer[constants.LOADBALANCER_ID])
@ -172,13 +214,20 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask):
def revert(self, members, listeners, loadbalancer, pool_id, *args,
**kwargs):
for m in members:
self.task_utils.mark_member_prov_status_error(
m[constants.MEMBER_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
self.task_utils.mark_pool_prov_status_active(pool_id)
try:
for m in members:
self.task_utils.mark_member_prov_status_error(
m[constants.MEMBER_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
self.task_utils.mark_pool_prov_status_active(pool_id)
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer[constants.LOADBALANCER_ID])
@ -190,9 +239,16 @@ class PoolToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, pool_id, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_pool_prov_status_error(pool_id)
try:
self.task_utils.mark_pool_prov_status_error(pool_id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
except Exception:
# Catching and skipping, errors are already reported by task_utils
# and we want to ensure that mark_loadbalancer_prov_status_active
# is called to unlock the LB (it will pass or it will fail after a
# very long timeout)
pass
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer[constants.LOADBALANCER_ID])
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])

View File

@ -14,6 +14,7 @@
from unittest import mock
from oslo_utils import uuidutils
import tenacity
from octavia.common import constants
from octavia.controller.worker import task_utils as task_utilities
@ -173,7 +174,13 @@ class TestTaskUtils(base.TestCase):
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
@mock.patch('tenacity.nap.time')
# mock LOG so we don't fill the console with log messages from
# tenacity.retry
@mock.patch('octavia.controller.worker.task_utils.LOG')
def test_mark_loadbalancer_prov_status_active(self,
mock_LOG,
mock_time,
mock_lb_repo_update,
mock_get_session):
@ -190,14 +197,41 @@ class TestTaskUtils(base.TestCase):
mock_lb_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_loadbalancer_prov_status_active(
self.assertRaises(
tenacity.RetryError,
self.task_utils.mark_loadbalancer_prov_status_active,
self.LOADBALANCER_ID)
self.assertFalse(mock_lb_repo_update.called)
# Exceptions then happy path
mock_get_session.reset_mock(side_effect=True)
mock_lb_repo_update.reset_mock()
mock_session = mock_get_session()
mock_get_session.side_effect = [
Exception('fail'),
Exception('fail'),
Exception('fail'),
mock_session]
self.task_utils.mark_loadbalancer_prov_status_active(
self.LOADBALANCER_ID)
mock_lb_repo_update.assert_called_once_with(
mock_session,
id=self.LOADBALANCER_ID,
provisioning_status=constants.ACTIVE)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
@mock.patch('tenacity.nap.time')
# mock LOG so we don't fill the console with log messages from
# tenacity.retry
@mock.patch('octavia.controller.worker.task_utils.LOG')
def test_mark_loadbalancer_prov_status_error(self,
mock_LOG,
mock_time,
mock_lb_repo_update,
mock_get_session):
@ -214,10 +248,30 @@ class TestTaskUtils(base.TestCase):
mock_lb_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.assertRaises(tenacity.RetryError,
self.task_utils.mark_loadbalancer_prov_status_error,
self.LOADBALANCER_ID)
self.assertFalse(mock_lb_repo_update.called)
# Exceptions then happy path
mock_get_session.reset_mock(side_effect=True)
mock_lb_repo_update.reset_mock()
mock_session = mock_get_session()
mock_get_session.side_effect = [
Exception('fail'),
Exception('fail'),
Exception('fail'),
mock_session]
self.task_utils.mark_loadbalancer_prov_status_error(
self.LOADBALANCER_ID)
self.assertFalse(mock_lb_repo_update.called)
mock_lb_repo_update.assert_called_once_with(
mock_session,
id=self.LOADBALANCER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.MemberRepository.update')

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Fixed an issue with load balancers stuck in a ``PENDING_*`` state during
database outages. Now when a task fails in Octavia, it retries to update
the ``provisioning_status`` of the load balancer until the database is back
(or it gives up after a really long timeout - around 2h45)