Fix potential race conditions on update requests in the v2 worker

The v2 worker was not waiting for a resource to be set to PENDING_UPDATE
before updating it effectively. In some circumstances (loaded
server/slow DB for instance), the resources may have been updated with
old data from the DB, resulting in a no-op update.

This wait for a PENDING_UPDATE state exists in amphorav1 but it was not
moved to v2 when we switched the logic from the DB-based objects to
dict-based objects.

Story 2009887
Task 44645
Story 2009985
Task 45032

Change-Id: I433e074c2eac5f6875ee5dace8faf86969b24f66
This commit is contained in:
Gregory Thiemonge 2022-04-08 21:25:48 +02:00
parent bf007ec4a8
commit 6042de759d
3 changed files with 96 additions and 10 deletions

View File

@ -294,8 +294,17 @@ class ControllerWorker(object):
:returns: None :returns: None
:raises ListenerNotFound: The referenced listener was not found :raises ListenerNotFound: The referenced listener was not found
""" """
db_lb = self._lb_repo.get(db_apis.get_session(), try:
id=listener[constants.LOADBALANCER_ID]) db_lb = self._get_db_obj_until_pending_update(
self._lb_repo, listener[constants.LOADBALANCER_ID])
except tenacity.RetryError as e:
LOG.warning('Loadbalancer did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_lb = e.last_attempt.result()
store = {constants.LISTENER: listener, store = {constants.LISTENER: listener,
constants.UPDATE_DICT: listener_updates, constants.UPDATE_DICT: listener_updates,
constants.LOADBALANCER_ID: db_lb.id, constants.LOADBALANCER_ID: db_lb.id,
@ -391,6 +400,18 @@ class ControllerWorker(object):
:returns: None :returns: None
:raises LBNotFound: The referenced load balancer was not found :raises LBNotFound: The referenced load balancer was not found
""" """
try:
self._get_db_obj_until_pending_update(
self._lb_repo,
original_load_balancer[constants.LOADBALANCER_ID])
except tenacity.RetryError:
LOG.warning('Load balancer did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
store = {constants.LOADBALANCER: original_load_balancer, store = {constants.LOADBALANCER: original_load_balancer,
constants.LOADBALANCER_ID: constants.LOADBALANCER_ID:
original_load_balancer[constants.LOADBALANCER_ID], original_load_balancer[constants.LOADBALANCER_ID],
@ -479,8 +500,26 @@ class ControllerWorker(object):
flow_utils.get_delete_member_flow, flow_utils.get_delete_member_flow,
store=store) store=store)
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def batch_update_members(self, old_members, new_members, def batch_update_members(self, old_members, new_members,
updated_members): updated_members):
db_new_members = [self._member_repo.get(db_apis.get_session(),
id=member[constants.MEMBER_ID])
for member in new_members]
# The API may not have commited all of the new member records yet.
# Make sure we retry looking them up.
if None in db_new_members or len(db_new_members) != len(new_members):
LOG.warning('Failed to fetch one of the new members from DB. '
'Retrying for up to 60 seconds.')
raise db_exceptions.NoResultFound
updated_members = [ updated_members = [
(provider_utils.db_member_to_provider_member( (provider_utils.db_member_to_provider_member(
self._member_repo.get(db_apis.get_session(), self._member_repo.get(db_apis.get_session(),
@ -534,9 +573,19 @@ class ControllerWorker(object):
:returns: None :returns: None
:raises MemberNotFound: The referenced member was not found :raises MemberNotFound: The referenced member was not found
""" """
# TODO(ataraday) when other flows will use dicts - revisit this
pool = self._pool_repo.get(db_apis.get_session(), try:
id=member[constants.POOL_ID]) db_member = self._get_db_obj_until_pending_update(
self._member_repo, member[constants.MEMBER_ID])
except tenacity.RetryError as e:
LOG.warning('Member did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_member = e.last_attempt.result()
pool = db_member.pool
load_balancer = pool.load_balancer load_balancer = pool.load_balancer
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict(recurse=True) load_balancer).to_dict(recurse=True)
@ -720,8 +769,18 @@ class ControllerWorker(object):
:returns: None :returns: None
:raises L7PolicyNotFound: The referenced l7policy was not found :raises L7PolicyNotFound: The referenced l7policy was not found
""" """
db_listener = self._listener_repo.get( try:
db_apis.get_session(), id=original_l7policy[constants.LISTENER_ID]) db_l7policy = self._get_db_obj_until_pending_update(
self._l7policy_repo, original_l7policy[constants.L7POLICY_ID])
except tenacity.RetryError as e:
LOG.warning('L7 policy did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_l7policy = e.last_attempt.result()
db_listener = db_l7policy.listener
listeners_dicts = ( listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts( provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
@ -812,8 +871,17 @@ class ControllerWorker(object):
:returns: None :returns: None
:raises L7RuleNotFound: The referenced l7rule was not found :raises L7RuleNotFound: The referenced l7rule was not found
""" """
db_l7policy = self._l7policy_repo.get( try:
db_apis.get_session(), id=original_l7rule[constants.L7POLICY_ID]) db_l7rule = self._get_db_obj_until_pending_update(
self._l7rule_repo, original_l7rule[constants.L7RULE_ID])
except tenacity.RetryError as e:
LOG.warning('L7 rule did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_l7rule = e.last_attempt.result()
db_l7policy = db_l7rule.l7policy
load_balancer = db_l7policy.listener.load_balancer load_balancer = db_l7policy.listener.load_balancer
listeners_dicts = ( listeners_dicts = (

View File

@ -375,6 +375,11 @@ class TestControllerWorker(base.TestCase):
mock_health_mon_repo_get, mock_health_mon_repo_get,
mock_amp_repo_get): mock_amp_repo_get):
load_balancer_mock = mock.MagicMock()
load_balancer_mock.provisioning_status = constants.PENDING_UPDATE
load_balancer_mock.id = LB_ID
mock_lb_repo_get.return_value = load_balancer_mock
_flow_mock.reset_mock() _flow_mock.reset_mock()
_listener_mock.provisioning_status = constants.PENDING_UPDATE _listener_mock.provisioning_status = constants.PENDING_UPDATE
@ -795,6 +800,10 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get): mock_amp_repo_get):
_flow_mock.reset_mock() _flow_mock.reset_mock()
db_member = mock.MagicMock()
db_member.provisioning_status = constants.PENDING_UPDATE
db_member.pool = _db_pool_mock
mock_member_repo_get.return_value = db_member
_member = _member_mock.to_dict() _member = _member_mock.to_dict()
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE _member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
mock_get_az_metadata_dict.return_value = {} mock_get_az_metadata_dict.return_value = {}
@ -843,7 +852,9 @@ class TestControllerWorker(base.TestCase):
old_member = mock.MagicMock() old_member = mock.MagicMock()
old_member.to_dict.return_value = {'id': 9, old_member.to_dict.return_value = {'id': 9,
constants.POOL_ID: 'testtest'} constants.POOL_ID: 'testtest'}
mock_member_repo_get.side_effect = [_member_mock, old_member] new_member = mock.MagicMock()
mock_member_repo_get.side_effect = [
new_member, _member_mock, old_member]
cw.batch_update_members([{constants.MEMBER_ID: 9, cw.batch_update_members([{constants.MEMBER_ID: 9,
constants.POOL_ID: 'testtest'}], constants.POOL_ID: 'testtest'}],
[{constants.MEMBER_ID: 11}], [{constants.MEMBER_ID: 11}],

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Fix a potential race condition when updating a resource in the amphorav2
worker. The worker was not waiting for the resource to be set to
PENDING_UPDATE, so the resource may have been updated with old data from the
database, resulting in a no-op update.