Fix amphora failover when VRRP port is missing

In an amphora failover situation, if the VRRP port of the other
amphora in an active/standby pair is missing from neutron, the amphora
failover may fail with a "port not found" error.
This patch corrects this issue by allowing the amphora failover in
progress to complete even if the other amphora is also in a failed
state.

Story: 2008127
Task: 40851

Change-Id: I806d220236ad741b638ad9315537334f2d923031
This commit is contained in:
Michael Johnson 2020-09-10 14:54:09 -07:00
parent 9a732565e9
commit 66f3a63f44
7 changed files with 136 additions and 40 deletions

View File

@ -126,15 +126,16 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
timeout_dict=None): timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora.""" """Execute listener reload routines for listeners on an amphora."""
if loadbalancer.listeners: if loadbalancer.listeners:
self.amphora_driver.reload( try:
loadbalancer, amphorae[amphora_index], timeout_dict) self.amphora_driver.reload(
loadbalancer, amphorae[amphora_index], timeout_dict)
def revert(self, loadbalancer, *args, **kwargs): except Exception as e:
"""Handle failed listeners reloads.""" amphora_id = amphorae[amphora_index].id
LOG.warning('Failed to reload listeners on amphora %s. '
LOG.warning("Reverting listener reload.") 'Skipping this amphora as it is failing to '
for listener in loadbalancer.listeners: 'reload due to: %s', amphora_id, str(e))
self.task_utils.mark_listener_prov_status_error(listener.id) self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
class ListenerDelete(BaseAmphoraTask): class ListenerDelete(BaseAmphoraTask):
@ -372,7 +373,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
'to: %s', amphora_id, str(e)) 'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return
LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id) LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id)
@ -394,8 +395,17 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
""" """
def execute(self, amphora_index, amphorae, timeout_dict=None): def execute(self, amphora_index, amphorae, timeout_dict=None):
self.amphora_driver.start_vrrp_service(amphorae[amphora_index], amphora_id = amphorae[amphora_index].id
timeout_dict) try:
self.amphora_driver.start_vrrp_service(amphorae[amphora_index],
timeout_dict)
except Exception as e:
LOG.error('Failed to start VRRP on amphora %s. '
'Skipping this amphora as it is failing to start due '
'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return
LOG.debug("Started VRRP on amphora %s.", amphorae[amphora_index].id) LOG.debug("Started VRRP on amphora %s.", amphorae[amphora_index].id)

View File

@ -188,17 +188,15 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
db_apis.get_session(), db_apis.get_session(),
id=loadbalancer[constants.LOADBALANCER_ID]) id=loadbalancer[constants.LOADBALANCER_ID])
if db_lb.listeners: if db_lb.listeners:
self.amphora_driver.reload(db_lb, db_amp, timeout_dict) try:
self.amphora_driver.reload(db_lb, db_amp, timeout_dict)
def revert(self, loadbalancer, *args, **kwargs): except Exception as e:
"""Handle failed listeners reloads.""" amphora_id = amphorae[amphora_index].id
LOG.warning('Failed to reload listeners on amphora %s. '
LOG.warning("Reverting listener reload.") 'Skipping this amphora as it is failing to '
db_lb = self.loadbalancer_repo.get( 'reload due to: %s', amphora_id, str(e))
db_apis.get_session(), self.amphora_repo.update(db_apis.get_session(), amphora_id,
id=loadbalancer[constants.LOADBALANCER_ID]) status=constants.ERROR)
for listener in db_lb.listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)
class ListenerDelete(BaseAmphoraTask): class ListenerDelete(BaseAmphoraTask):
@ -494,7 +492,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
'to: %s', amphora_id, str(e)) 'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return
LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id) LOG.debug("Uploaded VRRP configuration of amphora %s.", amphora_id)
@ -522,10 +520,17 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
def execute(self, amphora_index, amphorae, timeout_dict=None): def execute(self, amphora_index, amphorae, timeout_dict=None):
# TODO(johnsom) Optimize this to use the dicts and not need the # TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups # DB lookups
db_amp = self.amphora_repo.get( amphora_id = amphorae[amphora_index][constants.ID]
db_apis.get_session(), id=amphorae[amphora_index][constants.ID]) db_amp = self.amphora_repo.get(db_apis.get_session(), id=amphora_id)
try:
self.amphora_driver.start_vrrp_service(db_amp, timeout_dict) self.amphora_driver.start_vrrp_service(db_amp, timeout_dict)
except Exception as e:
LOG.error('Failed to start VRRP on amphora %s. '
'Skipping this amphora as it is failing to start due '
'to: %s', amphora_id, str(e))
self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR)
return
LOG.debug("Started VRRP on amphora %s.", LOG.debug("Started VRRP on amphora %s.",
amphorae[amphora_index][constants.ID]) amphorae[amphora_index][constants.ID])

View File

@ -712,8 +712,13 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
vip_subnet, vip_port) vip_subnet, vip_port)
else: else:
for amp in loadbalancer.amphorae: for amp in loadbalancer.amphorae:
self._get_amp_net_configs(amp, amp_configs, try:
vip_subnet, vip_port) self._get_amp_net_configs(amp, amp_configs,
vip_subnet, vip_port)
except Exception as e:
LOG.warning('Getting network configurations for amphora '
'%(amp)s failed due to %(err)s.',
{'amp': amp.id, 'err': str(e)})
return amp_configs return amp_configs
# TODO(johnsom) This may be dead code now. Remove in failover for v2 patch # TODO(johnsom) This may be dead code now. Remove in failover for v2 patch

View File

@ -200,6 +200,7 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_lb = mock.MagicMock() mock_lb = mock.MagicMock()
mock_listener = mock.MagicMock() mock_listener = mock.MagicMock()
mock_listener.id = '12345' mock_listener.id = '12345'
mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')]
# Test no listeners # Test no listeners
mock_lb.listeners = None mock_lb.listeners = None
@ -213,10 +214,15 @@ class TestAmphoraDriverTasks(base.TestCase):
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict) self.timeout_dict)
# Test revert
mock_lb.listeners = [mock_listener] # Test with reload exception
listeners_reload_obj.revert(mock_lb) mock_driver.reload.reset_mock()
mock_prov_status_error.assert_called_once_with('12345') listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, amphora_mock.id, status=constants.ERROR)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error') 'mark_listener_prov_status_error')
@ -717,11 +723,23 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_update): mock_amphora_repo_update):
amphora_vrrp_start_obj = ( amphora_vrrp_start_obj = (
amphora_driver_tasks.AmphoraIndexVRRPStart()) amphora_driver_tasks.AmphoraIndexVRRPStart())
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphora_vrrp_start_obj.execute(0, [_amphora_mock],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict) _amphora_mock, self.timeout_dict)
# Test with a start exception
mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock],
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR)
def test_amphora_compute_connectivity_wait(self, def test_amphora_compute_connectivity_wait(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,

View File

@ -199,6 +199,7 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener.id = '12345' mock_listener.id = '12345'
mock_amphora_repo_get.return_value = amphora_mock mock_amphora_repo_get.return_value = amphora_mock
mock_lb_repo_get.return_value = mock_lb mock_lb_repo_get.return_value = mock_lb
mock_driver.reload.side_effect = [mock.DEFAULT, Exception('boom')]
# Test no listeners # Test no listeners
mock_lb.listeners = None mock_lb.listeners = None
@ -212,10 +213,15 @@ class TestAmphoraDriverTasks(base.TestCase):
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict) self.timeout_dict)
# Test revert
mock_lb.listeners = [mock_listener] # Test with reload exception
listeners_reload_obj.revert(mock_lb) mock_driver.reload.reset_mock()
mock_prov_status_error.assert_called_once_with('12345') listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, amphora_mock.id, status=constants.ERROR)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error') 'mark_listener_prov_status_error')
@ -772,11 +778,23 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_get.return_value = _db_amphora_mock mock_amphora_repo_get.return_value = _db_amphora_mock
amphora_vrrp_start_obj = ( amphora_vrrp_start_obj = (
amphora_driver_tasks.AmphoraIndexVRRPStart()) amphora_driver_tasks.AmphoraIndexVRRPStart())
mock_driver.start_vrrp_service.side_effect = [mock.DEFAULT,
Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphora_vrrp_start_obj.execute(0, [_amphora_mock],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict) _db_amphora_mock, self.timeout_dict)
# Test with a start exception
mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock],
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR)
def test_amphora_compute_connectivity_wait(self, def test_amphora_compute_connectivity_wait(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,

View File

@ -1243,12 +1243,19 @@ class TestAllowedAddressPairsDriver(base.TestCase):
def test_get_network_configs(self): def test_get_network_configs(self):
amphora_mock = mock.MagicMock() amphora_mock = mock.MagicMock()
amphora2_mock = mock.MagicMock()
load_balancer_mock = mock.MagicMock() load_balancer_mock = mock.MagicMock()
vip_mock = mock.MagicMock() vip_mock = mock.MagicMock()
amphora_mock.status = constants.DELETED amphora_mock.status = constants.DELETED
load_balancer_mock.amphorae = [amphora_mock] load_balancer_mock.amphorae = [amphora_mock]
show_port = self.driver.neutron_client.show_port show_port = self.driver.neutron_client.show_port
show_port.return_value = t_constants.MOCK_NEUTRON_PORT show_port.side_effect = [
t_constants.MOCK_NEUTRON_PORT, t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT, t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT, t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT, t_constants.MOCK_NEUTRON_PORT,
t_constants.MOCK_NEUTRON_PORT, t_constants.MOCK_NEUTRON_PORT,
Exception('boom')]
fake_subnet = {'subnet': { fake_subnet = {'subnet': {
'id': t_constants.MOCK_SUBNET_ID, 'id': t_constants.MOCK_SUBNET_ID,
'gateway_ip': t_constants.MOCK_IP_ADDRESS, 'gateway_ip': t_constants.MOCK_IP_ADDRESS,
@ -1265,7 +1272,12 @@ class TestAllowedAddressPairsDriver(base.TestCase):
amphora_mock.vrrp_ip = "10.0.0.1" amphora_mock.vrrp_ip = "10.0.0.1"
amphora_mock.ha_port_id = 3 amphora_mock.ha_port_id = 3
amphora_mock.ha_ip = "10.0.0.2" amphora_mock.ha_ip = "10.0.0.2"
load_balancer_mock.amphorae = [amphora_mock] amphora2_mock.id = 333
amphora2_mock.status = constants.ACTIVE
amphora2_mock.vrrp_port_id = 3
amphora2_mock.vrrp_ip = "10.0.0.2"
amphora2_mock.ha_port_id = 4
amphora2_mock.ha_ip = "10.0.0.3"
configs = self.driver.get_network_configs(load_balancer_mock) configs = self.driver.get_network_configs(load_balancer_mock)
self.assertEqual(1, len(configs)) self.assertEqual(1, len(configs))
@ -1282,6 +1294,29 @@ class TestAllowedAddressPairsDriver(base.TestCase):
self.assertEqual(expected_subnet_id, config.ha_subnet.id) self.assertEqual(expected_subnet_id, config.ha_subnet.id)
self.assertEqual(expected_subnet_id, config.vrrp_subnet.id) self.assertEqual(expected_subnet_id, config.vrrp_subnet.id)
# Test with a specific amphora
configs = self.driver.get_network_configs(load_balancer_mock,
amphora_mock)
self.assertEqual(1, len(configs))
config = configs[222]
# TODO(ptoohill): find a way to return different items for multiple
# calls to the same method, right now each call to show subnet
# will return the same values if a method happens to call it
# multiple times for different subnets. We should be able to verify
# different requests get different expected data.
expected_port_id = t_constants.MOCK_NEUTRON_PORT['port']['id']
self.assertEqual(expected_port_id, config.ha_port.id)
self.assertEqual(expected_port_id, config.vrrp_port.id)
expected_subnet_id = fake_subnet['subnet']['id']
self.assertEqual(expected_subnet_id, config.ha_subnet.id)
self.assertEqual(expected_subnet_id, config.vrrp_subnet.id)
# Test with a load balancer with two amphora, one that has a
# neutron problem.
load_balancer_mock.amphorae = [amphora_mock, amphora2_mock]
configs = self.driver.get_network_configs(load_balancer_mock)
self.assertEqual(1, len(configs))
@mock.patch('time.sleep') @mock.patch('time.sleep')
def test_wait_for_port_detach(self, mock_sleep): def test_wait_for_port_detach(self, mock_sleep):
amphora = data_models.Amphora( amphora = data_models.Amphora(

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fixed an issue with failing over an amphora if the pair amphora in an
active/standby pair had a missing VRRP port in neutron.