Adds tasks so that reverts set objects to ERROR

This patch adds capstone tasks to our flows that make sure reverts
end in the proper final state for the impacted objects.
Previously some failure scenarios left objects in PENDING_* state.

To accomplish the above, this patch adds provisioning status
to all of the remaining top level objects(pools, members, etc.)
and adds tasks to maintain the proper provisioning status for
these objects.

Change-Id: I857b1aa28e4a6c7466c3abf4e088a74367e78faf
Closes-Bug: #1623686
This commit is contained in:
Michael Johnson 2016-09-19 22:15:55 +00:00
parent 02df1f7cbc
commit 0e2c927c58
25 changed files with 2685 additions and 147 deletions

View File

@ -151,7 +151,8 @@ class HealthMonitor(BaseDataModel):
def __init__(self, id=None, project_id=None, pool_id=None, type=None,
delay=None, timeout=None, fall_threshold=None,
rise_threshold=None, http_method=None, url_path=None,
expected_codes=None, enabled=None, pool=None):
expected_codes=None, enabled=None, pool=None,
provisioning_status=None):
self.id = id
self.project_id = project_id
self.pool_id = pool_id
@ -165,6 +166,7 @@ class HealthMonitor(BaseDataModel):
self.expected_codes = expected_codes
self.enabled = enabled
self.pool = pool
self.provisioning_status = provisioning_status
def delete(self):
self.pool.health_monitor = None
@ -176,7 +178,7 @@ class Pool(BaseDataModel):
operating_status=None, members=None, health_monitor=None,
session_persistence=None, load_balancer_id=None,
load_balancer=None, listeners=None, l7policies=None,
created_at=None, updated_at=None):
created_at=None, updated_at=None, provisioning_status=None):
self.id = id
self.project_id = project_id
self.name = name
@ -194,6 +196,7 @@ class Pool(BaseDataModel):
self.l7policies = l7policies or []
self.created_at = created_at
self.updated_at = updated_at
self.provisioning_status = provisioning_status
def update(self, update_dict):
for key, value in update_dict.items():
@ -238,7 +241,7 @@ class Member(BaseDataModel):
def __init__(self, id=None, project_id=None, pool_id=None, ip_address=None,
protocol_port=None, weight=None, enabled=None,
subnet_id=None, operating_status=None, pool=None,
created_at=None, updated_at=None):
created_at=None, updated_at=None, provisioning_status=None):
self.id = id
self.project_id = project_id
self.pool_id = pool_id
@ -251,6 +254,7 @@ class Member(BaseDataModel):
self.pool = pool
self.created_at = created_at
self.updated_at = updated_at
self.provisioning_status = provisioning_status
def delete(self):
for mem in self.pool.members:
@ -441,7 +445,7 @@ class L7Rule(BaseDataModel):
def __init__(self, id=None, l7policy_id=None, type=None,
compare_type=None, key=None, value=None, l7policy=None,
invert=False):
invert=False, provisioning_status=None):
self.id = id
self.l7policy_id = l7policy_id
self.type = type
@ -450,6 +454,7 @@ class L7Rule(BaseDataModel):
self.value = value
self.l7policy = l7policy
self.invert = invert
self.provisioning_status = provisioning_status
def delete(self):
if len(self.l7policy.l7rules) == 1:
@ -468,7 +473,7 @@ class L7Policy(BaseDataModel):
def __init__(self, id=None, name=None, description=None, listener_id=None,
action=None, redirect_pool_id=None, redirect_url=None,
position=None, listener=None, redirect_pool=None,
enabled=None, l7rules=None):
enabled=None, l7rules=None, provisioning_status=None):
self.id = id
self.name = name
self.description = description
@ -481,6 +486,7 @@ class L7Policy(BaseDataModel):
self.redirect_pool = redirect_pool
self.enabled = enabled
self.l7rules = l7rules or []
self.provisioning_status = provisioning_status
def _conditionally_remove_pool_links(self, pool):
"""Removes links to the given pool from parent objects.

View File

@ -76,6 +76,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
def create_amphora(self):
"""Creates an Amphora.
This is used to create spare amphora.
:returns: amphora_id
"""
create_amp_tf = self._taskflow_load(self._amphora_flows.

View File

@ -22,6 +22,7 @@ from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import cert_task
from octavia.controller.worker.tasks import compute_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import network_tasks
CONF = cfg.CONF
@ -38,15 +39,13 @@ class AmphoraFlows(object):
def get_create_amphora_flow(self):
"""Creates a flow to create an amphora.
Ideally that should be configurable in the
config file - a db session needs to be placed
into the flow
:returns: The flow for creating the amphora
"""
create_amphora_flow = linear_flow.Flow(constants.CREATE_AMPHORA_FLOW)
create_amphora_flow.add(database_tasks.CreateAmphoraInDB(
provides=constants.AMPHORA_ID))
create_amphora_flow.add(lifecycle_tasks.AmphoraIDToErrorOnRevertTask(
requires=constants.AMPHORA_ID))
if self.REST_AMPHORA_DRIVER:
create_amphora_flow.add(cert_task.GenerateServerPEMTask(
provides=constants.SERVER_PEM))
@ -259,6 +258,8 @@ class AmphoraFlows(object):
"""
delete_amphora_flow = linear_flow.Flow(constants.DELETE_AMPHORA_FLOW)
delete_amphora_flow.add(lifecycle_tasks.AmphoraToErrorOnRevertTask(
requires=constants.AMPHORA))
delete_amphora_flow.add(database_tasks.
MarkAmphoraPendingDeleteInDB(
requires=constants.AMPHORA))
@ -284,6 +285,10 @@ class AmphoraFlows(object):
failover_amphora_flow = linear_flow.Flow(
constants.FAILOVER_AMPHORA_FLOW)
failover_amphora_flow.add(lifecycle_tasks.AmphoraToErrorOnRevertTask(
rebind={constants.AMPHORA: constants.FAILED_AMPHORA},
requires=constants.AMPHORA))
# Delete the old amphora
failover_amphora_flow.add(
database_tasks.MarkAmphoraPendingDeleteInDB(
@ -409,6 +414,9 @@ class AmphoraFlows(object):
rotated_amphora_flow = linear_flow.Flow(
constants.CERT_ROTATE_AMPHORA_FLOW)
rotated_amphora_flow.add(lifecycle_tasks.AmphoraToErrorOnRevertTask(
requires=constants.AMPHORA))
# create a new certificate, the returned value is the newly created
# certificate
rotated_amphora_flow.add(cert_task.GenerateServerPEMTask(

View File

@ -18,6 +18,7 @@ from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import model_tasks
@ -29,8 +30,16 @@ class HealthMonitorFlows(object):
:returns: The flow for creating a health monitor
"""
create_hm_flow = linear_flow.Flow(constants.CREATE_HEALTH_MONITOR_FLOW)
create_hm_flow.add(lifecycle_tasks.HealthMonitorToErrorOnRevertTask(
requires=[constants.HEALTH_MON,
constants.LISTENERS,
constants.LOADBALANCER]))
create_hm_flow.add(database_tasks.MarkHealthMonitorPendingCreateInDB(
requires=constants.HEALTH_MON))
create_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB(
requires=constants.HEALTH_MON))
create_hm_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
@ -42,6 +51,12 @@ class HealthMonitorFlows(object):
:returns: The flow for deleting a health monitor
"""
delete_hm_flow = linear_flow.Flow(constants.DELETE_HEALTH_MONITOR_FLOW)
delete_hm_flow.add(lifecycle_tasks.HealthMonitorToErrorOnRevertTask(
requires=[constants.HEALTH_MON,
constants.LISTENERS,
constants.LOADBALANCER]))
delete_hm_flow.add(database_tasks.MarkHealthMonitorPendingDeleteInDB(
requires=constants.HEALTH_MON))
delete_hm_flow.add(model_tasks.
DeleteModelObject(rebind={constants.OBJECT:
constants.HEALTH_MON}))
@ -49,6 +64,8 @@ class HealthMonitorFlows(object):
requires=[constants.LOADBALANCER, constants.LISTENERS]))
delete_hm_flow.add(database_tasks.DeleteHealthMonitorInDB(
requires=constants.POOL_ID))
delete_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB(
requires=constants.HEALTH_MON))
delete_hm_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
@ -60,6 +77,12 @@ class HealthMonitorFlows(object):
:returns: The flow for updating a health monitor
"""
update_hm_flow = linear_flow.Flow(constants.UPDATE_HEALTH_MONITOR_FLOW)
update_hm_flow.add(lifecycle_tasks.HealthMonitorToErrorOnRevertTask(
requires=[constants.HEALTH_MON,
constants.LISTENERS,
constants.LOADBALANCER]))
update_hm_flow.add(database_tasks.MarkHealthMonitorPendingUpdateInDB(
requires=constants.HEALTH_MON))
update_hm_flow.add(model_tasks.
UpdateAttributes(
rebind={constants.OBJECT: constants.HEALTH_MON},
@ -68,6 +91,8 @@ class HealthMonitorFlows(object):
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_hm_flow.add(database_tasks.UpdateHealthMonInDB(
requires=[constants.HEALTH_MON, constants.UPDATE_DICT]))
update_hm_flow.add(database_tasks.MarkHealthMonitorActiveInDB(
requires=constants.HEALTH_MON))
update_hm_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))

View File

@ -18,6 +18,7 @@ from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import model_tasks
@ -29,8 +30,16 @@ class L7PolicyFlows(object):
:returns: The flow for creating an L7 policy
"""
create_l7policy_flow = linear_flow.Flow(constants.CREATE_L7POLICY_FLOW)
create_l7policy_flow.add(lifecycle_tasks.L7PolicyToErrorOnRevertTask(
requires=[constants.L7POLICY,
constants.LISTENERS,
constants.LOADBALANCER]))
create_l7policy_flow.add(database_tasks.MarkL7PolicyPendingCreateInDB(
requires=constants.L7POLICY))
create_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
create_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
@ -42,12 +51,20 @@ class L7PolicyFlows(object):
:returns: The flow for deleting an L7 policy
"""
delete_l7policy_flow = linear_flow.Flow(constants.DELETE_L7POLICY_FLOW)
delete_l7policy_flow.add(lifecycle_tasks.L7PolicyToErrorOnRevertTask(
requires=[constants.L7POLICY,
constants.LISTENERS,
constants.LOADBALANCER]))
delete_l7policy_flow.add(database_tasks.MarkL7PolicyPendingDeleteInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.L7POLICY}))
delete_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
delete_l7policy_flow.add(database_tasks.DeleteL7PolicyInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
@ -59,6 +76,12 @@ class L7PolicyFlows(object):
:returns: The flow for updating an L7 policy
"""
update_l7policy_flow = linear_flow.Flow(constants.UPDATE_L7POLICY_FLOW)
update_l7policy_flow.add(lifecycle_tasks.L7PolicyToErrorOnRevertTask(
requires=[constants.L7POLICY,
constants.LISTENERS,
constants.LOADBALANCER]))
update_l7policy_flow.add(database_tasks.MarkL7PolicyPendingUpdateInDB(
requires=constants.L7POLICY))
update_l7policy_flow.add(
model_tasks.UpdateAttributes(
rebind={constants.OBJECT: constants.L7POLICY},
@ -67,6 +90,8 @@ class L7PolicyFlows(object):
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_l7policy_flow.add(database_tasks.UpdateL7PolicyInDB(
requires=[constants.L7POLICY, constants.UPDATE_DICT]))
update_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
update_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))

View File

@ -18,6 +18,7 @@ from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import model_tasks
@ -29,8 +30,16 @@ class L7RuleFlows(object):
:returns: The flow for creating an L7 rule
"""
create_l7rule_flow = linear_flow.Flow(constants.CREATE_L7RULE_FLOW)
create_l7rule_flow.add(lifecycle_tasks.L7RuleToErrorOnRevertTask(
requires=[constants.L7RULE,
constants.LISTENERS,
constants.LOADBALANCER]))
create_l7rule_flow.add(database_tasks.MarkL7RulePendingCreateInDB(
requires=constants.L7RULE))
create_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB(
requires=constants.L7RULE))
create_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
@ -42,12 +51,20 @@ class L7RuleFlows(object):
:returns: The flow for deleting an L7 rule
"""
delete_l7rule_flow = linear_flow.Flow(constants.DELETE_L7RULE_FLOW)
delete_l7rule_flow.add(lifecycle_tasks.L7RuleToErrorOnRevertTask(
requires=[constants.L7RULE,
constants.LISTENERS,
constants.LOADBALANCER]))
delete_l7rule_flow.add(database_tasks.MarkL7RulePendingDeleteInDB(
requires=constants.L7RULE))
delete_l7rule_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.L7RULE}))
delete_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
delete_l7rule_flow.add(database_tasks.DeleteL7RuleInDB(
requires=constants.L7RULE))
delete_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB(
requires=constants.L7RULE))
delete_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
@ -59,6 +76,12 @@ class L7RuleFlows(object):
:returns: The flow for updating an L7 rule
"""
update_l7rule_flow = linear_flow.Flow(constants.UPDATE_L7RULE_FLOW)
update_l7rule_flow.add(lifecycle_tasks.L7RuleToErrorOnRevertTask(
requires=[constants.L7RULE,
constants.LISTENERS,
constants.LOADBALANCER]))
update_l7rule_flow.add(database_tasks.MarkL7RulePendingUpdateInDB(
requires=constants.L7RULE))
update_l7rule_flow.add(
model_tasks.UpdateAttributes(
rebind={constants.OBJECT: constants.L7RULE},
@ -67,6 +90,8 @@ class L7RuleFlows(object):
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_l7rule_flow.add(database_tasks.UpdateL7RuleInDB(
requires=[constants.L7RULE, constants.UPDATE_DICT]))
update_l7rule_flow.add(database_tasks.MarkL7RuleActiveInDB(
requires=constants.L7RULE))
update_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))

View File

@ -18,6 +18,7 @@ from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import model_tasks
from octavia.controller.worker.tasks import network_tasks
@ -30,6 +31,8 @@ class ListenerFlows(object):
:returns: The flow for creating a listener
"""
create_listener_flow = linear_flow.Flow(constants.CREATE_LISTENER_FLOW)
create_listener_flow.add(lifecycle_tasks.ListenersToErrorOnRevertTask(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_listener_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_listener_flow.add(network_tasks.UpdateVIP(
@ -66,6 +69,8 @@ class ListenerFlows(object):
:returns: The flow for deleting a listener
"""
delete_listener_flow = linear_flow.Flow(constants.DELETE_LISTENER_FLOW)
delete_listener_flow.add(lifecycle_tasks.ListenerToErrorOnRevertTask(
requires=constants.LISTENER))
delete_listener_flow.add(amphora_driver_tasks.ListenerDelete(
requires=[constants.LOADBALANCER, constants.LISTENER]))
delete_listener_flow.add(network_tasks.UpdateVIP(
@ -78,7 +83,7 @@ class ListenerFlows(object):
return delete_listener_flow
def get_delete_listener_internal_flow(self, listener_name):
"""Create a flow to delete a listener and associated l7policies internally
"""Create a flow to delete a listener and l7policies internally
(will skip deletion on the amp and marking LB active)
:returns: The flow for deleting a listener
@ -102,6 +107,8 @@ class ListenerFlows(object):
:returns: The flow for updating a listener
"""
update_listener_flow = linear_flow.Flow(constants.UPDATE_LISTENER_FLOW)
update_listener_flow.add(lifecycle_tasks.ListenersToErrorOnRevertTask(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_listener_flow.add(model_tasks.
UpdateAttributes(
rebind={constants.OBJECT:

View File

@ -27,6 +27,7 @@ from octavia.controller.worker.flows import pool_flows
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import compute_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import model_tasks
from octavia.controller.worker.tasks import network_tasks
from octavia.i18n import _LE
@ -56,6 +57,9 @@ class LoadBalancerFlows(object):
f_name = constants.CREATE_LOADBALANCER_FLOW
lb_create_flow = linear_flow.Flow(f_name)
lb_create_flow.add(lifecycle_tasks.LoadBalancerIDToErrorOnRevertTask(
requires=constants.LOADBALANCER_ID))
if topology == constants.TOPOLOGY_ACTIVE_STANDBY:
lb_create_flow.add(*self._create_active_standby_topology())
elif topology == constants.TOPOLOGY_SINGLE:
@ -211,6 +215,8 @@ class LoadBalancerFlows(object):
(listeners_delete, store) = self._get_delete_listeners_flow(lb)
delete_LB_flow = linear_flow.Flow(constants.DELETE_LOADBALANCER_FLOW)
delete_LB_flow.add(lifecycle_tasks.LoadBalancerToErrorOnRevertTask(
requires=constants.LOADBALANCER))
delete_LB_flow.add(compute_tasks.NovaServerGroupDelete(
requires=constants.SERVER_GROUP_ID))
delete_LB_flow.add(database_tasks.MarkLBAmphoraeHealthBusy(
@ -261,6 +267,8 @@ class LoadBalancerFlows(object):
store.update(pool_store)
delete_LB_flow = linear_flow.Flow(constants.DELETE_LOADBALANCER_FLOW)
delete_LB_flow.add(lifecycle_tasks.LoadBalancerToErrorOnRevertTask(
requires=constants.LOADBALANCER))
delete_LB_flow.add(database_tasks.MarkLBAmphoraeHealthBusy(
requires=constants.LOADBALANCER))
delete_LB_flow.add(pools_delete)
@ -318,6 +326,8 @@ class LoadBalancerFlows(object):
:returns: The flow for update a load balancer
"""
update_LB_flow = linear_flow.Flow(constants.UPDATE_LOADBALANCER_FLOW)
update_LB_flow.add(lifecycle_tasks.LoadBalancerToErrorOnRevertTask(
requires=constants.LOADBALANCER))
update_LB_flow.add(model_tasks.
UpdateAttributes(
rebind={constants.OBJECT:

View File

@ -18,6 +18,7 @@ from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import model_tasks
from octavia.controller.worker.tasks import network_tasks
@ -30,6 +31,12 @@ class MemberFlows(object):
:returns: The flow for creating a member
"""
create_member_flow = linear_flow.Flow(constants.CREATE_MEMBER_FLOW)
create_member_flow.add(lifecycle_tasks.MemberToErrorOnRevertTask(
requires=[constants.MEMBER,
constants.LISTENERS,
constants.LOADBALANCER]))
create_member_flow.add(database_tasks.MarkMemberPendingCreateInDB(
requires=constants.MEMBER))
create_member_flow.add(network_tasks.CalculateDelta(
requires=constants.LOADBALANCER,
provides=constants.DELTAS))
@ -40,6 +47,8 @@ class MemberFlows(object):
))
create_member_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=(constants.LOADBALANCER, constants.LISTENERS)))
create_member_flow.add(database_tasks.MarkMemberActiveInDB(
requires=constants.MEMBER))
create_member_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=(constants.LOADBALANCER,
@ -53,6 +62,12 @@ class MemberFlows(object):
:returns: The flow for deleting a member
"""
delete_member_flow = linear_flow.Flow(constants.DELETE_MEMBER_FLOW)
delete_member_flow.add(lifecycle_tasks.MemberToErrorOnRevertTask(
requires=[constants.MEMBER,
constants.LISTENERS,
constants.LOADBALANCER]))
delete_member_flow.add(database_tasks.MarkMemberPendingDeleteInDB(
requires=constants.MEMBER))
delete_member_flow.add(model_tasks.
DeleteModelObject(rebind={constants.OBJECT:
constants.MEMBER}))
@ -60,6 +75,8 @@ class MemberFlows(object):
requires=constants.MEMBER))
delete_member_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
delete_member_flow.add(database_tasks.MarkMemberActiveInDB(
requires=constants.MEMBER))
delete_member_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER,
@ -73,6 +90,12 @@ class MemberFlows(object):
:returns: The flow for updating a member
"""
update_member_flow = linear_flow.Flow(constants.UPDATE_MEMBER_FLOW)
update_member_flow.add(lifecycle_tasks.MemberToErrorOnRevertTask(
requires=[constants.MEMBER,
constants.LISTENERS,
constants.LOADBALANCER]))
update_member_flow.add(database_tasks.MarkMemberPendingUpdateInDB(
requires=constants.MEMBER))
update_member_flow.add(model_tasks.
UpdateAttributes(
rebind={constants.OBJECT: constants.MEMBER},
@ -81,6 +104,8 @@ class MemberFlows(object):
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_member_flow.add(database_tasks.UpdateMemberInDB(
requires=[constants.MEMBER, constants.UPDATE_DICT]))
update_member_flow.add(database_tasks.MarkMemberActiveInDB(
requires=constants.MEMBER))
update_member_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER,

View File

@ -18,6 +18,7 @@ from taskflow.patterns import linear_flow
from octavia.common import constants
from octavia.controller.worker.tasks import amphora_driver_tasks
from octavia.controller.worker.tasks import database_tasks
from octavia.controller.worker.tasks import lifecycle_tasks
from octavia.controller.worker.tasks import model_tasks
@ -29,8 +30,16 @@ class PoolFlows(object):
:returns: The flow for creating a pool
"""
create_pool_flow = linear_flow.Flow(constants.CREATE_POOL_FLOW)
create_pool_flow.add(lifecycle_tasks.PoolToErrorOnRevertTask(
requires=[constants.POOL,
constants.LISTENERS,
constants.LOADBALANCER]))
create_pool_flow.add(database_tasks.MarkPoolPendingCreateInDB(
requires=constants.POOL))
create_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
create_pool_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
create_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
@ -42,6 +51,12 @@ class PoolFlows(object):
:returns: The flow for deleting a pool
"""
delete_pool_flow = linear_flow.Flow(constants.DELETE_POOL_FLOW)
delete_pool_flow.add(lifecycle_tasks.PoolToErrorOnRevertTask(
requires=[constants.POOL,
constants.LISTENERS,
constants.LOADBALANCER]))
delete_pool_flow.add(database_tasks.MarkPoolPendingDeleteInDB(
requires=constants.POOL))
delete_pool_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.POOL}))
delete_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
@ -74,6 +89,12 @@ class PoolFlows(object):
:returns: The flow for updating a pool
"""
update_pool_flow = linear_flow.Flow(constants.UPDATE_POOL_FLOW)
update_pool_flow.add(lifecycle_tasks.PoolToErrorOnRevertTask(
requires=[constants.POOL,
constants.LISTENERS,
constants.LOADBALANCER]))
update_pool_flow.add(database_tasks.MarkPoolPendingUpdateInDB(
requires=constants.POOL))
update_pool_flow.add(model_tasks.
UpdateAttributes(
rebind={constants.OBJECT: constants.POOL},
@ -82,6 +103,8 @@ class PoolFlows(object):
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_pool_flow.add(database_tasks.UpdatePoolInDB(
requires=[constants.POOL, constants.UPDATE_DICT]))
update_pool_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
update_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))

View File

@ -0,0 +1,204 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Methods common to the controller work tasks."""
import logging
from octavia.common import constants
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.i18n import _LE
LOG = logging.getLogger(__name__)
class TaskUtils(object):
"""Class of helper/utility methods used by tasks."""
def __init__(self, **kwargs):
self.amphora_repo = repo.AmphoraRepository()
self.health_mon_repo = repo.HealthMonitorRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.member_repo = repo.MemberRepository()
self.pool_repo = repo.PoolRepository()
self.amp_health_repo = repo.AmphoraHealthRepository()
self.l7policy_repo = repo.L7PolicyRepository()
self.l7rule_repo = repo.L7RuleRepository()
super(TaskUtils, self).__init__(**kwargs)
def mark_amphora_status_error(self, amphora_id):
"""Sets an amphora status to ERROR.
NOTE: This should only be called from revert methods.
:param amphora_id: Amphora ID to set the status to ERROR
"""
try:
self.amphora_repo.update(db_apis.get_session(),
id=amphora_id,
status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update amphora %(amp)s "
"status to ERROR due to: "
"%(except)s"), {'amp': amphora_id, 'except': e})
def mark_health_mon_prov_status_error(self, health_mon_id):
"""Sets a health monitor provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param health_mon_id: Health Monitor ID to set prov status to ERROR
"""
try:
self.health_mon_repo.update(db_apis.get_session(),
pool_id=health_mon_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update health monitor %(health)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'health': health_mon_id,
'except': e})
def mark_l7policy_prov_status_error(self, l7policy_id):
"""Sets a L7 policy provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param l7policy_id: L7 Policy ID to set provisioning status to ERROR
"""
try:
self.l7policy_repo.update(db_apis.get_session(),
id=l7policy_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update l7policy %(l7p)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'l7p': l7policy_id, 'except': e})
def mark_l7rule_prov_status_error(self, l7rule_id):
"""Sets a L7 rule provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param l7rule_id: L7 Rule ID to set provisioning status to ERROR
"""
try:
self.l7rule_repo.update(db_apis.get_session(),
id=l7rule_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update l7rule %(l7r)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'l7r': l7rule_id, 'except': e})
def mark_listener_prov_status_error(self, listener_id):
"""Sets a listener provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param listener_id: Listener ID to set provisioning status to ERROR
"""
try:
self.listener_repo.update(db_apis.get_session(),
id=listener_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update listener %(list)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'list': listener_id, 'except': e})
def mark_loadbalancer_prov_status_error(self, loadbalancer_id):
"""Sets a load balancer provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param loadbalancer_id: Load balancer ID to set provisioning
status to ERROR
"""
try:
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update load balancer %(lb)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'lb': loadbalancer_id, 'except': e})
def mark_listener_prov_status_active(self, listener_id):
"""Sets a listener provisioning status to ACTIVE.
NOTE: This should only be called from revert methods.
:param listener_id: Listener ID to set provisioning
status to ACTIVE
"""
try:
self.listener_repo.update(db_apis.get_session(),
id=listener_id,
provisioning_status=constants.ACTIVE)
except Exception as e:
LOG.error(_LE("Failed to update listener %(list)s "
"provisioning status to ACTIVE due to: "
"%(except)s"), {'list': listener_id, 'except': e})
def mark_loadbalancer_prov_status_active(self, loadbalancer_id):
"""Sets a load balancer provisioning status to ACTIVE.
NOTE: This should only be called from revert methods.
:param loadbalancer_id: Load balancer ID to set provisioning
status to ACTIVE
"""
try:
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer_id,
provisioning_status=constants.ACTIVE)
except Exception as e:
LOG.error(_LE("Failed to update load balancer %(lb)s "
"provisioning status to ACTIVE due to: "
"%(except)s"), {'lb': loadbalancer_id, 'except': e})
def mark_member_prov_status_error(self, member_id):
"""Sets a member provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param member_id: Member ID to set provisioning status to ERROR
"""
try:
self.member_repo.update(db_apis.get_session(),
id=member_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update member %(member)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'member': member_id, 'except': e})
def mark_pool_prov_status_error(self, pool_id):
"""Sets a pool provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param pool_id: Pool ID to set provisioning status to ERROR
"""
try:
self.pool_repo.update(db_apis.get_session(),
id=pool_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update pool %(pool)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'pool': pool_id, 'except': e})

View File

@ -22,6 +22,7 @@ from taskflow import task
from taskflow.types import failure
from octavia.common import constants
from octavia.controller.worker import task_utils as task_utilities
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.i18n import _LE, _LW
@ -44,57 +45,7 @@ class BaseAmphoraTask(task.Task):
self.amphora_repo = repo.AmphoraRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
def _mark_amphora_status_error(self, amphora_id):
"""Sets an amphora status to ERROR.
NOTE: This should only be called from revert methods.
:param amphora_id: Amphora ID to set the status to ERROR
"""
try:
self.amphora_repo.update(db_apis.get_session(), id=amphora_id,
status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update amphora %(amp)s "
"status to ERROR due to: "
"%(except)s"), {'amp': amphora_id,
'except': e})
def _mark_listener_prov_status_error(self, listener_id):
"""Sets a listener provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param listener_id: Listener ID to set provisioning status to ERROR
"""
try:
self.listener_repo.update(db_apis.get_session(),
id=listener_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update listener %(list)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'list': listener_id,
'except': e})
def _mark_loadbalancer_prov_status_error(self, loadbalancer_id):
"""Sets a load balancer provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param loadbalancer_id: Load balancer ID to set provisioning
status to ERROR
"""
try:
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update load balancer %(lb)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'lb': loadbalancer_id,
'except': e})
self.task_utils = task_utilities.TaskUtils()
class ListenersUpdate(BaseAmphoraTask):
@ -112,7 +63,7 @@ class ListenersUpdate(BaseAmphoraTask):
LOG.warning(_LW("Reverting listeners updates."))
for listener in loadbalancer.listeners:
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
return None
@ -130,7 +81,7 @@ class ListenerStop(BaseAmphoraTask):
LOG.warning(_LW("Reverting listener stop."))
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
return None
@ -148,7 +99,7 @@ class ListenerStart(BaseAmphoraTask):
LOG.warning(_LW("Reverting listener start."))
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
return None
@ -167,7 +118,7 @@ class ListenersStart(BaseAmphoraTask):
LOG.warning(_LW("Reverting listeners starts."))
for listener in listeners:
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
return None
@ -185,7 +136,7 @@ class ListenerDelete(BaseAmphoraTask):
LOG.warning(_LW("Reverting listener delete."))
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class AmphoraGetInfo(BaseAmphoraTask):
@ -217,7 +168,7 @@ class AmphoraFinalize(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warning(_LW("Reverting amphora finalize."))
self._mark_amphora_status_error(amphora.id)
self.task_utils.mark_amphora_status_error(amphora.id)
class AmphoraPostNetworkPlug(BaseAmphoraTask):
@ -236,7 +187,7 @@ class AmphoraPostNetworkPlug(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warning(_LW("Reverting post network plug."))
self._mark_amphora_status_error(amphora.id)
self.task_utils.mark_amphora_status_error(amphora.id)
class AmphoraePostNetworkPlug(BaseAmphoraTask):
@ -258,7 +209,7 @@ class AmphoraePostNetworkPlug(BaseAmphoraTask):
lambda amp: amp.status == constants.AMPHORA_ALLOCATED,
loadbalancer.amphorae):
self._mark_amphora_status_error(amphora.id)
self.task_utils.mark_amphora_status_error(amphora.id)
class AmphoraPostVIPPlug(BaseAmphoraTask):
@ -275,8 +226,8 @@ class AmphoraPostVIPPlug(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warning(_LW("Reverting post vip plug."))
self._mark_amphora_status_error(amphora.id)
self._mark_loadbalancer_prov_status_error(loadbalancer.id)
self.task_utils.mark_amphora_status_error(amphora.id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
class AmphoraePostVIPPlug(BaseAmphoraTask):
@ -295,7 +246,7 @@ class AmphoraePostVIPPlug(BaseAmphoraTask):
if isinstance(result, failure.Failure):
return
LOG.warning(_LW("Reverting amphorae post vip plug."))
self._mark_loadbalancer_prov_status_error(loadbalancer.id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
class AmphoraCertUpload(BaseAmphoraTask):

View File

@ -26,6 +26,7 @@ from taskflow.types import failure
from octavia.common import constants
from octavia.common import data_models
import octavia.common.tls_utils.cert_parser as cert_parser
from octavia.controller.worker import task_utils as task_utilities
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.i18n import _LE, _LI, _LW
@ -49,6 +50,7 @@ class BaseDatabaseTask(task.Task):
self.amp_health_repo = repo.AmphoraHealthRepository()
self.l7policy_repo = repo.L7PolicyRepository()
self.l7rule_repo = repo.L7RuleRepository()
self.task_utils = task_utilities.TaskUtils()
super(BaseDatabaseTask, self).__init__(**kwargs)
def _delete_from_amp_health(self, amphora_id):
@ -80,57 +82,6 @@ class BaseDatabaseTask(task.Task):
LOG.debug('No existing amphora health record to mark busy '
'for amphora: %s, skipping.', amphora_id)
def _mark_amphora_status_error(self, amphora_id):
"""Sets an amphora status to ERROR.
NOTE: This should only be called from revert methods.
:param amphora_id: Amphora ID to set the status to ERROR
"""
try:
self.amphora_repo.update(db_apis.get_session(), id=amphora_id,
status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update amphora %(amp)s "
"status to ERROR due to: "
"%(except)s"), {'amp': amphora_id,
'except': e})
def _mark_listener_prov_status_error(self, listener_id):
"""Sets a listener provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param listener_id: Listener ID to set provisioning status to ERROR
"""
try:
self.listener_repo.update(db_apis.get_session(),
id=listener_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update listener %(list)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'list': listener_id,
'except': e})
def _mark_loadbalancer_prov_status_error(self, loadbalancer_id):
"""Sets a load balancer provisioning status to ERROR.
NOTE: This should only be called from revert methods.
:param loadbalancer_id: Load balancer ID to set provisioning
status to ERROR
"""
try:
self.loadbalancer_repo.update(db_apis.get_session(),
id=loadbalancer_id,
provisioning_status=constants.ERROR)
except Exception as e:
LOG.error(_LE("Failed to update load balancer %(lb)s "
"provisioning status to ERROR due to: "
"%(except)s"), {'lb': loadbalancer_id,
'except': e})
class CreateAmphoraInDB(BaseDatabaseTask):
"""Task to create an initial amphora in the Database."""
@ -546,7 +497,7 @@ class MapLoadbalancerToAmphora(BaseDatabaseTask):
def revert(self, result, loadbalancer_id, *args, **kwargs):
LOG.warning(_LW("Reverting Amphora allocation for the load "
"balancer %s in the database."), loadbalancer_id)
self._mark_loadbalancer_prov_status_error(loadbalancer_id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer_id)
class _MarkAmphoraRoleAndPriorityInDB(BaseDatabaseTask):
@ -697,7 +648,7 @@ class MarkAmphoraAllocatedInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark amphora ready in DB for amp "
"id %(amp)s and compute id %(comp)s"),
{'amp': amphora.id, 'comp': amphora.compute_id})
self._mark_amphora_status_error(amphora.id)
self.task_utils.mark_amphora_status_error(amphora.id)
class MarkAmphoraBootingInDB(BaseDatabaseTask):
@ -772,7 +723,7 @@ class MarkAmphoraDeletedInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark amphora deleted in DB "
"for amp id %(amp)s and compute id %(comp)s"),
{'amp': amphora.id, 'comp': amphora.compute_id})
self._mark_amphora_status_error(amphora.id)
self.task_utils.mark_amphora_status_error(amphora.id)
class MarkAmphoraPendingDeleteInDB(BaseDatabaseTask):
@ -804,7 +755,7 @@ class MarkAmphoraPendingDeleteInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark amphora pending delete in DB "
"for amp id %(amp)s and compute id %(comp)s"),
{'amp': amphora.id, 'comp': amphora.compute_id})
self._mark_amphora_status_error(amphora.id)
self.task_utils.mark_amphora_status_error(amphora.id)
class MarkAmphoraPendingUpdateInDB(BaseDatabaseTask):
@ -836,7 +787,7 @@ class MarkAmphoraPendingUpdateInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark amphora pending update in DB "
"for amp id %(amp)s and compute id %(comp)s"),
{'amp': amphora.id, 'comp': amphora.compute_id})
self._mark_amphora_status_error(amphora.id)
self.task_utils.mark_amphora_status_error(amphora.id)
class MarkAmphoraReadyInDB(BaseDatabaseTask):
@ -1005,7 +956,7 @@ class MarkLBActiveInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark load balancer deleted in DB "
"for load balancer id %s"), loadbalancer.id)
self._mark_loadbalancer_prov_status_error(loadbalancer.id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
class UpdateLBServerGroupInDB(BaseDatabaseTask):
@ -1076,7 +1027,7 @@ class MarkLBDeletedInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark load balancer deleted in DB "
"for load balancer id %s"), loadbalancer.id)
self._mark_loadbalancer_prov_status_error(loadbalancer.id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
class MarkLBPendingDeleteInDB(BaseDatabaseTask):
@ -1108,7 +1059,7 @@ class MarkLBPendingDeleteInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark load balancer pending delete in DB "
"for load balancer id %s"), loadbalancer.id)
self._mark_loadbalancer_prov_status_error(loadbalancer.id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
@ -1149,9 +1100,9 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
"listener ids: %(list)s"),
{'LB': loadbalancer.id,
'list': ', '.join([l.id for l in listeners])})
self._mark_loadbalancer_prov_status_error(loadbalancer.id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
for listener in listeners:
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerActiveInDB(BaseDatabaseTask):
@ -1180,7 +1131,7 @@ class MarkListenerActiveInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark listener active in DB "
"for listener id %s"), listener.id)
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerDeletedInDB(BaseDatabaseTask):
@ -1209,7 +1160,7 @@ class MarkListenerDeletedInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark listener deleted in DB "
"for listener id %s"), listener.id)
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class MarkListenerPendingDeleteInDB(BaseDatabaseTask):
@ -1239,7 +1190,7 @@ class MarkListenerPendingDeleteInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting mark listener pending delete in DB "
"for listener id %s"), listener.id)
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class UpdateLoadbalancerInDB(BaseDatabaseTask):
@ -1270,7 +1221,7 @@ class UpdateLoadbalancerInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting update loadbalancer in DB "
"for loadbalancer id %s"), loadbalancer.id)
self._mark_loadbalancer_prov_status_error(loadbalancer.id)
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
class UpdateHealthMonInDB(BaseDatabaseTask):
@ -1339,7 +1290,7 @@ class UpdateListenerInDB(BaseDatabaseTask):
LOG.warning(_LW("Reverting update listener in DB "
"for listener id %s"), listener.id)
self._mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
class UpdateMemberInDB(BaseDatabaseTask):
@ -1630,3 +1581,624 @@ class MarkLBAmphoraeHealthBusy(BaseDatabaseTask):
"""
for amphora in loadbalancer.amphorae:
self._mark_amp_health_busy(amphora.id)
class MarkHealthMonitorActiveInDB(BaseDatabaseTask):
"""Mark the health monitor ACTIVE in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, health_mon):
"""Mark the health monitor ACTIVE in DB.
:param health_mon: Health Monitor object to be updated
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for health monitor id: %s",
health_mon.pool_id)
self.health_mon_repo.update(db_apis.get_session(),
health_mon.pool_id,
provisioning_status=constants.ACTIVE)
def revert(self, health_mon, *args, **kwargs):
"""Mark the health monitor as broken
:param health_mon: Health Monitor object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark health montor ACTIVE in DB "
"for health monitor id %s"), health_mon.pool_id)
self.task_utils.mark_health_mon_prov_status_error(health_mon.pool_id)
class MarkHealthMonitorPendingCreateInDB(BaseDatabaseTask):
"""Mark the health monitor pending create in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, health_mon):
"""Mark the health monitor as pending create in DB.
:param health_mon: Health Monitor object to be updated
:returns: None
"""
LOG.debug("Mark PENDING CREATE in DB for health monitor id: %s",
health_mon.pool_id)
self.health_mon_repo.update(db_apis.get_session(),
health_mon.pool_id,
provisioning_status=(constants.
PENDING_CREATE))
def revert(self, health_mon, *args, **kwargs):
"""Mark the health monitor as broken
:param health_mon: Health Monitor object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark health monitor pending create in DB "
"for health monitor id %s"), health_mon.pool_id)
self.task_utils.mark_health_mon_prov_status_error(health_mon.pool_id)
class MarkHealthMonitorPendingDeleteInDB(BaseDatabaseTask):
"""Mark the health monitor pending delete in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, health_mon):
"""Mark the health monitor as pending delete in DB.
:param health_mon: Health Monitor object to be updated
:returns: None
"""
LOG.debug("Mark PENDING DELETE in DB for health monitor id: %s",
health_mon.pool_id)
self.health_mon_repo.update(db_apis.get_session(),
health_mon.pool_id,
provisioning_status=(constants.
PENDING_DELETE))
def revert(self, health_mon, *args, **kwargs):
"""Mark the health monitor as broken
:param health_mon: Health Monitor object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark health monitor pending delete in DB "
"for health monitor id %s"), health_mon.pool_id)
self.task_utils.mark_health_mon_prov_status_error(health_mon.pool_id)
class MarkHealthMonitorPendingUpdateInDB(BaseDatabaseTask):
"""Mark the health monitor pending update in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, health_mon):
"""Mark the health monitor as pending update in DB.
:param health_mon: Health Monitor object to be updated
:returns: None
"""
LOG.debug("Mark PENDING UPDATE in DB for health monitor id: %s",
health_mon.pool_id)
self.health_mon_repo.update(db_apis.get_session(),
health_mon.pool_id,
provisioning_status=(constants.
PENDING_UPDATE))
def revert(self, health_mon, *args, **kwargs):
"""Mark the health monitor as broken
:param health_mon: Health Monitor object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark health monitor pending update in DB "
"for health monitor id %s"), health_mon.pool_id)
self.task_utils.mark_health_mon_prov_status_error(health_mon.pool_id)
class MarkL7PolicyActiveInDB(BaseDatabaseTask):
"""Mark the l7policy ACTIVE in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7policy):
"""Mark the l7policy ACTIVE in DB.
:param l7policy: L7Policy object to be updated
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for l7policy id: %s",
l7policy.id)
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
provisioning_status=constants.ACTIVE)
def revert(self, l7policy, *args, **kwargs):
"""Mark the l7policy as broken
:param l7policy: L7Policy object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7policy ACTIVE in DB "
"for l7policy id %s"), l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
class MarkL7PolicyPendingCreateInDB(BaseDatabaseTask):
"""Mark the l7policy pending create in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7policy):
"""Mark the l7policy as pending create in DB.
:param l7policy: L7Policy object to be updated
:returns: None
"""
LOG.debug("Mark PENDING CREATE in DB for l7policy id: %s",
l7policy.id)
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
provisioning_status=constants.PENDING_CREATE)
def revert(self, l7policy, *args, **kwargs):
"""Mark the l7policy as broken
:param l7policy: L7Policy object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7policy pending create in DB "
"for l7policy id %s"), l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
class MarkL7PolicyPendingDeleteInDB(BaseDatabaseTask):
"""Mark the l7policy pending delete in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7policy):
"""Mark the l7policy as pending delete in DB.
:param l7policy: L7Policy object to be updated
:returns: None
"""
LOG.debug("Mark PENDING DELETE in DB for l7policy id: %s",
l7policy.id)
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
provisioning_status=constants.PENDING_DELETE)
def revert(self, l7policy, *args, **kwargs):
"""Mark the l7policy as broken
:param l7policy: L7Policy object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7policy pending delete in DB "
"for l7policy id %s"), l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
class MarkL7PolicyPendingUpdateInDB(BaseDatabaseTask):
"""Mark the l7policy pending update in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7policy):
"""Mark the l7policy as pending update in DB.
:param l7policy: L7Policy object to be updated
:returns: None
"""
LOG.debug("Mark PENDING UPDATE in DB for l7policy id: %s",
l7policy.id)
self.l7policy_repo.update(db_apis.get_session(),
l7policy.id,
provisioning_status=(constants.
PENDING_UPDATE))
def revert(self, l7policy, *args, **kwargs):
"""Mark the l7policy as broken
:param l7policy: L7Policy object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7policy pending update in DB "
"for l7policy id %s"), l7policy.id)
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
class MarkL7RuleActiveInDB(BaseDatabaseTask):
"""Mark the l7rule ACTIVE in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7rule):
"""Mark the l7rule ACTIVE in DB.
:param l7rule: L7Rule object to be updated
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for l7rule id: %s",
l7rule.id)
self.l7rule_repo.update(db_apis.get_session(),
l7rule.id,
provisioning_status=constants.ACTIVE)
def revert(self, l7rule, *args, **kwargs):
"""Mark the l7rule as broken
:param l7rule: L7Rule object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7rule ACTIVE in DB "
"for l7rule id %s"), l7rule.id)
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
class MarkL7RulePendingCreateInDB(BaseDatabaseTask):
"""Mark the l7rule pending create in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7rule):
"""Mark the l7rule as pending create in DB.
:param l7rule: L7Rule object to be updated
:returns: None
"""
LOG.debug("Mark PENDING CREATE in DB for l7rule id: %s",
l7rule.id)
self.l7rule_repo.update(db_apis.get_session(),
l7rule.id,
provisioning_status=constants.PENDING_CREATE)
def revert(self, l7rule, *args, **kwargs):
"""Mark the l7rule as broken
:param l7rule: L7Rule object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7rule pending create in DB "
"for l7rule id %s"), l7rule.id)
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
class MarkL7RulePendingDeleteInDB(BaseDatabaseTask):
"""Mark the l7rule pending delete in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7rule):
"""Mark the l7rule as pending delete in DB.
:param l7rule: L7Rule object to be updated
:returns: None
"""
LOG.debug("Mark PENDING DELETE in DB for l7rule id: %s",
l7rule.id)
self.l7rule_repo.update(db_apis.get_session(),
l7rule.id,
provisioning_status=constants.PENDING_DELETE)
def revert(self, l7rule, *args, **kwargs):
"""Mark the l7rule as broken
:param l7rule: L7Rule object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7rule pending delete in DB "
"for l7rule id %s"), l7rule.id)
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
class MarkL7RulePendingUpdateInDB(BaseDatabaseTask):
"""Mark the l7rule pending update in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, l7rule):
"""Mark the l7rule as pending update in DB.
:param l7rule: L7Rule object to be updated
:returns: None
"""
LOG.debug("Mark PENDING UPDATE in DB for l7rule id: %s",
l7rule.id)
self.l7rule_repo.update(db_apis.get_session(),
l7rule.id,
provisioning_status=constants.PENDING_UPDATE)
def revert(self, l7rule, *args, **kwargs):
"""Mark the l7rule as broken
:param l7rule: L7Rule object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark l7rule pending update in DB "
"for l7rule id %s"), l7rule.id)
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
class MarkMemberActiveInDB(BaseDatabaseTask):
"""Mark the member ACTIVE in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, member):
"""Mark the member ACTIVE in DB.
:param member: Member object to be updated
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for member id: %s", member.id)
self.member_repo.update(db_apis.get_session(),
member.id,
provisioning_status=constants.ACTIVE)
def revert(self, member, *args, **kwargs):
"""Mark the member as broken
:param member: Member object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark member ACTIVE in DB "
"for member id %s"), member.id)
self.task_utils.mark_member_prov_status_error(member.id)
class MarkMemberPendingCreateInDB(BaseDatabaseTask):
"""Mark the member pending create in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, member):
"""Mark the member as pending create in DB.
:param member: Member object to be updated
:returns: None
"""
LOG.debug("Mark PENDING CREATE in DB for member id: %s", member.id)
self.member_repo.update(db_apis.get_session(),
member.id,
provisioning_status=constants.PENDING_CREATE)
def revert(self, member, *args, **kwargs):
"""Mark the member as broken
:param member: Member object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark member pending create in DB "
"for member id %s"), member.id)
self.task_utils.mark_member_prov_status_error(member.id)
class MarkMemberPendingDeleteInDB(BaseDatabaseTask):
"""Mark the member pending delete in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, member):
"""Mark the member as pending delete in DB.
:param member: Member object to be updated
:returns: None
"""
LOG.debug("Mark PENDING DELETE in DB for member id: %s", member.id)
self.member_repo.update(db_apis.get_session(),
member.id,
provisioning_status=constants.PENDING_DELETE)
def revert(self, member, *args, **kwargs):
"""Mark the member as broken
:param member: Member object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark member pending delete in DB "
"for member id %s"), member.id)
self.task_utils.mark_member_prov_status_error(member.id)
class MarkMemberPendingUpdateInDB(BaseDatabaseTask):
"""Mark the member pending update in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, member):
"""Mark the member as pending update in DB.
:param member: Member object to be updated
:returns: None
"""
LOG.debug("Mark PENDING UPDATE in DB for member id: %s",
member.id)
self.member_repo.update(db_apis.get_session(),
member.id,
provisioning_status=constants.PENDING_UPDATE)
def revert(self, member, *args, **kwargs):
"""Mark the member as broken
:param member: Member object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark member pending update in DB "
"for member id %s"), member.id)
self.task_utils.mark_member_prov_status_error(member.id)
class MarkPoolActiveInDB(BaseDatabaseTask):
"""Mark the pool ACTIVE in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, pool):
"""Mark the pool ACTIVE in DB.
:param pool: Pool object to be updated
:returns: None
"""
LOG.debug("Mark ACTIVE in DB for pool id: %s",
pool.id)
self.pool_repo.update(db_apis.get_session(),
pool.id,
provisioning_status=constants.ACTIVE)
def revert(self, pool, *args, **kwargs):
"""Mark the pool as broken
:param pool: Pool object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark pool ACTIVE in DB "
"for pool id %s"), pool.id)
self.task_utils.mark_pool_prov_status_error(pool.id)
class MarkPoolPendingCreateInDB(BaseDatabaseTask):
"""Mark the pool pending create in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, pool):
"""Mark the pool as pending create in DB.
:param pool: Pool object to be updated
:returns: None
"""
LOG.debug("Mark PENDING CREATE in DB for pool id: %s",
pool.id)
self.pool_repo.update(db_apis.get_session(),
pool.id,
provisioning_status=constants.PENDING_CREATE)
def revert(self, pool, *args, **kwargs):
"""Mark the pool as broken
:param pool: Pool object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark pool pending create in DB "
"for pool id %s"), pool.id)
self.task_utils.mark_pool_prov_status_error(pool.id)
class MarkPoolPendingDeleteInDB(BaseDatabaseTask):
"""Mark the pool pending delete in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, pool):
"""Mark the pool as pending delete in DB.
:param pool: Pool object to be updated
:returns: None
"""
LOG.debug("Mark PENDING DELETE in DB for pool id: %s",
pool.id)
self.pool_repo.update(db_apis.get_session(),
pool.id,
provisioning_status=constants.PENDING_DELETE)
def revert(self, pool, *args, **kwargs):
"""Mark the pool as broken
:param pool: Pool object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark pool pending delete in DB "
"for pool id %s"), pool.id)
self.task_utils.mark_pool_prov_status_error(pool.id)
class MarkPoolPendingUpdateInDB(BaseDatabaseTask):
"""Mark the pool pending update in the DB.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, pool):
"""Mark the pool as pending update in DB.
:param pool: Pool object to be updated
:returns: None
"""
LOG.debug("Mark PENDING UPDATE in DB for pool id: %s",
pool.id)
self.pool_repo.update(db_apis.get_session(),
pool.id,
provisioning_status=constants.PENDING_UPDATE)
def revert(self, pool, *args, **kwargs):
"""Mark the pool as broken
:param pool: Pool object that failed to update
:returns: None
"""
LOG.warning(_LW("Reverting mark pool pending update in DB "
"for pool id %s"), pool.id)
self.task_utils.mark_pool_prov_status_error(pool.id)

View File

@ -0,0 +1,159 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from taskflow import task
from octavia.controller.worker import task_utils as task_utilities
LOG = logging.getLogger(__name__)
class BaseLifecycleTask(task.Task):
"""Base task to instansiate common classes."""
def __init__(self, **kwargs):
self.task_utils = task_utilities.TaskUtils()
super(BaseLifecycleTask, self).__init__(**kwargs)
class AmphoraIDToErrorOnRevertTask(BaseLifecycleTask):
"""Task to checkpoint Amphora lifecycle milestones."""
def execute(self, amphora_id):
pass
def revert(self, amphora_id, *args, **kwargs):
self.task_utils.mark_amphora_status_error(amphora_id)
class AmphoraToErrorOnRevertTask(AmphoraIDToErrorOnRevertTask):
"""Task to checkpoint Amphora lifecycle milestones."""
def execute(self, amphora):
pass
def revert(self, amphora, *args, **kwargs):
super(AmphoraToErrorOnRevertTask, self).revert(amphora.id)
class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set a member to ERROR on revert."""
def execute(self, health_mon, listeners, loadbalancer):
pass
def revert(self, health_mon, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_health_mon_prov_status_error(health_mon.pool_id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set a l7policy to ERROR on revert."""
def execute(self, l7policy, listeners, loadbalancer):
pass
def revert(self, l7policy, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set a l7rule to ERROR on revert."""
def execute(self, l7rule, listeners, loadbalancer):
pass
def revert(self, l7rule, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_l7rule_prov_status_error(l7rule.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set a listener to ERROR on revert."""
def execute(self, listener):
pass
def revert(self, listener, *args, **kwargs):
self.task_utils.mark_listener_prov_status_error(listener.id)
self.task_utils.mark_loadbalancer_prov_status_active(
listener.load_balancer.id)
class ListenersToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set listeners to ERROR on revert."""
def execute(self, listeners, loadbalancer):
pass
def revert(self, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)
class LoadBalancerIDToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set the load balancer to ERROR on revert."""
def execute(self, loadbalancer_id):
pass
def revert(self, loadbalancer_id, *args, **kwargs):
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer_id)
class LoadBalancerToErrorOnRevertTask(LoadBalancerIDToErrorOnRevertTask):
"""Task to set the load balancer to ERROR on revert."""
def execute(self, loadbalancer):
pass
def revert(self, loadbalancer, *args, **kwargs):
super(LoadBalancerToErrorOnRevertTask, self).revert(loadbalancer.id)
class MemberToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set a member to ERROR on revert."""
def execute(self, member, listeners, loadbalancer):
pass
def revert(self, member, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_member_prov_status_error(member.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
class PoolToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set a pool to ERROR on revert."""
def execute(self, pool, listeners, loadbalancer):
pass
def revert(self, pool, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_pool_prov_status_error(pool.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)

View File

@ -0,0 +1,81 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Add provisioning_status to objects
Revision ID: 9b5473976d6d
Revises: 82b9402e71fd
Create Date: 2016-09-20 21:46:26.843695
"""
# revision identifiers, used by Alembic.
revision = '9b5473976d6d'
down_revision = '82b9402e71fd'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('health_monitor',
sa.Column('provisioning_status',
sa.String(16),
nullable=True)
)
op.create_foreign_key(
u'fk_health_monitor_provisioning_status_name', u'health_monitor',
u'provisioning_status', [u'provisioning_status'], [u'name']
)
op.add_column('l7policy',
sa.Column('provisioning_status',
sa.String(16),
nullable=True)
)
op.create_foreign_key(
u'fk_l7policy_provisioning_status_name', u'l7policy',
u'provisioning_status', [u'provisioning_status'], [u'name']
)
op.add_column('l7rule',
sa.Column('provisioning_status',
sa.String(16),
nullable=True)
)
op.create_foreign_key(
u'fk_l7rule_provisioning_status_name', u'l7rule',
u'provisioning_status', [u'provisioning_status'], [u'name']
)
op.add_column('member',
sa.Column('provisioning_status',
sa.String(16),
nullable=True)
)
op.create_foreign_key(
u'fk_member_provisioning_status_name', u'member',
u'provisioning_status', [u'provisioning_status'], [u'name']
)
op.add_column('pool',
sa.Column('provisioning_status',
sa.String(16),
nullable=True)
)
op.create_foreign_key(
u'fk_pool_provisioning_status_name', u'pool',
u'provisioning_status', [u'provisioning_status'], [u'name']
)

View File

@ -170,6 +170,11 @@ class Member(base_models.BASE, base_models.IdMixin, base_models.ProjectMixin,
pool = orm.relationship("Pool", backref=orm.backref("members",
uselist=True,
cascade="delete"))
provisioning_status = sa.Column(
sa.String(16),
sa.ForeignKey("provisioning_status.name",
name="fk_member_provisioning_status_name"),
nullable=True)
class HealthMonitor(base_models.BASE, base_models.ProjectMixin):
@ -199,6 +204,11 @@ class HealthMonitor(base_models.BASE, base_models.ProjectMixin):
backref=orm.backref("health_monitor",
uselist=False,
cascade="delete"))
provisioning_status = sa.Column(
sa.String(16),
sa.ForeignKey("provisioning_status.name",
name="fk_health_monitor_provisioning_status_name"),
nullable=True)
class Pool(base_models.BASE, base_models.IdMixin, base_models.ProjectMixin,
@ -232,6 +242,11 @@ class Pool(base_models.BASE, base_models.IdMixin, base_models.ProjectMixin,
backref=orm.backref("pools",
uselist=True,
cascade="delete"))
provisioning_status = sa.Column(
sa.String(16),
sa.ForeignKey("provisioning_status.name",
name="fk_pool_provisioning_status_name"),
nullable=True)
# This property should be a unique list of any listeners that reference
# this pool as its default_pool and any listeners referenced by enabled
@ -490,6 +505,11 @@ class L7Rule(base_models.BASE, base_models.IdMixin):
backref=orm.backref("l7rules",
uselist=True,
cascade="delete"))
provisioning_status = sa.Column(
sa.String(16),
sa.ForeignKey("provisioning_status.name",
name="fk_l7rule_provisioning_status_name"),
nullable=True)
class L7Policy(base_models.BASE, base_models.IdMixin):
@ -531,3 +551,8 @@ class L7Policy(base_models.BASE, base_models.IdMixin):
redirect_pool = orm.relationship("Pool", uselist=False,
backref=orm.backref("l7policies",
uselist=True))
provisioning_status = sa.Column(
sa.String(16),
sa.ForeignKey("provisioning_status.name",
name="fk_l7policy_provisioning_status_name"),
nullable=True)

View File

@ -146,7 +146,8 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN,
'enabled': True, 'operating_status': constants.ONLINE,
'project_id': uuidutils.generate_uuid(),
'id': uuidutils.generate_uuid()}
'id': uuidutils.generate_uuid(),
'provisioning_status': None}
pool_dm = self.repos.create_pool_on_load_balancer(
self.session, pool, listener_id=self.listener.id)
pool_dm_dict = pool_dm.to_dict()
@ -170,7 +171,8 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN,
'enabled': True, 'operating_status': constants.ONLINE,
'project_id': uuidutils.generate_uuid(),
'id': uuidutils.generate_uuid()}
'id': uuidutils.generate_uuid(),
'provisioning_status': None}
sp = {'type': constants.SESSION_PERSISTENCE_HTTP_COOKIE,
'cookie_name': 'cookie_monster',
'pool_id': pool['id']}
@ -205,7 +207,8 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN,
'enabled': True, 'operating_status': constants.ONLINE,
'project_id': uuidutils.generate_uuid(),
'id': uuidutils.generate_uuid()}
'id': uuidutils.generate_uuid(),
'provisioning_status': None}
pool_dm = self.repos.create_pool_on_load_balancer(
self.session, pool, listener_id=self.listener.id)
update_pool = {'protocol': constants.PROTOCOL_TCP, 'name': 'up_pool'}
@ -231,7 +234,8 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
'lb_algorithm': constants.LB_ALGORITHM_ROUND_ROBIN,
'enabled': True, 'operating_status': constants.ONLINE,
'project_id': uuidutils.generate_uuid(),
'id': uuidutils.generate_uuid()}
'id': uuidutils.generate_uuid(),
'provisioning_status': None}
sp = {'type': constants.SESSION_PERSISTENCE_HTTP_COOKIE,
'cookie_name': 'cookie_monster',
'pool_id': pool['id']}

View File

@ -37,7 +37,7 @@ class TestHealthMonitorFlows(base.TestCase):
self.assertIn(constants.LISTENERS, health_mon_flow.requires)
self.assertIn(constants.LOADBALANCER, health_mon_flow.requires)
self.assertEqual(2, len(health_mon_flow.requires))
self.assertEqual(3, len(health_mon_flow.requires))
self.assertEqual(0, len(health_mon_flow.provides))
def test_get_delete_health_monitor_flow(self):

View File

@ -36,7 +36,7 @@ class TestL7PolicyFlows(base.TestCase):
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertEqual(2, len(l7policy_flow.requires))
self.assertEqual(3, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))
def test_get_delete_l7policy_flow(self):

View File

@ -36,7 +36,7 @@ class TestL7RuleFlows(base.TestCase):
self.assertIn(constants.LISTENERS, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER, l7rule_flow.requires)
self.assertEqual(2, len(l7rule_flow.requires))
self.assertEqual(3, len(l7rule_flow.requires))
self.assertEqual(0, len(l7rule_flow.provides))
def test_get_delete_l7rule_flow(self):

View File

@ -40,7 +40,7 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertEqual(2, len(member_flow.requires))
self.assertEqual(3, len(member_flow.requires))
self.assertEqual(2, len(member_flow.provides))
def test_get_delete_member_flow(self, mock_get_net_driver):

View File

@ -36,7 +36,7 @@ class TestPoolFlows(base.TestCase):
self.assertIn(constants.LISTENERS, pool_flow.requires)
self.assertIn(constants.LOADBALANCER, pool_flow.requires)
self.assertEqual(2, len(pool_flow.requires))
self.assertEqual(3, len(pool_flow.requires))
self.assertEqual(0, len(pool_flow.provides))
def test_get_delete_pool_flow(self):

View File

@ -1695,3 +1695,779 @@ class TestDatabaseTasks(base.TestCase):
mock_listener_repo_update.reset_mock()
mock_loadbalancer_repo_update.side_effect = Exception('fail')
update_server_group_info.revert(LB_ID, SERVER_GROUP_ID)
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
def test_mark_health_mon_active_in_db(self,
mock_health_mon_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_health_mon_active = (database_tasks.MarkHealthMonitorActiveInDB())
mark_health_mon_active.execute(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_health_mon_repo_update.reset_mock()
mark_health_mon_active.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_health_mon_repo_update.reset_mock()
mock_health_mon_repo_update.side_effect = Exception('fail')
mark_health_mon_active.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
def test_mark_health_mon_pending_create_in_db(
self,
mock_health_mon_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_health_mon_pending_create = (database_tasks.
MarkHealthMonitorPendingCreateInDB())
mark_health_mon_pending_create.execute(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.PENDING_CREATE)
# Test the revert
mock_health_mon_repo_update.reset_mock()
mark_health_mon_pending_create.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_health_mon_repo_update.reset_mock()
mock_health_mon_repo_update.side_effect = Exception('fail')
mark_health_mon_pending_create.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
def test_mark_health_mon_pending_delete_in_db(
self,
mock_health_mon_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_health_mon_pending_delete = (database_tasks.
MarkHealthMonitorPendingDeleteInDB())
mark_health_mon_pending_delete.execute(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.PENDING_DELETE)
# Test the revert
mock_health_mon_repo_update.reset_mock()
mark_health_mon_pending_delete.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_health_mon_repo_update.reset_mock()
mock_health_mon_repo_update.side_effect = Exception('fail')
mark_health_mon_pending_delete.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
def test_mark_health_mon_pending_update_in_db(
self,
mock_health_mon_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_health_mon_pending_update = (database_tasks.
MarkHealthMonitorPendingUpdateInDB())
mark_health_mon_pending_update.execute(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.PENDING_UPDATE)
# Test the revert
mock_health_mon_repo_update.reset_mock()
mark_health_mon_pending_update.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_health_mon_repo_update.reset_mock()
mock_health_mon_repo_update.side_effect = Exception('fail')
mark_health_mon_pending_update.revert(self.health_mon_mock)
mock_health_mon_repo_update.assert_called_once_with(
'TEST',
pool_id=POOL_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
def test_mark_l7policy_active_in_db(self,
mock_l7policy_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7policy_active = (database_tasks.MarkL7PolicyActiveInDB())
mark_l7policy_active.execute(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
L7POLICY_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_l7policy_repo_update.reset_mock()
mark_l7policy_active.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7policy_repo_update.reset_mock()
mock_l7policy_repo_update.side_effect = Exception('fail')
mark_l7policy_active.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
def test_mark_l7policy_pending_create_in_db(self,
mock_l7policy_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7policy_pending_create = (database_tasks.
MarkL7PolicyPendingCreateInDB())
mark_l7policy_pending_create.execute(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
L7POLICY_ID,
provisioning_status=constants.PENDING_CREATE)
# Test the revert
mock_l7policy_repo_update.reset_mock()
mark_l7policy_pending_create.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7policy_repo_update.reset_mock()
mock_l7policy_repo_update.side_effect = Exception('fail')
mark_l7policy_pending_create.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
def test_mark_l7policy_pending_delete_in_db(self,
mock_l7policy_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7policy_pending_delete = (database_tasks.
MarkL7PolicyPendingDeleteInDB())
mark_l7policy_pending_delete.execute(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
L7POLICY_ID,
provisioning_status=constants.PENDING_DELETE)
# Test the revert
mock_l7policy_repo_update.reset_mock()
mark_l7policy_pending_delete.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7policy_repo_update.reset_mock()
mock_l7policy_repo_update.side_effect = Exception('fail')
mark_l7policy_pending_delete.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
def test_mark_l7policy_pending_update_in_db(self,
mock_l7policy_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7policy_pending_update = (database_tasks.
MarkL7PolicyPendingUpdateInDB())
mark_l7policy_pending_update.execute(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
L7POLICY_ID,
provisioning_status=constants.PENDING_UPDATE)
# Test the revert
mock_l7policy_repo_update.reset_mock()
mark_l7policy_pending_update.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7policy_repo_update.reset_mock()
mock_l7policy_repo_update.side_effect = Exception('fail')
mark_l7policy_pending_update.revert(self.l7policy_mock)
mock_l7policy_repo_update.assert_called_once_with(
'TEST',
id=L7POLICY_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7RuleRepository.update')
def test_mark_l7rule_active_in_db(self,
mock_l7rule_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7rule_active = (database_tasks.MarkL7RuleActiveInDB())
mark_l7rule_active.execute(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
L7RULE_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_l7rule_repo_update.reset_mock()
mark_l7rule_active.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7rule_repo_update.reset_mock()
mock_l7rule_repo_update.side_effect = Exception('fail')
mark_l7rule_active.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7RuleRepository.update')
def test_mark_l7rule_pending_create_in_db(self,
mock_l7rule_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7rule_pending_create = (database_tasks.
MarkL7RulePendingCreateInDB())
mark_l7rule_pending_create.execute(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
L7RULE_ID,
provisioning_status=constants.PENDING_CREATE)
# Test the revert
mock_l7rule_repo_update.reset_mock()
mark_l7rule_pending_create.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7rule_repo_update.reset_mock()
mock_l7rule_repo_update.side_effect = Exception('fail')
mark_l7rule_pending_create.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7RuleRepository.update')
def test_mark_l7rule_pending_delete_in_db(self,
mock_l7rule_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7rule_pending_delete = (database_tasks.
MarkL7RulePendingDeleteInDB())
mark_l7rule_pending_delete.execute(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
L7RULE_ID,
provisioning_status=constants.PENDING_DELETE)
# Test the revert
mock_l7rule_repo_update.reset_mock()
mark_l7rule_pending_delete.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7rule_repo_update.reset_mock()
mock_l7rule_repo_update.side_effect = Exception('fail')
mark_l7rule_pending_delete.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.L7RuleRepository.update')
def test_mark_l7rule_pending_update_in_db(self,
mock_l7rule_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_l7rule_pending_update = (database_tasks.
MarkL7RulePendingUpdateInDB())
mark_l7rule_pending_update.execute(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
L7RULE_ID,
provisioning_status=constants.PENDING_UPDATE)
# Test the revert
mock_l7rule_repo_update.reset_mock()
mark_l7rule_pending_update.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_l7rule_repo_update.reset_mock()
mock_l7rule_repo_update.side_effect = Exception('fail')
mark_l7rule_pending_update.revert(self.l7rule_mock)
mock_l7rule_repo_update.assert_called_once_with(
'TEST',
id=L7RULE_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.MemberRepository.update')
def test_mark_member_active_in_db(self,
mock_member_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_member_active = (database_tasks.MarkMemberActiveInDB())
mark_member_active.execute(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
MEMBER_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_member_repo_update.reset_mock()
mark_member_active.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_member_repo_update.reset_mock()
mock_member_repo_update.side_effect = Exception('fail')
mark_member_active.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.MemberRepository.update')
def test_mark_member_pending_create_in_db(self,
mock_member_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_member_pending_create = (database_tasks.
MarkMemberPendingCreateInDB())
mark_member_pending_create.execute(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
MEMBER_ID,
provisioning_status=constants.PENDING_CREATE)
# Test the revert
mock_member_repo_update.reset_mock()
mark_member_pending_create.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_member_repo_update.reset_mock()
mock_member_repo_update.side_effect = Exception('fail')
mark_member_pending_create.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.MemberRepository.update')
def test_mark_member_pending_delete_in_db(self,
mock_member_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_member_pending_delete = (database_tasks.
MarkMemberPendingDeleteInDB())
mark_member_pending_delete.execute(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
MEMBER_ID,
provisioning_status=constants.PENDING_DELETE)
# Test the revert
mock_member_repo_update.reset_mock()
mark_member_pending_delete.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_member_repo_update.reset_mock()
mock_member_repo_update.side_effect = Exception('fail')
mark_member_pending_delete.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.MemberRepository.update')
def test_mark_member_pending_update_in_db(self,
mock_member_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_member_pending_update = (database_tasks.
MarkMemberPendingUpdateInDB())
mark_member_pending_update.execute(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
MEMBER_ID,
provisioning_status=constants.PENDING_UPDATE)
# Test the revert
mock_member_repo_update.reset_mock()
mark_member_pending_update.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_member_repo_update.reset_mock()
mock_member_repo_update.side_effect = Exception('fail')
mark_member_pending_update.revert(self.member_mock)
mock_member_repo_update.assert_called_once_with(
'TEST',
id=MEMBER_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.PoolRepository.update')
def test_mark_pool_active_in_db(self,
mock_pool_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_pool_active = (database_tasks.MarkPoolActiveInDB())
mark_pool_active.execute(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.ACTIVE)
# Test the revert
mock_pool_repo_update.reset_mock()
mark_pool_active.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_pool_repo_update.reset_mock()
mock_pool_repo_update.side_effect = Exception('fail')
mark_pool_active.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.PoolRepository.update')
def test_mark_pool_pending_create_in_db(self,
mock_pool_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_pool_pending_create = (database_tasks.MarkPoolPendingCreateInDB())
mark_pool_pending_create.execute(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.PENDING_CREATE)
# Test the revert
mock_pool_repo_update.reset_mock()
mark_pool_pending_create.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_pool_repo_update.reset_mock()
mock_pool_repo_update.side_effect = Exception('fail')
mark_pool_pending_create.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.PoolRepository.update')
def test_mark_pool_pending_delete_in_db(self,
mock_pool_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_pool_pending_delete = (database_tasks.MarkPoolPendingDeleteInDB())
mark_pool_pending_delete.execute(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.PENDING_DELETE)
# Test the revert
mock_pool_repo_update.reset_mock()
mark_pool_pending_delete.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_pool_repo_update.reset_mock()
mock_pool_repo_update.side_effect = Exception('fail')
mark_pool_pending_delete.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)
@mock.patch('octavia.db.repositories.PoolRepository.update')
def test_mark_pool_pending_update_in_db(self,
mock_pool_repo_update,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
mark_pool_pending_update = (database_tasks.
MarkPoolPendingUpdateInDB())
mark_pool_pending_update.execute(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
POOL_ID,
provisioning_status=constants.PENDING_UPDATE)
# Test the revert
mock_pool_repo_update.reset_mock()
mark_pool_pending_update.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)
# Test the revert with exception
mock_pool_repo_update.reset_mock()
mock_pool_repo_update.side_effect = Exception('fail')
mark_pool_pending_update.revert(self.pool_mock)
mock_pool_repo_update.assert_called_once_with(
'TEST',
id=POOL_ID,
provisioning_status=constants.ERROR)

View File

@ -0,0 +1,346 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import uuidutils
from octavia.controller.worker.tasks import lifecycle_tasks
import octavia.tests.unit.base as base
class TestLifecycleTasks(base.TestCase):
def setUp(self):
self.AMPHORA = mock.MagicMock()
self.AMPHORA_ID = uuidutils.generate_uuid()
self.AMPHORA.id = self.AMPHORA_ID
self.HEALTH_MON = mock.MagicMock()
self.HEALTH_MON_ID = uuidutils.generate_uuid()
self.HEALTH_MON.pool_id = self.HEALTH_MON_ID
self.L7POLICY = mock.MagicMock()
self.L7POLICY_ID = uuidutils.generate_uuid()
self.L7POLICY.id = self.L7POLICY_ID
self.L7RULE = mock.MagicMock()
self.L7RULE_ID = uuidutils.generate_uuid()
self.L7RULE.id = self.L7RULE_ID
self.LISTENER = mock.MagicMock()
self.LISTENER_ID = uuidutils.generate_uuid()
self.LISTENER.id = self.LISTENER_ID
self.LISTENERS = [self.LISTENER]
self.LOADBALANCER = mock.MagicMock()
self.LOADBALANCER_ID = uuidutils.generate_uuid()
self.LOADBALANCER.id = self.LOADBALANCER_ID
self.LISTENER.load_balancer = self.LOADBALANCER
self.MEMBER = mock.MagicMock()
self.MEMBER_ID = uuidutils.generate_uuid()
self.MEMBER.id = self.MEMBER_ID
self.POOL = mock.MagicMock()
self.POOL_ID = uuidutils.generate_uuid()
self.POOL.id = self.POOL_ID
super(TestLifecycleTasks, self).setUp()
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_amphora_status_error')
def test_AmphoraIDToErrorOnRevertTask(self, mock_amp_status_error):
amp_id_to_error_on_revert = (lifecycle_tasks.
AmphoraIDToErrorOnRevertTask())
# Execute
amp_id_to_error_on_revert.execute(self.AMPHORA_ID)
self.assertFalse(mock_amp_status_error.called)
# Revert
amp_id_to_error_on_revert.revert(self.AMPHORA_ID)
mock_amp_status_error.assert_called_once_with(self.AMPHORA_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_amphora_status_error')
def test_AmphoraToErrorOnRevertTask(self, mock_amp_status_error):
amp_to_error_on_revert = lifecycle_tasks.AmphoraToErrorOnRevertTask()
# Execute
amp_to_error_on_revert.execute(self.AMPHORA)
self.assertFalse(mock_amp_status_error.called)
# Revert
amp_to_error_on_revert.revert(self.AMPHORA)
mock_amp_status_error.assert_called_once_with(self.AMPHORA_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_health_mon_prov_status_error')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_active')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_active')
def test_HealthMonitorToErrorOnRevertTask(
self,
mock_listener_prov_status_active,
mock_loadbalancer_prov_status_active,
mock_health_mon_prov_status_error):
health_mon_to_error_on_revert = (lifecycle_tasks.
HealthMonitorToErrorOnRevertTask())
# Execute
health_mon_to_error_on_revert.execute(self.HEALTH_MON,
self.LISTENERS,
self.LOADBALANCER)
self.assertFalse(mock_health_mon_prov_status_error.called)
# Revert
health_mon_to_error_on_revert.revert(self.HEALTH_MON,
self.LISTENERS,
self.LOADBALANCER)
mock_health_mon_prov_status_error.assert_called_once_with(
self.HEALTH_MON_ID)
mock_loadbalancer_prov_status_active.assert_called_once_with(
self.LOADBALANCER_ID)
mock_listener_prov_status_active.assert_called_once_with(
self.LISTENER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_l7policy_prov_status_error')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_active')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_active')
def test_L7PolicyToErrorOnRevertTask(
self,
mock_listener_prov_status_active,
mock_loadbalancer_prov_status_active,
mock_l7policy_prov_status_error):
l7policy_to_error_on_revert = (lifecycle_tasks.
L7PolicyToErrorOnRevertTask())
# Execute
l7policy_to_error_on_revert.execute(self.L7POLICY,
self.LISTENERS,
self.LOADBALANCER)
self.assertFalse(mock_l7policy_prov_status_error.called)
# Revert
l7policy_to_error_on_revert.revert(self.L7POLICY,
self.LISTENERS,
self.LOADBALANCER)
mock_l7policy_prov_status_error.assert_called_once_with(
self.L7POLICY_ID)
mock_loadbalancer_prov_status_active.assert_called_once_with(
self.LOADBALANCER_ID)
mock_listener_prov_status_active.assert_called_once_with(
self.LISTENER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_l7rule_prov_status_error')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_active')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_active')
def test_L7RuleToErrorOnRevertTask(
self,
mock_listener_prov_status_active,
mock_loadbalancer_prov_status_active,
mock_l7rule_prov_status_error):
l7rule_to_error_on_revert = (lifecycle_tasks.
L7RuleToErrorOnRevertTask())
# Execute
l7rule_to_error_on_revert.execute(self.L7RULE,
self.LISTENERS,
self.LOADBALANCER)
self.assertFalse(mock_l7rule_prov_status_error.called)
# Revert
l7rule_to_error_on_revert.revert(self.L7RULE,
self.LISTENERS,
self.LOADBALANCER)
mock_l7rule_prov_status_error.assert_called_once_with(
self.L7RULE_ID)
mock_loadbalancer_prov_status_active.assert_called_once_with(
self.LOADBALANCER_ID)
mock_listener_prov_status_active.assert_called_once_with(
self.LISTENER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_active')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error')
def test_ListenerToErrorOnRevertTask(
self,
mock_listener_prov_status_error,
mock_loadbalancer_prov_status_active):
listener_to_error_on_revert = (lifecycle_tasks.
ListenerToErrorOnRevertTask())
# Execute
listener_to_error_on_revert.execute(self.LISTENER)
self.assertFalse(mock_listener_prov_status_error.called)
# Revert
listener_to_error_on_revert.revert(self.LISTENER)
mock_listener_prov_status_error.assert_called_once_with(
self.LISTENER_ID)
mock_loadbalancer_prov_status_active.assert_called_once_with(
self.LOADBALANCER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_active')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_error')
def test_ListenersToErrorOnRevertTask(
self,
mock_listener_prov_status_error,
mock_loadbalancer_prov_status_active):
listeners_to_error_on_revert = (lifecycle_tasks.
ListenersToErrorOnRevertTask())
# Execute
listeners_to_error_on_revert.execute(self.LISTENERS,
self.LOADBALANCER)
self.assertFalse(mock_listener_prov_status_error.called)
# Revert
listeners_to_error_on_revert.revert(self.LISTENERS,
self.LOADBALANCER)
mock_listener_prov_status_error.assert_called_once_with(
self.LISTENER_ID)
mock_loadbalancer_prov_status_active.assert_called_once_with(
self.LOADBALANCER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_error')
def test_LoadBalancerIDToErrorOnRevertTask(
self,
mock_loadbalancer_prov_status_error):
loadbalancer_id_to_error_on_revert = (
lifecycle_tasks.LoadBalancerIDToErrorOnRevertTask())
# Execute
loadbalancer_id_to_error_on_revert.execute(self.LOADBALANCER_ID)
self.assertFalse(mock_loadbalancer_prov_status_error.called)
# Revert
loadbalancer_id_to_error_on_revert.revert(self.LOADBALANCER_ID)
mock_loadbalancer_prov_status_error.assert_called_once_with(
self.LOADBALANCER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_error')
def test_LoadBalancerToErrorOnRevertTask(
self,
mock_loadbalancer_prov_status_error):
loadbalancer_to_error_on_revert = (
lifecycle_tasks.LoadBalancerToErrorOnRevertTask())
# Execute
loadbalancer_to_error_on_revert.execute(self.LOADBALANCER)
self.assertFalse(mock_loadbalancer_prov_status_error.called)
# Revert
loadbalancer_to_error_on_revert.revert(self.LOADBALANCER)
mock_loadbalancer_prov_status_error.assert_called_once_with(
self.LOADBALANCER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_member_prov_status_error')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_active')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_active')
def test_MemberToErrorOnRevertTask(
self,
mock_listener_prov_status_active,
mock_loadbalancer_prov_status_active,
mock_member_prov_status_error):
member_to_error_on_revert = lifecycle_tasks.MemberToErrorOnRevertTask()
# Execute
member_to_error_on_revert.execute(self.MEMBER,
self.LISTENERS,
self.LOADBALANCER)
self.assertFalse(mock_member_prov_status_error.called)
# Revert
member_to_error_on_revert.revert(self.MEMBER,
self.LISTENERS,
self.LOADBALANCER)
mock_member_prov_status_error.assert_called_once_with(
self.MEMBER_ID)
mock_loadbalancer_prov_status_active.assert_called_once_with(
self.LOADBALANCER_ID)
mock_listener_prov_status_active.assert_called_once_with(
self.LISTENER_ID)
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_pool_prov_status_error')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_loadbalancer_prov_status_active')
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
'mark_listener_prov_status_active')
def test_PoolToErrorOnRevertTask(
self,
mock_listener_prov_status_active,
mock_loadbalancer_prov_status_active,
mock_pool_prov_status_error):
pool_to_error_on_revert = lifecycle_tasks.PoolToErrorOnRevertTask()
# Execute
pool_to_error_on_revert.execute(self.POOL,
self.LISTENERS,
self.LOADBALANCER)
self.assertFalse(mock_pool_prov_status_error.called)
# Revert
pool_to_error_on_revert.revert(self.POOL,
self.LISTENERS,
self.LOADBALANCER)
mock_pool_prov_status_error.assert_called_once_with(
self.POOL_ID)
mock_loadbalancer_prov_status_active.assert_called_once_with(
self.LOADBALANCER_ID)
mock_listener_prov_status_active.assert_called_once_with(
self.LISTENER_ID)

View File

@ -0,0 +1,264 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import uuidutils
from octavia.common import constants
from octavia.controller.worker import task_utils as task_utilities
import octavia.tests.unit.base as base
TEST_SESSION = 'TEST_SESSION'
class TestTaskUtils(base.TestCase):
def setUp(self):
self.task_utils = task_utilities.TaskUtils()
self.AMPHORA_ID = uuidutils.generate_uuid()
self.HEALTH_MON_ID = uuidutils.generate_uuid()
self.L7POLICY_ID = uuidutils.generate_uuid()
self.L7RULE_ID = uuidutils.generate_uuid()
self.LISTENER_ID = uuidutils.generate_uuid()
self.LOADBALANCER_ID = uuidutils.generate_uuid()
self.MEMBER_ID = uuidutils.generate_uuid()
self.POOL_ID = uuidutils.generate_uuid()
super(TestTaskUtils, self).setUp()
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.AmphoraRepository.update')
def test_mark_amphora_status_error(self,
mock_amphora_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_amphora_status_error(self.AMPHORA_ID)
mock_amphora_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.AMPHORA_ID,
status=constants.ERROR)
# Exception path
mock_amphora_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_amphora_status_error(self.AMPHORA_ID)
self.assertFalse(mock_amphora_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
def test_mark_health_mon_prov_status_error(self,
mock_health_mon_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_health_mon_prov_status_error(self.HEALTH_MON_ID)
mock_health_mon_repo_update.assert_called_once_with(
TEST_SESSION,
pool_id=self.HEALTH_MON_ID,
provisioning_status=constants.ERROR)
# Exception path
mock_health_mon_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_health_mon_prov_status_error(self.HEALTH_MON_ID)
self.assertFalse(mock_health_mon_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.L7PolicyRepository.update')
def test_mark_l7policy_prov_status_error(self,
mock_l7policy_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_l7policy_prov_status_error(self.L7POLICY_ID)
mock_l7policy_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.L7POLICY_ID,
provisioning_status=constants.ERROR)
# Exception path
mock_l7policy_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_l7policy_prov_status_error(self.L7POLICY_ID)
self.assertFalse(mock_l7policy_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.L7RuleRepository.update')
def test_mark_l7rule_prov_status_error(self,
mock_l7rule_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_l7rule_prov_status_error(self.L7RULE_ID)
mock_l7rule_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.L7RULE_ID,
provisioning_status=constants.ERROR)
# Exception path
mock_l7rule_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_l7rule_prov_status_error(self.L7RULE_ID)
self.assertFalse(mock_l7rule_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.ListenerRepository.update')
def test_mark_listener_prov_status_active(self,
mock_listener_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_listener_prov_status_active(self.LISTENER_ID)
mock_listener_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.LISTENER_ID,
provisioning_status=constants.ACTIVE)
# Exception path
mock_listener_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_listener_prov_status_active(self.LISTENER_ID)
self.assertFalse(mock_listener_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.ListenerRepository.update')
def test_mark_listener_prov_status_error(self,
mock_listener_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_listener_prov_status_error(self.LISTENER_ID)
mock_listener_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.LISTENER_ID,
provisioning_status=constants.ERROR)
# Exception path
mock_listener_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_listener_prov_status_error(self.LISTENER_ID)
self.assertFalse(mock_listener_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_mark_loadbalancer_prov_status_active(self,
mock_lb_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_loadbalancer_prov_status_active(
self.LOADBALANCER_ID)
mock_lb_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.LOADBALANCER_ID,
provisioning_status=constants.ACTIVE)
# Exception path
mock_lb_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_loadbalancer_prov_status_active(
self.LOADBALANCER_ID)
self.assertFalse(mock_lb_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_mark_loadbalancer_prov_status_error(self,
mock_lb_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_loadbalancer_prov_status_error(
self.LOADBALANCER_ID)
mock_lb_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.LOADBALANCER_ID,
provisioning_status=constants.ERROR)
# Exception path
mock_lb_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_loadbalancer_prov_status_error(
self.LOADBALANCER_ID)
self.assertFalse(mock_lb_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.MemberRepository.update')
def test_mark_member_prov_status_error(self,
mock_member_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_member_prov_status_error(self.MEMBER_ID)
mock_member_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.MEMBER_ID,
provisioning_status=constants.ERROR)
# Exception path
mock_member_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_member_prov_status_error(self.MEMBER_ID)
self.assertFalse(mock_member_repo_update.called)
@mock.patch('octavia.db.api.get_session', return_value=TEST_SESSION)
@mock.patch('octavia.db.repositories.PoolRepository.update')
def test_mark_pool_prov_status_error(self,
mock_pool_repo_update,
mock_get_session):
# Happy path
self.task_utils.mark_pool_prov_status_error(self.POOL_ID)
mock_pool_repo_update.assert_called_once_with(
TEST_SESSION,
id=self.POOL_ID,
provisioning_status=constants.ERROR)
# Exception path
mock_pool_repo_update.reset_mock()
mock_get_session.side_effect = Exception('fail')
self.task_utils.mark_pool_prov_status_error(self.POOL_ID)
self.assertFalse(mock_pool_repo_update.called)