Fix potential race conditions on update requests in the v2 worker
The v2 worker was not waiting for a resource to be set to PENDING_UPDATE before updating it effectively. In some circumstances (loaded server/slow DB for instance), the resources may have been updated with old data from the DB, resulting in a no-op update. This wait for a PENDING_UPDATE state exists in amphorav1 but it was not moved to v2 when we switched the logic from the DB-based objects to dict-based objects. Note for stable/xena: this backport also contains [0] which improves code coverage. [0] I403e1861b1b51c07edad35c64ffac22167342c88 Story 2009887 Task 44645 Story 2009985 Task 45032 Change-Id: I433e074c2eac5f6875ee5dace8faf86969b24f66 (cherry picked from commit6042de759d
) (cherry picked from commitc31051b388
)
This commit is contained in:
parent
d4693c4c00
commit
4b87554b59
@ -291,8 +291,17 @@ class ControllerWorker(object):
|
|||||||
:returns: None
|
:returns: None
|
||||||
:raises ListenerNotFound: The referenced listener was not found
|
:raises ListenerNotFound: The referenced listener was not found
|
||||||
"""
|
"""
|
||||||
db_lb = self._lb_repo.get(db_apis.get_session(),
|
try:
|
||||||
id=listener[constants.LOADBALANCER_ID])
|
db_lb = self._get_db_obj_until_pending_update(
|
||||||
|
self._lb_repo, listener[constants.LOADBALANCER_ID])
|
||||||
|
except tenacity.RetryError as e:
|
||||||
|
LOG.warning('Loadbalancer did not go into %s in 60 seconds. '
|
||||||
|
'This either due to an in-progress Octavia upgrade '
|
||||||
|
'or an overloaded and failing database. Assuming '
|
||||||
|
'an upgrade is in progress and continuing.',
|
||||||
|
constants.PENDING_UPDATE)
|
||||||
|
db_lb = e.last_attempt.result()
|
||||||
|
|
||||||
store = {constants.LISTENER: listener,
|
store = {constants.LISTENER: listener,
|
||||||
constants.UPDATE_DICT: listener_updates,
|
constants.UPDATE_DICT: listener_updates,
|
||||||
constants.LOADBALANCER_ID: db_lb.id,
|
constants.LOADBALANCER_ID: db_lb.id,
|
||||||
@ -388,6 +397,18 @@ class ControllerWorker(object):
|
|||||||
:returns: None
|
:returns: None
|
||||||
:raises LBNotFound: The referenced load balancer was not found
|
:raises LBNotFound: The referenced load balancer was not found
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._get_db_obj_until_pending_update(
|
||||||
|
self._lb_repo,
|
||||||
|
original_load_balancer[constants.LOADBALANCER_ID])
|
||||||
|
except tenacity.RetryError:
|
||||||
|
LOG.warning('Load balancer did not go into %s in 60 seconds. '
|
||||||
|
'This either due to an in-progress Octavia upgrade '
|
||||||
|
'or an overloaded and failing database. Assuming '
|
||||||
|
'an upgrade is in progress and continuing.',
|
||||||
|
constants.PENDING_UPDATE)
|
||||||
|
|
||||||
store = {constants.LOADBALANCER: original_load_balancer,
|
store = {constants.LOADBALANCER: original_load_balancer,
|
||||||
constants.LOADBALANCER_ID:
|
constants.LOADBALANCER_ID:
|
||||||
original_load_balancer[constants.LOADBALANCER_ID],
|
original_load_balancer[constants.LOADBALANCER_ID],
|
||||||
@ -476,8 +497,26 @@ class ControllerWorker(object):
|
|||||||
flow_utils.get_delete_member_flow,
|
flow_utils.get_delete_member_flow,
|
||||||
store=store)
|
store=store)
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
||||||
|
wait=tenacity.wait_incrementing(
|
||||||
|
CONF.haproxy_amphora.api_db_commit_retry_initial_delay,
|
||||||
|
CONF.haproxy_amphora.api_db_commit_retry_backoff,
|
||||||
|
CONF.haproxy_amphora.api_db_commit_retry_max),
|
||||||
|
stop=tenacity.stop_after_attempt(
|
||||||
|
CONF.haproxy_amphora.api_db_commit_retry_attempts))
|
||||||
def batch_update_members(self, old_members, new_members,
|
def batch_update_members(self, old_members, new_members,
|
||||||
updated_members):
|
updated_members):
|
||||||
|
db_new_members = [self._member_repo.get(db_apis.get_session(),
|
||||||
|
id=member[constants.MEMBER_ID])
|
||||||
|
for member in new_members]
|
||||||
|
# The API may not have commited all of the new member records yet.
|
||||||
|
# Make sure we retry looking them up.
|
||||||
|
if None in db_new_members or len(db_new_members) != len(new_members):
|
||||||
|
LOG.warning('Failed to fetch one of the new members from DB. '
|
||||||
|
'Retrying for up to 60 seconds.')
|
||||||
|
raise db_exceptions.NoResultFound
|
||||||
|
|
||||||
updated_members = [
|
updated_members = [
|
||||||
(provider_utils.db_member_to_provider_member(
|
(provider_utils.db_member_to_provider_member(
|
||||||
self._member_repo.get(db_apis.get_session(),
|
self._member_repo.get(db_apis.get_session(),
|
||||||
@ -531,9 +570,19 @@ class ControllerWorker(object):
|
|||||||
:returns: None
|
:returns: None
|
||||||
:raises MemberNotFound: The referenced member was not found
|
:raises MemberNotFound: The referenced member was not found
|
||||||
"""
|
"""
|
||||||
# TODO(ataraday) when other flows will use dicts - revisit this
|
|
||||||
pool = self._pool_repo.get(db_apis.get_session(),
|
try:
|
||||||
id=member[constants.POOL_ID])
|
db_member = self._get_db_obj_until_pending_update(
|
||||||
|
self._member_repo, member[constants.MEMBER_ID])
|
||||||
|
except tenacity.RetryError as e:
|
||||||
|
LOG.warning('Member did not go into %s in 60 seconds. '
|
||||||
|
'This either due to an in-progress Octavia upgrade '
|
||||||
|
'or an overloaded and failing database. Assuming '
|
||||||
|
'an upgrade is in progress and continuing.',
|
||||||
|
constants.PENDING_UPDATE)
|
||||||
|
db_member = e.last_attempt.result()
|
||||||
|
|
||||||
|
pool = db_member.pool
|
||||||
load_balancer = pool.load_balancer
|
load_balancer = pool.load_balancer
|
||||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||||
load_balancer).to_dict(recurse=True)
|
load_balancer).to_dict(recurse=True)
|
||||||
@ -717,8 +766,18 @@ class ControllerWorker(object):
|
|||||||
:returns: None
|
:returns: None
|
||||||
:raises L7PolicyNotFound: The referenced l7policy was not found
|
:raises L7PolicyNotFound: The referenced l7policy was not found
|
||||||
"""
|
"""
|
||||||
db_listener = self._listener_repo.get(
|
try:
|
||||||
db_apis.get_session(), id=original_l7policy[constants.LISTENER_ID])
|
db_l7policy = self._get_db_obj_until_pending_update(
|
||||||
|
self._l7policy_repo, original_l7policy[constants.L7POLICY_ID])
|
||||||
|
except tenacity.RetryError as e:
|
||||||
|
LOG.warning('L7 policy did not go into %s in 60 seconds. '
|
||||||
|
'This either due to an in-progress Octavia upgrade '
|
||||||
|
'or an overloaded and failing database. Assuming '
|
||||||
|
'an upgrade is in progress and continuing.',
|
||||||
|
constants.PENDING_UPDATE)
|
||||||
|
db_l7policy = e.last_attempt.result()
|
||||||
|
|
||||||
|
db_listener = db_l7policy.listener
|
||||||
|
|
||||||
listeners_dicts = (
|
listeners_dicts = (
|
||||||
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
||||||
@ -809,8 +868,17 @@ class ControllerWorker(object):
|
|||||||
:returns: None
|
:returns: None
|
||||||
:raises L7RuleNotFound: The referenced l7rule was not found
|
:raises L7RuleNotFound: The referenced l7rule was not found
|
||||||
"""
|
"""
|
||||||
db_l7policy = self._l7policy_repo.get(
|
try:
|
||||||
db_apis.get_session(), id=original_l7rule[constants.L7POLICY_ID])
|
db_l7rule = self._get_db_obj_until_pending_update(
|
||||||
|
self._l7rule_repo, original_l7rule[constants.L7RULE_ID])
|
||||||
|
except tenacity.RetryError as e:
|
||||||
|
LOG.warning('L7 rule did not go into %s in 60 seconds. '
|
||||||
|
'This either due to an in-progress Octavia upgrade '
|
||||||
|
'or an overloaded and failing database. Assuming '
|
||||||
|
'an upgrade is in progress and continuing.',
|
||||||
|
constants.PENDING_UPDATE)
|
||||||
|
db_l7rule = e.last_attempt.result()
|
||||||
|
db_l7policy = db_l7rule.l7policy
|
||||||
load_balancer = db_l7policy.listener.load_balancer
|
load_balancer = db_l7policy.listener.load_balancer
|
||||||
|
|
||||||
listeners_dicts = (
|
listeners_dicts = (
|
||||||
|
@ -17,6 +17,7 @@ from unittest import mock
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_config import fixture as oslo_fixture
|
from oslo_config import fixture as oslo_fixture
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
|
import tenacity
|
||||||
|
|
||||||
from octavia.api.drivers import utils as provider_utils
|
from octavia.api.drivers import utils as provider_utils
|
||||||
from octavia.common import constants
|
from octavia.common import constants
|
||||||
@ -303,6 +304,33 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.UPDATE_DICT:
|
constants.UPDATE_DICT:
|
||||||
HEALTH_UPDATE_DICT}))
|
HEALTH_UPDATE_DICT}))
|
||||||
|
|
||||||
|
@mock.patch("octavia.controller.worker.v2.controller_worker."
|
||||||
|
"ControllerWorker._get_db_obj_until_pending_update")
|
||||||
|
def test_update_health_monitor_timeout(self,
|
||||||
|
mock__get_db_obj_until_pending,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
_db_health_mon_mock.provisioning_status = constants.ACTIVE
|
||||||
|
last_attempt_mock = mock.MagicMock()
|
||||||
|
last_attempt_mock.result.return_value = _db_health_mon_mock
|
||||||
|
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
|
||||||
|
last_attempt=last_attempt_mock)
|
||||||
|
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
cw.update_health_monitor(_health_mon_mock,
|
||||||
|
HEALTH_UPDATE_DICT)
|
||||||
|
|
||||||
def test_create_listener(self,
|
def test_create_listener(self,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@ -375,6 +403,11 @@ class TestControllerWorker(base.TestCase):
|
|||||||
mock_health_mon_repo_get,
|
mock_health_mon_repo_get,
|
||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
load_balancer_mock = mock.MagicMock()
|
||||||
|
load_balancer_mock.provisioning_status = constants.PENDING_UPDATE
|
||||||
|
load_balancer_mock.id = LB_ID
|
||||||
|
mock_lb_repo_get.return_value = load_balancer_mock
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
_listener_mock.provisioning_status = constants.PENDING_UPDATE
|
_listener_mock.provisioning_status = constants.PENDING_UPDATE
|
||||||
|
|
||||||
@ -392,6 +425,36 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.LISTENERS:
|
constants.LISTENERS:
|
||||||
[listener_dict]}))
|
[listener_dict]}))
|
||||||
|
|
||||||
|
@mock.patch("octavia.controller.worker.v2.controller_worker."
|
||||||
|
"ControllerWorker._get_db_obj_until_pending_update")
|
||||||
|
def test_update_listener_timeout(self,
|
||||||
|
mock__get_db_obj_until_pending,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
load_balancer_mock = mock.MagicMock()
|
||||||
|
load_balancer_mock.provisioning_status = constants.PENDING_UPDATE
|
||||||
|
load_balancer_mock.id = LB_ID
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
_listener_mock.provisioning_status = constants.PENDING_UPDATE
|
||||||
|
last_attempt_mock = mock.MagicMock()
|
||||||
|
last_attempt_mock.result.return_value = load_balancer_mock
|
||||||
|
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
|
||||||
|
last_attempt=last_attempt_mock)
|
||||||
|
|
||||||
|
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
|
||||||
|
constants.LOADBALANCER_ID: LB_ID}
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
cw.update_listener(listener_dict, LISTENER_UPDATE_DICT)
|
||||||
|
|
||||||
def test_create_load_balancer_single_no_anti_affinity(
|
def test_create_load_balancer_single_no_anti_affinity(
|
||||||
self, mock_api_get_session,
|
self, mock_api_get_session,
|
||||||
mock_dyn_log_listener, mock_taskflow_load, mock_pool_repo_get,
|
mock_dyn_log_listener, mock_taskflow_load, mock_pool_repo_get,
|
||||||
@ -693,6 +756,36 @@ class TestControllerWorker(base.TestCase):
|
|||||||
_db_load_balancer_mock.id,
|
_db_load_balancer_mock.id,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
@mock.patch('octavia.db.repositories.ListenerRepository.get_all',
|
||||||
|
return_value=([_listener_mock], None))
|
||||||
|
@mock.patch("octavia.controller.worker.v2.controller_worker."
|
||||||
|
"ControllerWorker._get_db_obj_until_pending_update")
|
||||||
|
def test_update_load_balancer_timeout(self,
|
||||||
|
mock__get_db_obj_until_pending,
|
||||||
|
mock_listener_repo_get_all,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
_db_load_balancer_mock.provisioning_status = constants.ACTIVE
|
||||||
|
last_attempt_mock = mock.MagicMock()
|
||||||
|
last_attempt_mock.result.return_value = _db_load_balancer_mock
|
||||||
|
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
|
||||||
|
last_attempt=last_attempt_mock)
|
||||||
|
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
change = 'TEST2'
|
||||||
|
cw.update_load_balancer(_load_balancer_mock, change)
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||||
'member_flows.MemberFlows.get_create_member_flow',
|
'member_flows.MemberFlows.get_create_member_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
@ -795,6 +888,10 @@ class TestControllerWorker(base.TestCase):
|
|||||||
mock_amp_repo_get):
|
mock_amp_repo_get):
|
||||||
|
|
||||||
_flow_mock.reset_mock()
|
_flow_mock.reset_mock()
|
||||||
|
db_member = mock.MagicMock()
|
||||||
|
db_member.provisioning_status = constants.PENDING_UPDATE
|
||||||
|
db_member.pool = _db_pool_mock
|
||||||
|
mock_member_repo_get.return_value = db_member
|
||||||
_member = _member_mock.to_dict()
|
_member = _member_mock.to_dict()
|
||||||
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
|
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
|
||||||
mock_get_az_metadata_dict.return_value = {}
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
@ -817,6 +914,44 @@ class TestControllerWorker(base.TestCase):
|
|||||||
MEMBER_UPDATE_DICT,
|
MEMBER_UPDATE_DICT,
|
||||||
constants.AVAILABILITY_ZONE: {}}))
|
constants.AVAILABILITY_ZONE: {}}))
|
||||||
|
|
||||||
|
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||||
|
'member_flows.MemberFlows.get_update_member_flow',
|
||||||
|
return_value=_flow_mock)
|
||||||
|
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
|
||||||
|
'get_availability_zone_metadata_dict')
|
||||||
|
@mock.patch("octavia.controller.worker.v2.controller_worker."
|
||||||
|
"ControllerWorker._get_db_obj_until_pending_update")
|
||||||
|
def test_update_member_timeout(self,
|
||||||
|
mock__get_db_obj_until_pending,
|
||||||
|
mock_get_az_metadata_dict,
|
||||||
|
mock_get_update_member_flow,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
db_member = mock.MagicMock()
|
||||||
|
db_member.provisioning_status = constants.ACTIVE
|
||||||
|
db_member.pool = _db_pool_mock
|
||||||
|
last_attempt_mock = mock.MagicMock()
|
||||||
|
last_attempt_mock.result.return_value = db_member
|
||||||
|
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
|
||||||
|
last_attempt=last_attempt_mock)
|
||||||
|
mock_member_repo_get.return_value = db_member
|
||||||
|
_member = _member_mock.to_dict()
|
||||||
|
_member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE
|
||||||
|
mock_get_az_metadata_dict.return_value = {}
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
cw.update_member(_member, MEMBER_UPDATE_DICT)
|
||||||
|
|
||||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||||
'member_flows.MemberFlows.get_batch_update_members_flow',
|
'member_flows.MemberFlows.get_batch_update_members_flow',
|
||||||
return_value=_flow_mock)
|
return_value=_flow_mock)
|
||||||
@ -843,7 +978,9 @@ class TestControllerWorker(base.TestCase):
|
|||||||
old_member = mock.MagicMock()
|
old_member = mock.MagicMock()
|
||||||
old_member.to_dict.return_value = {'id': 9,
|
old_member.to_dict.return_value = {'id': 9,
|
||||||
constants.POOL_ID: 'testtest'}
|
constants.POOL_ID: 'testtest'}
|
||||||
mock_member_repo_get.side_effect = [_member_mock, old_member]
|
new_member = mock.MagicMock()
|
||||||
|
mock_member_repo_get.side_effect = [
|
||||||
|
new_member, _member_mock, old_member]
|
||||||
cw.batch_update_members([{constants.MEMBER_ID: 9,
|
cw.batch_update_members([{constants.MEMBER_ID: 9,
|
||||||
constants.POOL_ID: 'testtest'}],
|
constants.POOL_ID: 'testtest'}],
|
||||||
[{constants.MEMBER_ID: 11}],
|
[{constants.MEMBER_ID: 11}],
|
||||||
@ -963,6 +1100,32 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.UPDATE_DICT:
|
constants.UPDATE_DICT:
|
||||||
POOL_UPDATE_DICT}))
|
POOL_UPDATE_DICT}))
|
||||||
|
|
||||||
|
@mock.patch("octavia.controller.worker.v2.controller_worker."
|
||||||
|
"ControllerWorker._get_db_obj_until_pending_update")
|
||||||
|
def test_update_pool_update(self,
|
||||||
|
mock__get_db_obj_until_pending,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
_db_pool_mock.provisioning_status = constants.ACTIVE
|
||||||
|
last_attempt_mock = mock.MagicMock()
|
||||||
|
last_attempt_mock.result.return_value = _db_pool_mock
|
||||||
|
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
|
||||||
|
last_attempt=last_attempt_mock)
|
||||||
|
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
cw.update_pool(_pool_mock, POOL_UPDATE_DICT)
|
||||||
|
|
||||||
def test_create_l7policy(self,
|
def test_create_l7policy(self,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@ -1058,6 +1221,38 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.UPDATE_DICT:
|
constants.UPDATE_DICT:
|
||||||
L7POLICY_UPDATE_DICT}))
|
L7POLICY_UPDATE_DICT}))
|
||||||
|
|
||||||
|
@mock.patch("octavia.controller.worker.v2.controller_worker."
|
||||||
|
"ControllerWorker._get_db_obj_until_pending_update")
|
||||||
|
def test_update_l7policy_timeout(self,
|
||||||
|
mock__get_db_obj_until_pending,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
mock_listener_repo_get.return_value = _listener_mock
|
||||||
|
_l7policy_mock.provisioning_status = constants.ACTIVE
|
||||||
|
last_attempt_mock = mock.MagicMock()
|
||||||
|
last_attempt_mock.result.return_value = _l7policy_mock
|
||||||
|
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
|
||||||
|
last_attempt=last_attempt_mock)
|
||||||
|
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
l7policy_mock = {
|
||||||
|
constants.L7POLICY_ID: L7POLICY_ID,
|
||||||
|
constants.LISTENER_ID: LISTENER_ID
|
||||||
|
}
|
||||||
|
|
||||||
|
cw.update_l7policy(l7policy_mock, L7POLICY_UPDATE_DICT)
|
||||||
|
|
||||||
def test_create_l7rule(self,
|
def test_create_l7rule(self,
|
||||||
mock_api_get_session,
|
mock_api_get_session,
|
||||||
mock_dyn_log_listener,
|
mock_dyn_log_listener,
|
||||||
@ -1161,6 +1356,32 @@ class TestControllerWorker(base.TestCase):
|
|||||||
constants.UPDATE_DICT:
|
constants.UPDATE_DICT:
|
||||||
L7RULE_UPDATE_DICT}))
|
L7RULE_UPDATE_DICT}))
|
||||||
|
|
||||||
|
@mock.patch("octavia.controller.worker.v2.controller_worker."
|
||||||
|
"ControllerWorker._get_db_obj_until_pending_update")
|
||||||
|
def test_update_l7rule_timeout(self,
|
||||||
|
mock__get_db_obj_until_pending,
|
||||||
|
mock_api_get_session,
|
||||||
|
mock_dyn_log_listener,
|
||||||
|
mock_taskflow_load,
|
||||||
|
mock_pool_repo_get,
|
||||||
|
mock_member_repo_get,
|
||||||
|
mock_l7rule_repo_get,
|
||||||
|
mock_l7policy_repo_get,
|
||||||
|
mock_listener_repo_get,
|
||||||
|
mock_lb_repo_get,
|
||||||
|
mock_health_mon_repo_get,
|
||||||
|
mock_amp_repo_get):
|
||||||
|
|
||||||
|
_flow_mock.reset_mock()
|
||||||
|
_l7rule_mock.provisioning_status = constants.ACTIVE
|
||||||
|
last_attempt_mock = mock.MagicMock()
|
||||||
|
last_attempt_mock.result.return_value = _l7rule_mock
|
||||||
|
mock__get_db_obj_until_pending.side_effect = tenacity.RetryError(
|
||||||
|
last_attempt=last_attempt_mock)
|
||||||
|
|
||||||
|
cw = controller_worker.ControllerWorker()
|
||||||
|
cw.update_l7rule(_l7rule_mock.to_dict(), L7RULE_UPDATE_DICT)
|
||||||
|
|
||||||
@mock.patch('octavia.api.drivers.utils.'
|
@mock.patch('octavia.api.drivers.utils.'
|
||||||
'db_loadbalancer_to_provider_loadbalancer')
|
'db_loadbalancer_to_provider_loadbalancer')
|
||||||
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
|
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fix a potential race condition when updating a resource in the amphorav2
|
||||||
|
worker. The worker was not waiting for the resource to be set to
|
||||||
|
PENDING_UPDATE, so the resource may have been updated with old data from the
|
||||||
|
database, resulting in a no-op update.
|
Loading…
Reference in New Issue
Block a user