Convert health_monitor flows to use provider models
Change-Id: Ifab50d30dbaf197dd2ba3b5e11b94732eb2e0aaf Story: 2005072 Task: 30811
This commit is contained in:
parent
5f1adf4e2b
commit
657068a278
octavia
api/drivers/amphora_driver/v2
common
controller
queue/v2
worker/v2
tests/unit
api/drivers/amphora_driver/v2
controller
@ -277,12 +277,11 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
|
||||
|
||||
# Health Monitor
|
||||
def health_monitor_create(self, healthmonitor):
|
||||
payload = {consts.HEALTH_MONITOR_ID: healthmonitor.healthmonitor_id}
|
||||
payload = {consts.HEALTH_MONITOR: healthmonitor.to_dict()}
|
||||
self.client.cast({}, 'create_health_monitor', **payload)
|
||||
|
||||
def health_monitor_delete(self, healthmonitor):
|
||||
healthmonitor_id = healthmonitor.healthmonitor_id
|
||||
payload = {consts.HEALTH_MONITOR_ID: healthmonitor_id}
|
||||
payload = {consts.HEALTH_MONITOR: healthmonitor.to_dict()}
|
||||
self.client.cast({}, 'delete_health_monitor', **payload)
|
||||
|
||||
def health_monitor_update(self, old_healthmonitor, new_healthmonitor):
|
||||
@ -295,9 +294,9 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
|
||||
if 'max_retries' in healthmon_dict:
|
||||
healthmon_dict['rise_threshold'] = healthmon_dict.pop(
|
||||
'max_retries')
|
||||
healthmon_id = healthmon_dict.pop('healthmonitor_id')
|
||||
healthmon_dict.pop('healthmonitor_id')
|
||||
|
||||
payload = {consts.HEALTH_MONITOR_ID: healthmon_id,
|
||||
payload = {consts.ORIGINAL_HEALTH_MONITOR: old_healthmonitor.to_dict(),
|
||||
consts.HEALTH_MONITOR_UPDATES: healthmon_dict}
|
||||
self.client.cast({}, 'update_health_monitor', **payload)
|
||||
|
||||
|
@ -335,6 +335,7 @@ HA_PORT_ID = 'ha_port_id'
|
||||
HEALTH_MON = 'health_mon'
|
||||
HEALTH_MONITOR = 'health_monitor'
|
||||
HEALTH_MONITOR_ID = 'health_monitor_id'
|
||||
HEALTHMONITOR_ID = 'healthmonitor_id'
|
||||
HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
|
||||
ID = 'id'
|
||||
IMAGE_ID = 'image_id'
|
||||
@ -363,6 +364,7 @@ NETWORK = 'network'
|
||||
NETWORK_ID = 'network_id'
|
||||
NICS = 'nics'
|
||||
OBJECT = 'object'
|
||||
ORIGINAL_HEALTH_MONITOR = 'original_health_monitor'
|
||||
ORIGINAL_LISTENER = 'original_listener'
|
||||
ORIGINAL_LOADBALANCER = 'original_load_balancer'
|
||||
ORIGINAL_MEMBER = 'original_member'
|
||||
|
@ -93,19 +93,22 @@ class Endpoints(object):
|
||||
LOG.info('Deleting pool \'%s\'...', pool.get(constants.POOL_ID))
|
||||
self.worker.delete_pool(pool)
|
||||
|
||||
def create_health_monitor(self, context, health_monitor_id):
|
||||
LOG.info('Creating health monitor \'%s\'...', health_monitor_id)
|
||||
self.worker.create_health_monitor(health_monitor_id)
|
||||
def create_health_monitor(self, context, health_monitor):
|
||||
LOG.info('Creating health monitor \'%s\'...', health_monitor.get(
|
||||
constants.ID))
|
||||
self.worker.create_health_monitor(health_monitor)
|
||||
|
||||
def update_health_monitor(self, context, health_monitor_id,
|
||||
def update_health_monitor(self, context, original_health_monitor,
|
||||
health_monitor_updates):
|
||||
LOG.info('Updating health monitor \'%s\'...', health_monitor_id)
|
||||
self.worker.update_health_monitor(health_monitor_id,
|
||||
LOG.info('Updating health monitor \'%s\'...',
|
||||
original_health_monitor.get(constants.ID))
|
||||
self.worker.update_health_monitor(original_health_monitor,
|
||||
health_monitor_updates)
|
||||
|
||||
def delete_health_monitor(self, context, health_monitor_id):
|
||||
LOG.info('Deleting health monitor \'%s\'...', health_monitor_id)
|
||||
self.worker.delete_health_monitor(health_monitor_id)
|
||||
def delete_health_monitor(self, context, health_monitor):
|
||||
LOG.info('Deleting health monitor \'%s\'...', health_monitor.get(
|
||||
constants.ID))
|
||||
self.worker.delete_health_monitor(health_monitor)
|
||||
|
||||
def create_member(self, context, member):
|
||||
LOG.info('Creating member \'%s\'...', member.get(constants.MEMBER_ID))
|
||||
|
@ -140,27 +140,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
log=LOG):
|
||||
delete_amp_tf.run()
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
|
||||
wait=tenacity.wait_incrementing(
|
||||
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||
def create_health_monitor(self, health_monitor_id):
|
||||
def create_health_monitor(self, health_monitor):
|
||||
"""Creates a health monitor.
|
||||
|
||||
:param pool_id: ID of the pool to create a health monitor on
|
||||
:param health_monitor: Provider health monitor dict
|
||||
:returns: None
|
||||
:raises NoResultFound: Unable to find the object
|
||||
"""
|
||||
health_mon = self._health_mon_repo.get(db_apis.get_session(),
|
||||
id=health_monitor_id)
|
||||
if not health_mon:
|
||||
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
|
||||
'60 seconds.', 'health_monitor', health_monitor_id)
|
||||
raise db_exceptions.NoResultFound
|
||||
db_health_monitor = self._health_mon_repo.get(
|
||||
db_apis.get_session(),
|
||||
id=health_monitor[constants.HEALTHMONITOR_ID])
|
||||
|
||||
pool = health_mon.pool
|
||||
pool.health_monitor = health_mon
|
||||
pool = db_health_monitor.pool
|
||||
pool.health_monitor = db_health_monitor
|
||||
load_balancer = pool.load_balancer
|
||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||
load_balancer).to_dict()
|
||||
@ -171,7 +163,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
|
||||
create_hm_tf = self._taskflow_load(
|
||||
self._health_monitor_flows.get_create_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
store={constants.HEALTH_MON: health_monitor,
|
||||
constants.POOL_ID: pool.id,
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
@ -180,17 +172,18 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
log=LOG):
|
||||
create_hm_tf.run()
|
||||
|
||||
def delete_health_monitor(self, health_monitor_id):
|
||||
def delete_health_monitor(self, health_monitor):
|
||||
"""Deletes a health monitor.
|
||||
|
||||
:param pool_id: ID of the pool to delete its health monitor
|
||||
:param health_monitor: Provider health monitor dict
|
||||
:returns: None
|
||||
:raises HMNotFound: The referenced health monitor was not found
|
||||
"""
|
||||
health_mon = self._health_mon_repo.get(db_apis.get_session(),
|
||||
id=health_monitor_id)
|
||||
db_health_monitor = self._health_mon_repo.get(
|
||||
db_apis.get_session(),
|
||||
id=health_monitor[constants.HEALTHMONITOR_ID])
|
||||
|
||||
pool = health_mon.pool
|
||||
pool = db_health_monitor.pool
|
||||
load_balancer = pool.load_balancer
|
||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||
load_balancer).to_dict()
|
||||
@ -201,49 +194,50 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
|
||||
delete_hm_tf = self._taskflow_load(
|
||||
self._health_monitor_flows.get_delete_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
store={constants.HEALTH_MON: health_monitor,
|
||||
constants.POOL_ID: pool.id,
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.LOADBALANCER: provider_lb})
|
||||
constants.LOADBALANCER: provider_lb,
|
||||
constants.PROJECT_ID: load_balancer.project_id})
|
||||
with tf_logging.DynamicLoggingListener(delete_hm_tf,
|
||||
log=LOG):
|
||||
delete_hm_tf.run()
|
||||
|
||||
def update_health_monitor(self, health_monitor_id, health_monitor_updates):
|
||||
def update_health_monitor(self, original_health_monitor,
|
||||
health_monitor_updates):
|
||||
"""Updates a health monitor.
|
||||
|
||||
:param pool_id: ID of the pool to have it's health monitor updated
|
||||
:param original_health_monitor: Provider health monitor dict
|
||||
:param health_monitor_updates: Dict containing updated health monitor
|
||||
:returns: None
|
||||
:raises HMNotFound: The referenced health monitor was not found
|
||||
"""
|
||||
health_mon = None
|
||||
try:
|
||||
health_mon = self._get_db_obj_until_pending_update(
|
||||
self._health_mon_repo, health_monitor_id)
|
||||
db_health_monitor = self._get_db_obj_until_pending_update(
|
||||
self._health_mon_repo,
|
||||
original_health_monitor[constants.HEALTHMONITOR_ID])
|
||||
except tenacity.RetryError as e:
|
||||
LOG.warning('Health monitor did not go into %s in 60 seconds. '
|
||||
'This either due to an in-progress Octavia upgrade '
|
||||
'or an overloaded and failing database. Assuming '
|
||||
'an upgrade is in progress and continuing.',
|
||||
constants.PENDING_UPDATE)
|
||||
health_mon = e.last_attempt.result()
|
||||
db_health_monitor = e.last_attempt.result()
|
||||
|
||||
pool = health_mon.pool
|
||||
pool = db_health_monitor.pool
|
||||
|
||||
listeners_dicts = (
|
||||
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
|
||||
pool.listeners))
|
||||
|
||||
pool.health_monitor = health_mon
|
||||
load_balancer = pool.load_balancer
|
||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||
load_balancer).to_dict()
|
||||
|
||||
update_hm_tf = self._taskflow_load(
|
||||
self._health_monitor_flows.get_update_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
store={constants.HEALTH_MON: original_health_monitor,
|
||||
constants.POOL_ID: pool.id,
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
|
@ -63,7 +63,7 @@ class HealthMonitorFlows(object):
|
||||
delete_hm_flow.add(database_tasks.DeleteHealthMonitorInDB(
|
||||
requires=constants.HEALTH_MON))
|
||||
delete_hm_flow.add(database_tasks.DecrementHealthMonitorQuota(
|
||||
requires=constants.HEALTH_MON))
|
||||
requires=constants.PROJECT_ID))
|
||||
delete_hm_flow.add(
|
||||
database_tasks.UpdatePoolMembersOperatingStatusInDB(
|
||||
requires=constants.POOL_ID,
|
||||
|
@ -165,10 +165,12 @@ class DeleteHealthMonitorInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("DB delete health monitor: %s ", health_mon.id)
|
||||
LOG.debug("DB delete health monitor: %s ",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
try:
|
||||
self.health_mon_repo.delete(db_apis.get_session(),
|
||||
id=health_mon.id)
|
||||
self.health_mon_repo.delete(
|
||||
db_apis.get_session(),
|
||||
id=health_mon[constants.HEALTHMONITOR_ID])
|
||||
except exc.NoResultFound:
|
||||
# ignore if the HealthMonitor was not found
|
||||
pass
|
||||
@ -181,8 +183,10 @@ class DeleteHealthMonitorInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark health monitor delete in DB "
|
||||
"for health monitor with id %s", health_mon.id)
|
||||
self.health_mon_repo.update(db_apis.get_session(), id=health_mon.id,
|
||||
"for health monitor with id %s",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.health_mon_repo.update(db_apis.get_session(),
|
||||
id=health_mon[constants.HEALTHMONITOR_ID],
|
||||
provisioning_status=constants.ERROR)
|
||||
|
||||
|
||||
@ -192,23 +196,31 @@ class DeleteHealthMonitorInDBByPool(DeleteHealthMonitorInDB):
|
||||
Since sqlalchemy will likely retry by itself always revert if it fails
|
||||
"""
|
||||
|
||||
def execute(self, pool):
|
||||
def execute(self, pool_id):
|
||||
"""Delete the health monitor in the DB.
|
||||
|
||||
:param pool: A pool which health monitor should be deleted.
|
||||
:param pool_id: ID of pool which health monitor should be deleted.
|
||||
:returns: None
|
||||
"""
|
||||
db_pool = self.pool_repo.get(db_apis.get_session(),
|
||||
id=pool_id)
|
||||
provider_hm = provider_utils.db_HM_to_provider_HM(
|
||||
db_pool.health_monitor).to_dict()
|
||||
super(DeleteHealthMonitorInDBByPool, self).execute(
|
||||
pool.health_monitor)
|
||||
provider_hm)
|
||||
|
||||
def revert(self, pool, *args, **kwargs):
|
||||
def revert(self, pool_id, *args, **kwargs):
|
||||
"""Mark the health monitor ERROR since the mark active couldn't happen
|
||||
|
||||
:param pool: A pool which health monitor couldn't be deleted
|
||||
:param pool_id: ID of pool which health monitor couldn't be deleted
|
||||
:returns: None
|
||||
"""
|
||||
db_pool = self.pool_repo.get(db_apis.get_session(),
|
||||
id=pool_id)
|
||||
provider_hm = provider_utils.db_HM_to_provider_HM(
|
||||
db_pool.health_monitor).to_dict()
|
||||
super(DeleteHealthMonitorInDBByPool, self).revert(
|
||||
pool.health_monitor, *args, **kwargs)
|
||||
provider_hm, *args, **kwargs)
|
||||
|
||||
|
||||
class DeleteMemberInDB(BaseDatabaseTask):
|
||||
@ -1433,8 +1445,10 @@ class UpdateHealthMonInDB(BaseDatabaseTask):
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("Update DB for health monitor id: %s ", health_mon.id)
|
||||
self.health_mon_repo.update(db_apis.get_session(), health_mon.id,
|
||||
LOG.debug("Update DB for health monitor id: %s ",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.health_mon_repo.update(db_apis.get_session(),
|
||||
health_mon[constants.HEALTHMONITOR_ID],
|
||||
**update_dict)
|
||||
|
||||
def revert(self, health_mon, *args, **kwargs):
|
||||
@ -1445,15 +1459,18 @@ class UpdateHealthMonInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting update health monitor in DB "
|
||||
"for health monitor id %s", health_mon.id)
|
||||
"for health monitor id %s",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
try:
|
||||
self.health_mon_repo.update(db_apis.get_session(),
|
||||
health_mon.id,
|
||||
provisioning_status=constants.ERROR)
|
||||
self.health_mon_repo.update(
|
||||
db_apis.get_session(),
|
||||
health_mon[constants.HEALTHMONITOR_ID],
|
||||
provisioning_status=constants.ERROR)
|
||||
except Exception as e:
|
||||
LOG.error("Failed to update health monitor %(hm)s "
|
||||
"provisioning_status to ERROR due to: %(except)s",
|
||||
{'hm': health_mon.id, 'except': e})
|
||||
{'hm': health_mon[constants.HEALTHMONITOR_ID],
|
||||
'except': e})
|
||||
|
||||
|
||||
class UpdateListenerInDB(BaseDatabaseTask):
|
||||
@ -1823,12 +1840,11 @@ class MarkHealthMonitorActiveInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.debug("Mark ACTIVE in DB for health monitor id: %s",
|
||||
health_mon.id)
|
||||
|
||||
op_status = (constants.ONLINE if health_mon.enabled
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
op_status = (constants.ONLINE if health_mon['admin_state_up']
|
||||
else constants.OFFLINE)
|
||||
self.health_mon_repo.update(db_apis.get_session(),
|
||||
health_mon.id,
|
||||
health_mon[constants.HEALTHMONITOR_ID],
|
||||
provisioning_status=constants.ACTIVE,
|
||||
operating_status=op_status)
|
||||
|
||||
@ -1840,8 +1856,10 @@ class MarkHealthMonitorActiveInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark health montor ACTIVE in DB "
|
||||
"for health monitor id %s", health_mon.id)
|
||||
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
|
||||
"for health monitor id %s",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.task_utils.mark_health_mon_prov_status_error(
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
|
||||
|
||||
class MarkHealthMonitorPendingCreateInDB(BaseDatabaseTask):
|
||||
@ -1858,9 +1876,9 @@ class MarkHealthMonitorPendingCreateInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.debug("Mark PENDING CREATE in DB for health monitor id: %s",
|
||||
health_mon.id)
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.health_mon_repo.update(db_apis.get_session(),
|
||||
health_mon.id,
|
||||
health_mon[constants.HEALTHMONITOR_ID],
|
||||
provisioning_status=(constants.
|
||||
PENDING_CREATE))
|
||||
|
||||
@ -1872,8 +1890,10 @@ class MarkHealthMonitorPendingCreateInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark health monitor pending create in DB "
|
||||
"for health monitor id %s", health_mon.id)
|
||||
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
|
||||
"for health monitor id %s",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.task_utils.mark_health_mon_prov_status_error(
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
|
||||
|
||||
class MarkHealthMonitorPendingDeleteInDB(BaseDatabaseTask):
|
||||
@ -1890,9 +1910,9 @@ class MarkHealthMonitorPendingDeleteInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.debug("Mark PENDING DELETE in DB for health monitor id: %s",
|
||||
health_mon.id)
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.health_mon_repo.update(db_apis.get_session(),
|
||||
health_mon.id,
|
||||
health_mon[constants.HEALTHMONITOR_ID],
|
||||
provisioning_status=(constants.
|
||||
PENDING_DELETE))
|
||||
|
||||
@ -1904,8 +1924,10 @@ class MarkHealthMonitorPendingDeleteInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark health monitor pending delete in DB "
|
||||
"for health monitor id %s", health_mon.id)
|
||||
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
|
||||
"for health monitor id %s",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.task_utils.mark_health_mon_prov_status_error(
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
|
||||
|
||||
class MarkHealthMonitorPendingUpdateInDB(BaseDatabaseTask):
|
||||
@ -1922,9 +1944,9 @@ class MarkHealthMonitorPendingUpdateInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.debug("Mark PENDING UPDATE in DB for health monitor id: %s",
|
||||
health_mon.id)
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.health_mon_repo.update(db_apis.get_session(),
|
||||
health_mon.id,
|
||||
health_mon[constants.HEALTHMONITOR_ID],
|
||||
provisioning_status=(constants.
|
||||
PENDING_UPDATE))
|
||||
|
||||
@ -1936,8 +1958,10 @@ class MarkHealthMonitorPendingUpdateInDB(BaseDatabaseTask):
|
||||
"""
|
||||
|
||||
LOG.warning("Reverting mark health monitor pending update in DB "
|
||||
"for health monitor id %s", health_mon.id)
|
||||
self.task_utils.mark_health_mon_prov_status_error(health_mon.id)
|
||||
"for health monitor id %s",
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
self.task_utils.mark_health_mon_prov_status_error(
|
||||
health_mon[constants.HEALTHMONITOR_ID])
|
||||
|
||||
|
||||
class MarkL7PolicyActiveInDB(BaseDatabaseTask):
|
||||
@ -2452,39 +2476,39 @@ class DecrementHealthMonitorQuota(BaseDatabaseTask):
|
||||
Since sqlalchemy will likely retry by itself always revert if it fails
|
||||
"""
|
||||
|
||||
def execute(self, health_mon):
|
||||
def execute(self, project_id):
|
||||
"""Decrements the health monitor quota.
|
||||
|
||||
:param health_mon: The health monitor to decrement the quota on.
|
||||
:param project_id: The project_id to decrement the quota on.
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.debug("Decrementing health monitor quota for "
|
||||
"project: %s ", health_mon.project_id)
|
||||
"project: %s ", project_id)
|
||||
|
||||
lock_session = db_apis.get_session(autocommit=False)
|
||||
try:
|
||||
self.repos.decrement_quota(lock_session,
|
||||
data_models.HealthMonitor,
|
||||
health_mon.project_id)
|
||||
project_id)
|
||||
lock_session.commit()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error('Failed to decrement health monitor quota for '
|
||||
'project: %(proj)s the project may have excess '
|
||||
'quota in use.', {'proj': health_mon.project_id})
|
||||
'quota in use.', {'proj': project_id})
|
||||
lock_session.rollback()
|
||||
|
||||
def revert(self, health_mon, result, *args, **kwargs):
|
||||
def revert(self, project_id, result, *args, **kwargs):
|
||||
"""Re-apply the quota
|
||||
|
||||
:param health_mon: The health monitor to decrement the quota on.
|
||||
:param project_id: The project_id to decrement the quota on.
|
||||
:returns: None
|
||||
"""
|
||||
|
||||
LOG.warning('Reverting decrement quota for health monitor on project'
|
||||
' %(proj)s Project quota counts may be incorrect.',
|
||||
{'proj': health_mon.project_id})
|
||||
{'proj': project_id})
|
||||
|
||||
# Increment the quota back if this task wasn't the failure
|
||||
if not isinstance(result, failure.Failure):
|
||||
@ -2496,7 +2520,7 @@ class DecrementHealthMonitorQuota(BaseDatabaseTask):
|
||||
self.repos.check_quota_met(session,
|
||||
lock_session,
|
||||
data_models.HealthMonitor,
|
||||
health_mon.project_id)
|
||||
project_id)
|
||||
lock_session.commit()
|
||||
except Exception:
|
||||
lock_session.rollback()
|
||||
|
@ -54,8 +54,10 @@ class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
|
||||
pass
|
||||
|
||||
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
|
||||
self.task_utils.mark_health_mon_prov_status_error(health_mon.pool_id)
|
||||
self.task_utils.mark_pool_prov_status_active(health_mon.pool_id)
|
||||
self.task_utils.mark_health_mon_prov_status_error(
|
||||
health_mon[constants.POOL_ID])
|
||||
self.task_utils.mark_pool_prov_status_active(
|
||||
health_mon[constants.POOL_ID])
|
||||
self.task_utils.mark_loadbalancer_prov_status_active(
|
||||
loadbalancer[constants.LOADBALANCER_ID])
|
||||
for listener in listeners:
|
||||
|
@ -417,7 +417,7 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
provider_HM = driver_dm.HealthMonitor(
|
||||
healthmonitor_id=self.sample_data.hm1_id)
|
||||
self.amp_driver.health_monitor_create(provider_HM)
|
||||
payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id}
|
||||
payload = {consts.HEALTH_MONITOR: provider_HM.to_dict()}
|
||||
mock_cast.assert_called_with({}, 'create_health_monitor', **payload)
|
||||
|
||||
@mock.patch('oslo_messaging.RPCClient.cast')
|
||||
@ -425,7 +425,7 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
provider_HM = driver_dm.HealthMonitor(
|
||||
healthmonitor_id=self.sample_data.hm1_id)
|
||||
self.amp_driver.health_monitor_delete(provider_HM)
|
||||
payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id}
|
||||
payload = {consts.HEALTH_MONITOR: provider_HM.to_dict()}
|
||||
mock_cast.assert_called_with({}, 'delete_health_monitor', **payload)
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
@ -516,7 +516,7 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
max_retries=1, max_retries_down=2)
|
||||
hm_dict = {'enabled': True, 'rise_threshold': 1, 'fall_threshold': 2}
|
||||
self.amp_driver.health_monitor_update(old_provider_hm, provider_hm)
|
||||
payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id,
|
||||
payload = {consts.ORIGINAL_HEALTH_MONITOR: old_provider_hm.to_dict(),
|
||||
consts.HEALTH_MONITOR_UPDATES: hm_dict}
|
||||
mock_cast.assert_called_with({}, 'update_health_monitor', **payload)
|
||||
|
||||
@ -528,7 +528,7 @@ class TestAmphoraDriver(base.TestRpc):
|
||||
healthmonitor_id=self.sample_data.hm1_id, name='Great HM')
|
||||
hm_dict = {'name': 'Great HM'}
|
||||
self.amp_driver.health_monitor_update(old_provider_hm, provider_hm)
|
||||
payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id,
|
||||
payload = {consts.ORIGINAL_HEALTH_MONITOR: old_provider_hm.to_dict(),
|
||||
consts.HEALTH_MONITOR_UPDATES: hm_dict}
|
||||
mock_cast.assert_called_with({}, 'update_health_monitor', **payload)
|
||||
|
||||
|
@ -113,20 +113,20 @@ class TestEndpoints(base.TestCase):
|
||||
self.resource)
|
||||
|
||||
def test_create_health_monitor(self):
|
||||
self.ep.create_health_monitor(self.context, self.resource_id)
|
||||
self.ep.create_health_monitor(self.context, self.resource)
|
||||
self.ep.worker.create_health_monitor.assert_called_once_with(
|
||||
self.resource_id)
|
||||
self.resource)
|
||||
|
||||
def test_update_health_monitor(self):
|
||||
self.ep.update_health_monitor(self.context, self.resource_id,
|
||||
self.ep.update_health_monitor(self.context, self.resource,
|
||||
self.resource_updates)
|
||||
self.ep.worker.update_health_monitor.assert_called_once_with(
|
||||
self.resource_id, self.resource_updates)
|
||||
self.resource, self.resource_updates)
|
||||
|
||||
def test_delete_health_monitor(self):
|
||||
self.ep.delete_health_monitor(self.context, self.resource_id)
|
||||
self.ep.delete_health_monitor(self.context, self.resource)
|
||||
self.ep.worker.delete_health_monitor.assert_called_once_with(
|
||||
self.resource_id)
|
||||
self.resource)
|
||||
|
||||
def test_create_member(self):
|
||||
self.ep.create_member(self.context, self.resource)
|
||||
|
@ -54,8 +54,9 @@ class TestHealthMonitorFlows(base.TestCase):
|
||||
self.assertIn(constants.LOADBALANCER, health_mon_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER_ID, health_mon_flow.requires)
|
||||
self.assertIn(constants.POOL_ID, health_mon_flow.requires)
|
||||
self.assertIn(constants.PROJECT_ID, health_mon_flow.requires)
|
||||
|
||||
self.assertEqual(5, len(health_mon_flow.requires))
|
||||
self.assertEqual(6, len(health_mon_flow.requires))
|
||||
self.assertEqual(0, len(health_mon_flow.provides))
|
||||
|
||||
def test_get_update_health_monitor_flow(self):
|
||||
|
@ -114,9 +114,15 @@ class TestDatabaseTasks(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.health_mon_mock = mock.MagicMock()
|
||||
self.health_mon_mock.id = HM_ID
|
||||
self.health_mon_mock.pool_id = POOL_ID
|
||||
self.db_health_mon_mock = mock.MagicMock()
|
||||
self.db_health_mon_mock.id = HM_ID
|
||||
self.db_health_mon_mock.pool_id = POOL_ID
|
||||
|
||||
self.health_mon_mock = {
|
||||
constants.HEALTHMONITOR_ID: HM_ID,
|
||||
constants.POOL_ID: POOL_ID,
|
||||
'admin_state_up': True,
|
||||
}
|
||||
|
||||
self.listener_mock = mock.MagicMock()
|
||||
self.listener_mock.id = LISTENER_ID
|
||||
@ -130,7 +136,11 @@ class TestDatabaseTasks(base.TestCase):
|
||||
|
||||
self.db_pool_mock = mock.MagicMock()
|
||||
self.db_pool_mock.id = POOL_ID
|
||||
self.db_pool_mock.health_monitor = self.health_mon_mock
|
||||
self.db_pool_mock.health_monitor = self.db_health_mon_mock
|
||||
self.db_health_mon_mock.to_dict.return_value = {
|
||||
constants.ID: HM_ID,
|
||||
constants.POOL_ID: POOL_ID,
|
||||
}
|
||||
|
||||
self.member_mock = {
|
||||
constants.MEMBER_ID: MEMBER_ID,
|
||||
@ -262,7 +272,9 @@ class TestDatabaseTasks(base.TestCase):
|
||||
|
||||
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
|
||||
@mock.patch('octavia.db.repositories.HealthMonitorRepository.delete')
|
||||
@mock.patch('octavia.db.repositories.PoolRepository.get')
|
||||
def test_delete_health_monitor_in_db_by_pool(self,
|
||||
mock_pool_repo_get,
|
||||
mock_health_mon_repo_delete,
|
||||
mock_health_mon_repo_update,
|
||||
mock_generate_uuid,
|
||||
@ -272,9 +284,9 @@ class TestDatabaseTasks(base.TestCase):
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update,
|
||||
mock_amphora_repo_delete):
|
||||
|
||||
mock_pool_repo_get.return_value = self.db_pool_mock
|
||||
delete_health_mon = database_tasks.DeleteHealthMonitorInDBByPool()
|
||||
delete_health_mon.execute(self.db_pool_mock)
|
||||
delete_health_mon.execute(POOL_ID)
|
||||
|
||||
repo.HealthMonitorRepository.delete.assert_called_once_with(
|
||||
'TEST',
|
||||
@ -282,7 +294,7 @@ class TestDatabaseTasks(base.TestCase):
|
||||
|
||||
# Test the revert
|
||||
mock_health_mon_repo_delete.reset_mock()
|
||||
delete_health_mon.revert(self.db_pool_mock)
|
||||
delete_health_mon.revert(POOL_ID)
|
||||
|
||||
repo.HealthMonitorRepository.update.assert_called_once_with(
|
||||
'TEST', id=HM_ID, provisioning_status=constants.ERROR)
|
||||
@ -2059,7 +2071,9 @@ class TestDatabaseTasks(base.TestCase):
|
||||
update_server_group_info.revert(LB_ID, SERVER_GROUP_ID)
|
||||
|
||||
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
|
||||
@mock.patch('octavia.db.repositories.HealthMonitorRepository.get')
|
||||
def test_mark_health_mon_active_in_db(self,
|
||||
mock_health_mon_repo_get,
|
||||
mock_health_mon_repo_update,
|
||||
mock_generate_uuid,
|
||||
mock_LOG,
|
||||
@ -2068,7 +2082,7 @@ class TestDatabaseTasks(base.TestCase):
|
||||
mock_listener_repo_update,
|
||||
mock_amphora_repo_update,
|
||||
mock_amphora_repo_delete):
|
||||
|
||||
mock_health_mon_repo_get.return_value = self.db_health_mon_mock
|
||||
mark_health_mon_active = (database_tasks.MarkHealthMonitorActiveInDB())
|
||||
mark_health_mon_active.execute(self.health_mon_mock)
|
||||
|
||||
|
@ -153,9 +153,10 @@ class TestDatabaseTasksQuota(base.TestCase):
|
||||
self.assertFalse(mock_check_quota_met.called)
|
||||
|
||||
def test_decrement_health_monitor_quota(self):
|
||||
project_id = uuidutils.generate_uuid()
|
||||
task = database_tasks.DecrementHealthMonitorQuota()
|
||||
data_model = data_models.HealthMonitor
|
||||
self._test_decrement_quota(task, data_model)
|
||||
self._test_decrement_quota(task, data_model, project_id=project_id)
|
||||
|
||||
def test_decrement_listener_quota(self):
|
||||
project_id = uuidutils.generate_uuid()
|
||||
|
@ -27,9 +27,6 @@ class TestLifecycleTasks(base.TestCase):
|
||||
self.AMPHORA = mock.MagicMock()
|
||||
self.AMPHORA_ID = uuidutils.generate_uuid()
|
||||
self.AMPHORA.id = self.AMPHORA_ID
|
||||
self.HEALTH_MON = mock.MagicMock()
|
||||
self.HEALTH_MON_ID = uuidutils.generate_uuid()
|
||||
self.HEALTH_MON.pool_id = self.HEALTH_MON_ID
|
||||
self.L7POLICY = mock.MagicMock()
|
||||
self.L7POLICY_ID = uuidutils.generate_uuid()
|
||||
self.L7POLICY.id = self.L7POLICY_ID
|
||||
@ -50,6 +47,11 @@ class TestLifecycleTasks(base.TestCase):
|
||||
self.POOL = mock.MagicMock()
|
||||
self.POOL_ID = uuidutils.generate_uuid()
|
||||
self.POOL.id = self.POOL_ID
|
||||
self.HEALTH_MON_ID = uuidutils.generate_uuid()
|
||||
self.HEALTH_MON = {
|
||||
constants.HEALTHMONITOR_ID: self.HEALTH_MON_ID,
|
||||
constants.POOL_ID: self.POOL_ID,
|
||||
}
|
||||
|
||||
super(TestLifecycleTasks, self).setUp()
|
||||
|
||||
@ -123,7 +125,7 @@ class TestLifecycleTasks(base.TestCase):
|
||||
self.LOADBALANCER)
|
||||
|
||||
mock_health_mon_prov_status_error.assert_called_once_with(
|
||||
self.HEALTH_MON_ID)
|
||||
self.POOL_ID)
|
||||
mock_loadbalancer_prov_status_active.assert_called_once_with(
|
||||
self.LOADBALANCER_ID)
|
||||
mock_listener_prov_status_active.assert_called_once_with(
|
||||
|
@ -53,7 +53,11 @@ _amphora_mock = {
|
||||
constants.LOAD_BALANCER_ID: LB_ID,
|
||||
}
|
||||
_flow_mock = mock.MagicMock()
|
||||
_health_mon_mock = mock.MagicMock()
|
||||
_db_health_mon_mock = mock.MagicMock()
|
||||
_health_mon_mock = {
|
||||
constants.HEALTHMONITOR_ID: HM_ID,
|
||||
constants.POOL_ID: POOL_ID
|
||||
}
|
||||
_vip_mock = mock.MagicMock()
|
||||
_listener_mock = mock.MagicMock()
|
||||
_db_load_balancer_mock = mock.MagicMock()
|
||||
@ -87,7 +91,7 @@ class TestException(Exception):
|
||||
@mock.patch('octavia.db.repositories.AmphoraRepository.get',
|
||||
return_value=_db_amphora_mock)
|
||||
@mock.patch('octavia.db.repositories.HealthMonitorRepository.get',
|
||||
return_value=_health_mon_mock)
|
||||
return_value=_db_health_mon_mock)
|
||||
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get',
|
||||
return_value=_db_load_balancer_mock)
|
||||
@mock.patch('octavia.db.repositories.ListenerRepository.get',
|
||||
@ -112,7 +116,7 @@ class TestControllerWorker(base.TestCase):
|
||||
|
||||
_db_pool_mock.listeners = [_listener_mock]
|
||||
_db_pool_mock.load_balancer = _db_load_balancer_mock
|
||||
_health_mon_mock.pool = _db_pool_mock
|
||||
_db_health_mon_mock.pool = _db_pool_mock
|
||||
_db_load_balancer_mock.amphorae = _db_amphora_mock
|
||||
_db_load_balancer_mock.vip = _vip_mock
|
||||
_db_load_balancer_mock.id = LB_ID
|
||||
@ -134,8 +138,12 @@ class TestControllerWorker(base.TestCase):
|
||||
_flow_mock.storage.fetch = fetch_mock
|
||||
|
||||
_db_pool_mock.id = POOL_ID
|
||||
_health_mon_mock.pool_id = POOL_ID
|
||||
_health_mon_mock.id = HM_ID
|
||||
_db_health_mon_mock.pool_id = POOL_ID
|
||||
_db_health_mon_mock.id = HM_ID
|
||||
_db_health_mon_mock.to_dict.return_value = {
|
||||
'id': HM_ID,
|
||||
constants.POOL_ID: POOL_ID
|
||||
}
|
||||
|
||||
super(TestControllerWorker, self).setUp()
|
||||
|
||||
@ -268,13 +276,12 @@ class TestControllerWorker(base.TestCase):
|
||||
mock_amp_repo_get):
|
||||
|
||||
_flow_mock.reset_mock()
|
||||
mock_health_mon_repo_get.side_effect = [None, _health_mon_mock]
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_health_monitor(_health_mon_mock)
|
||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||
_db_load_balancer_mock).to_dict()
|
||||
|
||||
mock_health_mon_repo_get.return_value = _db_health_mon_mock
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.HEALTH_MON:
|
||||
@ -289,7 +296,6 @@ class TestControllerWorker(base.TestCase):
|
||||
POOL_ID}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
self.assertEqual(2, mock_health_mon_repo_get.call_count)
|
||||
|
||||
@mock.patch('octavia.controller.worker.v2.flows.'
|
||||
'health_monitor_flows.HealthMonitorFlows.'
|
||||
@ -312,10 +318,11 @@ class TestControllerWorker(base.TestCase):
|
||||
_flow_mock.reset_mock()
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_health_monitor(HM_ID)
|
||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||
_db_load_balancer_mock).to_dict()
|
||||
|
||||
cw.delete_health_monitor(_health_mon_mock)
|
||||
mock_health_mon_repo_get.return_value = _db_health_mon_mock
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.HEALTH_MON:
|
||||
@ -327,7 +334,8 @@ class TestControllerWorker(base.TestCase):
|
||||
constants.LOADBALANCER:
|
||||
provider_lb,
|
||||
constants.POOL_ID:
|
||||
POOL_ID}))
|
||||
POOL_ID,
|
||||
constants.PROJECT_ID: PROJECT_ID}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
|
||||
@ -350,10 +358,11 @@ class TestControllerWorker(base.TestCase):
|
||||
mock_amp_repo_get):
|
||||
|
||||
_flow_mock.reset_mock()
|
||||
_health_mon_mock.provisioning_status = constants.PENDING_UPDATE
|
||||
mock_health_mon_repo_get.return_value = _db_health_mon_mock
|
||||
_db_health_mon_mock.provisioning_status = constants.PENDING_UPDATE
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.update_health_monitor(_health_mon_mock.id,
|
||||
cw.update_health_monitor(_health_mon_mock,
|
||||
HEALTH_UPDATE_DICT)
|
||||
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
|
||||
_db_load_balancer_mock).to_dict()
|
||||
|
Loading…
x
Reference in New Issue
Block a user