Merge "Fix amphorae in ERROR during the failover" into stable/xena

This commit is contained in:
Zuul 2023-10-16 09:22:02 +00:00 committed by Gerrit Code Review
commit 8a2945b74c
11 changed files with 221 additions and 48 deletions

View File

@ -279,6 +279,7 @@ class AmphoraFlows(object):
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -288,12 +289,14 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-0-' + constants.AMP_VRRP_START, name=sf_name + '-0-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -302,6 +305,7 @@ class AmphoraFlows(object):
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -311,11 +315,13 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-1-' + constants.AMP_VRRP_START, name=sf_name + '-1-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -583,6 +589,7 @@ class AmphoraFlows(object):
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -611,6 +618,7 @@ class AmphoraFlows(object):
constants.AMPHORA_RELOAD_LISTENER), constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -645,6 +645,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
update_amps_subflow.add( update_amps_subflow.add(
@ -653,6 +654,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -677,6 +679,7 @@ class LoadBalancerFlows(object):
name=(new_amp_role + '-' + name=(new_amp_role + '-' +
constants.AMPHORA_RELOAD_LISTENER), constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -75,7 +75,7 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora.""" """Task to update the listeners on one amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=None): amphorae_status: dict, new_amphora_id: str, timeout_dict=None):
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
# health manager fix it. # health manager fix it.
@ -97,6 +97,9 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
LOG.error('Failed to update listeners on amphora %s. Skipping ' LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s', 'this amphora as it is failing to update due to: %s',
amphora_id, str(e)) amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
@ -138,7 +141,7 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
"""Task to reload all listeners on an amphora.""" """Task to reload all listeners on an amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=None): amphorae_status: dict, new_amphora_id: str, timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora.""" """Execute listener reload routines for listeners on an amphora."""
if amphorae is None: if amphorae is None:
return return
@ -158,6 +161,9 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
LOG.warning('Failed to reload listeners on amphora %s. ' LOG.warning('Failed to reload listeners on amphora %s. '
'Skipping this amphora as it is failing to ' 'Skipping this amphora as it is failing to '
'reload due to: %s', amphora_id, str(e)) 'reload due to: %s', amphora_id, str(e))
# Update only the status of the newly created amphora during
# the failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
@ -324,7 +330,7 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
"""Task to get and update the VRRP interface device name from amphora.""" """Task to get and update the VRRP interface device name from amphora."""
def execute(self, amphora_index, amphorae, amphorae_status: dict, def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None): new_amphora_id: str, timeout_dict=None):
amphora_id = amphorae[amphora_index].id amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {}) amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE): if amphora_status.get(constants.UNREACHABLE):
@ -341,6 +347,9 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
LOG.error('Failed to get amphora VRRP interface on amphora ' LOG.error('Failed to get amphora VRRP interface on amphora '
'%s. Skipping this amphora as it is failing due to: ' '%s. Skipping this amphora as it is failing due to: '
'%s', amphora_id, str(e)) '%s', amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return None return None
@ -380,7 +389,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
timeout_dict=None): new_amphora_id: str, timeout_dict=None):
"""Execute update_vrrp_conf.""" """Execute update_vrrp_conf."""
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
@ -403,6 +412,9 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
LOG.error('Failed to update VRRP configuration amphora %s. ' LOG.error('Failed to update VRRP configuration amphora %s. '
'Skipping this amphora as it is failing to update due ' 'Skipping this amphora as it is failing to update due '
'to: %s', amphora_id, str(e)) 'to: %s', amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return return
@ -427,7 +439,7 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
""" """
def execute(self, amphora_index, amphorae, amphorae_status: dict, def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None): new_amphora_id: str, timeout_dict=None):
amphora_id = amphorae[amphora_index].id amphora_id = amphorae[amphora_index].id
amphora_status = amphorae_status.get(amphora_id, {}) amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE): if amphora_status.get(constants.UNREACHABLE):
@ -442,6 +454,9 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
LOG.error('Failed to start VRRP on amphora %s. ' LOG.error('Failed to start VRRP on amphora %s. '
'Skipping this amphora as it is failing to start due ' 'Skipping this amphora as it is failing to start due '
'to: %s', amphora_id, str(e)) 'to: %s', amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return return

View File

@ -265,6 +265,7 @@ class AmphoraFlows(object):
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-0-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -274,12 +275,14 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_0_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-0-' + constants.AMP_VRRP_START, name=sf_name + '-0-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -288,6 +291,7 @@ class AmphoraFlows(object):
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface(
name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF, name=sf_name + '-1-' + constants.AMP_UPDATE_VRRP_INTF,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict}, constants.TIMEOUT_DICT: timeout_dict},
provides=constants.AMP_VRRP_INT)) provides=constants.AMP_VRRP_INT))
@ -297,11 +301,13 @@ class AmphoraFlows(object):
requires=(constants.LOADBALANCER_ID, requires=(constants.LOADBALANCER_ID,
constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE, constants.AMPHORAE_NETWORK_CONFIG, constants.AMPHORAE,
constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT), constants.AMPHORAE_STATUS, constants.AMP_VRRP_INT),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart( amp_1_subflow.add(amphora_driver_tasks.AmphoraIndexVRRPStart(
name=sf_name + '-1-' + constants.AMP_VRRP_START, name=sf_name + '-1-' + constants.AMP_VRRP_START,
requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS), requires=(constants.AMPHORAE, constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -569,6 +575,7 @@ class AmphoraFlows(object):
name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE, name=str(amp_index) + '-' + constants.AMP_LISTENER_UPDATE,
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -597,6 +604,7 @@ class AmphoraFlows(object):
constants.AMPHORA_RELOAD_LISTENER), constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: amp_index, inject={constants.AMPHORA_INDEX: amp_index,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -635,6 +635,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 0, inject={constants.AMPHORA_INDEX: 0,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
update_amps_subflow.add( update_amps_subflow.add(
@ -643,6 +644,7 @@ class LoadBalancerFlows(object):
constants.AMP_LISTENER_UPDATE), constants.AMP_LISTENER_UPDATE),
requires=(constants.LOADBALANCER, constants.AMPHORAE, requires=(constants.LOADBALANCER, constants.AMPHORAE,
constants.AMPHORAE_STATUS), constants.AMPHORAE_STATUS),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))
@ -667,6 +669,7 @@ class LoadBalancerFlows(object):
name=(new_amp_role + '-' + name=(new_amp_role + '-' +
constants.AMPHORA_RELOAD_LISTENER), constants.AMPHORA_RELOAD_LISTENER),
requires=(constants.LOADBALANCER, constants.AMPHORAE), requires=(constants.LOADBALANCER, constants.AMPHORAE),
rebind={constants.NEW_AMPHORA_ID: constants.AMPHORA_ID},
inject={constants.AMPHORA_INDEX: 1, inject={constants.AMPHORA_INDEX: 1,
constants.TIMEOUT_DICT: timeout_dict})) constants.TIMEOUT_DICT: timeout_dict}))

View File

@ -103,7 +103,7 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora.""" """Task to update the listeners on one amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=()): amphorae_status: dict, new_amphora_id: str, timeout_dict=()):
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
# health manager fix it. # health manager fix it.
@ -130,6 +130,9 @@ class AmphoraIndexListenerUpdate(BaseAmphoraTask):
LOG.error('Failed to update listeners on amphora %s. Skipping ' LOG.error('Failed to update listeners on amphora %s. Skipping '
'this amphora as it is failing to update due to: %s', 'this amphora as it is failing to update due to: %s',
amphora_id, str(e)) amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
@ -188,7 +191,7 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
"""Task to reload all listeners on an amphora.""" """Task to reload all listeners on an amphora."""
def execute(self, loadbalancer, amphora_index, amphorae, def execute(self, loadbalancer, amphora_index, amphorae,
amphorae_status: dict, timeout_dict=None): amphorae_status: dict, new_amphora_id: str, timeout_dict=None):
"""Execute listener reload routines for listeners on an amphora.""" """Execute listener reload routines for listeners on an amphora."""
if amphorae is None: if amphorae is None:
return return
@ -214,6 +217,9 @@ class AmphoraIndexListenersReload(BaseAmphoraTask):
LOG.warning('Failed to reload listeners on amphora %s. ' LOG.warning('Failed to reload listeners on amphora %s. '
'Skipping this amphora as it is failing to ' 'Skipping this amphora as it is failing to '
'reload due to: %s', amphora_id, str(e)) 'reload due to: %s', amphora_id, str(e))
# Update only the status of the newly created amphora during
# the failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
@ -440,7 +446,7 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
"""Task to get and update the VRRP interface device name from amphora.""" """Task to get and update the VRRP interface device name from amphora."""
def execute(self, amphora_index, amphorae, amphorae_status: dict, def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None): new_amphora_id: str, timeout_dict=None):
amphora_id = amphorae[amphora_index][constants.ID] amphora_id = amphorae[amphora_index][constants.ID]
amphora_status = amphorae_status.get(amphora_id, {}) amphora_status = amphorae_status.get(amphora_id, {})
if amphora_status.get(constants.UNREACHABLE): if amphora_status.get(constants.UNREACHABLE):
@ -460,6 +466,9 @@ class AmphoraIndexUpdateVRRPInterface(BaseAmphoraTask):
LOG.error('Failed to get amphora VRRP interface on amphora ' LOG.error('Failed to get amphora VRRP interface on amphora '
'%s. Skipping this amphora as it is failing due to: ' '%s. Skipping this amphora as it is failing due to: '
'%s', amphora_id, str(e)) '%s', amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return None return None
@ -504,7 +513,7 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index, def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str], amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
timeout_dict=None): new_amphora_id: str, timeout_dict=None):
"""Execute update_vrrp_conf.""" """Execute update_vrrp_conf."""
# Note, we don't want this to cause a revert as it may be used # Note, we don't want this to cause a revert as it may be used
# in a failover flow with both amps failing. Skip it and let # in a failover flow with both amps failing. Skip it and let
@ -530,6 +539,9 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
LOG.error('Failed to update VRRP configuration amphora %s. ' LOG.error('Failed to update VRRP configuration amphora %s. '
'Skipping this amphora as it is failing to update due ' 'Skipping this amphora as it is failing to update due '
'to: %s', amphora_id, str(e)) 'to: %s', amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return return
@ -558,7 +570,7 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
""" """
def execute(self, amphora_index, amphorae, amphorae_status: dict, def execute(self, amphora_index, amphorae, amphorae_status: dict,
timeout_dict=None): new_amphora_id: str, timeout_dict=None):
# TODO(johnsom) Optimize this to use the dicts and not need the # TODO(johnsom) Optimize this to use the dicts and not need the
# DB lookups # DB lookups
amphora_id = amphorae[amphora_index][constants.ID] amphora_id = amphorae[amphora_index][constants.ID]
@ -575,6 +587,9 @@ class AmphoraIndexVRRPStart(BaseAmphoraTask):
LOG.error('Failed to start VRRP on amphora %s. ' LOG.error('Failed to start VRRP on amphora %s. '
'Skipping this amphora as it is failing to start due ' 'Skipping this amphora as it is failing to start due '
'to: %s', amphora_id, str(e)) 'to: %s', amphora_id, str(e))
# Update only the status of the newly created amphora during the
# failover
if amphora_id == new_amphora_id:
self.amphora_repo.update(db_apis.get_session(), amphora_id, self.amphora_repo.update(db_apis.get_session(), amphora_id,
status=constants.ERROR) status=constants.ERROR)
return return

View File

@ -346,10 +346,11 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides)) self.assertEqual(2, len(vrrp_subflow.provides))
self.assertEqual(3, len(vrrp_subflow.requires)) self.assertEqual(4, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_create_vrrp_group( def test_get_vrrp_subflow_dont_create_vrrp_group(
self, mock_get_net_driver): self, mock_get_net_driver):

View File

@ -134,6 +134,7 @@ class TestAmphoraDriverTasks(base.TestCase):
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
amp_list_update_obj.execute(_load_balancer_mock, 0, amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], amphorae_status, [_amphora_mock], amphorae_status,
_amphora_mock.id,
self.timeout_dict) self.timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with( mock_driver.update_amphora_listeners.assert_called_once_with(
@ -147,7 +148,9 @@ class TestAmphoraDriverTasks(base.TestCase):
} }
} }
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict) amphorae_status,
_amphora_mock.id,
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_not_called() mock_driver.update_amphora_listeners.assert_not_called()
# Test exception # Test exception
@ -155,11 +158,23 @@ class TestAmphoraDriverTasks(base.TestCase):
amp_list_update_obj.execute(_load_balancer_mock, 0, amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], {}, [_amphora_mock], {},
_amphora_mock.id,
self.timeout_dict) self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR) _session_mock, AMP_ID, status=constants.ERROR)
# Test exception, secondary amp
mock_amphora_repo_update.reset_mock()
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_load_balancer_mock, 0,
[_amphora_mock], {},
'1234',
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
def test_listener_update(self, def test_listener_update(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,
@ -237,7 +252,7 @@ class TestAmphoraDriverTasks(base.TestCase):
# Test no listeners # Test no listeners
mock_lb.listeners = None mock_lb.listeners = None
listeners_reload_obj.execute(mock_lb, 0, None, {}) listeners_reload_obj.execute(mock_lb, 0, None, {}, amphora_mock.id)
mock_driver.reload.assert_not_called() mock_driver.reload.assert_not_called()
# Test with listeners # Test with listeners
@ -250,6 +265,7 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_lb.listeners = [mock_listener] mock_lb.listeners = [mock_listener]
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [amphora_mock],
amphorae_status, amphorae_status,
amphora_mock.id,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict) self.timeout_dict)
@ -263,18 +279,30 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.reload.reset_mock() mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status, amphorae_status,
_amphora_mock.id,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_not_called() mock_driver.reload.assert_not_called()
# Test with reload exception # Test with reload exception
mock_driver.reload.reset_mock() mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {}, listeners_reload_obj.execute(mock_lb, 0, [amphora_mock], {},
amphora_mock.id,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, amphora_mock,
self.timeout_dict) self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, amphora_mock.id, status=constants.ERROR) _session_mock, amphora_mock.id, status=constants.ERROR)
# Test with reload exception, secondary amp
mock_driver.reload.reset_mock()
mock_amphora_repo_update.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
'1234',
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error') 'mark_listener_prov_status_error')
def test_listeners_start(self, def test_listeners_start(self,
@ -654,7 +682,8 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_update_vrrp_interface_obj = ( amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict) 0, [_amphora_mock], amphorae_status, _amphora_mock.id,
timeout_dict)
mock_driver.get_interface_from_ip.assert_called_once_with( mock_driver.get_interface_from_ip.assert_called_once_with(
_amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict) _amphora_mock, _amphora_mock.vrrp_ip, timeout_dict=timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
@ -668,16 +697,23 @@ class TestAmphoraDriverTasks(base.TestCase):
} }
} }
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict) 0, [_amphora_mock], amphorae_status, _amphora_mock.id,
timeout_dict)
mock_driver.get_interface_from_ip.assert_not_called() mock_driver.get_interface_from_ip.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, timeout_dict) 0, [_amphora_mock], {}, _amphora_mock.id, timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR) _session_mock, _amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, '1234', timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get') @mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_amphora_vrrp_update(self, def test_amphora_vrrp_update(self,
mock_lb_get, mock_lb_get,
@ -732,6 +768,7 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
0, [_amphora_mock], amphorae_status, 0, [_amphora_mock], amphorae_status,
'fakeint0', 'fakeint0',
_amphora_mock.id,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.update_vrrp_conf.assert_called_once_with( mock_driver.update_vrrp_conf.assert_called_once_with(
_LB_mock, amphorae_network_config, _amphora_mock, _LB_mock, amphorae_network_config, _amphora_mock,
@ -747,17 +784,24 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.update_vrrp_conf.reset_mock() mock_driver.update_vrrp_conf.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status, 0, [_amphora_mock], amphorae_status,
None) None, _amphora_mock.id)
mock_driver.update_vrrp_conf.assert_not_called() mock_driver.update_vrrp_conf.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config, amphora_vrrp_update_obj.execute(_LB_mock.id, amphorae_network_config,
0, [_amphora_mock], {}, 0, [_amphora_mock], {}, 'fakeint0',
'fakeint0') _amphora_mock.id)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR) _session_mock, _amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], {}, 'fakeint0',
'1234')
mock_amphora_repo_update.assert_not_called()
def test_amphora_vrrp_start(self, def test_amphora_vrrp_start(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,
@ -793,6 +837,7 @@ class TestAmphoraDriverTasks(base.TestCase):
Exception('boom')] Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock.id,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict) _amphora_mock, self.timeout_dict)
@ -805,18 +850,29 @@ class TestAmphoraDriverTasks(base.TestCase):
} }
} }
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock.id,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_not_called() mock_driver.start_vrrp_service.assert_not_called()
# Test with a start exception # Test with a start exception
mock_driver.start_vrrp_service.reset_mock() mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
_amphora_mock.id,
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict) _amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _amphora_mock.id, status=constants.ERROR) _session_mock, _amphora_mock.id, status=constants.ERROR)
# Test with a start exception, secondary amp
mock_driver.start_vrrp_service.reset_mock()
mock_amphora_repo_update.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, '1234',
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
def test_amphora_compute_connectivity_wait(self, def test_amphora_compute_connectivity_wait(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,

View File

@ -390,10 +390,11 @@ class TestAmphoraFlows(base.TestCase):
self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires) self.assertIn(constants.LOADBALANCER_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE, vrrp_subflow.requires)
self.assertIn(constants.AMPHORA_ID, vrrp_subflow.requires)
self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires) self.assertIn(constants.AMPHORAE_STATUS, vrrp_subflow.requires)
self.assertEqual(2, len(vrrp_subflow.provides)) self.assertEqual(2, len(vrrp_subflow.provides))
self.assertEqual(3, len(vrrp_subflow.requires)) self.assertEqual(4, len(vrrp_subflow.requires))
def test_get_vrrp_subflow_dont_create_vrrp_group( def test_get_vrrp_subflow_dont_create_vrrp_group(
self, mock_get_net_driver): self, mock_get_net_driver):

View File

@ -140,7 +140,9 @@ class TestAmphoraDriverTasks(base.TestCase):
amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate() amp_list_update_obj = amphora_driver_tasks.AmphoraIndexListenerUpdate()
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict) amphorae_status,
_amphora_mock[constants.ID],
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_called_once_with( mock_driver.update_amphora_listeners.assert_called_once_with(
_db_load_balancer_mock, _db_amphora_mock, self.timeout_dict) _db_load_balancer_mock, _db_amphora_mock, self.timeout_dict)
@ -153,18 +155,31 @@ class TestAmphoraDriverTasks(base.TestCase):
} }
} }
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock],
amphorae_status, self.timeout_dict) amphorae_status,
_amphora_mock[constants.ID],
self.timeout_dict)
mock_driver.update_amphora_listeners.assert_not_called() mock_driver.update_amphora_listeners.assert_not_called()
# Test exception # Test exception
mock_driver.update_amphora_listeners.side_effect = Exception('boom') mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {}, amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {},
_amphora_mock[constants.ID],
self.timeout_dict) self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR) _session_mock, AMP_ID, status=constants.ERROR)
# Test exception, secondary amp
mock_amphora_repo_update.reset_mock()
mock_driver.update_amphora_listeners.side_effect = Exception('boom')
amp_list_update_obj.execute(_LB_mock, 0, [_amphora_mock], {},
'1234',
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get') @mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_listeners_update(self, def test_listeners_update(self,
mock_lb_get, mock_lb_get,
@ -223,7 +238,8 @@ class TestAmphoraDriverTasks(base.TestCase):
# Test no listeners # Test no listeners
mock_lb.listeners = None mock_lb.listeners = None
listeners_reload_obj.execute(mock_lb, 0, None, {}) listeners_reload_obj.execute(mock_lb, 0, None, {},
_amphora_mock[constants.ID])
mock_driver.reload.assert_not_called() mock_driver.reload.assert_not_called()
# Test with listeners # Test with listeners
@ -236,6 +252,7 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_lb.listeners = [mock_listener] mock_lb.listeners = [mock_listener]
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status, amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict) self.timeout_dict)
@ -249,12 +266,14 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.reload.reset_mock() mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock],
amphorae_status, amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_not_called() mock_driver.reload.assert_not_called()
# Test with reload exception # Test with reload exception
mock_driver.reload.reset_mock() mock_driver.reload.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {}, listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock, mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict) self.timeout_dict)
@ -262,6 +281,16 @@ class TestAmphoraDriverTasks(base.TestCase):
_session_mock, _amphora_mock[constants.ID], _session_mock, _amphora_mock[constants.ID],
status=constants.ERROR) status=constants.ERROR)
# Test with reload exception, secondary amp
mock_driver.reload.reset_mock()
mock_amphora_repo_update.reset_mock()
listeners_reload_obj.execute(mock_lb, 0, [_amphora_mock], {},
'1234',
timeout_dict=self.timeout_dict)
mock_driver.reload.assert_called_once_with(mock_lb, _amphora_mock,
self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.' @mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error') 'mark_listener_prov_status_error')
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get') @mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
@ -775,7 +804,8 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_update_vrrp_interface_obj = ( amphora_update_vrrp_interface_obj = (
amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface()) amphora_driver_tasks.AmphoraIndexUpdateVRRPInterface())
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict) 0, [_amphora_mock], amphorae_status, _amphora_mock[constants.ID],
timeout_dict)
mock_driver.get_interface_from_ip.assert_called_once_with( mock_driver.get_interface_from_ip.assert_called_once_with(
_db_amphora_mock, _db_amphora_mock.vrrp_ip, _db_amphora_mock, _db_amphora_mock.vrrp_ip,
timeout_dict=timeout_dict) timeout_dict=timeout_dict)
@ -790,16 +820,23 @@ class TestAmphoraDriverTasks(base.TestCase):
} }
} }
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], amphorae_status, timeout_dict) 0, [_amphora_mock], amphorae_status, _amphora_mock[constants.ID],
timeout_dict)
mock_driver.get_interface_from_ip.assert_not_called() mock_driver.get_interface_from_ip.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute( amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, timeout_dict) 0, [_amphora_mock], {}, _amphora_mock[constants.ID], timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR) _session_mock, _db_amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_update_vrrp_interface_obj.execute(
0, [_amphora_mock], {}, '1234', timeout_dict)
mock_amphora_repo_update.assert_not_called()
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get') @mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_amphora_vrrp_update(self, def test_amphora_vrrp_update(self,
mock_lb_get, mock_lb_get,
@ -859,6 +896,7 @@ class TestAmphoraDriverTasks(base.TestCase):
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status, 0, [_amphora_mock], amphorae_status,
'fakeint0', 'fakeint0',
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.update_vrrp_conf.assert_called_once_with( mock_driver.update_vrrp_conf.assert_called_once_with(
_db_load_balancer_mock, amphorae_network_config, _db_amphora_mock, _db_load_balancer_mock, amphorae_network_config, _db_amphora_mock,
@ -874,16 +912,24 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_driver.update_vrrp_conf.reset_mock() mock_driver.update_vrrp_conf.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], amphorae_status, 0, [_amphora_mock], amphorae_status,
None) None, _amphora_mock[constants.ID])
mock_driver.update_vrrp_conf.assert_not_called() mock_driver.update_vrrp_conf.assert_not_called()
# Test with an exception # Test with an exception
mock_amphora_repo_update.reset_mock() mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config, amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], {}, 'fakeint0') 0, [_amphora_mock], {}, 'fakeint0',
_amphora_mock[constants.ID])
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR) _session_mock, _db_amphora_mock.id, status=constants.ERROR)
# Test with an exception, secondary amp
mock_amphora_repo_update.reset_mock()
amphora_vrrp_update_obj.execute(LB_ID, amphorae_network_config,
0, [_amphora_mock], {}, 'fakeint0',
'1234')
mock_amphora_repo_update.assert_not_called()
def test_amphora_vrrp_start(self, def test_amphora_vrrp_start(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,
@ -923,6 +969,7 @@ class TestAmphoraDriverTasks(base.TestCase):
Exception('boom')] Exception('boom')]
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict) _db_amphora_mock, self.timeout_dict)
@ -935,18 +982,29 @@ class TestAmphoraDriverTasks(base.TestCase):
} }
} }
amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status, amphora_vrrp_start_obj.execute(0, [_amphora_mock], amphorae_status,
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_not_called() mock_driver.start_vrrp_service.assert_not_called()
# Test with a start exception # Test with a start exception
mock_driver.start_vrrp_service.reset_mock() mock_driver.start_vrrp_service.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, amphora_vrrp_start_obj.execute(0, [_amphora_mock], {},
_amphora_mock[constants.ID],
timeout_dict=self.timeout_dict) timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with( mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict) _db_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_called_once_with( mock_amphora_repo_update.assert_called_once_with(
_session_mock, _db_amphora_mock.id, status=constants.ERROR) _session_mock, _db_amphora_mock.id, status=constants.ERROR)
# Test with a start exception, secondary amp
mock_driver.start_vrrp_service.reset_mock()
mock_amphora_repo_update.reset_mock()
amphora_vrrp_start_obj.execute(0, [_amphora_mock], {}, '1234',
timeout_dict=self.timeout_dict)
mock_driver.start_vrrp_service.assert_called_once_with(
_db_amphora_mock, self.timeout_dict)
mock_amphora_repo_update.assert_not_called()
def test_amphora_compute_connectivity_wait(self, def test_amphora_compute_connectivity_wait(self,
mock_driver, mock_driver,
mock_generate_uuid, mock_generate_uuid,

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Reduce the duration of the failovers of ACTIVE_STANDBY load balancers when
both amphorae are unreachable.