Check Amphora status on SR-IOV failover flows
As noted on an earlier patch[1], the "SetAmphoraFirewallRules" task was not checking the Amphora status nor using an API timeout. This could cause failover flows to take longer than necessary if one of the Amphora is missing. This patch corrects that issue by honoring both the Amphora status and timeout. [1] https://review.opendev.org/c/openstack/octavia/+/910101/13/octavia/controller/worker/v2/flows/amphora_flows.py Change-Id: Ic5e8140b13164267236f0a5d9a48fbd84bcdd688
This commit is contained in:
parent
ffc6f83f07
commit
411e7c6dbc
|
@ -599,7 +599,7 @@ class HaproxyAmphoraLoadBalancerDriver(
|
|||
return response_json.get('interface', None)
|
||||
|
||||
def set_interface_rules(self, amphora: db_models.Amphora,
|
||||
ip_address, rules):
|
||||
ip_address, rules, timeout_dict=None):
|
||||
"""Sets interface firewall rules in the amphora
|
||||
|
||||
:param amphora: The amphora to query.
|
||||
|
@ -608,9 +608,9 @@ class HaproxyAmphoraLoadBalancerDriver(
|
|||
:param rules: The l1st of allow rules to apply.
|
||||
"""
|
||||
try:
|
||||
self._populate_amphora_api_version(amphora)
|
||||
self._populate_amphora_api_version(amphora, timeout_dict)
|
||||
self.clients[amphora.api_version].set_interface_rules(
|
||||
amphora, ip_address, rules)
|
||||
amphora, ip_address, rules, timeout_dict=timeout_dict)
|
||||
except exc.NotFound as e:
|
||||
LOG.debug('Amphora %s does not support the set_interface_rules '
|
||||
'API.', amphora.id)
|
||||
|
@ -886,6 +886,7 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
|
|||
r = self.put(amp, 'config', timeout_dict, data=agent_config)
|
||||
return exc.check_exception(r)
|
||||
|
||||
def set_interface_rules(self, amp, ip_address, rules):
|
||||
r = self.put(amp, f'interface/{ip_address}/rules', json=rules)
|
||||
def set_interface_rules(self, amp, ip_address, rules, timeout_dict=None):
|
||||
r = self.put(amp, f'interface/{ip_address}/rules', timeout_dict,
|
||||
json=rules)
|
||||
return exc.check_exception(r)
|
||||
|
|
|
@ -290,9 +290,10 @@ class AmphoraFlows(object):
|
|||
|
||||
amp_0_subflow.add(amphora_driver_tasks.SetAmphoraFirewallRules(
|
||||
name=sf_name + '-0-' + constants.SET_AMPHORA_FIREWALL_RULES,
|
||||
requires=(constants.AMPHORAE,
|
||||
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS,
|
||||
constants.AMPHORA_FIREWALL_RULES),
|
||||
inject={constants.AMPHORA_INDEX: 0}))
|
||||
inject={constants.AMPHORA_INDEX: 0,
|
||||
constants.TIMEOUT_DICT: timeout_dict}))
|
||||
|
||||
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
||||
name=sf_name + '-0-' + constants.AMP_VRRP_START,
|
||||
|
@ -330,9 +331,10 @@ class AmphoraFlows(object):
|
|||
|
||||
amp_1_subflow.add(amphora_driver_tasks.SetAmphoraFirewallRules(
|
||||
name=sf_name + '-1-' + constants.SET_AMPHORA_FIREWALL_RULES,
|
||||
requires=(constants.AMPHORAE,
|
||||
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS,
|
||||
constants.AMPHORA_FIREWALL_RULES),
|
||||
inject={constants.AMPHORA_INDEX: 1}))
|
||||
inject={constants.AMPHORA_INDEX: 1,
|
||||
constants.TIMEOUT_DICT: timeout_dict}))
|
||||
|
||||
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
|
||||
name=sf_name + '-1-' + constants.AMP_VRRP_START,
|
||||
|
|
|
@ -766,7 +766,8 @@ class SetAmphoraFirewallRules(BaseAmphoraTask):
|
|||
"""Task to push updated firewall ruls to an amphora."""
|
||||
|
||||
def execute(self, amphorae: List[dict], amphora_index: int,
|
||||
amphora_firewall_rules: List[dict]):
|
||||
amphora_firewall_rules: List[dict], amphorae_status: dict,
|
||||
timeout_dict=None):
|
||||
|
||||
if (amphora_firewall_rules and
|
||||
amphora_firewall_rules[0].get('non-sriov-vip', False)):
|
||||
|
@ -774,12 +775,18 @@ class SetAmphoraFirewallRules(BaseAmphoraTask):
|
|||
# This is already logged in GetAmphoraFirewallRules.
|
||||
return
|
||||
|
||||
amphora_id = amphorae[amphora_index][constants.ID]
|
||||
amphora_status = amphorae_status.get(amphora_id, {})
|
||||
if amphora_status.get(constants.UNREACHABLE):
|
||||
LOG.warning("Skipping firewall rules update because amphora %s "
|
||||
"is not reachable.", amphora_id)
|
||||
return
|
||||
|
||||
session = db_apis.get_session()
|
||||
with session.begin():
|
||||
db_amp = self.amphora_repo.get(
|
||||
session, id=amphorae[amphora_index][constants.ID])
|
||||
db_amp = self.amphora_repo.get(session, id=amphora_id)
|
||||
|
||||
self.amphora_driver.set_interface_rules(
|
||||
db_amp,
|
||||
amphorae[amphora_index][constants.VRRP_IP],
|
||||
amphora_firewall_rules)
|
||||
amphora_firewall_rules, timeout_dict=timeout_dict)
|
||||
|
|
|
@ -105,9 +105,9 @@ class TestHAProxyAmphoraDriver(base.TestCase):
|
|||
self.driver.clients['0'] = client_mock
|
||||
|
||||
self.driver.set_interface_rules(amphora_mock, IP_ADDRESS, 'fake_rules')
|
||||
mock_api_version.assert_called_once_with(amphora_mock)
|
||||
mock_api_version.assert_called_once_with(amphora_mock, None)
|
||||
client_mock.set_interface_rules.assert_called_once_with(
|
||||
amphora_mock, IP_ADDRESS, 'fake_rules')
|
||||
amphora_mock, IP_ADDRESS, 'fake_rules', timeout_dict=None)
|
||||
|
||||
self.assertRaises(driver_except.AmpDriverNotImplementedError,
|
||||
self.driver.set_interface_rules, amphora_mock,
|
||||
|
|
|
@ -44,13 +44,14 @@ class TestListenerFlows(base.TestCase):
|
|||
|
||||
self.assertIn(constants.LISTENERS, listener_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
|
||||
self.assertIn(constants.AMPHORAE_STATUS, listener_flow.requires)
|
||||
|
||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||
listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORAE, listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORA_FIREWALL_RULES, listener_flow.provides)
|
||||
|
||||
self.assertEqual(2, len(listener_flow.requires))
|
||||
self.assertEqual(3, len(listener_flow.requires))
|
||||
self.assertEqual(3, len(listener_flow.provides))
|
||||
|
||||
def test_get_delete_listener_flow(self, mock_get_net_driver):
|
||||
|
@ -65,13 +66,14 @@ class TestListenerFlows(base.TestCase):
|
|||
self.assertIn(constants.LISTENER, listener_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
|
||||
self.assertIn(constants.PROJECT_ID, listener_flow.requires)
|
||||
self.assertIn(constants.AMPHORAE_STATUS, listener_flow.requires)
|
||||
|
||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||
listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORAE, listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORA_FIREWALL_RULES, listener_flow.provides)
|
||||
|
||||
self.assertEqual(3, len(listener_flow.requires))
|
||||
self.assertEqual(4, len(listener_flow.requires))
|
||||
self.assertEqual(3, len(listener_flow.provides))
|
||||
|
||||
def test_get_delete_listener_internal_flow(self, mock_get_net_driver):
|
||||
|
@ -86,13 +88,14 @@ class TestListenerFlows(base.TestCase):
|
|||
|
||||
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
|
||||
self.assertIn(constants.PROJECT_ID, listener_flow.requires)
|
||||
self.assertIn(constants.AMPHORAE_STATUS, listener_flow.requires)
|
||||
|
||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||
listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORAE, listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORA_FIREWALL_RULES, listener_flow.provides)
|
||||
|
||||
self.assertEqual(2, len(listener_flow.requires))
|
||||
self.assertEqual(3, len(listener_flow.requires))
|
||||
self.assertEqual(3, len(listener_flow.provides))
|
||||
|
||||
def test_get_update_listener_flow(self, mock_get_net_driver):
|
||||
|
@ -109,13 +112,14 @@ class TestListenerFlows(base.TestCase):
|
|||
self.assertIn(constants.UPDATE_DICT, listener_flow.requires)
|
||||
self.assertIn(constants.LISTENERS, listener_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
|
||||
self.assertIn(constants.AMPHORAE_STATUS, listener_flow.requires)
|
||||
|
||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||
listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORAE, listener_flow.provides)
|
||||
self.assertIn(constants.AMPHORA_FIREWALL_RULES, listener_flow.provides)
|
||||
|
||||
self.assertEqual(4, len(listener_flow.requires))
|
||||
self.assertEqual(5, len(listener_flow.requires))
|
||||
self.assertEqual(3, len(listener_flow.provides))
|
||||
|
||||
def test_get_create_all_listeners_flow(self, mock_get_net_driver):
|
||||
|
@ -127,11 +131,12 @@ class TestListenerFlows(base.TestCase):
|
|||
self.assertIsInstance(listeners_flow, flow.Flow)
|
||||
self.assertIn(constants.LOADBALANCER, listeners_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, listeners_flow.requires)
|
||||
self.assertIn(constants.AMPHORAE_STATUS, listeners_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER, listeners_flow.provides)
|
||||
self.assertIn(constants.AMPHORAE_NETWORK_CONFIG,
|
||||
listeners_flow.provides)
|
||||
self.assertIn(constants.AMPHORAE, listeners_flow.provides)
|
||||
self.assertIn(constants.AMPHORA_FIREWALL_RULES,
|
||||
listeners_flow.provides)
|
||||
self.assertEqual(2, len(listeners_flow.requires))
|
||||
self.assertEqual(3, len(listeners_flow.requires))
|
||||
self.assertEqual(5, len(listeners_flow.provides))
|
||||
|
|
|
@ -344,6 +344,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||
self.assertIn(constants.FLAVOR, failover_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER, failover_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, failover_flow.requires)
|
||||
self.assertIn(constants.AMPHORAE_STATUS, failover_flow.requires)
|
||||
|
||||
self.assertIn(constants.UPDATED_PORTS, failover_flow.provides)
|
||||
self.assertIn(constants.AMPHORA, failover_flow.provides)
|
||||
|
@ -363,7 +364,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||
self.assertIn(constants.SUBNET, failover_flow.provides)
|
||||
self.assertIn(constants.NEW_AMPHORAE, failover_flow.provides)
|
||||
|
||||
self.assertEqual(6, len(failover_flow.requires),
|
||||
self.assertEqual(7, len(failover_flow.requires),
|
||||
failover_flow.requires)
|
||||
self.assertEqual(16, len(failover_flow.provides),
|
||||
failover_flow.provides)
|
||||
|
@ -423,6 +424,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||
self.assertIn(constants.FLAVOR, failover_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER, failover_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, failover_flow.requires)
|
||||
self.assertIn(constants.AMPHORAE_STATUS, failover_flow.requires)
|
||||
|
||||
self.assertIn(constants.UPDATED_PORTS, failover_flow.provides)
|
||||
self.assertIn(constants.AMPHORA, failover_flow.provides)
|
||||
|
@ -443,7 +445,7 @@ class TestLoadBalancerFlows(base.TestCase):
|
|||
self.assertIn(constants.SUBNET, failover_flow.provides)
|
||||
self.assertIn(constants.NEW_AMPHORAE, failover_flow.provides)
|
||||
|
||||
self.assertEqual(6, len(failover_flow.requires),
|
||||
self.assertEqual(7, len(failover_flow.requires),
|
||||
failover_flow.requires)
|
||||
self.assertEqual(16, len(failover_flow.provides),
|
||||
failover_flow.provides)
|
||||
|
|
|
@ -1262,15 +1262,18 @@ class TestAmphoraDriverTasks(base.TestCase):
|
|||
set_amp_fw_rules = amphora_driver_tasks.SetAmphoraFirewallRules()
|
||||
|
||||
# Test non-SRIOV VIP path
|
||||
set_amp_fw_rules.execute([amphora], 0, [{'non-sriov-vip': True}])
|
||||
set_amp_fw_rules.execute([amphora], 0, [{'non-sriov-vip': True}], {},
|
||||
timeout_dict=None)
|
||||
|
||||
mock_get_session.assert_not_called()
|
||||
mock_driver.set_interface_rules.assert_not_called()
|
||||
|
||||
# Test SRIOV VIP path
|
||||
set_amp_fw_rules.execute([amphora], 0, [{'fake_rule': True}])
|
||||
set_amp_fw_rules.execute([amphora], 0, [{'fake_rule': True}], {},
|
||||
timeout_dict=None)
|
||||
|
||||
mock_amphora_repo_get.assert_called_once_with(_session_mock, id=AMP_ID)
|
||||
|
||||
mock_driver.set_interface_rules.assert_called_once_with(
|
||||
_db_amphora_mock, '192.0.2.88', [{'fake_rule': True}])
|
||||
_db_amphora_mock, '192.0.2.88', [{'fake_rule': True}],
|
||||
timeout_dict=None)
|
||||
|
|
Loading…
Reference in New Issue