Fix race conditions between API and worker DB calls

The Octavia API creates new resource in the DB and passes the resource
and its ID to the worker. The worker then accesses to the resource using
the database, but it might try to fetch the element before it's
commited by the API.
This commit ensures that the resource has been commited to the DB before
processing the request.

Story 2008878
Task 42423

Change-Id: I55136f570db33cfd22e919019c18e9028a6a9bf7
This commit is contained in:
Gregory Thiemonge 2021-05-03 16:09:25 +02:00
parent c55e68a3be
commit cd272ae5a9
2 changed files with 29 additions and 7 deletions

View File

@ -449,8 +449,15 @@ class ControllerWorker(object):
:returns: None
:raises NoSuitablePool: Unable to find the node pool
"""
pool = self._pool_repo.get(db_apis.get_session(),
id=member[constants.POOL_ID])
db_member = self._member_repo.get(db_apis.get_session(),
id=member[constants.MEMBER_ID])
if not db_member:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7member',
member[constants.MEMBER_ID])
raise db_exceptions.NoResultFound
pool = db_member.pool
load_balancer = pool.load_balancer
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()
@ -716,8 +723,15 @@ class ControllerWorker(object):
:returns: None
:raises NoResultFound: Unable to find the object
"""
db_listener = self._listener_repo.get(
db_apis.get_session(), id=l7policy[constants.LISTENER_ID])
db_l7policy = self._l7policy_repo.get(
db_apis.get_session(), id=l7policy[constants.L7POLICY_ID])
if not db_l7policy:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7policy',
l7policy[constants.L7POLICY_ID])
raise db_exceptions.NoResultFound
db_listener = db_l7policy.listener
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
@ -790,8 +804,15 @@ class ControllerWorker(object):
:returns: None
:raises NoResultFound: Unable to find the object
"""
db_l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7rule[constants.L7POLICY_ID])
db_l7rule = self._l7rule_repo.get(db_apis.get_session(),
id=l7rule[constants.L7RULE_ID])
if not db_l7rule:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7rule',
l7rule[constants.L7RULE_ID])
raise db_exceptions.NoResultFound
db_l7policy = db_l7rule.l7policy
load_balancer = db_l7policy.listener.load_balancer

View File

@ -1052,6 +1052,7 @@ class TestControllerWorker(base.TestCase):
constants.L7POLICY_ID: L7POLICY_ID,
constants.LISTENER_ID: LISTENER_ID
}
mock_l7policy_repo_get.side_effect = [None, _l7policy_mock]
cw.create_l7policy(l7policy_mock)
(cw.services_controller.run_poster.
@ -1140,7 +1141,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_l7policy_repo_get.return_value = _l7policy_mock
mock_l7rule_repo_get.side_effect = [None, _l7rule_mock]
cw = controller_worker.ControllerWorker()