Convert listener flows to use provider models

This patch converts the listener flows to use the provider driver
data model.
It also stops storing sqlalchemy models in the flow storage in
preparation for enabling jobboard.

Change-Id: Ic44019b8877f008e6d7a75ceed1b7fd958e051d0
Story: 2005072
Task: 30808
This commit is contained in:
Michael Johnson 2019-05-20 17:51:43 -07:00
parent a3081d850c
commit 7f0abf8b7a
38 changed files with 849 additions and 525 deletions

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from cryptography import fernet
from jsonschema import exceptions as js_exceptions
from jsonschema import validate
@ -52,6 +53,8 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
topic=consts.TOPIC_AMPHORA_V2, version="2.0", fanout=False)
self.client = rpc.get_client(self.target)
self.repositories = repositories.Repositories()
key = utils.get_six_compatible_server_certs_key_passphrase()
self.fernet = fernet.Fernet(key)
def _validate_pool_algorithm(self, pool):
if pool.lb_algorithm not in AMPHORA_SUPPORTED_LB_ALGORITHMS:
@ -114,32 +117,40 @@ class AmphoraProviderDriver(driver_base.ProviderDriver):
consts.LOAD_BALANCER_UPDATES: lb_dict}
self.client.cast({}, 'update_load_balancer', **payload)
def _encrypt_listener_dict(self, listener_dict):
# We need to encrypt the user cert/key data for sending it
# over messaging.
if listener_dict.get(consts.DEFAULT_TLS_CONTAINER_DATA, False):
listener_dict[consts.DEFAULT_TLS_CONTAINER_DATA] = (
self.fernet.encrypt(
listener_dict[consts.DEFAULT_TLS_CONTAINER_DATA]))
if listener_dict.get(consts.SNI_CONTAINER_DATA, False):
sni_list = []
for sni_data in listener_dict[consts.SNI_CONTAINER_DATA]:
sni_list.append(self.fernet.encrypt(sni_data))
if sni_list:
listener_dict[consts.SNI_CONTAINER_DATA] = sni_list
# Listener
def listener_create(self, listener):
payload = {consts.LISTENER_ID: listener.listener_id}
payload = {consts.LISTENER: listener.to_dict()}
self._encrypt_listener_dict(payload)
self.client.cast({}, 'create_listener', **payload)
def listener_delete(self, listener):
listener_id = listener.listener_id
payload = {consts.LISTENER_ID: listener_id}
payload = {consts.LISTENER: listener.to_dict()}
self.client.cast({}, 'delete_listener', **payload)
def listener_update(self, old_listener, new_listener):
listener_dict = new_listener.to_dict()
if 'admin_state_up' in listener_dict:
listener_dict['enabled'] = listener_dict.pop('admin_state_up')
listener_id = listener_dict.pop('listener_id')
if 'client_ca_tls_container_ref' in listener_dict:
listener_dict['client_ca_tls_container_id'] = listener_dict.pop(
'client_ca_tls_container_ref')
listener_dict.pop('client_ca_tls_container_data', None)
if 'client_crl_container_ref' in listener_dict:
listener_dict['client_crl_container_id'] = listener_dict.pop(
'client_crl_container_ref')
listener_dict.pop('client_crl_container_data', None)
original_listener = old_listener.to_dict()
listener_updates = new_listener.to_dict()
payload = {consts.LISTENER_ID: listener_id,
consts.LISTENER_UPDATES: listener_dict}
self._encrypt_listener_dict(original_listener)
self._encrypt_listener_dict(listener_updates)
payload = {consts.ORIGINAL_LISTENER: original_listener,
consts.LISTENER_UPDATES: listener_updates}
self.client.cast({}, 'update_listener', **payload)
# Pool

@ -173,6 +173,13 @@ def db_listeners_to_provider_listeners(db_listeners, for_delete=False):
return provider_listeners
def db_listeners_to_provider_dicts_list_of_dicts(db_listeners,
for_delete=False):
listeners = db_listeners_to_provider_listeners(
db_listeners, for_delete=for_delete)
return [listener.to_dict() for listener in listeners]
def db_listener_to_provider_listener(db_listener, for_delete=False):
new_listener_dict = listener_dict_to_provider_dict(
db_listener.to_dict(recurse=True), for_delete=for_delete)

@ -307,6 +307,7 @@ CONN_MAX_RETRIES = 'conn_max_retries'
CONN_RETRY_INTERVAL = 'conn_retry_interval'
CREATED_AT = 'created_at'
CRL_CONTAINER_ID = 'crl_container_id'
DEFAULT_TLS_CONTAINER_DATA = 'default_tls_container_data'
DELTA = 'delta'
DELTAS = 'deltas'
DESCRIPTION = 'description'
@ -320,6 +321,7 @@ HEALTH_MON = 'health_mon'
HEALTH_MONITOR = 'health_monitor'
HEALTH_MONITOR_ID = 'health_monitor_id'
HEALTH_MONITOR_UPDATES = 'health_monitor_updates'
ID = 'id'
IP_ADDRESS = 'ip_address'
L7POLICY = 'l7policy'
L7POLICY_ID = 'l7policy_id'
@ -343,10 +345,12 @@ NAME = 'name'
NETWORK_ID = 'network_id'
NICS = 'nics'
OBJECT = 'object'
ORIGINAL_LISTENER = 'original_listener'
PEER_PORT = 'peer_port'
POOL = 'pool'
POOL_CHILD_COUNT = 'pool_child_count'
POOL_ID = 'pool_id'
PROJECT_ID = 'project_id'
POOL_UPDATES = 'pool_updates'
PORT_ID = 'port_id'
PORTS = 'ports'
@ -359,6 +363,7 @@ REQ_READ_TIMEOUT = 'req_read_timeout'
REQUEST_ERRORS = 'request_errors'
SERVER_GROUP_ID = 'server_group_id'
SERVER_PEM = 'server_pem'
SNI_CONTAINER_DATA = 'sni_container_data'
SNI_CONTAINERS = 'sni_containers'
SOFT_ANTI_AFFINITY = 'soft-anti-affinity'
SUBNET = 'subnet'

@ -289,13 +289,17 @@ class JinjaTemplater(object):
os.path.join(self.base_crt_dir, loadbalancer.id, client_crl))
pools = []
for x in listener.pools:
pool_gen = (pool for pool in listener.pools if
pool.provisioning_status != constants.PENDING_DELETE)
for x in pool_gen:
kwargs = {}
if pool_tls_certs and pool_tls_certs.get(x.id):
kwargs = {'pool_tls_certs': pool_tls_certs.get(x.id)}
pools.append(self._transform_pool(
x, feature_compatibility, **kwargs))
ret_value['pools'] = pools
policy_gen = (policy for policy in listener.l7policies if
policy.provisioning_status != constants.PENDING_DELETE)
if listener.default_pool:
for pool in pools:
if pool['id'] == listener.default_pool.id:
@ -304,7 +308,7 @@ class JinjaTemplater(object):
l7policies = [self._transform_l7policy(
x, feature_compatibility, pool_tls_certs)
for x in listener.l7policies]
for x in policy_gen]
ret_value['l7policies'] = l7policies
return ret_value
@ -331,12 +335,16 @@ class JinjaTemplater(object):
'crl_path': '',
'tls_enabled': pool.tls_enabled
}
members_gen = (mem for mem in pool.members if
mem.provisioning_status != constants.PENDING_DELETE)
members = [self._transform_member(x, feature_compatibility)
for x in pool.members]
for x in members_gen]
ret_value['members'] = members
if pool.health_monitor:
health_mon = pool.health_monitor
if (health_mon and
health_mon.provisioning_status != constants.PENDING_DELETE):
ret_value['health_monitor'] = self._transform_health_monitor(
pool.health_monitor, feature_compatibility)
health_mon, feature_compatibility)
if pool.session_persistence:
ret_value[
'session_persistence'] = self._transform_session_persistence(
@ -420,7 +428,9 @@ class JinjaTemplater(object):
'redirect_prefix': l7policy.redirect_prefix,
'enabled': l7policy.enabled
}
if l7policy.redirect_pool:
if (l7policy.redirect_pool and
l7policy.redirect_pool.provisioning_status !=
constants.PENDING_DELETE):
kwargs = {}
if pool_tls_certs and pool_tls_certs.get(
l7policy.redirect_pool.id):
@ -436,8 +446,10 @@ class JinjaTemplater(object):
ret_value['redirect_http_code'] = l7policy.redirect_http_code
else:
ret_value['redirect_http_code'] = None
rule_gen = (rule for rule in l7policy.l7rules if rule.enabled and
rule.provisioning_status != constants.PENDING_DELETE)
l7rules = [self._transform_l7rule(x, feature_compatibility)
for x in l7policy.l7rules if x.enabled]
for x in rule_gen]
ret_value['l7rules'] = l7rules
return ret_value

@ -271,7 +271,9 @@ class JinjaTemplater(object):
ret_value['client_crl_path'] = '%s' % (
os.path.join(self.base_crt_dir, listener.id, client_crl))
if listener.default_pool:
if (listener.default_pool and
listener.default_pool.provisioning_status !=
constants.PENDING_DELETE):
kwargs = {}
if pool_tls_certs and pool_tls_certs.get(listener.default_pool.id):
kwargs = {'pool_tls_certs': pool_tls_certs.get(
@ -279,16 +281,20 @@ class JinjaTemplater(object):
ret_value['default_pool'] = self._transform_pool(
listener.default_pool, feature_compatibility, **kwargs)
pools = []
for x in listener.pools:
pool_gen = (pool for pool in listener.pools if
pool.provisioning_status != constants.PENDING_DELETE)
for x in pool_gen:
kwargs = {}
if pool_tls_certs and pool_tls_certs.get(x.id):
kwargs = {'pool_tls_certs': pool_tls_certs.get(x.id)}
pools.append(self._transform_pool(
x, feature_compatibility, **kwargs))
ret_value['pools'] = pools
policy_gen = (policy for policy in listener.l7policies if
policy.provisioning_status != constants.PENDING_DELETE)
l7policies = [self._transform_l7policy(
x, feature_compatibility, pool_tls_certs)
for x in listener.l7policies]
for x in policy_gen]
ret_value['l7policies'] = l7policies
return ret_value
@ -314,12 +320,16 @@ class JinjaTemplater(object):
'crl_path': '',
'tls_enabled': pool.tls_enabled
}
members_gen = (mem for mem in pool.members if
mem.provisioning_status != constants.PENDING_DELETE)
members = [self._transform_member(x, feature_compatibility)
for x in pool.members]
for x in members_gen]
ret_value['members'] = members
if pool.health_monitor:
health_mon = pool.health_monitor
if (health_mon and
health_mon.provisioning_status != constants.PENDING_DELETE):
ret_value['health_monitor'] = self._transform_health_monitor(
pool.health_monitor, feature_compatibility)
health_mon, feature_compatibility)
if pool.session_persistence:
ret_value[
'session_persistence'] = self._transform_session_persistence(
@ -403,7 +413,9 @@ class JinjaTemplater(object):
'redirect_prefix': l7policy.redirect_prefix,
'enabled': l7policy.enabled
}
if l7policy.redirect_pool:
if (l7policy.redirect_pool and
l7policy.redirect_pool.provisioning_status !=
constants.PENDING_DELETE):
kwargs = {}
if pool_tls_certs and pool_tls_certs.get(
l7policy.redirect_pool.id):
@ -419,8 +431,10 @@ class JinjaTemplater(object):
ret_value['redirect_http_code'] = l7policy.redirect_http_code
else:
ret_value['redirect_http_code'] = None
rule_gen = (rule for rule in l7policy.l7rules if rule.enabled and
rule.provisioning_status != constants.PENDING_DELETE)
l7rules = [self._transform_l7rule(x, feature_compatibility)
for x in l7policy.l7rules if x.enabled]
for x in rule_gen]
ret_value['l7rules'] = l7rules
return ret_value

@ -120,7 +120,9 @@ class LvsJinjaTemplater(object):
}
if listener.connection_limit and listener.connection_limit > -1:
ret_value['connection_limit'] = listener.connection_limit
if listener.default_pool:
if (listener.default_pool and
listener.default_pool.provisioning_status !=
constants.PENDING_DELETE):
ret_value['default_pool'] = self._transform_pool(
listener.default_pool)
return ret_value
@ -140,9 +142,13 @@ class LvsJinjaTemplater(object):
'session_persistence': '',
'enabled': pool.enabled
}
members = [self._transform_member(x) for x in pool.members]
members_gen = (mem for mem in pool.members if
mem.provisioning_status != constants.PENDING_DELETE)
members = [self._transform_member(x) for x in members_gen]
ret_value['members'] = members
if pool.health_monitor:
if (pool.health_monitor and
pool.health_monitor.provisioning_status !=
constants.PENDING_DELETE):
ret_value['health_monitor'] = self._transform_health_monitor(
pool.health_monitor)
if pool.session_persistence:

@ -28,7 +28,7 @@ class Endpoints(object):
# API version history:
# 1.0 - Initial version.
# 2.0 - Provider driver format
# 2.0 - Provider driver format.
target = messaging.Target(
namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT,
version='2.0')
@ -61,17 +61,18 @@ class Endpoints(object):
amphora_id)
self.worker.failover_amphora(amphora_id)
def create_listener(self, context, listener_id):
LOG.info('Creating listener \'%s\'...', listener_id)
self.worker.create_listener(listener_id)
def create_listener(self, context, listener):
LOG.info('Creating listener \'%s\'...', listener.get(constants.ID))
self.worker.create_listener(listener)
def update_listener(self, context, listener_id, listener_updates):
LOG.info('Updating listener \'%s\'...', listener_id)
self.worker.update_listener(listener_id, listener_updates)
def update_listener(self, context, original_listener, listener_updates):
LOG.info('Updating listener \'%s\'...', original_listener.get(
constants.ID))
self.worker.update_listener(original_listener, listener_updates)
def delete_listener(self, context, listener_id):
LOG.info('Deleting listener \'%s\'...', listener_id)
self.worker.delete_listener(listener_id)
def delete_listener(self, context, listener):
LOG.info('Deleting listener \'%s\'...', listener.get(constants.ID))
self.worker.delete_listener(listener)
def create_pool(self, context, pool_id):
LOG.info('Creating pool \'%s\'...', pool_id)

@ -21,6 +21,7 @@ from sqlalchemy.orm import exc as db_exceptions
from taskflow.listeners import logging as tf_logging
import tenacity
from octavia.api.drivers import utils as provider_utils
from octavia.common import base_taskflow
from octavia.common import constants
from octavia.controller.worker.v2.flows import amphora_flows
@ -141,15 +142,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
raise db_exceptions.NoResultFound
pool = health_mon.pool
listeners = pool.listeners
pool.health_monitor = health_mon
load_balancer = pool.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
create_hm_tf = self._taskflow_load(
self._health_monitor_flows.get_create_health_monitor_flow(),
store={constants.HEALTH_MON: health_mon,
constants.POOL: pool,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(create_hm_tf,
log=LOG):
@ -166,14 +171,18 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
id=health_monitor_id)
pool = health_mon.pool
listeners = pool.listeners
load_balancer = pool.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
delete_hm_tf = self._taskflow_load(
self._health_monitor_flows.get_delete_health_monitor_flow(),
store={constants.HEALTH_MON: health_mon,
constants.POOL: pool,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(delete_hm_tf,
log=LOG):
@ -200,7 +209,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
health_mon = e.last_attempt.result()
pool = health_mon.pool
listeners = pool.listeners
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
pool.health_monitor = health_mon
load_balancer = pool.load_balancer
@ -208,96 +221,83 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
self._health_monitor_flows.get_update_health_monitor_flow(),
store={constants.HEALTH_MON: health_mon,
constants.POOL: pool,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer,
constants.UPDATE_DICT: health_monitor_updates})
with tf_logging.DynamicLoggingListener(update_hm_tf,
log=LOG):
update_hm_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_listener(self, listener_id):
def create_listener(self, listener):
"""Creates a listener.
:param listener_id: ID of the listener to create
:param listener: A listener provider dictionary.
:returns: None
:raises NoResultFound: Unable to find the object
"""
listener = self._listener_repo.get(db_apis.get_session(),
id=listener_id)
if not listener:
db_listener = self._listener_repo.get(
db_apis.get_session(), id=listener[constants.LISTENER_ID])
if not db_listener:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'listener', listener_id)
'60 seconds.', 'listener',
listener[constants.LISTENER_ID])
raise db_exceptions.NoResultFound
load_balancer = listener.load_balancer
load_balancer = db_listener.load_balancer
listeners = load_balancer.listeners
dict_listeners = []
for l in listeners:
dict_listeners.append(
provider_utils.db_listener_to_provider_listener(l).to_dict())
create_listener_tf = self._taskflow_load(self._listener_flows.
get_create_listener_flow(),
store={constants.LOADBALANCER:
load_balancer,
constants.LISTENERS:
listeners})
create_listener_tf = self._taskflow_load(
self._listener_flows.get_create_listener_flow(),
store={constants.LISTENERS: dict_listeners,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id})
with tf_logging.DynamicLoggingListener(create_listener_tf,
log=LOG):
create_listener_tf.run()
def delete_listener(self, listener_id):
def delete_listener(self, listener):
"""Deletes a listener.
:param listener_id: ID of the listener to delete
:param listener: A listener provider dictionary to delete
:returns: None
:raises ListenerNotFound: The referenced listener was not found
"""
listener = self._listener_repo.get(db_apis.get_session(),
id=listener_id)
load_balancer = listener.load_balancer
# TODO(johnsom) Remove once the provider data model includes
# the project ID
lb = self._lb_repo.get(db_apis.get_session(),
id=listener[constants.LOADBALANCER_ID])
delete_listener_tf = self._taskflow_load(
self._listener_flows.get_delete_listener_flow(),
store={constants.LOADBALANCER: load_balancer,
constants.LISTENER: listener})
store={constants.LISTENER: listener,
constants.LOADBALANCER_ID: lb.id,
constants.PROJECT_ID: lb.project_id})
with tf_logging.DynamicLoggingListener(delete_listener_tf,
log=LOG):
delete_listener_tf.run()
def update_listener(self, listener_id, listener_updates):
def update_listener(self, listener, listener_updates):
"""Updates a listener.
:param listener_id: ID of the listener to update
:param listener: A listener provider dictionary to update
:param listener_updates: Dict containing updated listener attributes
:returns: None
:raises ListenerNotFound: The referenced listener was not found
"""
listener = None
try:
listener = self._get_db_obj_until_pending_update(
self._listener_repo, listener_id)
except tenacity.RetryError as e:
LOG.warning('Listener did not go into %s in 60 seconds. '
'This either due to an in-progress Octavia upgrade '
'or an overloaded and failing database. Assuming '
'an upgrade is in progress and continuing.',
constants.PENDING_UPDATE)
listener = e.last_attempt.result()
load_balancer = listener.load_balancer
update_listener_tf = self._taskflow_load(self._listener_flows.
get_update_listener_flow(),
store={constants.LISTENER:
listener,
constants.LOADBALANCER:
load_balancer,
constants.UPDATE_DICT:
listener_updates,
constants.LISTENERS:
[listener]})
db_lb = self._lb_repo.get(db_apis.get_session(),
id=listener[constants.LOADBALANCER_ID])
update_listener_tf = self._taskflow_load(
self._listener_flows.get_update_listener_flow(),
store={constants.LISTENER: listener,
constants.UPDATE_DICT: listener_updates,
constants.LOADBALANCER_ID: db_lb.id,
constants.LOADBALANCER: db_lb,
constants.LISTENERS: [listener]})
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run()
@ -386,14 +386,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
constants.PENDING_UPDATE)
lb = e.last_attempt.result()
listeners, _ = self._listener_repo.get_all(
db_apis.get_session(),
load_balancer_id=load_balancer_id)
update_lb_tf = self._taskflow_load(
self._lb_flows.get_update_load_balancer_flow(),
store={constants.LOADBALANCER: lb,
constants.LISTENERS: listeners,
constants.UPDATE_DICT: load_balancer_updates})
with tf_logging.DynamicLoggingListener(update_lb_tf,
@ -420,17 +415,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
raise db_exceptions.NoResultFound
pool = member.pool
listeners = pool.listeners
load_balancer = pool.load_balancer
create_member_tf = self._taskflow_load(self._member_flows.
get_create_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer,
constants.POOL: pool})
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
create_member_tf = self._taskflow_load(
self._member_flows.get_create_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer,
constants.POOL: pool})
with tf_logging.DynamicLoggingListener(create_member_tf,
log=LOG):
create_member_tf.run()
@ -445,13 +442,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
member = self._member_repo.get(db_apis.get_session(),
id=member_id)
pool = member.pool
listeners = pool.listeners
load_balancer = pool.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
delete_member_tf = self._taskflow_load(
self._member_flows.get_delete_member_flow(),
store={constants.MEMBER: member, constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer, constants.POOL: pool}
store={constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id,
constants.POOL: pool}
)
with tf_logging.DynamicLoggingListener(delete_member_tf,
log=LOG):
@ -472,14 +475,18 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
pool = new_members[0].pool
else:
pool = updated_members[0][0].pool
listeners = pool.listeners
load_balancer = pool.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
batch_update_members_tf = self._taskflow_load(
self._member_flows.get_batch_update_members_flow(
old_members, new_members, updated_members),
store={constants.LISTENERS: listeners,
store={constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id,
constants.POOL: pool})
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
log=LOG):
@ -506,20 +513,20 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
member = e.last_attempt.result()
pool = member.pool
listeners = pool.listeners
load_balancer = pool.load_balancer
update_member_tf = self._taskflow_load(self._member_flows.
get_update_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer,
constants.POOL:
pool,
constants.UPDATE_DICT:
member_updates})
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
update_member_tf = self._taskflow_load(
self._member_flows.get_update_member_flow(),
store={constants.MEMBER: member,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id,
constants.POOL: pool,
constants.UPDATE_DICT: member_updates})
with tf_logging.DynamicLoggingListener(update_member_tf,
log=LOG):
update_member_tf.run()
@ -543,16 +550,18 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
'60 seconds.', 'pool', pool_id)
raise db_exceptions.NoResultFound
listeners = pool.listeners
load_balancer = pool.load_balancer
create_pool_tf = self._taskflow_load(self._pool_flows.
get_create_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer})
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
create_pool_tf = self._taskflow_load(
self._pool_flows.get_create_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(create_pool_tf,
log=LOG):
create_pool_tf.run()
@ -568,12 +577,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
id=pool_id)
load_balancer = pool.load_balancer
listeners = pool.listeners
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
delete_pool_tf = self._taskflow_load(
self._pool_flows.get_delete_pool_flow(),
store={constants.POOL: pool, constants.LISTENERS: listeners,
constants.LOADBALANCER: load_balancer})
store={constants.POOL: pool, constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id})
with tf_logging.DynamicLoggingListener(delete_pool_tf,
log=LOG):
delete_pool_tf.run()
@ -598,18 +611,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
constants.PENDING_UPDATE)
pool = e.last_attempt.result()
listeners = pool.listeners
load_balancer = pool.load_balancer
update_pool_tf = self._taskflow_load(self._pool_flows.
get_update_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS:
listeners,
constants.LOADBALANCER:
load_balancer,
constants.UPDATE_DICT:
pool_updates})
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))
update_pool_tf = self._taskflow_load(
self._pool_flows.get_update_pool_flow(),
store={constants.POOL: pool,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: pool_updates})
with tf_logging.DynamicLoggingListener(update_pool_tf,
log=LOG):
update_pool_tf.run()
@ -633,13 +647,17 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
'60 seconds.', 'l7policy', l7policy_id)
raise db_exceptions.NoResultFound
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
create_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_create_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(create_l7policy_tf,
log=LOG):
@ -656,12 +674,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
id=l7policy_id)
load_balancer = l7policy.listener.load_balancer
listeners = [l7policy.listener]
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
delete_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_delete_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(delete_l7policy_tf,
log=LOG):
@ -687,14 +709,18 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
constants.PENDING_UPDATE)
l7policy = e.last_attempt.result()
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
update_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_update_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: l7policy_updates})
with tf_logging.DynamicLoggingListener(update_l7policy_tf,
log=LOG):
@ -720,14 +746,18 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
raise db_exceptions.NoResultFound
l7policy = l7rule.l7policy
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
create_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_create_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(create_l7rule_tf,
log=LOG):
@ -744,13 +774,17 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
id=l7rule_id)
l7policy = l7rule.l7policy
load_balancer = l7policy.listener.load_balancer
listeners = [l7policy.listener]
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
delete_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_delete_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: load_balancer})
with tf_logging.DynamicLoggingListener(delete_l7rule_tf,
log=LOG):
@ -777,15 +811,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
l7rule = e.last_attempt.result()
l7policy = l7rule.l7policy
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
listeners_dicts = (
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[l7policy.listener]))
update_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_update_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy,
constants.LISTENERS: listeners,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: load_balancer,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: l7rule_updates})
with tf_logging.DynamicLoggingListener(update_l7rule_tf,
log=LOG):

@ -19,7 +19,6 @@ from octavia.common import constants
from octavia.controller.worker.v2.tasks import amphora_driver_tasks
from octavia.controller.worker.v2.tasks import database_tasks
from octavia.controller.worker.v2.tasks import lifecycle_tasks
from octavia.controller.worker.v2.tasks import model_tasks
class HealthMonitorFlows(object):
@ -43,7 +42,7 @@ class HealthMonitorFlows(object):
create_hm_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
create_hm_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return create_hm_flow
@ -59,9 +58,6 @@ class HealthMonitorFlows(object):
constants.LOADBALANCER]))
delete_hm_flow.add(database_tasks.MarkHealthMonitorPendingDeleteInDB(
requires=constants.HEALTH_MON))
delete_hm_flow.add(model_tasks.
DeleteModelObject(rebind={constants.OBJECT:
constants.HEALTH_MON}))
delete_hm_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
delete_hm_flow.add(database_tasks.DeleteHealthMonitorInDB(
@ -75,7 +71,7 @@ class HealthMonitorFlows(object):
delete_hm_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
delete_hm_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return delete_hm_flow
@ -100,6 +96,6 @@ class HealthMonitorFlows(object):
update_hm_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
update_hm_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return update_hm_flow

@ -19,7 +19,6 @@ from octavia.common import constants
from octavia.controller.worker.v2.tasks import amphora_driver_tasks
from octavia.controller.worker.v2.tasks import database_tasks
from octavia.controller.worker.v2.tasks import lifecycle_tasks
from octavia.controller.worker.v2.tasks import model_tasks
class L7PolicyFlows(object):
@ -41,7 +40,7 @@ class L7PolicyFlows(object):
create_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
create_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return create_l7policy_flow
@ -57,14 +56,12 @@ class L7PolicyFlows(object):
constants.LOADBALANCER]))
delete_l7policy_flow.add(database_tasks.MarkL7PolicyPendingDeleteInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.L7POLICY}))
delete_l7policy_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
delete_l7policy_flow.add(database_tasks.DeleteL7PolicyInDB(
requires=constants.L7POLICY))
delete_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return delete_l7policy_flow
@ -87,6 +84,6 @@ class L7PolicyFlows(object):
update_l7policy_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
update_l7policy_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return update_l7policy_flow

@ -19,7 +19,6 @@ from octavia.common import constants
from octavia.controller.worker.v2.tasks import amphora_driver_tasks
from octavia.controller.worker.v2.tasks import database_tasks
from octavia.controller.worker.v2.tasks import lifecycle_tasks
from octavia.controller.worker.v2.tasks import model_tasks
class L7RuleFlows(object):
@ -43,7 +42,7 @@ class L7RuleFlows(object):
create_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
create_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return create_l7rule_flow
@ -59,8 +58,6 @@ class L7RuleFlows(object):
constants.LOADBALANCER]))
delete_l7rule_flow.add(database_tasks.MarkL7RulePendingDeleteInDB(
requires=constants.L7RULE))
delete_l7rule_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.L7RULE}))
delete_l7rule_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
delete_l7rule_flow.add(database_tasks.DeleteL7RuleInDB(
@ -68,7 +65,7 @@ class L7RuleFlows(object):
delete_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
delete_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return delete_l7rule_flow
@ -93,6 +90,6 @@ class L7RuleFlows(object):
update_l7rule_flow.add(database_tasks.MarkL7PolicyActiveInDB(
requires=constants.L7POLICY))
update_l7rule_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return update_l7rule_flow

@ -31,15 +31,15 @@ class ListenerFlows(object):
"""
create_listener_flow = linear_flow.Flow(constants.CREATE_LISTENER_FLOW)
create_listener_flow.add(lifecycle_tasks.ListenersToErrorOnRevertTask(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=constants.LISTENERS))
create_listener_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
create_listener_flow.add(network_tasks.UpdateVIP(
requires=constants.LOADBALANCER))
requires=constants.LISTENERS))
create_listener_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER,
constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID,
constants.LISTENERS)))
return create_listener_flow
def get_create_all_listeners_flow(self):
@ -59,7 +59,7 @@ class ListenerFlows(object):
create_all_listeners_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
create_all_listeners_flow.add(network_tasks.UpdateVIP(
requires=constants.LOADBALANCER))
requires=constants.LISTENERS))
return create_all_listeners_flow
def get_delete_listener_flow(self):
@ -73,13 +73,13 @@ class ListenerFlows(object):
delete_listener_flow.add(amphora_driver_tasks.ListenerDelete(
requires=constants.LISTENER))
delete_listener_flow.add(network_tasks.UpdateVIPForDelete(
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
delete_listener_flow.add(database_tasks.DeleteListenerInDB(
requires=constants.LISTENER))
delete_listener_flow.add(database_tasks.DecrementListenerQuota(
requires=constants.PROJECT_ID))
delete_listener_flow.add(database_tasks.MarkLBActiveInDBByListener(
requires=constants.LISTENER))
delete_listener_flow.add(database_tasks.MarkLBActiveInDB(
requires=constants.LOADBALANCER))
return delete_listener_flow
@ -94,15 +94,14 @@ class ListenerFlows(object):
# Should cascade delete all L7 policies
delete_listener_flow.add(network_tasks.UpdateVIPForDelete(
name='delete_update_vip_' + listener_name,
requires=constants.LOADBALANCER))
requires=constants.LOADBALANCER_ID))
delete_listener_flow.add(database_tasks.DeleteListenerInDB(
name='delete_listener_in_db_' + listener_name,
requires=constants.LISTENER,
rebind={constants.LISTENER: listener_name}))
delete_listener_flow.add(database_tasks.DecrementListenerQuota(
name='decrement_listener_quota_' + listener_name,
requires=constants.LISTENER,
rebind={constants.LISTENER: listener_name}))
requires=constants.PROJECT_ID))
return delete_listener_flow
@ -112,17 +111,17 @@ class ListenerFlows(object):
:returns: The flow for updating a listener
"""
update_listener_flow = linear_flow.Flow(constants.UPDATE_LISTENER_FLOW)
update_listener_flow.add(lifecycle_tasks.ListenersToErrorOnRevertTask(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
update_listener_flow.add(lifecycle_tasks.ListenerToErrorOnRevertTask(
requires=constants.LISTENER))
update_listener_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
update_listener_flow.add(network_tasks.UpdateVIP(
requires=constants.LOADBALANCER))
requires=constants.LISTENERS))
update_listener_flow.add(database_tasks.UpdateListenerInDB(
requires=[constants.LISTENER, constants.UPDATE_DICT]))
update_listener_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER,
constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID,
constants.LISTENERS)))
return update_listener_flow

@ -18,6 +18,7 @@ from oslo_log import log as logging
from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow
from octavia.api.drivers import utils as provider_utils
from octavia.common import constants
from octavia.common import exceptions
from octavia.controller.worker.v2.flows import amphora_flows
@ -216,10 +217,14 @@ class LoadBalancerFlows(object):
store = {}
for listener in lb.listeners:
listener_name = 'listener_' + listener.id
store[listener_name] = listener
prov_listener = provider_utils.db_listener_to_provider_listener(
listener)
store[listener_name] = prov_listener.to_dict()
listeners_delete_flow.add(
self.listener_flows.get_delete_listener_internal_flow(
listener_name))
store.update({constants.LOADBALANCER_ID: lb.id,
constants.PROJECT_ID: lb.project_id})
return (listeners_delete_flow, store)
def get_delete_load_balancer_flow(self, lb):

@ -20,7 +20,6 @@ from octavia.common import constants
from octavia.controller.worker.v2.tasks import amphora_driver_tasks
from octavia.controller.worker.v2.tasks import database_tasks
from octavia.controller.worker.v2.tasks import lifecycle_tasks
from octavia.controller.worker.v2.tasks import model_tasks
from octavia.controller.worker.v2.tasks import network_tasks
@ -55,8 +54,8 @@ class MemberFlows(object):
requires=constants.POOL))
create_member_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=(constants.LOADBALANCER,
constants.LISTENERS)))
requires=(constants.LISTENERS,
constants.LOADBALANCER_ID)))
return create_member_flow
@ -73,21 +72,18 @@ class MemberFlows(object):
constants.POOL]))
delete_member_flow.add(database_tasks.MarkMemberPendingDeleteInDB(
requires=constants.MEMBER))
delete_member_flow.add(model_tasks.
DeleteModelObject(rebind={constants.OBJECT:
constants.MEMBER}))
delete_member_flow.add(database_tasks.DeleteMemberInDB(
requires=constants.MEMBER))
delete_member_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
delete_member_flow.add(database_tasks.DeleteMemberInDB(
requires=constants.MEMBER))
delete_member_flow.add(database_tasks.DecrementMemberQuota(
requires=constants.MEMBER))
delete_member_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
delete_member_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER,
constants.LISTENERS]))
requires=(constants.LISTENERS,
constants.LOADBALANCER_ID)))
return delete_member_flow
@ -114,8 +110,8 @@ class MemberFlows(object):
requires=constants.POOL))
update_member_flow.add(database_tasks.
MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER,
constants.LISTENERS]))
requires=(constants.LISTENERS,
constants.LOADBALANCER_ID)))
return update_member_flow
@ -139,11 +135,6 @@ class MemberFlows(object):
name='{flow}-deleted'.format(
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
for m in old_members:
unordered_members_flow.add(
model_tasks.DeleteModelObject(
inject={constants.OBJECT: m},
name='{flow}-{id}'.format(
id=m.id, flow=constants.DELETE_MODEL_OBJECT_FLOW)))
unordered_members_flow.add(database_tasks.DeleteMemberInDB(
inject={constants.MEMBER: m},
name='{flow}-{id}'.format(
@ -203,7 +194,6 @@ class MemberFlows(object):
requires=constants.POOL))
batch_update_members_flow.add(
database_tasks.MarkLBAndListenersActiveInDB(
requires=(constants.LOADBALANCER,
constants.LISTENERS)))
requires=(constants.LISTENERS, constants.LOADBALANCER_ID)))
return batch_update_members_flow

@ -19,7 +19,6 @@ from octavia.common import constants
from octavia.controller.worker.v2.tasks import amphora_driver_tasks
from octavia.controller.worker.v2.tasks import database_tasks
from octavia.controller.worker.v2.tasks import lifecycle_tasks
from octavia.controller.worker.v2.tasks import model_tasks
class PoolFlows(object):
@ -41,7 +40,7 @@ class PoolFlows(object):
create_pool_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
create_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return create_pool_flow
@ -59,8 +58,6 @@ class PoolFlows(object):
requires=constants.POOL))
delete_pool_flow.add(database_tasks.CountPoolChildrenForQuota(
requires=constants.POOL, provides=constants.POOL_CHILD_COUNT))
delete_pool_flow.add(model_tasks.DeleteModelObject(
rebind={constants.OBJECT: constants.POOL}))
delete_pool_flow.add(amphora_driver_tasks.ListenersUpdate(
requires=constants.LOADBALANCER))
delete_pool_flow.add(database_tasks.DeletePoolInDB(
@ -68,7 +65,7 @@ class PoolFlows(object):
delete_pool_flow.add(database_tasks.DecrementPoolQuota(
requires=[constants.POOL, constants.POOL_CHILD_COUNT]))
delete_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return delete_pool_flow
@ -89,9 +86,6 @@ class PoolFlows(object):
requires=constants.POOL,
provides=constants.POOL_CHILD_COUNT,
rebind={constants.POOL: name}))
delete_pool_flow.add(model_tasks.DeleteModelObject(
name='delete_model_object_' + name,
rebind={constants.OBJECT: name}))
delete_pool_flow.add(database_tasks.DeletePoolInDB(
name='delete_pool_in_db_' + name,
requires=constants.POOL,
@ -122,6 +116,6 @@ class PoolFlows(object):
update_pool_flow.add(database_tasks.MarkPoolActiveInDB(
requires=constants.POOL))
update_pool_flow.add(database_tasks.MarkLBAndListenersActiveInDB(
requires=[constants.LOADBALANCER, constants.LISTENERS]))
requires=(constants.LOADBALANCER_ID, constants.LISTENERS)))
return update_pool_flow

@ -102,7 +102,8 @@ class ListenersUpdate(BaseAmphoraTask):
LOG.warning("Reverting listeners updates.")
for listener in loadbalancer.listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(
listener.id)
class ListenersStart(BaseAmphoraTask):
@ -127,8 +128,9 @@ class ListenerDelete(BaseAmphoraTask):
def execute(self, listener):
"""Execute listener delete routines for an amphora."""
# TODO(rm_work): This is only relevant because of UDP listeners now.
self.amphora_driver.delete(listener)
db_listener = self.listener_repo.get(
db_apis.get_session(), id=listener[constants.LISTENER_ID])
self.amphora_driver.delete(db_listener)
LOG.debug("Deleted the listener on the vip")
def revert(self, listener, *args, **kwargs):
@ -136,7 +138,8 @@ class ListenerDelete(BaseAmphoraTask):
LOG.warning("Reverting listener delete.")
self.task_utils.mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
class AmphoraGetInfo(BaseAmphoraTask):

@ -25,6 +25,7 @@ from sqlalchemy.orm import exc
from taskflow import task
from taskflow.types import failure
from octavia.api.drivers import utils as provider_utils
from octavia.common import constants
from octavia.common import data_models
import octavia.common.tls_utils.cert_parser as cert_parser
@ -250,8 +251,10 @@ class DeleteListenerInDB(BaseDatabaseTask):
:param listener: The listener to delete
:returns: None
"""
LOG.debug("Delete in DB for listener id: %s", listener.id)
self.listener_repo.delete(db_apis.get_session(), id=listener.id)
LOG.debug("Delete in DB for listener id: %s",
listener[constants.LISTENER_ID])
self.listener_repo.delete(db_apis.get_session(),
id=listener[constants.LISTENER_ID])
def revert(self, listener, *args, **kwargs):
"""Mark the listener ERROR since the listener didn't delete
@ -261,7 +264,7 @@ class DeleteListenerInDB(BaseDatabaseTask):
"""
LOG.warning("Reverting mark listener delete in DB for listener id %s",
listener.id)
listener[constants.LISTENER_ID])
class DeletePoolInDB(BaseDatabaseTask):
@ -1059,6 +1062,42 @@ class MarkLBActiveInDB(BaseDatabaseTask):
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
class MarkLBActiveInDBByListener(BaseDatabaseTask):
"""Mark the load balancer active in the DB using a listener dict.
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, listener):
"""Mark the load balancer as active in DB.
:param listener: Listener dictionary
:returns: None
"""
LOG.info("Mark ACTIVE in DB for load balancer id: %s",
listener[constants.LOADBALANCER_ID])
self.loadbalancer_repo.update(db_apis.get_session(),
listener[constants.LOADBALANCER_ID],
provisioning_status=constants.ACTIVE)
def revert(self, listener, *args, **kwargs):
"""Mark the load balancer as broken and ready to be cleaned up.
This also puts all sub-objects of the load balancer to ERROR state if
self.mark_subobjects is True
:param listener: Listener dictionary
:returns: None
"""
LOG.warning("Reverting mark load balancer active in DB "
"for load balancer id %s",
listener[constants.LOADBALANCER_ID])
self.task_utils.mark_loadbalancer_prov_status_error(
listener[constants.LOADBALANCER_ID])
class UpdateLBServerGroupInDB(BaseDatabaseTask):
"""Update the server group id info for load balancer in DB."""
@ -1167,39 +1206,55 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, loadbalancer, listeners):
def execute(self, loadbalancer_id, listeners):
"""Mark the load balancer and listeners as active in DB.
:param loadbalancer: Load balancer object to be updated
:param loadbalancer_id: The load balancer ID to be updated
:param listeners: Listener objects to be updated
:returns: None
"""
lb_id = None
if loadbalancer_id:
lb_id = loadbalancer_id
elif listeners:
lb_id = listeners[0][constants.LOADBALANCER_ID]
LOG.debug("Mark ACTIVE in DB for load balancer id: %s "
"and listener ids: %s", loadbalancer.id,
', '.join([l.id for l in listeners]))
self.loadbalancer_repo.update(db_apis.get_session(),
loadbalancer.id,
provisioning_status=constants.ACTIVE)
if lb_id:
LOG.debug("Mark ACTIVE in DB for load balancer id: %s "
"and listener ids: %s", lb_id,
', '.join([l[constants.LISTENER_ID] for l in listeners]))
self.loadbalancer_repo.update(db_apis.get_session(),
lb_id,
provisioning_status=constants.ACTIVE)
for listener in listeners:
self.listener_repo.update(db_apis.get_session(), listener.id,
provisioning_status=constants.ACTIVE)
self.listener_repo.update(
db_apis.get_session(), listener[constants.LISTENER_ID],
provisioning_status=constants.ACTIVE)
def revert(self, loadbalancer, listeners, *args, **kwargs):
def revert(self, loadbalancer_id, listeners, *args, **kwargs):
"""Mark the load balancer and listeners as broken.
:param loadbalancer: Load balancer object that failed to update
:param loadbalancer_id: The load balancer ID to be updated
:param listeners: Listener objects that failed to update
:returns: None
"""
lb_id = None
if loadbalancer_id:
lb_id = loadbalancer_id
elif listeners:
lb_id = listeners[0][constants.LOADBALANCER_ID]
if lb_id:
lists = ', '.join([l[constants.LISTENER_ID] for l in listeners])
LOG.warning("Reverting mark load balancer and listeners active in "
"DB for load balancer id %(LB)s and listener ids: "
"%(list)s", {'LB': lb_id,
'list': lists})
self.task_utils.mark_loadbalancer_prov_status_error(lb_id)
LOG.warning("Reverting mark load balancer and listeners active in DB "
"for load balancer id %(LB)s and listener ids: %(list)s",
{'LB': loadbalancer.id,
'list': ', '.join([l.id for l in listeners])})
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
class MarkListenerActiveInDB(BaseDatabaseTask):
@ -1377,8 +1432,10 @@ class UpdateListenerInDB(BaseDatabaseTask):
:returns: None
"""
LOG.debug("Update DB for listener id: %s ", listener.id)
self.listener_repo.update(db_apis.get_session(), listener.id,
LOG.debug("Update DB for listener id: %s ",
listener[constants.LISTENER_ID])
self.listener_repo.update(db_apis.get_session(),
listener[constants.LISTENER_ID],
**update_dict)
def revert(self, listener, *args, **kwargs):
@ -1389,8 +1446,9 @@ class UpdateListenerInDB(BaseDatabaseTask):
"""
LOG.warning("Reverting update listener in DB "
"for listener id %s", listener.id)
self.task_utils.mark_listener_prov_status_error(listener.id)
"for listener id %s", listener[constants.LISTENER_ID])
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
class UpdateMemberInDB(BaseDatabaseTask):
@ -1587,9 +1645,11 @@ class GetListenersFromLoadbalancer(BaseDatabaseTask):
"""
listeners = []
for listener in loadbalancer.listeners:
lb = self.listener_repo.get(db_apis.get_session(), id=listener.id)
lb.load_balancer = loadbalancer
listeners.append(lb)
db_l = self.listener_repo.get(db_apis.get_session(),
id=listener.id)
prov_listener = provider_utils.db_listener_to_provider_listener(
db_l)
listeners.append(prov_listener.to_dict())
return listeners
@ -2389,39 +2449,38 @@ class DecrementListenerQuota(BaseDatabaseTask):
Since sqlalchemy will likely retry by itself always revert if it fails
"""
def execute(self, listener):
def execute(self, project_id):
"""Decrements the listener quota.
:param listener: The listener to decrement the quota on.
:param project_id: The project_id to decrement the quota on.
:returns: None
"""
LOG.debug("Decrementing listener quota for "
"project: %s ", listener.project_id)
"project: %s ", project_id)
lock_session = db_apis.get_session(autocommit=False)
try:
self.repos.decrement_quota(lock_session,
data_models.Listener,
listener.project_id)
project_id)
lock_session.commit()
except Exception:
with excutils.save_and_reraise_exception():
LOG.error('Failed to decrement listener quota for project: '
'%(proj)s the project may have excess quota in use.',
{'proj': listener.project_id})
{'proj': project_id})
lock_session.rollback()
def revert(self, listener, result, *args, **kwargs):
def revert(self, project_id, result, *args, **kwargs):
"""Re-apply the quota
:param listener: The listener to decrement the quota on.
:param project_id: The project_id to decrement the quota on.
:returns: None
"""
LOG.warning('Reverting decrement quota for listener on project '
'%(proj)s Project quota counts may be incorrect.',
{'proj': listener.project_id})
{'proj': project_id})
# Increment the quota back if this task wasn't the failure
if not isinstance(result, failure.Failure):
@ -2433,7 +2492,7 @@ class DecrementListenerQuota(BaseDatabaseTask):
self.repos.check_quota_met(session,
lock_session,
data_models.Listener,
listener.project_id)
project_id)
lock_session.commit()
except Exception:
lock_session.rollback()

@ -14,6 +14,7 @@
from taskflow import task
from octavia.common import constants
from octavia.controller.worker import task_utils as task_utilities
@ -56,7 +57,8 @@ class HealthMonitorToErrorOnRevertTask(BaseLifecycleTask):
self.task_utils.mark_pool_prov_status_active(health_mon.pool_id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
@ -69,7 +71,8 @@ class L7PolicyToErrorOnRevertTask(BaseLifecycleTask):
self.task_utils.mark_l7policy_prov_status_error(l7policy.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
@ -83,7 +86,8 @@ class L7RuleToErrorOnRevertTask(BaseLifecycleTask):
self.task_utils.mark_l7policy_prov_status_active(l7rule.l7policy_id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
class ListenerToErrorOnRevertTask(BaseLifecycleTask):
@ -93,22 +97,24 @@ class ListenerToErrorOnRevertTask(BaseLifecycleTask):
pass
def revert(self, listener, *args, **kwargs):
self.task_utils.mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
self.task_utils.mark_loadbalancer_prov_status_active(
listener.load_balancer.id)
listener[constants.LOADBALANCER_ID])
class ListenersToErrorOnRevertTask(BaseLifecycleTask):
"""Task to set listeners to ERROR on revert."""
"""Task to set a listener to ERROR on revert."""
def execute(self, listeners, loadbalancer):
def execute(self, listeners):
pass
def revert(self, listeners, loadbalancer, *args, **kwargs):
self.task_utils.mark_loadbalancer_prov_status_active(
loadbalancer.id)
def revert(self, listeners, *args, **kwargs):
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)
self.task_utils.mark_listener_prov_status_error(
listener[constants.LISTENER_ID])
self.task_utils.mark_loadbalancer_prov_status_active(
listeners[0][constants.LOADBALANCER_ID])
class LoadBalancerIDToErrorOnRevertTask(BaseLifecycleTask):
@ -140,7 +146,8 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
def revert(self, member, listeners, loadbalancer, pool, *args, **kwargs):
self.task_utils.mark_member_prov_status_error(member.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
self.task_utils.mark_pool_prov_status_active(pool.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
@ -155,7 +162,8 @@ class MembersToErrorOnRevertTask(BaseLifecycleTask):
for m in members:
self.task_utils.mark_member_prov_status_error(m.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])
self.task_utils.mark_pool_prov_status_active(pool.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
@ -170,4 +178,5 @@ class PoolToErrorOnRevertTask(BaseLifecycleTask):
self.task_utils.mark_pool_prov_status_error(pool.id)
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_active(listener.id)
self.task_utils.mark_listener_prov_status_active(
listener[constants.LISTENER_ID])

@ -1,41 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from taskflow import task
class DeleteModelObject(task.Task):
"""Task to delete an object in a model."""
def execute(self, object):
object.delete()
class UpdateAttributes(task.Task):
"""Task to update an object for changes."""
def execute(self, object, update_dict):
"""Update an object and its associated resources.
Note: This relies on the data_model update() methods to handle complex
objects with nested objects (LoadBalancer.vip,
Pool.session_persistence, etc.)
:param object: The object will be updated.
:param update_dict: The updates dictionary.
:returns: None
"""
object.update(update_dict)

@ -22,6 +22,8 @@ from taskflow.types import failure
from octavia.common import constants
from octavia.common import utils
from octavia.controller.worker import task_utils
from octavia.db import api as db_apis
from octavia.db import repositories as repo
from octavia.network import base
from octavia.network import data_models as n_data_models
@ -36,6 +38,7 @@ class BaseNetworkTask(task.Task):
super(BaseNetworkTask, self).__init__(**kwargs)
self._network_driver = None
self.task_utils = task_utils.TaskUtils()
self.loadbalancer_repo = repo.LoadBalancerRepository()
@property
def network_driver(self):
@ -468,7 +471,10 @@ class DeallocateVIP(BaseNetworkTask):
class UpdateVIP(BaseNetworkTask):
"""Task to update a VIP."""
def execute(self, loadbalancer):
def execute(self, listeners):
loadbalancer = self.loadbalancer_repo.get(
db_apis.get_session(), id=listeners[0][constants.LOADBALANCER_ID])
LOG.debug("Updating VIP of load_balancer %s.", loadbalancer.id)
self.network_driver.update_vip(loadbalancer)
@ -477,7 +483,9 @@ class UpdateVIP(BaseNetworkTask):
class UpdateVIPForDelete(BaseNetworkTask):
"""Task to update a VIP for listener delete flows."""
def execute(self, loadbalancer):
def execute(self, loadbalancer_id):
loadbalancer = self.loadbalancer_repo.get(
db_apis.get_session(), id=loadbalancer_id)
LOG.debug("Updating VIP for listener delete on load_balancer %s.",
loadbalancer.id)

@ -124,7 +124,7 @@ class TestAmphoraDriver(base.TestRpc):
provider_listener = driver_dm.Listener(
listener_id=self.sample_data.listener1_id)
self.amp_driver.listener_create(provider_listener)
payload = {consts.LISTENER_ID: self.sample_data.listener1_id}
payload = {consts.LISTENER: provider_listener.to_dict()}
mock_cast.assert_called_with({}, 'create_listener', **payload)
@mock.patch('oslo_messaging.RPCClient.cast')
@ -132,7 +132,7 @@ class TestAmphoraDriver(base.TestRpc):
provider_listener = driver_dm.Listener(
listener_id=self.sample_data.listener1_id)
self.amp_driver.listener_delete(provider_listener)
payload = {consts.LISTENER_ID: self.sample_data.listener1_id}
payload = {consts.LISTENER: provider_listener.to_dict()}
mock_cast.assert_called_with({}, 'delete_listener', **payload)
@mock.patch('oslo_messaging.RPCClient.cast')
@ -141,10 +141,11 @@ class TestAmphoraDriver(base.TestRpc):
listener_id=self.sample_data.listener1_id)
provider_listener = driver_dm.Listener(
listener_id=self.sample_data.listener1_id, admin_state_up=False)
listener_dict = {'enabled': False}
listener_dict = provider_listener.to_dict()
listener_dict['admin_state_up'] = False
self.amp_driver.listener_update(old_provider_listener,
provider_listener)
payload = {consts.LISTENER_ID: self.sample_data.listener1_id,
payload = {consts.ORIGINAL_LISTENER: old_provider_listener.to_dict(),
consts.LISTENER_UPDATES: listener_dict}
mock_cast.assert_called_with({}, 'update_listener', **payload)
@ -154,10 +155,11 @@ class TestAmphoraDriver(base.TestRpc):
listener_id=self.sample_data.listener1_id)
provider_listener = driver_dm.Listener(
listener_id=self.sample_data.listener1_id, name='Great Listener')
listener_dict = {'name': 'Great Listener'}
listener_dict = provider_listener.to_dict()
listener_dict['name'] = 'Great Listener'
self.amp_driver.listener_update(old_provider_listener,
provider_listener)
payload = {consts.LISTENER_ID: self.sample_data.listener1_id,
payload = {consts.ORIGINAL_LISTENER: old_provider_listener.to_dict(),
consts.LISTENER_UPDATES: listener_dict}
mock_cast.assert_called_with({}, 'update_listener', **payload)
@ -690,3 +692,39 @@ class TestAmphoraDriver(base.TestRpc):
self.assertRaises(exceptions.DriverError,
self.amp_driver.validate_availability_zone,
'bogus')
@mock.patch('cryptography.fernet.Fernet')
def test_encrypt_listener_dict(self, mock_fernet):
mock_fern = mock.MagicMock()
mock_fernet.return_value = mock_fern
TEST_DATA = 'some data'
TEST_DATA2 = 'more data'
FAKE_ENCRYPTED_DATA = 'alqwkhjetrhth'
mock_fern.encrypt.return_value = FAKE_ENCRYPTED_DATA
# We need a class instance with the mock
amp_driver = driver.AmphoraProviderDriver()
# Test just default_tls_container_data
list_dict = {consts.DEFAULT_TLS_CONTAINER_DATA: TEST_DATA}
amp_driver._encrypt_listener_dict(list_dict)
mock_fern.encrypt.assert_called_once_with(TEST_DATA)
self.assertEqual(FAKE_ENCRYPTED_DATA,
list_dict[consts.DEFAULT_TLS_CONTAINER_DATA])
mock_fern.reset_mock()
# Test just sni_container_data
list_dict = {consts.SNI_CONTAINER_DATA: [TEST_DATA, TEST_DATA2]}
amp_driver._encrypt_listener_dict(list_dict)
calls = [mock.call(TEST_DATA), mock.call(TEST_DATA2)]
mock_fern.encrypt.assert_has_calls(calls)
encrypted_sni = [FAKE_ENCRYPTED_DATA, FAKE_ENCRYPTED_DATA]
self.assertEqual(encrypted_sni, list_dict[consts.SNI_CONTAINER_DATA])

@ -598,7 +598,8 @@ def sample_listener_tuple(proto=None, monitor=True, alloc_default_pool=True,
ssl_type_l7=False, pool_cert=False,
pool_ca_cert=False, pool_crl=False,
tls_enabled=False, hm_host_http_check=False,
id='sample_listener_id_1', recursive_nest=False):
id='sample_listener_id_1', recursive_nest=False,
provisioning_status=constants.ACTIVE):
proto = 'HTTP' if proto is None else proto
if be_proto is None:
be_proto = 'HTTP' if proto is 'TERMINATED_HTTPS' else proto
@ -615,7 +616,7 @@ def sample_listener_tuple(proto=None, monitor=True, alloc_default_pool=True,
'timeout_member_connect, timeout_member_data, '
'timeout_tcp_inspect, client_ca_tls_certificate_id, '
'client_ca_tls_certificate, client_authentication, '
'client_crl_container_id')
'client_crl_container_id, provisioning_status')
if l7:
pools = [
sample_pool_tuple(
@ -725,6 +726,7 @@ def sample_listener_tuple(proto=None, monitor=True, alloc_default_pool=True,
constants.CLIENT_AUTH_MANDATORY if client_ca_cert else
constants.CLIENT_AUTH_NONE),
client_crl_container_id='cont_id_crl' if client_crl_cert else '',
provisioning_status=provisioning_status,
)
if recursive_nest:
listener.load_balancer.listeners.append(listener)
@ -761,14 +763,16 @@ def sample_pool_tuple(listener_id=None, proto=None, monitor=True,
backup_member=False, disabled_member=False,
has_http_reuse=True, pool_cert=False, pool_ca_cert=False,
pool_crl=False, tls_enabled=False,
hm_host_http_check=False):
hm_host_http_check=False,
provisioning_status=constants.ACTIVE):
proto = 'HTTP' if proto is None else proto
monitor_proto = proto if monitor_proto is None else monitor_proto
in_pool = collections.namedtuple(
'pool', 'id, protocol, lb_algorithm, members, health_monitor, '
'session_persistence, enabled, operating_status, '
'tls_certificate_id, ca_tls_certificate_id, '
'crl_container_id, tls_enabled, ' + constants.HTTP_REUSE)
'crl_container_id, tls_enabled, provisioning_status, ' +
constants.HTTP_REUSE)
if (proto == constants.PROTOCOL_UDP and
persistence_type == constants.SESSION_PERSISTENCE_SOURCE_IP):
kwargs = {'persistence_type': persistence_type,
@ -810,17 +814,19 @@ def sample_pool_tuple(listener_id=None, proto=None, monitor=True,
tls_certificate_id='pool_cont_1' if pool_cert else None,
ca_tls_certificate_id='pool_ca_1' if pool_ca_cert else None,
crl_container_id='pool_crl' if pool_crl else None,
tls_enabled=tls_enabled)
tls_enabled=tls_enabled,
provisioning_status=provisioning_status)
def sample_member_tuple(id, ip, enabled=True, operating_status='ACTIVE',
provisioning_status=constants.ACTIVE,
monitor_ip_port=False, backup=False):
in_member = collections.namedtuple('member',
'id, ip_address, protocol_port, '
'weight, subnet_id, '
'enabled, operating_status, '
'monitor_address, monitor_port, '
'backup')
'backup, provisioning_status')
monitor_address = '192.168.1.1' if monitor_ip_port else None
monitor_port = 9000 if monitor_ip_port else None
return in_member(
@ -833,7 +839,7 @@ def sample_member_tuple(id, ip, enabled=True, operating_status='ACTIVE',
operating_status=operating_status,
monitor_address=monitor_address,
monitor_port=monitor_port,
backup=backup)
backup=backup, provisioning_status=provisioning_status)
def sample_session_persistence_tuple(persistence_type=None,
@ -852,12 +858,14 @@ def sample_session_persistence_tuple(persistence_type=None,
def sample_health_monitor_tuple(proto='HTTP', sample_hm=1,
host_http_check=False):
host_http_check=False,
provisioning_status=constants.ACTIVE):
proto = 'HTTP' if proto is 'TERMINATED_HTTPS' else proto
monitor = collections.namedtuple(
'monitor', 'id, type, delay, timeout, fall_threshold, rise_threshold,'
'http_method, url_path, expected_codes, enabled, '
'check_script_path, http_version, domain_name')
'check_script_path, http_version, domain_name, '
'provisioning_status')
if sample_hm == 1:
id = 'sample_monitor_id_1'
@ -875,7 +883,8 @@ def sample_health_monitor_tuple(proto='HTTP', sample_hm=1,
'http_method': 'GET',
'url_path': url_path,
'expected_codes': '418',
'enabled': True
'enabled': True,
'provisioning_status': provisioning_status,
}
if host_http_check:
kwargs.update({'http_version': 1.1, 'domain_name': 'testlab.com'})
@ -894,12 +903,14 @@ def sample_l7policy_tuple(id,
redirect_pool=None, redirect_url=None,
redirect_prefix=None,
enabled=True, redirect_http_code=302,
sample_policy=1):
sample_policy=1,
provisioning_status=constants.ACTIVE):
in_l7policy = collections.namedtuple('l7policy',
'id, action, redirect_pool, '
'redirect_url, redirect_prefix, '
'l7rules, enabled,'
'redirect_http_code')
'redirect_http_code,'
'provisioning_status')
l7rules = []
if sample_policy == 1:
action = constants.L7POLICY_ACTION_REDIRECT_TO_POOL
@ -947,7 +958,8 @@ def sample_l7policy_tuple(id,
redirect_http_code=redirect_http_code
if (action in [constants.L7POLICY_ACTION_REDIRECT_TO_URL,
constants.L7POLICY_ACTION_REDIRECT_PREFIX] and
redirect_http_code) else None)
redirect_http_code) else None,
provisioning_status=provisioning_status)
def sample_l7rule_tuple(id,
@ -957,10 +969,12 @@ def sample_l7rule_tuple(id,
value='/api',
invert=False,
enabled=True,
sample_rule=1):
sample_rule=1,
provisioning_status=constants.ACTIVE):
in_l7rule = collections.namedtuple('l7rule',
'id, type, compare_type, '
'key, value, invert, enabled')
'key, value, invert, enabled,'
'provisioning_status')
if sample_rule == 2:
type = constants.L7RULE_TYPE_HEADER
compare_type = constants.L7RULE_COMPARE_TYPE_CONTAINS
@ -1031,7 +1045,8 @@ def sample_l7rule_tuple(id,
key=key,
value=value,
invert=invert,
enabled=enabled)
enabled=enabled,
provisioning_status=provisioning_status)
def sample_base_expected_config(frontend=None, logging=None, backend=None,

@ -621,7 +621,8 @@ def sample_listener_tuple(proto=None, monitor=True, alloc_default_pool=True,
ssl_type_l7=False, pool_cert=False,
pool_ca_cert=False, pool_crl=False,
tls_enabled=False, hm_host_http_check=False,
id='sample_listener_id_1', recursive_nest=False):
id='sample_listener_id_1', recursive_nest=False,
provisioning_status=constants.ACTIVE):
proto = 'HTTP' if proto is None else proto
if be_proto is None:
be_proto = 'HTTP' if proto is 'TERMINATED_HTTPS' else proto
@ -638,7 +639,7 @@ def sample_listener_tuple(proto=None, monitor=True, alloc_default_pool=True,
'timeout_member_connect, timeout_member_data, '
'timeout_tcp_inspect, client_ca_tls_certificate_id, '
'client_ca_tls_certificate, client_authentication, '
'client_crl_container_id')
'client_crl_container_id, provisioning_status')
if l7:
pools = [
sample_pool_tuple(
@ -744,6 +745,7 @@ def sample_listener_tuple(proto=None, monitor=True, alloc_default_pool=True,
constants.CLIENT_AUTH_MANDATORY if client_ca_cert else
constants.CLIENT_AUTH_NONE),
client_crl_container_id='cont_id_crl' if client_crl_cert else '',
provisioning_status=provisioning_status,
)
if recursive_nest:
listener.load_balancer.listeners.append(listener)
@ -779,14 +781,16 @@ def sample_pool_tuple(proto=None, monitor=True, persistence=True,
monitor_proto=None, backup_member=False,
disabled_member=False, has_http_reuse=True,
pool_cert=False, pool_ca_cert=False, pool_crl=False,
tls_enabled=False, hm_host_http_check=False):
tls_enabled=False, hm_host_http_check=False,
provisioning_status=constants.ACTIVE):
proto = 'HTTP' if proto is None else proto
monitor_proto = proto if monitor_proto is None else monitor_proto
in_pool = collections.namedtuple(
'pool', 'id, protocol, lb_algorithm, members, health_monitor, '
'session_persistence, enabled, operating_status, '
'tls_certificate_id, ca_tls_certificate_id, '
'crl_container_id, tls_enabled, ' + constants.HTTP_REUSE)
'crl_container_id, tls_enabled, provisioning_status, ' +
constants.HTTP_REUSE)
if (proto == constants.PROTOCOL_UDP and
persistence_type == constants.SESSION_PERSISTENCE_SOURCE_IP):
kwargs = {'persistence_type': persistence_type,
@ -829,17 +833,19 @@ def sample_pool_tuple(proto=None, monitor=True, persistence=True,
tls_certificate_id='pool_cont_1' if pool_cert else None,
ca_tls_certificate_id='pool_ca_1' if pool_ca_cert else None,
crl_container_id='pool_crl' if pool_crl else None,
tls_enabled=tls_enabled)
tls_enabled=tls_enabled, provisioning_status=provisioning_status)
def sample_member_tuple(id, ip, enabled=True, operating_status='ACTIVE',
def sample_member_tuple(id, ip, enabled=True,
operating_status=constants.ACTIVE,
provisioning_status=constants.ACTIVE,
monitor_ip_port=False, backup=False):
in_member = collections.namedtuple('member',
'id, ip_address, protocol_port, '
'weight, subnet_id, '
'enabled, operating_status, '
'monitor_address, monitor_port, '
'backup')
'backup, provisioning_status')
monitor_address = '192.168.1.1' if monitor_ip_port else None
monitor_port = 9000 if monitor_ip_port else None
return in_member(
@ -852,7 +858,7 @@ def sample_member_tuple(id, ip, enabled=True, operating_status='ACTIVE',
operating_status=operating_status,
monitor_address=monitor_address,
monitor_port=monitor_port,
backup=backup)
backup=backup, provisioning_status=provisioning_status)
def sample_session_persistence_tuple(persistence_type=None,
@ -871,12 +877,14 @@ def sample_session_persistence_tuple(persistence_type=None,
def sample_health_monitor_tuple(proto='HTTP', sample_hm=1,
host_http_check=False):
host_http_check=False,
provisioning_status=constants.ACTIVE):
proto = 'HTTP' if proto is 'TERMINATED_HTTPS' else proto
monitor = collections.namedtuple(
'monitor', 'id, type, delay, timeout, fall_threshold, rise_threshold,'
'http_method, url_path, expected_codes, enabled, '
'check_script_path, http_version, domain_name')
'check_script_path, http_version, domain_name, '
'provisioning_status')
if sample_hm == 1:
id = 'sample_monitor_id_1'
@ -894,7 +902,8 @@ def sample_health_monitor_tuple(proto='HTTP', sample_hm=1,
'http_method': 'GET',
'url_path': url_path,
'expected_codes': '418',
'enabled': True
'enabled': True,
'provisioning_status': provisioning_status,
}
if host_http_check:
kwargs.update({'http_version': 1.1, 'domain_name': 'testlab.com'})
@ -913,12 +922,14 @@ def sample_l7policy_tuple(id,
redirect_pool=None, redirect_url=None,
redirect_prefix=None,
enabled=True, redirect_http_code=302,
sample_policy=1):
sample_policy=1,
provisioning_status=constants.ACTIVE):
in_l7policy = collections.namedtuple('l7policy',
'id, action, redirect_pool, '
'redirect_url, redirect_prefix, '
'l7rules, enabled,'
'redirect_http_code')
'l7rules, enabled, '
'redirect_http_code, '
'provisioning_status')
l7rules = []
if sample_policy == 1:
action = constants.L7POLICY_ACTION_REDIRECT_TO_POOL
@ -966,20 +977,20 @@ def sample_l7policy_tuple(id,
redirect_http_code=redirect_http_code
if (action in [constants.L7POLICY_ACTION_REDIRECT_TO_URL,
constants.L7POLICY_ACTION_REDIRECT_PREFIX] and
redirect_http_code) else None)
redirect_http_code) else None,
provisioning_status=provisioning_status)
def sample_l7rule_tuple(id,
type=constants.L7RULE_TYPE_PATH,
compare_type=constants.L7RULE_COMPARE_TYPE_STARTS_WITH,
key=None,
value='/api',
invert=False,
enabled=True,
sample_rule=1):
key=None, value='/api',
invert=False, enabled=True,
sample_rule=1, provisioning_status=constants.ACTIVE):
in_l7rule = collections.namedtuple('l7rule',
'id, type, compare_type, '
'key, value, invert, enabled')
'key, value, invert, enabled, '
'provisioning_status')
if sample_rule == 2:
type = constants.L7RULE_TYPE_HEADER
compare_type = constants.L7RULE_COMPARE_TYPE_CONTAINS
@ -1050,7 +1061,7 @@ def sample_l7rule_tuple(id,
key=key,
value=value,
invert=invert,
enabled=enabled)
enabled=enabled, provisioning_status=provisioning_status)
def sample_base_expected_config(frontend=None, logging=None, backend=None,

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.common import constants
from octavia.controller.queue.v2 import endpoints
from octavia.tests.unit import base
@ -38,6 +39,7 @@ class TestEndpoints(base.TestCase):
self.resource_updates = {}
self.resource_id = 1234
self.server_group_id = 3456
self.listener_dict = {constants.ID: uuidutils.generate_uuid()}
self.flavor_id = uuidutils.generate_uuid()
def test_create_load_balancer(self):
@ -73,20 +75,20 @@ class TestEndpoints(base.TestCase):
self.resource_id)
def test_create_listener(self):
self.ep.create_listener(self.context, self.resource_id)
self.ep.create_listener(self.context, self.listener_dict)
self.ep.worker.create_listener.assert_called_once_with(
self.resource_id)
self.listener_dict)
def test_update_listener(self):
self.ep.update_listener(self.context, self.resource_id,
self.ep.update_listener(self.context, self.listener_dict,
self.resource_updates)
self.ep.worker.update_listener.assert_called_once_with(
self.resource_id, self.resource_updates)
self.listener_dict, self.resource_updates)
def test_delete_listener(self):
self.ep.delete_listener(self.context, self.resource_id)
self.ep.delete_listener(self.context, self.listener_dict)
self.ep.worker.delete_listener.assert_called_once_with(
self.resource_id)
self.listener_dict)
def test_create_pool(self):
self.ep.create_pool(self.context, self.resource_id)

@ -36,9 +36,10 @@ class TestHealthMonitorFlows(base.TestCase):
self.assertIn(constants.LISTENERS, health_mon_flow.requires)
self.assertIn(constants.LOADBALANCER, health_mon_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, health_mon_flow.requires)
self.assertIn(constants.POOL, health_mon_flow.requires)
self.assertEqual(4, len(health_mon_flow.requires))
self.assertEqual(5, len(health_mon_flow.requires))
self.assertEqual(0, len(health_mon_flow.provides))
def test_get_delete_health_monitor_flow(self):
@ -51,9 +52,10 @@ class TestHealthMonitorFlows(base.TestCase):
self.assertIn(constants.HEALTH_MON, health_mon_flow.requires)
self.assertIn(constants.LISTENERS, health_mon_flow.requires)
self.assertIn(constants.LOADBALANCER, health_mon_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, health_mon_flow.requires)
self.assertIn(constants.POOL, health_mon_flow.requires)
self.assertEqual(4, len(health_mon_flow.requires))
self.assertEqual(5, len(health_mon_flow.requires))
self.assertEqual(0, len(health_mon_flow.provides))
def test_get_update_health_monitor_flow(self):
@ -65,8 +67,9 @@ class TestHealthMonitorFlows(base.TestCase):
self.assertIn(constants.LISTENERS, health_mon_flow.requires)
self.assertIn(constants.LOADBALANCER, health_mon_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, health_mon_flow.requires)
self.assertIn(constants.HEALTH_MON, health_mon_flow.requires)
self.assertIn(constants.UPDATE_DICT, health_mon_flow.requires)
self.assertEqual(5, len(health_mon_flow.requires))
self.assertEqual(6, len(health_mon_flow.requires))
self.assertEqual(0, len(health_mon_flow.provides))

@ -35,8 +35,9 @@ class TestL7PolicyFlows(base.TestCase):
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7policy_flow.requires)
self.assertEqual(3, len(l7policy_flow.requires))
self.assertEqual(4, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))
def test_get_delete_l7policy_flow(self):
@ -47,9 +48,10 @@ class TestL7PolicyFlows(base.TestCase):
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7policy_flow.requires)
self.assertIn(constants.L7POLICY, l7policy_flow.requires)
self.assertEqual(3, len(l7policy_flow.requires))
self.assertEqual(4, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))
def test_get_update_l7policy_flow(self):
@ -61,7 +63,8 @@ class TestL7PolicyFlows(base.TestCase):
self.assertIn(constants.L7POLICY, l7policy_flow.requires)
self.assertIn(constants.LISTENERS, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER, l7policy_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7policy_flow.requires)
self.assertIn(constants.UPDATE_DICT, l7policy_flow.requires)
self.assertEqual(4, len(l7policy_flow.requires))
self.assertEqual(5, len(l7policy_flow.requires))
self.assertEqual(0, len(l7policy_flow.provides))

@ -35,8 +35,9 @@ class TestL7RuleFlows(base.TestCase):
self.assertIn(constants.LISTENERS, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7rule_flow.requires)
self.assertEqual(4, len(l7rule_flow.requires))
self.assertEqual(5, len(l7rule_flow.requires))
self.assertEqual(0, len(l7rule_flow.provides))
def test_get_delete_l7rule_flow(self):
@ -47,9 +48,10 @@ class TestL7RuleFlows(base.TestCase):
self.assertIn(constants.LISTENERS, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7rule_flow.requires)
self.assertIn(constants.L7RULE, l7rule_flow.requires)
self.assertEqual(4, len(l7rule_flow.requires))
self.assertEqual(5, len(l7rule_flow.requires))
self.assertEqual(0, len(l7rule_flow.provides))
def test_get_update_l7rule_flow(self):
@ -61,7 +63,8 @@ class TestL7RuleFlows(base.TestCase):
self.assertIn(constants.L7RULE, l7rule_flow.requires)
self.assertIn(constants.LISTENERS, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER, l7rule_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, l7rule_flow.requires)
self.assertIn(constants.UPDATE_DICT, l7rule_flow.requires)
self.assertEqual(5, len(l7rule_flow.requires))
self.assertEqual(6, len(l7rule_flow.requires))
self.assertEqual(0, len(l7rule_flow.provides))

@ -36,10 +36,11 @@ class TestListenerFlows(base.TestCase):
self.assertIsInstance(listener_flow, flow.Flow)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertIn(constants.LISTENERS, listener_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertEqual(2, len(listener_flow.requires))
self.assertEqual(3, len(listener_flow.requires))
self.assertEqual(0, len(listener_flow.provides))
def test_get_delete_listener_flow(self, mock_get_net_driver):
@ -49,9 +50,10 @@ class TestListenerFlows(base.TestCase):
self.assertIsInstance(listener_flow, flow.Flow)
self.assertIn(constants.LISTENER, listener_flow.requires)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
self.assertIn(constants.PROJECT_ID, listener_flow.requires)
self.assertEqual(2, len(listener_flow.requires))
self.assertEqual(3, len(listener_flow.requires))
self.assertEqual(0, len(listener_flow.provides))
def test_get_delete_listener_internal_flow(self, mock_get_net_driver):
@ -61,9 +63,9 @@ class TestListenerFlows(base.TestCase):
self.assertIsInstance(listener_flow, flow.Flow)
self.assertIn('test-listener', listener_flow.requires)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertIn(constants.PROJECT_ID, listener_flow.requires)
self.assertEqual(2, len(listener_flow.requires))
self.assertEqual(3, len(listener_flow.requires))
self.assertEqual(0, len(listener_flow.provides))
def test_get_update_listener_flow(self, mock_get_net_driver):
@ -73,11 +75,12 @@ class TestListenerFlows(base.TestCase):
self.assertIsInstance(listener_flow, flow.Flow)
self.assertIn(constants.LISTENER, listener_flow.requires)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertIn(constants.UPDATE_DICT, listener_flow.requires)
self.assertIn(constants.LISTENERS, listener_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, listener_flow.requires)
self.assertIn(constants.LOADBALANCER, listener_flow.requires)
self.assertEqual(4, len(listener_flow.requires))
self.assertEqual(5, len(listener_flow.requires))
self.assertEqual(0, len(listener_flow.provides))
def test_get_create_all_listeners_flow(self, mock_get_net_driver):

@ -101,7 +101,11 @@ class TestLoadBalancerFlows(base.TestCase):
lb_mock = mock.Mock()
listener_mock = mock.Mock()
listener_mock.id = '123'
listener_dict = {constants.LISTENER_ID: '123'}
listener_mock.to_dict.return_value = {'id': '123'}
lb_mock.listeners = [listener_mock]
lb_mock.id = '321'
lb_mock.project_id = '876'
pool_mock = mock.Mock()
pool_mock.id = '345'
lb_mock.pools = [pool_mock]
@ -113,13 +117,15 @@ class TestLoadBalancerFlows(base.TestCase):
lb_mock)
self.assertIsInstance(lb_flow, flow.Flow)
self.assertEqual({'listener_123': listener_mock,
self.assertEqual({'listener_123': listener_dict,
constants.LOADBALANCER_ID: lb_mock.id,
constants.PROJECT_ID: lb_mock.project_id,
'pool345': pool_mock}, store)
self.assertIn(constants.LOADBALANCER, lb_flow.requires)
self.assertEqual(1, len(lb_flow.provides))
self.assertEqual(4, len(lb_flow.requires))
self.assertEqual(6, len(lb_flow.requires))
def test_get_update_load_balancer_flow(self, mock_get_net_driver):

@ -39,9 +39,10 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
self.assertIn(constants.POOL, member_flow.requires)
self.assertEqual(4, len(member_flow.requires))
self.assertEqual(5, len(member_flow.requires))
self.assertEqual(2, len(member_flow.provides))
def test_get_delete_member_flow(self, mock_get_net_driver):
@ -51,11 +52,12 @@ class TestMemberFlows(base.TestCase):
self.assertIsInstance(member_flow, flow.Flow)
self.assertIn(constants.MEMBER, member_flow.requires)
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
self.assertIn(constants.POOL, member_flow.requires)
self.assertEqual(4, len(member_flow.requires))
self.assertEqual(5, len(member_flow.requires))
self.assertEqual(0, len(member_flow.provides))
def test_get_update_member_flow(self, mock_get_net_driver):
@ -67,10 +69,11 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.MEMBER, member_flow.requires)
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
self.assertIn(constants.POOL, member_flow.requires)
self.assertIn(constants.UPDATE_DICT, member_flow.requires)
self.assertEqual(5, len(member_flow.requires))
self.assertEqual(6, len(member_flow.requires))
self.assertEqual(0, len(member_flow.provides))
def test_get_batch_update_members_flow(self, mock_get_net_driver):
@ -82,7 +85,8 @@ class TestMemberFlows(base.TestCase):
self.assertIn(constants.LISTENERS, member_flow.requires)
self.assertIn(constants.LOADBALANCER, member_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, member_flow.requires)
self.assertIn(constants.POOL, member_flow.requires)
self.assertEqual(3, len(member_flow.requires))
self.assertEqual(4, len(member_flow.requires))
self.assertEqual(2, len(member_flow.provides))

@ -35,8 +35,9 @@ class TestPoolFlows(base.TestCase):
self.assertIn(constants.LISTENERS, pool_flow.requires)
self.assertIn(constants.LOADBALANCER, pool_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, pool_flow.requires)
self.assertEqual(3, len(pool_flow.requires))
self.assertEqual(4, len(pool_flow.requires))
self.assertEqual(0, len(pool_flow.provides))
def test_get_delete_pool_flow(self):
@ -47,9 +48,10 @@ class TestPoolFlows(base.TestCase):
self.assertIn(constants.LISTENERS, pool_flow.requires)
self.assertIn(constants.LOADBALANCER, pool_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, pool_flow.requires)
self.assertIn(constants.POOL, pool_flow.requires)
self.assertEqual(3, len(pool_flow.requires))
self.assertEqual(4, len(pool_flow.requires))
self.assertEqual(1, len(pool_flow.provides))
def test_get_delete_pool_flow_internal(self):
@ -71,7 +73,8 @@ class TestPoolFlows(base.TestCase):
self.assertIn(constants.POOL, pool_flow.requires)
self.assertIn(constants.LISTENERS, pool_flow.requires)
self.assertIn(constants.LOADBALANCER, pool_flow.requires)
self.assertIn(constants.LOADBALANCER_ID, pool_flow.requires)
self.assertIn(constants.UPDATE_DICT, pool_flow.requires)
self.assertEqual(4, len(pool_flow.requires))
self.assertEqual(5, len(pool_flow.requires))
self.assertEqual(0, len(pool_flow.provides))

@ -107,7 +107,9 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_update.assert_called_once_with(
_session_mock, AMP_ID, status=constants.ERROR)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_listener_update(self,
mock_lb_get,
mock_driver,
mock_generate_uuid,
mock_log,
@ -139,7 +141,9 @@ class TestAmphoraDriverTasks(base.TestCase):
provisioning_status=constants.ERROR)
self.assertIsNone(amp)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
def test_listeners_update(self,
mock_lb_get,
mock_driver,
mock_generate_uuid,
mock_log,
@ -148,14 +152,21 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_update):
listeners_update_obj = amphora_driver_tasks.ListenersUpdate()
LB_ID = 'lb1'
listeners = [data_models.Listener(id='listener1'),
data_models.Listener(id='listener2')]
vip = data_models.Vip(ip_address='10.0.0.1')
lb = data_models.LoadBalancer(id='lb1', listeners=listeners, vip=vip)
lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners, vip=vip)
mock_lb_get.return_value = lb
listeners_update_obj.execute(lb)
mock_driver.update.assert_called_once_with(lb)
self.assertEqual(1, mock_driver.update.call_count)
mock_lb_get.reset_mock()
listeners_update_obj.execute(None)
mock_lb_get.assert_not_called()
# Test the revert
amp = listeners_update_obj.revert(lb)
expected_db_calls = [mock.call(_session_mock,
@ -209,13 +220,14 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_listener_repo_update,
mock_amphora_repo_update):
listener_dict = {constants.LISTENER_ID: LISTENER_ID}
listener_delete_obj = amphora_driver_tasks.ListenerDelete()
listener_delete_obj.execute(_listener_mock)
listener_delete_obj.execute(listener_dict)
mock_driver.delete.assert_called_once_with(_listener_mock)
# Test the revert
amp = listener_delete_obj.revert(_listener_mock)
amp = listener_delete_obj.revert(listener_dict)
repo.ListenerRepository.update.assert_called_once_with(
_session_mock,
id=LISTENER_ID,
@ -225,7 +237,7 @@ class TestAmphoraDriverTasks(base.TestCase):
# Test the revert with exception
repo.ListenerRepository.update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
amp = listener_delete_obj.revert(_listener_mock)
amp = listener_delete_obj.revert(listener_dict)
repo.ListenerRepository.update.assert_called_once_with(
_session_mock,
id=LISTENER_ID,

@ -77,7 +77,10 @@ _l7policy_mock.id = L7POLICY_ID
_l7rule_mock = mock.MagicMock()
_l7rule_mock.id = L7RULE_ID
_listener_mock = mock.MagicMock()
_listener_to_dict_mock = mock.MagicMock(
return_value={'id': LISTENER_ID})
_listener_mock.id = LISTENER_ID
_listener_mock.to_dict = _listener_to_dict_mock
_tf_failure_mock = mock.Mock(spec=failure.Failure)
_vip_mock = mock.MagicMock()
_vip_mock.port_id = PORT_ID
@ -184,12 +187,17 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_delete):
delete_listener = database_tasks.DeleteListenerInDB()
delete_listener.execute(_listener_mock)
delete_listener.execute({constants.LISTENER_ID: LISTENER_ID})
repo.ListenerRepository.delete.assert_called_once_with(
'TEST',
id=LISTENER_ID)
# Test the revert
repo.ListenerRepository.delete.reset_mock()
delete_listener.revert({constants.LISTENER_ID: LISTENER_ID})
repo.ListenerRepository.delete.assert_not_called()
@mock.patch('octavia.db.repositories.HealthMonitorRepository.update')
@mock.patch('octavia.db.repositories.HealthMonitorRepository.delete')
def test_delete_health_monitor_in_db(self,
@ -1000,10 +1008,11 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_update,
mock_amphora_repo_delete):
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
mark_lb_and_listeners_active = (database_tasks.
MarkLBAndListenersActiveInDB())
mark_lb_and_listeners_active.execute(self.loadbalancer_mock,
[self.listener_mock])
mark_lb_and_listeners_active.execute(LB_ID, [listener_dict])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
@ -1014,12 +1023,35 @@ class TestDatabaseTasks(base.TestCase):
LB_ID,
provisioning_status=constants.ACTIVE)
# Test with LB_ID from listeners
mock_loadbalancer_repo_update.reset_mock()
mock_listener_repo_update.reset_mock()
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
mark_lb_and_listeners_active = (database_tasks.
MarkLBAndListenersActiveInDB())
mark_lb_and_listeners_active.execute(None, [listener_dict])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
LISTENER_ID,
provisioning_status=constants.ACTIVE)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
LB_ID,
provisioning_status=constants.ACTIVE)
# Test with no LB_ID
mock_loadbalancer_repo_update.reset_mock()
mark_lb_and_listeners_active.execute(None, [])
mock_loadbalancer_repo_update.assert_not_called()
# Test the revert
mock_loadbalancer_repo_update.reset_mock()
mock_listener_repo_update.reset_mock()
mark_lb_and_listeners_active.revert(self.loadbalancer_mock,
[self.listener_mock])
mark_lb_and_listeners_active.revert(LB_ID, [listener_dict])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
@ -1030,14 +1062,37 @@ class TestDatabaseTasks(base.TestCase):
id=LB_ID,
provisioning_status=constants.ERROR)
# Test the revert LB_ID from listeners
mock_loadbalancer_repo_update.reset_mock()
mock_listener_repo_update.reset_mock()
mark_lb_and_listeners_active.revert(None, [listener_dict])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
provisioning_status=constants.ERROR)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
# Test the revert no LB_ID
mock_loadbalancer_repo_update.reset_mock()
mock_listener_repo_update.reset_mock()
mark_lb_and_listeners_active.revert(None, [])
mock_loadbalancer_repo_update.assert_not_called()
mock_listener_repo_update.assert_not_called()
# Test the revert with exceptions
mock_loadbalancer_repo_update.reset_mock()
mock_loadbalancer_repo_update.side_effect = Exception('fail')
mock_listener_repo_update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
mark_lb_and_listeners_active.revert(self.loadbalancer_mock,
[self.listener_mock])
mark_lb_and_listeners_active.revert(LB_ID, [listener_dict])
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
@ -1127,6 +1182,46 @@ class TestDatabaseTasks(base.TestCase):
provisioning_status=constants.ERROR)
self.assertEqual(0, repo.ListenerRepository.update.call_count)
def test_mark_LB_active_in_db_by_listener(self,
mock_generate_uuid,
mock_LOG,
mock_get_session,
mock_loadbalancer_repo_update,
mock_listener_repo_update,
mock_amphora_repo_update,
mock_amphora_repo_delete):
listener_dict = {'loadbalancer_id': LB_ID}
mark_loadbalancer_active = database_tasks.MarkLBActiveInDBByListener()
mark_loadbalancer_active.execute(listener_dict)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
LB_ID,
provisioning_status=constants.ACTIVE)
self.assertEqual(0, repo.ListenerRepository.update.call_count)
# Test the revert
mock_loadbalancer_repo_update.reset_mock()
mark_loadbalancer_active.revert(listener_dict)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
self.assertEqual(0, repo.ListenerRepository.update.call_count)
# Test the revert with exception
mock_loadbalancer_repo_update.reset_mock()
mock_loadbalancer_repo_update.side_effect = Exception('fail')
mark_loadbalancer_active.revert(listener_dict)
repo.LoadBalancerRepository.update.assert_called_once_with(
'TEST',
id=LB_ID,
provisioning_status=constants.ERROR)
self.assertEqual(0, repo.ListenerRepository.update.call_count)
def test_mark_LB_active_in_db_and_listeners(self,
mock_generate_uuid,
mock_LOG,
@ -1486,7 +1581,8 @@ class TestDatabaseTasks(base.TestCase):
mock_amphora_repo_delete):
update_listener = database_tasks.UpdateListenerInDB()
update_listener.execute(self.listener_mock,
listener_dict = {constants.LISTENER_ID: LISTENER_ID}
update_listener.execute(listener_dict,
{'name': 'test', 'description': 'test2'})
repo.ListenerRepository.update.assert_called_once_with(
@ -1496,7 +1592,7 @@ class TestDatabaseTasks(base.TestCase):
# Test the revert
mock_listener_repo_update.reset_mock()
update_listener.revert(self.listener_mock)
update_listener.revert(listener_dict)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
@ -1505,7 +1601,7 @@ class TestDatabaseTasks(base.TestCase):
# Test the revert
mock_listener_repo_update.reset_mock()
mock_listener_repo_update.side_effect = Exception('fail')
update_listener.revert(self.listener_mock)
update_listener.revert(listener_dict)
repo.ListenerRepository.update.assert_called_once_with(
'TEST',
id=LISTENER_ID,
@ -1798,7 +1894,7 @@ class TestDatabaseTasks(base.TestCase):
get_list_from_lb_obj = database_tasks.GetListenersFromLoadbalancer()
result = get_list_from_lb_obj.execute(_loadbalancer_mock)
mock_listener_get.assert_called_once_with('TEST', id=_listener_mock.id)
self.assertEqual([_listener_mock], result)
self.assertEqual([{constants.LISTENER_ID: LISTENER_ID}], result)
def test_get_vip_from_loadbalancer(self,
mock_generate_uuid,

@ -35,16 +35,16 @@ class TestDatabaseTasksQuota(base.TestCase):
@mock.patch('octavia.db.api.get_session', return_value='TEST')
@mock.patch('octavia.db.repositories.Repositories.decrement_quota')
@mock.patch('octavia.db.repositories.Repositories.check_quota_met')
def _test_decrement_quota(self,
task,
data_model,
mock_check_quota_met,
mock_decrement_quota,
mock_get_session):
project_id = uuidutils.generate_uuid()
test_object = mock.MagicMock()
test_object.project_id = project_id
def _test_decrement_quota(self, task, data_model,
mock_check_quota_met, mock_decrement_quota,
mock_get_session, project_id=None):
test_object = None
if project_id:
test_object = project_id
else:
project_id = uuidutils.generate_uuid()
test_object = mock.MagicMock()
test_object.project_id = project_id
# execute without exception
mock_decrement_quota.reset_mock()
@ -105,6 +105,7 @@ class TestDatabaseTasksQuota(base.TestCase):
'get_session') as mock_get_session_local:
mock_session = mock.MagicMock()
mock_lock_session = mock.MagicMock()
mock_get_session_local.side_effect = [mock_session,
mock_lock_session]
@ -124,8 +125,10 @@ class TestDatabaseTasksQuota(base.TestCase):
'get_session') as mock_get_session_local:
mock_session = mock.MagicMock()
mock_lock_session = mock.MagicMock()
mock_get_session_local.side_effect = [mock_session,
mock_lock_session]
mock_check_quota_met.side_effect = (
exceptions.OctaviaException('fail'))
@ -155,9 +158,10 @@ class TestDatabaseTasksQuota(base.TestCase):
self._test_decrement_quota(task, data_model)
def test_decrement_listener_quota(self):
project_id = uuidutils.generate_uuid()
task = database_tasks.DecrementListenerQuota()
data_model = data_models.Listener
self._test_decrement_quota(task, data_model)
self._test_decrement_quota(task, data_model, project_id=project_id)
def test_decrement_loadbalancer_quota(self):
task = database_tasks.DecrementLoadBalancerQuota()

@ -15,6 +15,7 @@
import mock
from oslo_utils import uuidutils
from octavia.common import constants
from octavia.controller.worker.v2.tasks import lifecycle_tasks
import octavia.tests.unit.base as base
@ -37,12 +38,12 @@ class TestLifecycleTasks(base.TestCase):
self.L7RULE.id = self.L7RULE_ID
self.LISTENER = mock.MagicMock()
self.LISTENER_ID = uuidutils.generate_uuid()
self.LISTENER.id = self.LISTENER_ID
self.LISTENER = {constants.LISTENER_ID: self.LISTENER_ID}
self.LISTENERS = [self.LISTENER]
self.LOADBALANCER = mock.MagicMock()
self.LOADBALANCER_ID = uuidutils.generate_uuid()
self.LOADBALANCER.id = self.LOADBALANCER_ID
self.LISTENER.load_balancer = self.LOADBALANCER
self.LISTENER[constants.LOADBALANCER_ID] = self.LOADBALANCER_ID
self.MEMBER = mock.MagicMock()
self.MEMBER_ID = uuidutils.generate_uuid()
self.MEMBER.id = self.MEMBER_ID
@ -234,14 +235,12 @@ class TestLifecycleTasks(base.TestCase):
ListenersToErrorOnRevertTask())
# Execute
listeners_to_error_on_revert.execute(self.LISTENERS,
self.LOADBALANCER)
listeners_to_error_on_revert.execute([self.LISTENER])
self.assertFalse(mock_listener_prov_status_error.called)
# Revert
listeners_to_error_on_revert.revert(self.LISTENERS,
self.LOADBALANCER)
listeners_to_error_on_revert.revert([self.LISTENER])
mock_listener_prov_status_error.assert_called_once_with(
self.LISTENER_ID)

@ -1,44 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from octavia.controller.worker.v2.tasks import model_tasks
import octavia.tests.unit.base as base
class TestObjectUpdateTasks(base.TestCase):
def setUp(self):
self.listener_mock = mock.MagicMock()
self.listener_mock.name = 'TEST'
super(TestObjectUpdateTasks, self).setUp()
def test_delete_model_object(self):
delete_object = model_tasks.DeleteModelObject()
delete_object.execute(self.listener_mock)
self.listener_mock.delete.assert_called_once_with()
def test_update_listener(self):
update_attr = model_tasks.UpdateAttributes()
update_attr.execute(self.listener_mock,
{'name': 'TEST2'})
self.listener_mock.update.assert_called_once_with({'name': 'TEST2'})

@ -55,6 +55,7 @@ AMPS_DATA = [o_data_models.Amphora(id=t_constants.MOCK_AMP_ID1,
vrrp_ip=t_constants.MOCK_VRRP_IP2)
]
UPDATE_DICT = {constants.TOPOLOGY: None}
_session_mock = mock.MagicMock()
class TestException(Exception):
@ -643,22 +644,32 @@ class TestNetworkTasks(base.TestCase):
net.execute(lb)
mock_driver.deallocate_vip.assert_called_once_with(lb.vip)
def test_update_vip(self, mock_get_net_driver):
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
@mock.patch('octavia.db.api.get_session', return_value=_session_mock)
def test_update_vip(self, mock_get_session, mock_get_lb,
mock_get_net_driver):
mock_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_driver
vip = o_data_models.Vip()
lb = o_data_models.LoadBalancer(vip=vip)
mock_get_lb.return_value = lb
listeners = [{constants.LOADBALANCER_ID: lb.id}]
net_task = network_tasks.UpdateVIP()
net_task.execute(lb)
net_task.execute(listeners)
mock_driver.update_vip.assert_called_once_with(lb)
def test_update_vip_for_delete(self, mock_get_net_driver):
@mock.patch('octavia.db.repositories.LoadBalancerRepository.get')
@mock.patch('octavia.db.api.get_session', return_value=_session_mock)
def test_update_vip_for_delete(self, mock_get_session, mock_get_lb,
mock_get_net_driver):
mock_driver = mock.MagicMock()
mock_get_net_driver.return_value = mock_driver
vip = o_data_models.Vip()
lb = o_data_models.LoadBalancer(vip=vip)
mock_get_lb.return_value = lb
listener = {constants.LOADBALANCER_ID: lb.id}
net_task = network_tasks.UpdateVIPForDelete()
net_task.execute(lb)
net_task.execute(listener)
mock_driver.update_vip.assert_called_once_with(lb, for_delete=True)
def test_get_amphorae_network_configs(self, mock_get_net_driver):

@ -27,7 +27,9 @@ import octavia.tests.unit.base as base
AMP_ID = uuidutils.generate_uuid()
LB_ID = uuidutils.generate_uuid()
LISTENER_ID = uuidutils.generate_uuid()
POOL_ID = uuidutils.generate_uuid()
PROJECT_ID = uuidutils.generate_uuid()
HM_ID = uuidutils.generate_uuid()
MEMBER_ID = uuidutils.generate_uuid()
COMPUTE_ID = uuidutils.generate_uuid()
@ -102,7 +104,14 @@ class TestControllerWorker(base.TestCase):
_health_mon_mock.pool = _pool_mock
_load_balancer_mock.amphorae = _amphora_mock
_load_balancer_mock.vip = _vip_mock
_load_balancer_mock.id = LB_ID
_load_balancer_mock.project_id = PROJECT_ID
_listener_mock.load_balancer = _load_balancer_mock
_listener_mock.id = LISTENER_ID
_listener_mock.to_dict.return_value = {
'id': LISTENER_ID, constants.LOAD_BALANCER_ID: LB_ID}
self.ref_listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
_member_mock.pool = _pool_mock
_l7policy_mock.listener = _listener_mock
_l7rule_mock.l7policy = _l7policy_mock
@ -212,7 +221,9 @@ class TestControllerWorker(base.TestCase):
store={constants.HEALTH_MON:
_health_mon_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL:
@ -249,7 +260,9 @@ class TestControllerWorker(base.TestCase):
store={constants.HEALTH_MON:
_health_mon_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL:
@ -288,8 +301,10 @@ class TestControllerWorker(base.TestCase):
_health_mon_mock,
constants.POOL:
_pool_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock,
constants.UPDATE_DICT:
@ -315,20 +330,21 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_listener_repo_get.side_effect = [None, _listener_mock]
cw = controller_worker.ControllerWorker()
cw.create_listener(LB_ID)
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
cw.create_listener(listener_dict)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.LOADBALANCER:
_load_balancer_mock,
constants.LISTENERS:
_load_balancer_mock.listeners}))
assert_called_once_with(
_flow_mock, store={
constants.LOADBALANCER: _load_balancer_mock,
constants.LOADBALANCER_ID: LB_ID,
constants.LISTENERS: [listener_dict]}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_listener_repo_get.call_count)
@mock.patch('octavia.controller.worker.v2.flows.'
'listener_flows.ListenerFlows.get_delete_listener_flow',
@ -349,13 +365,16 @@ class TestControllerWorker(base.TestCase):
_flow_mock.reset_mock()
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
cw = controller_worker.ControllerWorker()
cw.delete_listener(LB_ID)
cw.delete_listener(listener_dict)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(
_flow_mock, store={constants.LISTENER: _listener_mock,
constants.LOADBALANCER: _load_balancer_mock}))
_flow_mock, store={constants.LISTENER: self.ref_listener_dict,
constants.LOADBALANCER_ID: LB_ID,
constants.PROJECT_ID: PROJECT_ID}))
_flow_mock.run.assert_called_once_with()
@ -379,18 +398,21 @@ class TestControllerWorker(base.TestCase):
_flow_mock.reset_mock()
_listener_mock.provisioning_status = constants.PENDING_UPDATE
listener_dict = {constants.LISTENER_ID: LISTENER_ID,
constants.LOADBALANCER_ID: LB_ID}
cw = controller_worker.ControllerWorker()
cw.update_listener(LB_ID, LISTENER_UPDATE_DICT)
cw.update_listener(listener_dict, LISTENER_UPDATE_DICT)
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.LISTENER: _listener_mock,
constants.LOADBALANCER:
_load_balancer_mock,
store={constants.LISTENER: listener_dict,
constants.UPDATE_DICT:
LISTENER_UPDATE_DICT,
constants.LOADBALANCER_ID: LB_ID,
constants.LOADBALANCER:
_load_balancer_mock,
constants.LISTENERS:
[_listener_mock]}))
[listener_dict]}))
_flow_mock.run.assert_called_once_with()
@ -694,8 +716,7 @@ class TestControllerWorker(base.TestCase):
store={constants.UPDATE_DICT: change,
constants.LOADBALANCER:
_load_balancer_mock,
constants.LISTENERS:
[_listener_mock]}))
}))
_flow_mock.run.assert_called_once_with()
@ -726,7 +747,9 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(_flow_mock,
store={constants.MEMBER: _member_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL:
@ -761,7 +784,9 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(
_flow_mock, store={constants.MEMBER: _member_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL:
@ -796,11 +821,13 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(_flow_mock,
store={constants.MEMBER: _member_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL:
_pool_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.UPDATE_DICT:
MEMBER_UPDATE_DICT}))
@ -829,12 +856,12 @@ class TestControllerWorker(base.TestCase):
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={
constants.LISTENERS: [_listener_mock],
constants.LOADBALANCER:
_load_balancer_mock,
constants.POOL: _pool_mock}))
assert_called_once_with(
_flow_mock,
store={constants.LISTENERS: [self.ref_listener_dict],
constants.LOADBALANCER_ID: _load_balancer_mock.id,
constants.LOADBALANCER: _load_balancer_mock,
constants.POOL: _pool_mock}))
_flow_mock.run.assert_called_once_with()
@ -864,8 +891,10 @@ class TestControllerWorker(base.TestCase):
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.POOL: _pool_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock}))
@ -897,8 +926,10 @@ class TestControllerWorker(base.TestCase):
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.POOL: _pool_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock}))
@ -931,7 +962,9 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(_flow_mock,
store={constants.POOL: _pool_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LOADBALANCER:
_load_balancer_mock,
constants.UPDATE_DICT:
@ -965,8 +998,10 @@ class TestControllerWorker(base.TestCase):
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock}))
@ -999,7 +1034,9 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LOADBALANCER:
_load_balancer_mock}))
@ -1032,7 +1069,9 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(_flow_mock,
store={constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LOADBALANCER:
_load_balancer_mock,
constants.UPDATE_DICT:
@ -1067,8 +1106,10 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.L7POLICY: _l7policy_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock}))
@ -1100,9 +1141,11 @@ class TestControllerWorker(base.TestCase):
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.L7POLICY: _l7policy_mock,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock}))
@ -1135,8 +1178,10 @@ class TestControllerWorker(base.TestCase):
assert_called_once_with(_flow_mock,
store={constants.L7RULE: _l7rule_mock,
constants.L7POLICY: _l7policy_mock,
constants.LOADBALANCER_ID:
_load_balancer_mock.id,
constants.LISTENERS:
[_listener_mock],
[self.ref_listener_dict],
constants.LOADBALANCER:
_load_balancer_mock,
constants.UPDATE_DICT: