Merge "Fix potential race conditions on update requests in the v2 worker" into stable/victoria

This commit is contained in:
Zuul 2022-07-01 16:31:54 +00:00 committed by Gerrit Code Review
commit 9fb6adaf02
3 changed files with 306 additions and 10 deletions

View File

@ -314,8 +314,17 @@ class ControllerWorker(object):
:returns: None
:raises ListenerNotFound: The referenced listener was not found
"""
db_lb = self._lb_repo.get(db_apis.get_session(),
id=listener[constants.LOADBALANCER_ID])
try:
db_lb = self._get_db_obj_until_pending_update(
self._lb_repo, listener[constants.LOADBALANCER_ID])
except tenacity.RetryError as e:
LOG.warning('Loadbalancer did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_lb = e.last_attempt.result()
store = {constants.LISTENER: listener,
constants.UPDATE_DICT: listener_updates,
constants.LOADBALANCER_ID: db_lb.id,
@ -411,6 +420,18 @@ class ControllerWorker(object):
:returns: None
:raises LBNotFound: The referenced load balancer was not found
"""
try:
self._get_db_obj_until_pending_update(
self._lb_repo,
original_load_balancer[constants.LOADBALANCER_ID])
except tenacity.RetryError:
LOG.warning('Load balancer did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
store = {constants.LOADBALANCER: original_load_balancer,
constants.LOADBALANCER_ID:
original_load_balancer[constants.LOADBALANCER_ID],
@ -499,8 +520,26 @@ class ControllerWorker(object):
flow_utils.get_delete_member_flow,
store=store)
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
CONF.haproxy_amphora.api_db_commit_retry_backoff,
CONF.haproxy_amphora.api_db_commit_retry_max),
stop=tenacity.stop_after_attempt(
CONF.haproxy_amphora.api_db_commit_retry_attempts))
def batch_update_members(self, old_members, new_members,
updated_members):
db_new_members = [self._member_repo.get(db_apis.get_session(),
id=member[constants.MEMBER_ID])
for member in new_members]
# The API may not have commited all of the new member records yet.
# Make sure we retry looking them up.
if None in db_new_members or len(db_new_members) != len(new_members):
LOG.warning('Failed to fetch one of the new members from DB. '
'Retrying for up to 60 seconds.')
raise db_exceptions.NoResultFound
updated_members = [
(provider_utils.db_member_to_provider_member(
self._member_repo.get(db_apis.get_session(),
@ -554,9 +593,19 @@ class ControllerWorker(object):
:returns: None
:raises MemberNotFound: The referenced member was not found
"""
# TODO(ataraday) when other flows will use dicts - revisit this
pool = self._pool_repo.get(db_apis.get_session(),
id=member[constants.POOL_ID])
try:
db_member = self._get_db_obj_until_pending_update(
self._member_repo, member[constants.MEMBER_ID])
except tenacity.RetryError as e:
LOG.warning('Member did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_member = e.last_attempt.result()
pool = db_member.pool
load_balancer = pool.load_balancer
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict(recurse=True)
@ -740,8 +789,18 @@ class ControllerWorker(object):
:returns: None
:raises L7PolicyNotFound: The referenced l7policy was not found
"""
db_listener = self._listener_repo.get(
db_apis.get_session(), id=original_l7policy[constants.LISTENER_ID])
try:
db_l7policy = self._get_db_obj_until_pending_update(
self._l7policy_repo, original_l7policy[constants.L7POLICY_ID])
except tenacity.RetryError as e:
LOG.warning('L7 policy did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_l7policy = e.last_attempt.result()
db_listener = db_l7policy.listener
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
@ -832,8 +891,17 @@ class ControllerWorker(object):
:returns: None
:raises L7RuleNotFound: The referenced l7rule was not found
"""
db_l7policy = self._l7policy_repo.get(
db_apis.get_session(), id=original_l7rule[constants.L7POLICY_ID])
try:
db_l7rule = self._get_db_obj_until_pending_update(
self._l7rule_repo, original_l7rule[constants.L7RULE_ID])
except tenacity.RetryError as e:
LOG.warning('L7 rule did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
db_l7rule = e.last_attempt.result()
db_l7policy = db_l7rule.l7policy
load_balancer = db_l7policy.listener.load_balancer
listeners_dicts = (

View File

@ -17,6 +17,7 @@ from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
import tenacity
from octavia.api.drivers import utils as provider_utils
from octavia.common import constants
@ -370,6 +371,33 @@ class TestControllerWorker(base.TestCase):
constants.UPDATE_DICT:
HEALTH_UPDATE_DICT}))
@mock.patch("octavia.controller.worker.v2.controller_worker."
"ControllerWorker._get_db_obj_until_pending_update")
def test_update_health_monitor_timeout(self,
mock__get_db_obj_until_pending,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
_db_health_mon_mock.provisioning_status = constants.ACTIVE
last_attempt_mock = mock.MagicMock()
last_attempt_mock.result.return_value = _db_health_mon_mock
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
last_attempt=last_attempt_mock)
cw = controller_worker.ControllerWorker()
cw.update_health_monitor(_health_mon_mock,
HEALTH_UPDATE_DICT)
def test_create_listener(self,
mock_api_get_session,
mock_dyn_log_listener,
@ -442,6 +470,11 @@ class TestControllerWorker(base.TestCase):
mock_health_mon_repo_get,
mock_amp_repo_get):
load_balancer_mock = mock.MagicMock()
load_balancer_mock.provisioning_status = constants.PENDING_UPDATE
load_balancer_mock.id = LB_ID
mock_lb_repo_get.return_value = load_balancer_mock
_flow_mock.reset_mock()
_listener_mock.provisioning_status = constants.PENDING_UPDATE
@ -459,6 +492,36 @@ class TestControllerWorker(base.TestCase):
constants.LISTENERS:
[listener_dict]}))
@mock.patch("octavia.controller.worker.v2.controller_worker."
"ControllerWorker._get_db_obj_until_pending_update")
def test_update_listener_timeout(self,
mock__get_db_obj_until_pending,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
load_balancer_mock = mock.MagicMock()
load_balancer_mock.provisioning_status = constants.PENDING_UPDATE
load_balancer_mock.id = LB_ID
_flow_mock.reset_mock()
_listener_mock.provisioning_status = constants.PENDING_UPDATE
last_attempt_mock = mock.MagicMock()
last_attempt_mock.result.return_value = load_balancer_mock
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
last_attempt=last_attempt_mock)
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
cw = controller_worker.ControllerWorker()
cw.update_listener(listener_dict, LISTENER_UPDATE_DICT)
def test_create_load_balancer_single_no_anti_affinity(
self, mock_api_get_session,
mock_dyn_log_listener, mock_taskflow_load, mock_pool_repo_get,
@ -760,6 +823,36 @@ class TestControllerWorker(base.TestCase):
_db_load_balancer_mock.id,
}))
@mock.patch('octavia.db.repositories.ListenerRepository.get_all',
return_value=([_listener_mock], None))
@mock.patch("octavia.controller.worker.v2.controller_worker."
"ControllerWorker._get_db_obj_until_pending_update")
def test_update_load_balancer_timeout(self,
mock__get_db_obj_until_pending,
mock_listener_repo_get_all,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
_db_load_balancer_mock.provisioning_status = constants.ACTIVE
last_attempt_mock = mock.MagicMock()
last_attempt_mock.result.return_value = _db_load_balancer_mock
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
last_attempt=last_attempt_mock)
cw = controller_worker.ControllerWorker()
change = 'TEST2'
cw.update_load_balancer(_load_balancer_mock, change)
@mock.patch('octavia.controller.worker.v2.flows.'
'member_flows.MemberFlows.get_create_member_flow',
return_value=_flow_mock)
@ -862,6 +955,10 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
db_member = mock.MagicMock()
db_member.provisioning_status = constants.PENDING_UPDATE
db_member.pool = _db_pool_mock
mock_member_repo_get.return_value = db_member
_member = _member_mock.to_dict()
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
mock_get_az_metadata_dict.return_value = {}
@ -884,6 +981,44 @@ class TestControllerWorker(base.TestCase):
MEMBER_UPDATE_DICT,
constants.AVAILABILITY_ZONE: {}}))
@mock.patch('octavia.controller.worker.v2.flows.'
'member_flows.MemberFlows.get_update_member_flow',
return_value=_flow_mock)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict')
@mock.patch("octavia.controller.worker.v2.controller_worker."
"ControllerWorker._get_db_obj_until_pending_update")
def test_update_member_timeout(self,
mock__get_db_obj_until_pending,
mock_get_az_metadata_dict,
mock_get_update_member_flow,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
db_member = mock.MagicMock()
db_member.provisioning_status = constants.ACTIVE
db_member.pool = _db_pool_mock
last_attempt_mock = mock.MagicMock()
last_attempt_mock.result.return_value = db_member
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
last_attempt=last_attempt_mock)
mock_member_repo_get.return_value = db_member
_member = _member_mock.to_dict()
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
mock_get_az_metadata_dict.return_value = {}
cw = controller_worker.ControllerWorker()
cw.update_member(_member, MEMBER_UPDATE_DICT)
@mock.patch('octavia.controller.worker.v2.flows.'
'member_flows.MemberFlows.get_batch_update_members_flow',
return_value=_flow_mock)
@ -910,7 +1045,9 @@ class TestControllerWorker(base.TestCase):
old_member = mock.MagicMock()
old_member.to_dict.return_value = {'id': 9,
constants.POOL_ID: 'testtest'}
mock_member_repo_get.side_effect = [_member_mock, old_member]
new_member = mock.MagicMock()
mock_member_repo_get.side_effect = [
new_member, _member_mock, old_member]
cw.batch_update_members([{constants.MEMBER_ID: 9,
constants.POOL_ID: 'testtest'}],
[{constants.MEMBER_ID: 11}],
@ -1030,6 +1167,32 @@ class TestControllerWorker(base.TestCase):
constants.UPDATE_DICT:
POOL_UPDATE_DICT}))
@mock.patch("octavia.controller.worker.v2.controller_worker."
"ControllerWorker._get_db_obj_until_pending_update")
def test_update_pool_update(self,
mock__get_db_obj_until_pending,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
_db_pool_mock.provisioning_status = constants.ACTIVE
last_attempt_mock = mock.MagicMock()
last_attempt_mock.result.return_value = _db_pool_mock
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
last_attempt=last_attempt_mock)
cw = controller_worker.ControllerWorker()
cw.update_pool(_pool_mock, POOL_UPDATE_DICT)
def test_create_l7policy(self,
mock_api_get_session,
mock_dyn_log_listener,
@ -1125,6 +1288,38 @@ class TestControllerWorker(base.TestCase):
constants.UPDATE_DICT:
L7POLICY_UPDATE_DICT}))
@mock.patch("octavia.controller.worker.v2.controller_worker."
"ControllerWorker._get_db_obj_until_pending_update")
def test_update_l7policy_timeout(self,
mock__get_db_obj_until_pending,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_listener_repo_get.return_value = _listener_mock
_l7policy_mock.provisioning_status = constants.ACTIVE
last_attempt_mock = mock.MagicMock()
last_attempt_mock.result.return_value = _l7policy_mock
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
last_attempt=last_attempt_mock)
cw = controller_worker.ControllerWorker()
l7policy_mock = {
constants.L7POLICY_ID: L7POLICY_ID,
constants.LISTENER_ID: LISTENER_ID
}
cw.update_l7policy(l7policy_mock, L7POLICY_UPDATE_DICT)
def test_create_l7rule(self,
mock_api_get_session,
mock_dyn_log_listener,
@ -1228,6 +1423,32 @@ class TestControllerWorker(base.TestCase):
constants.UPDATE_DICT:
L7RULE_UPDATE_DICT}))
@mock.patch("octavia.controller.worker.v2.controller_worker."
"ControllerWorker._get_db_obj_until_pending_update")
def test_update_l7rule_timeout(self,
mock__get_db_obj_until_pending,
mock_api_get_session,
mock_dyn_log_listener,
mock_taskflow_load,
mock_pool_repo_get,
mock_member_repo_get,
mock_l7rule_repo_get,
mock_l7policy_repo_get,
mock_listener_repo_get,
mock_lb_repo_get,
mock_health_mon_repo_get,
mock_amp_repo_get):
_flow_mock.reset_mock()
_l7rule_mock.provisioning_status = constants.ACTIVE
last_attempt_mock = mock.MagicMock()
last_attempt_mock.result.return_value = _l7rule_mock
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
last_attempt=last_attempt_mock)
cw = controller_worker.ControllerWorker()
cw.update_l7rule(_l7rule_mock.to_dict(), L7RULE_UPDATE_DICT)
@mock.patch('octavia.api.drivers.utils.'
'db_loadbalancer_to_provider_loadbalancer')
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Fix a potential race condition when updating a resource in the amphorav2
worker. The worker was not waiting for the resource to be set to
PENDING_UPDATE, so the resource may have been updated with old data from the
database, resulting in a no-op update.