Allow amphorav2 to run without jobboard
This patch adds a new configuration setting to enable/disable jobboard functionality in the amphorav2 provider. When disabled, the amphorav2 provider behaves similarly to the amphora v1 provider. The default setting is jobboard disabled while jobboard remains an experimental feature. Change-Id: I063d832f5a049d7ae38378766200c7f82a35996d
This commit is contained in:
parent
e2e873fc9d
commit
bb9b30be7e
@ -289,7 +289,7 @@ function octavia_configure {
|
||||
iniset $OCTAVIA_CONF api_settings api_handler queue_producer
|
||||
|
||||
iniset $OCTAVIA_CONF database connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia"
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
|
||||
iniset $OCTAVIA_CONF task_flow persistence_connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia_persistence"
|
||||
fi
|
||||
# Configure keystone auth_token for all users
|
||||
@ -366,7 +366,7 @@ function octavia_configure {
|
||||
recreate_database_mysql octavia
|
||||
octavia-db-manage upgrade head
|
||||
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
|
||||
recreate_database_mysql octavia_persistence
|
||||
octavia-db-manage upgrade_persistence
|
||||
fi
|
||||
@ -377,7 +377,7 @@ function octavia_configure {
|
||||
fi
|
||||
|
||||
# amphorav2 required redis installation
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
|
||||
install_redis
|
||||
fi
|
||||
|
||||
@ -669,7 +669,7 @@ function octavia_cleanup {
|
||||
|
||||
sudo rm -rf $NOVA_STATE_PATH $NOVA_AUTH_CACHE_DIR
|
||||
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} == True ]]; then
|
||||
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD} == True ]]; then
|
||||
uninstall_redis
|
||||
fi
|
||||
|
||||
|
@ -33,7 +33,7 @@ OCTAVIA_PORT=${OCTAVIA_PORT:-"9876"}
|
||||
OCTAVIA_HA_PORT=${OCTAVIA_HA_PORT:-"9875"}
|
||||
OCTAVIA_HM_LISTEN_PORT=${OCTAVIA_HM_LISTEN_PORT:-"5555"}
|
||||
|
||||
OCTAVIA_ENABLE_AMPHORAV2_PROVIDER=${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER:-False}
|
||||
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD=${OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD:-False}
|
||||
|
||||
OCTAVIA_MGMT_SUBNET=${OCTAVIA_MGMT_SUBNET:-"192.168.0.0/24"}
|
||||
OCTAVIA_MGMT_SUBNET_START=${OCTAVIA_MGMT_SUBNET_START:-"192.168.0.2"}
|
||||
|
@ -3,8 +3,19 @@
|
||||
Additional configuration steps to configure amphorav2 provider
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If you would like to use amphorav2 provider for load-balancer service the
|
||||
following additional steps are required.
|
||||
The amphorav2 provider driver improves control plane resiliency. Should a
|
||||
control plane host go down during a load balancer provisioning operation, an
|
||||
alternate controller can resume the in-process provisioning and complete the
|
||||
request. This solves the issue with resources stuck in PENDING_* states by
|
||||
writing info about task states in persistent backend and monitoring job claims
|
||||
via jobboard.
|
||||
|
||||
If you would like to use amphorav2 provider with jobboard-based controller
|
||||
for load-balancer service the following additional steps are required.
|
||||
|
||||
This provider driver can also run without jobboard and its dependencies (extra
|
||||
database, Redis/Zookeeper). This is the default setting while jobboard remains
|
||||
an experimental feature.
|
||||
|
||||
|
||||
Prerequisites
|
||||
@ -66,6 +77,7 @@ Additional configuration to octavia components
|
||||
.. code-block:: ini
|
||||
|
||||
[task_flow]
|
||||
jobboard_enabled = True
|
||||
jobboard_backend_driver = 'redis_taskflow_driver'
|
||||
jobboard_backend_hosts = KEYVALUE_HOST_IPS
|
||||
jobboard_backend_port = KEYVALUE_PORT
|
||||
|
@ -351,6 +351,9 @@
|
||||
# Database connection url with db name (string value)
|
||||
#persistence_connection = sqlite://
|
||||
|
||||
# If True, enables TaskFlow jobboard.
|
||||
# jobboard_enabled = False
|
||||
|
||||
# Jobboard backend driver that will monitor job state. (string value)
|
||||
# Possible values:
|
||||
# - redis_taskflow_driver: Driver that will use Redis to store job states.
|
||||
|
@ -60,7 +60,7 @@ class BaseTaskFlowEngine(object):
|
||||
self.executor = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=CONF.task_flow.max_workers)
|
||||
|
||||
def _taskflow_load(self, flow, **kwargs):
|
||||
def taskflow_load(self, flow, **kwargs):
|
||||
eng = engines.load(
|
||||
flow,
|
||||
engine=CONF.task_flow.engine,
|
||||
|
@ -509,6 +509,8 @@ task_flow_opts = [
|
||||
default='sqlite://',
|
||||
help='Persistence database, which will be used to store tasks '
|
||||
'states. Database connection url with db name'),
|
||||
cfg.BoolOpt('jobboard_enabled', default=False,
|
||||
help=_('If True, enables TaskFlow jobboard.')),
|
||||
cfg.StrOpt('jobboard_backend_driver',
|
||||
default='redis_taskflow_driver',
|
||||
choices=['redis_taskflow_driver', 'zookeeper_taskflow_driver'],
|
||||
|
@ -51,10 +51,11 @@ class ConsumerService(cotyledon.Service):
|
||||
)
|
||||
self.message_listener.start()
|
||||
if constants.AMPHORAV2 in CONF.api_settings.enabled_provider_drivers:
|
||||
for e in self.endpoints:
|
||||
e.worker.services_controller.run_conductor(
|
||||
'octavia-task-flow-conductor-%s' %
|
||||
uuidutils.generate_uuid())
|
||||
if CONF.task_flow.jobboard_enabled:
|
||||
for e in self.endpoints:
|
||||
e.worker.services_controller.run_conductor(
|
||||
'octavia-task-flow-conductor-%s' %
|
||||
uuidutils.generate_uuid())
|
||||
|
||||
def terminate(self):
|
||||
if self.message_listener:
|
||||
|
@ -102,7 +102,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
store[constants.AVAILABILITY_ZONE] = (
|
||||
self._az_repo.get_availability_zone_metadata_dict(
|
||||
db_apis.get_session(), availability_zone))
|
||||
create_amp_tf = self._taskflow_load(
|
||||
create_amp_tf = self.taskflow_load(
|
||||
self._amphora_flows.get_create_amphora_flow(),
|
||||
store=store)
|
||||
with tf_logging.DynamicLoggingListener(create_amp_tf, log=LOG):
|
||||
@ -139,7 +139,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
pool.health_monitor = health_mon
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
create_hm_tf = self._taskflow_load(
|
||||
create_hm_tf = self.taskflow_load(
|
||||
self._health_monitor_flows.get_create_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
constants.POOL: pool,
|
||||
@ -163,7 +163,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
listeners = pool.listeners
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
delete_hm_tf = self._taskflow_load(
|
||||
delete_hm_tf = self.taskflow_load(
|
||||
self._health_monitor_flows.get_delete_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
constants.POOL: pool,
|
||||
@ -198,7 +198,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
pool.health_monitor = health_mon
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
update_hm_tf = self._taskflow_load(
|
||||
update_hm_tf = self.taskflow_load(
|
||||
self._health_monitor_flows.get_update_health_monitor_flow(),
|
||||
store={constants.HEALTH_MON: health_mon,
|
||||
constants.POOL: pool,
|
||||
@ -234,12 +234,12 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
load_balancer = listener.load_balancer
|
||||
listeners = load_balancer.listeners
|
||||
|
||||
create_listener_tf = self._taskflow_load(self._listener_flows.
|
||||
get_create_listener_flow(),
|
||||
store={constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.LISTENERS:
|
||||
listeners})
|
||||
create_listener_tf = self.taskflow_load(self._listener_flows.
|
||||
get_create_listener_flow(),
|
||||
store={constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.LISTENERS:
|
||||
listeners})
|
||||
with tf_logging.DynamicLoggingListener(create_listener_tf,
|
||||
log=LOG):
|
||||
create_listener_tf.run()
|
||||
@ -255,7 +255,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
id=listener_id)
|
||||
load_balancer = listener.load_balancer
|
||||
|
||||
delete_listener_tf = self._taskflow_load(
|
||||
delete_listener_tf = self.taskflow_load(
|
||||
self._listener_flows.get_delete_listener_flow(),
|
||||
store={constants.LOADBALANCER: load_balancer,
|
||||
constants.LISTENER: listener})
|
||||
@ -285,16 +285,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
|
||||
load_balancer = listener.load_balancer
|
||||
|
||||
update_listener_tf = self._taskflow_load(self._listener_flows.
|
||||
get_update_listener_flow(),
|
||||
store={constants.LISTENER:
|
||||
listener,
|
||||
constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.UPDATE_DICT:
|
||||
listener_updates,
|
||||
constants.LISTENERS:
|
||||
[listener]})
|
||||
update_listener_tf = self.taskflow_load(self._listener_flows.
|
||||
get_update_listener_flow(),
|
||||
store={constants.LISTENER:
|
||||
listener,
|
||||
constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.UPDATE_DICT:
|
||||
listener_updates,
|
||||
constants.LISTENERS:
|
||||
[listener]})
|
||||
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
|
||||
update_listener_tf.run()
|
||||
|
||||
@ -345,7 +345,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
create_lb_flow = self._lb_flows.get_create_load_balancer_flow(
|
||||
topology=topology, listeners=lb.listeners)
|
||||
|
||||
create_lb_tf = self._taskflow_load(create_lb_flow, store=store)
|
||||
create_lb_tf = self.taskflow_load(create_lb_flow, store=store)
|
||||
with tf_logging.DynamicLoggingListener(create_lb_tf, log=LOG):
|
||||
create_lb_tf.run()
|
||||
|
||||
@ -366,7 +366,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
(flow, store) = self._lb_flows.get_delete_load_balancer_flow(lb)
|
||||
store.update({constants.LOADBALANCER: lb,
|
||||
constants.SERVER_GROUP_ID: lb.server_group_id})
|
||||
delete_lb_tf = self._taskflow_load(flow, store=store)
|
||||
delete_lb_tf = self.taskflow_load(flow, store=store)
|
||||
|
||||
with tf_logging.DynamicLoggingListener(delete_lb_tf,
|
||||
log=LOG):
|
||||
@ -396,7 +396,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
db_apis.get_session(),
|
||||
load_balancer_id=load_balancer_id)
|
||||
|
||||
update_lb_tf = self._taskflow_load(
|
||||
update_lb_tf = self.taskflow_load(
|
||||
self._lb_flows.get_update_load_balancer_flow(),
|
||||
store={constants.LOADBALANCER: lb,
|
||||
constants.LISTENERS: listeners,
|
||||
@ -444,7 +444,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
create_member_tf = self._taskflow_load(
|
||||
create_member_tf = self.taskflow_load(
|
||||
self._member_flows.get_create_member_flow(),
|
||||
store=store)
|
||||
with tf_logging.DynamicLoggingListener(create_member_tf,
|
||||
@ -476,7 +476,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
delete_member_tf = self._taskflow_load(
|
||||
delete_member_tf = self.taskflow_load(
|
||||
self._member_flows.get_delete_member_flow(),
|
||||
store=store
|
||||
)
|
||||
@ -527,7 +527,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
batch_update_members_tf = self._taskflow_load(
|
||||
batch_update_members_tf = self.taskflow_load(
|
||||
self._member_flows.get_batch_update_members_flow(
|
||||
old_members, new_members, updated_members),
|
||||
store=store)
|
||||
@ -571,7 +571,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
update_member_tf = self._taskflow_load(
|
||||
update_member_tf = self.taskflow_load(
|
||||
self._member_flows.get_update_member_flow(),
|
||||
store=store)
|
||||
with tf_logging.DynamicLoggingListener(update_member_tf,
|
||||
@ -603,13 +603,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
listeners = pool.listeners
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
create_pool_tf = self._taskflow_load(self._pool_flows.
|
||||
get_create_pool_flow(),
|
||||
store={constants.POOL: pool,
|
||||
constants.LISTENERS:
|
||||
listeners,
|
||||
constants.LOADBALANCER:
|
||||
load_balancer})
|
||||
create_pool_tf = self.taskflow_load(self._pool_flows.
|
||||
get_create_pool_flow(),
|
||||
store={constants.POOL: pool,
|
||||
constants.LISTENERS:
|
||||
listeners,
|
||||
constants.LOADBALANCER:
|
||||
load_balancer})
|
||||
with tf_logging.DynamicLoggingListener(create_pool_tf,
|
||||
log=LOG):
|
||||
create_pool_tf.run()
|
||||
@ -627,7 +627,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
load_balancer = pool.load_balancer
|
||||
listeners = pool.listeners
|
||||
|
||||
delete_pool_tf = self._taskflow_load(
|
||||
delete_pool_tf = self.taskflow_load(
|
||||
self._pool_flows.get_delete_pool_flow(),
|
||||
store={constants.POOL: pool, constants.LISTENERS: listeners,
|
||||
constants.LOADBALANCER: load_balancer})
|
||||
@ -658,15 +658,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
listeners = pool.listeners
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
update_pool_tf = self._taskflow_load(self._pool_flows.
|
||||
get_update_pool_flow(),
|
||||
store={constants.POOL: pool,
|
||||
constants.LISTENERS:
|
||||
listeners,
|
||||
constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.UPDATE_DICT:
|
||||
pool_updates})
|
||||
update_pool_tf = self.taskflow_load(self._pool_flows.
|
||||
get_update_pool_flow(),
|
||||
store={constants.POOL: pool,
|
||||
constants.LISTENERS:
|
||||
listeners,
|
||||
constants.LOADBALANCER:
|
||||
load_balancer,
|
||||
constants.UPDATE_DICT:
|
||||
pool_updates})
|
||||
with tf_logging.DynamicLoggingListener(update_pool_tf,
|
||||
log=LOG):
|
||||
update_pool_tf.run()
|
||||
@ -696,7 +696,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
listeners = [l7policy.listener]
|
||||
load_balancer = l7policy.listener.load_balancer
|
||||
|
||||
create_l7policy_tf = self._taskflow_load(
|
||||
create_l7policy_tf = self.taskflow_load(
|
||||
self._l7policy_flows.get_create_l7policy_flow(),
|
||||
store={constants.L7POLICY: l7policy,
|
||||
constants.LISTENERS: listeners,
|
||||
@ -718,7 +718,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
load_balancer = l7policy.listener.load_balancer
|
||||
listeners = [l7policy.listener]
|
||||
|
||||
delete_l7policy_tf = self._taskflow_load(
|
||||
delete_l7policy_tf = self.taskflow_load(
|
||||
self._l7policy_flows.get_delete_l7policy_flow(),
|
||||
store={constants.L7POLICY: l7policy,
|
||||
constants.LISTENERS: listeners,
|
||||
@ -750,7 +750,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
listeners = [l7policy.listener]
|
||||
load_balancer = l7policy.listener.load_balancer
|
||||
|
||||
update_l7policy_tf = self._taskflow_load(
|
||||
update_l7policy_tf = self.taskflow_load(
|
||||
self._l7policy_flows.get_update_l7policy_flow(),
|
||||
store={constants.L7POLICY: l7policy,
|
||||
constants.LISTENERS: listeners,
|
||||
@ -786,7 +786,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
listeners = [l7policy.listener]
|
||||
load_balancer = l7policy.listener.load_balancer
|
||||
|
||||
create_l7rule_tf = self._taskflow_load(
|
||||
create_l7rule_tf = self.taskflow_load(
|
||||
self._l7rule_flows.get_create_l7rule_flow(),
|
||||
store={constants.L7RULE: l7rule,
|
||||
constants.L7POLICY: l7policy,
|
||||
@ -809,7 +809,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
load_balancer = l7policy.listener.load_balancer
|
||||
listeners = [l7policy.listener]
|
||||
|
||||
delete_l7rule_tf = self._taskflow_load(
|
||||
delete_l7rule_tf = self.taskflow_load(
|
||||
self._l7rule_flows.get_delete_l7rule_flow(),
|
||||
store={constants.L7RULE: l7rule,
|
||||
constants.L7POLICY: l7policy,
|
||||
@ -843,7 +843,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
listeners = [l7policy.listener]
|
||||
load_balancer = l7policy.listener.load_balancer
|
||||
|
||||
update_l7rule_tf = self._taskflow_load(
|
||||
update_l7rule_tf = self.taskflow_load(
|
||||
self._l7rule_flows.get_update_l7rule_flow(),
|
||||
store={constants.L7RULE: l7rule,
|
||||
constants.L7POLICY: l7policy,
|
||||
@ -933,8 +933,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
constants.LOADBALANCER_ID: lb_id,
|
||||
constants.VIP: vip}
|
||||
|
||||
failover_amphora_tf = self._taskflow_load(amp_failover_flow,
|
||||
store=stored_params)
|
||||
failover_amphora_tf = self.taskflow_load(amp_failover_flow,
|
||||
store=stored_params)
|
||||
|
||||
with tf_logging.DynamicLoggingListener(failover_amphora_tf,
|
||||
log=LOG):
|
||||
@ -1084,8 +1084,8 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
else:
|
||||
stored_params[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
failover_lb_tf = self._taskflow_load(lb_failover_flow,
|
||||
store=stored_params)
|
||||
failover_lb_tf = self.taskflow_load(lb_failover_flow,
|
||||
store=stored_params)
|
||||
|
||||
with tf_logging.DynamicLoggingListener(failover_lb_tf, log=LOG):
|
||||
failover_lb_tf.run()
|
||||
@ -1112,7 +1112,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
id=amphora_id)
|
||||
LOG.info("Start amphora cert rotation, amphora's id is: %s", amp.id)
|
||||
|
||||
certrotation_amphora_tf = self._taskflow_load(
|
||||
certrotation_amphora_tf = self.taskflow_load(
|
||||
self._amphora_flows.cert_rotate_amphora_flow(),
|
||||
store={constants.AMPHORA: amp,
|
||||
constants.AMPHORA_ID: amp.id})
|
||||
@ -1141,7 +1141,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
flavor = self._flavor_repo.get_flavor_metadata_dict(
|
||||
db_apis.get_session(), lb.flavor_id)
|
||||
|
||||
update_amphora_tf = self._taskflow_load(
|
||||
update_amphora_tf = self.taskflow_load(
|
||||
self._amphora_flows.update_amphora_config_flow(),
|
||||
store={constants.AMPHORA: amp,
|
||||
constants.FLAVOR: flavor})
|
||||
|
@ -18,8 +18,10 @@ from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from sqlalchemy.orm import exc as db_exceptions
|
||||
from stevedore import driver as stevedore_driver
|
||||
from taskflow.listeners import logging as tf_logging
|
||||
import tenacity
|
||||
|
||||
from octavia.amphorae.driver_exceptions import exceptions
|
||||
from octavia.api.drivers import utils as provider_utils
|
||||
from octavia.common import base_taskflow
|
||||
from octavia.common import constants
|
||||
@ -37,6 +39,18 @@ RETRY_BACKOFF = 1
|
||||
RETRY_MAX = 5
|
||||
|
||||
|
||||
# We do not need to log retry exception information. Warning "Could not connect
|
||||
# to instance" will be logged as usual.
|
||||
def retryMaskFilter(record):
|
||||
if record.exc_info is not None and isinstance(
|
||||
record.exc_info[1], exceptions.AmpConnectionRetry):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
LOG.logger.addFilter(retryMaskFilter)
|
||||
|
||||
|
||||
def _is_provisioning_status_pending_update(lb_obj):
|
||||
return not lb_obj.provisioning_status == constants.PENDING_UPDATE
|
||||
|
||||
@ -57,13 +71,16 @@ class ControllerWorker(object):
|
||||
self._flavor_repo = repo.FlavorRepository()
|
||||
self._az_repo = repo.AvailabilityZoneRepository()
|
||||
|
||||
persistence = tsk_driver.MysqlPersistenceDriver()
|
||||
if CONF.task_flow.jobboard_enabled:
|
||||
persistence = tsk_driver.MysqlPersistenceDriver()
|
||||
|
||||
self.jobboard_driver = stevedore_driver.DriverManager(
|
||||
namespace='octavia.worker.jobboard_driver',
|
||||
name=CONF.task_flow.jobboard_backend_driver,
|
||||
invoke_args=(persistence,),
|
||||
invoke_on_load=True).driver
|
||||
self.jobboard_driver = stevedore_driver.DriverManager(
|
||||
namespace='octavia.worker.jobboard_driver',
|
||||
name=CONF.task_flow.jobboard_backend_driver,
|
||||
invoke_args=(persistence,),
|
||||
invoke_on_load=True).driver
|
||||
else:
|
||||
self.tf_engine = base_taskflow.BaseTaskFlowEngine()
|
||||
|
||||
@tenacity.retry(
|
||||
retry=(
|
||||
@ -80,6 +97,15 @@ class ControllerWorker(object):
|
||||
def services_controller(self):
|
||||
return base_taskflow.TaskFlowServiceController(self.jobboard_driver)
|
||||
|
||||
def run_flow(self, func, *args, **kwargs):
|
||||
if CONF.task_flow.jobboard_enabled:
|
||||
self.services_controller.run_poster(func, *args, **kwargs)
|
||||
else:
|
||||
tf = self.tf_engine.taskflow_load(
|
||||
func(*args), **kwargs)
|
||||
with tf_logging.DynamicLoggingListener(tf, log=LOG):
|
||||
tf.run()
|
||||
|
||||
def create_amphora(self, availability_zone=None):
|
||||
"""Creates an Amphora.
|
||||
|
||||
@ -96,11 +122,9 @@ class ControllerWorker(object):
|
||||
store[constants.AVAILABILITY_ZONE] = (
|
||||
self._az_repo.get_availability_zone_metadata_dict(
|
||||
db_apis.get_session(), availability_zone))
|
||||
job_id = self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_amphora_flow,
|
||||
store=store, wait=True)
|
||||
|
||||
return job_id
|
||||
except Exception as e:
|
||||
LOG.error('Failed to create an amphora due to: {}'.format(str(e)))
|
||||
|
||||
@ -114,7 +138,7 @@ class ControllerWorker(object):
|
||||
amphora = self._amphora_repo.get(db_apis.get_session(),
|
||||
id=amphora_id)
|
||||
store = {constants.AMPHORA: amphora.to_dict()}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_amphora_flow,
|
||||
store=store)
|
||||
|
||||
@ -149,7 +173,7 @@ class ControllerWorker(object):
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.LOADBALANCER: provider_lb}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_health_monitor_flow,
|
||||
store=store)
|
||||
|
||||
@ -179,7 +203,7 @@ class ControllerWorker(object):
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.LOADBALANCER: provider_lb,
|
||||
constants.PROJECT_ID: load_balancer.project_id}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_health_monitor_flow,
|
||||
store=store)
|
||||
|
||||
@ -220,7 +244,7 @@ class ControllerWorker(object):
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.LOADBALANCER: provider_lb,
|
||||
constants.UPDATE_DICT: health_monitor_updates}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_update_health_monitor_flow,
|
||||
store=store)
|
||||
|
||||
@ -257,7 +281,7 @@ class ControllerWorker(object):
|
||||
constants.LOADBALANCER: provider_lb,
|
||||
constants.LOADBALANCER_ID: load_balancer.id}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_listener_flow,
|
||||
store=store)
|
||||
|
||||
@ -276,7 +300,7 @@ class ControllerWorker(object):
|
||||
constants.LOADBALANCER_ID:
|
||||
listener[constants.LOADBALANCER_ID],
|
||||
constants.PROJECT_ID: lb.project_id}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_listener_flow,
|
||||
store=store)
|
||||
|
||||
@ -294,7 +318,7 @@ class ControllerWorker(object):
|
||||
constants.UPDATE_DICT: listener_updates,
|
||||
constants.LOADBALANCER_ID: db_lb.id,
|
||||
constants.LISTENERS: [listener]}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_update_listener_flow,
|
||||
store=store)
|
||||
|
||||
@ -341,7 +365,7 @@ class ControllerWorker(object):
|
||||
store[constants.UPDATE_DICT] = {
|
||||
constants.TOPOLOGY: topology
|
||||
}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_load_balancer_flow,
|
||||
topology, listeners=listeners_dicts,
|
||||
store=store)
|
||||
@ -361,11 +385,11 @@ class ControllerWorker(object):
|
||||
if cascade:
|
||||
store.update(flow_utils.get_delete_pools_store(db_lb))
|
||||
store.update(flow_utils.get_delete_listeners_store(db_lb))
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_cascade_delete_load_balancer_flow,
|
||||
load_balancer, store=store)
|
||||
else:
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_load_balancer_flow,
|
||||
load_balancer, store=store)
|
||||
|
||||
@ -383,7 +407,7 @@ class ControllerWorker(object):
|
||||
original_load_balancer[constants.LOADBALANCER_ID],
|
||||
constants.UPDATE_DICT: load_balancer_updates}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_update_load_balancer_flow,
|
||||
store=store)
|
||||
|
||||
@ -417,7 +441,7 @@ class ControllerWorker(object):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_member_flow,
|
||||
store=store)
|
||||
|
||||
@ -453,7 +477,7 @@ class ControllerWorker(object):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_member_flow,
|
||||
store=store)
|
||||
|
||||
@ -501,7 +525,7 @@ class ControllerWorker(object):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_batch_update_members_flow,
|
||||
provider_old_members, new_members, updated_members,
|
||||
store=store)
|
||||
@ -539,7 +563,7 @@ class ControllerWorker(object):
|
||||
else:
|
||||
store[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_update_member_flow,
|
||||
store=store)
|
||||
|
||||
@ -577,7 +601,7 @@ class ControllerWorker(object):
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.LOADBALANCER: provider_lb}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_pool_flow,
|
||||
store=store)
|
||||
|
||||
@ -604,7 +628,7 @@ class ControllerWorker(object):
|
||||
constants.LOADBALANCER: provider_lb,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.PROJECT_ID: db_pool.project_id}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_pool_flow,
|
||||
store=store)
|
||||
|
||||
@ -640,7 +664,7 @@ class ControllerWorker(object):
|
||||
constants.LOADBALANCER: provider_lb,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.UPDATE_DICT: pool_updates}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_update_pool_flow,
|
||||
store=store)
|
||||
|
||||
@ -662,7 +686,7 @@ class ControllerWorker(object):
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: db_listener.load_balancer.id
|
||||
}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_l7policy_flow,
|
||||
store=store)
|
||||
|
||||
@ -683,7 +707,7 @@ class ControllerWorker(object):
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: db_listener.load_balancer.id
|
||||
}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_l7policy_flow,
|
||||
store=store)
|
||||
|
||||
@ -706,7 +730,7 @@ class ControllerWorker(object):
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: db_listener.load_balancer.id,
|
||||
constants.UPDATE_DICT: l7policy_updates}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_update_l7policy_flow,
|
||||
store=store)
|
||||
|
||||
@ -734,7 +758,7 @@ class ControllerWorker(object):
|
||||
constants.LISTENERS: listeners_dicts,
|
||||
constants.LOADBALANCER_ID: load_balancer.id
|
||||
}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_create_l7rule_flow,
|
||||
store=store)
|
||||
|
||||
@ -760,7 +784,7 @@ class ControllerWorker(object):
|
||||
constants.L7POLICY_ID: db_l7policy.id,
|
||||
constants.LOADBALANCER_ID: load_balancer.id
|
||||
}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_delete_l7rule_flow,
|
||||
store=store)
|
||||
|
||||
@ -788,7 +812,7 @@ class ControllerWorker(object):
|
||||
constants.L7POLICY_ID: db_l7policy.id,
|
||||
constants.LOADBALANCER_ID: load_balancer.id,
|
||||
constants.UPDATE_DICT: l7rule_updates}
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_update_l7rule_flow,
|
||||
store=store)
|
||||
|
||||
@ -855,7 +879,7 @@ class ControllerWorker(object):
|
||||
else:
|
||||
stored_params[constants.AVAILABILITY_ZONE] = {}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.get_failover_flow,
|
||||
role=amp.role, load_balancer=provider_lb,
|
||||
store=stored_params, wait=True)
|
||||
@ -958,7 +982,7 @@ class ControllerWorker(object):
|
||||
store = {constants.AMPHORA: amp.to_dict(),
|
||||
constants.AMPHORA_ID: amphora_id}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.cert_rotate_amphora_flow,
|
||||
store=store)
|
||||
|
||||
@ -985,6 +1009,6 @@ class ControllerWorker(object):
|
||||
store = {constants.AMPHORA: amp.to_dict(),
|
||||
constants.FLAVOR: flavor}
|
||||
|
||||
self.services_controller.run_poster(
|
||||
self.run_flow(
|
||||
flow_utils.update_amphora_config_flow,
|
||||
store=store)
|
||||
|
@ -54,9 +54,9 @@ class TestBaseTaskFlowEngine(base.TestCase):
|
||||
concurrent.futures.ThreadPoolExecutor.assert_called_once_with(
|
||||
max_workers=MAX_WORKERS)
|
||||
|
||||
# Test _taskflow_load
|
||||
# Test taskflow_load
|
||||
|
||||
base_taskflow_engine._taskflow_load('TEST')
|
||||
base_taskflow_engine.taskflow_load('TEST')
|
||||
|
||||
tf_engines.load.assert_called_once_with(
|
||||
'TEST',
|
||||
|
@ -91,7 +91,7 @@ class TestException(Exception):
|
||||
return_value=_member_mock)
|
||||
@mock.patch('octavia.db.repositories.PoolRepository.get',
|
||||
return_value=_pool_mock)
|
||||
@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine._taskflow_load',
|
||||
@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine.taskflow_load',
|
||||
return_value=_flow_mock)
|
||||
@mock.patch('taskflow.listeners.logging.DynamicLoggingListener')
|
||||
@mock.patch('octavia.db.api.get_session', return_value=_db_session)
|
||||
@ -142,7 +142,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
amp = cw.create_amphora()
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
'TEST',
|
||||
store={constants.BUILD_TYPE_PRIORITY:
|
||||
@ -184,7 +184,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
amp = cw.create_amphora(availability_zone=az)
|
||||
mock_get_az_metadata.assert_called_once_with(_db_session, az)
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
'TEST',
|
||||
store={constants.BUILD_TYPE_PRIORITY:
|
||||
@ -223,7 +223,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_health_monitor(_health_mon_mock)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.HEALTH_MON:
|
||||
_health_mon_mock,
|
||||
@ -260,7 +260,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_health_monitor(HM_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.HEALTH_MON:
|
||||
_health_mon_mock,
|
||||
@ -298,7 +298,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw.update_health_monitor(_health_mon_mock.id,
|
||||
HEALTH_UPDATE_DICT)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.HEALTH_MON:
|
||||
_health_mon_mock,
|
||||
@ -336,7 +336,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_listener(LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.LOADBALANCER:
|
||||
_load_balancer_mock,
|
||||
@ -368,7 +368,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_listener(LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock, store={constants.LISTENER: _listener_mock,
|
||||
constants.LOADBALANCER: _load_balancer_mock}))
|
||||
@ -398,7 +398,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.update_listener(LB_ID, LISTENER_UPDATE_DICT)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.LISTENER: _listener_mock,
|
||||
constants.LOADBALANCER:
|
||||
@ -658,7 +658,7 @@ class TestControllerWorker(base.TestCase):
|
||||
_db_session,
|
||||
id=LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.LOADBALANCER:
|
||||
_load_balancer_mock,
|
||||
@ -696,7 +696,7 @@ class TestControllerWorker(base.TestCase):
|
||||
_db_session,
|
||||
id=LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.LOADBALANCER:
|
||||
_load_balancer_mock,
|
||||
@ -739,7 +739,7 @@ class TestControllerWorker(base.TestCase):
|
||||
_db_session,
|
||||
id=LB_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.UPDATE_DICT: change,
|
||||
constants.LOADBALANCER:
|
||||
@ -776,7 +776,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_member(MEMBER_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.MEMBER: _member_mock,
|
||||
@ -813,7 +813,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_member(MEMBER_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock, store={constants.MEMBER: _member_mock,
|
||||
constants.LISTENERS:
|
||||
@ -852,7 +852,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.update_member(MEMBER_ID, MEMBER_UPDATE_DICT)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.MEMBER: _member_mock,
|
||||
constants.LISTENERS:
|
||||
@ -894,7 +894,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={
|
||||
constants.LISTENERS: [_listener_mock],
|
||||
@ -929,7 +929,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_pool(POOL_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.POOL: _pool_mock,
|
||||
constants.LISTENERS:
|
||||
@ -962,7 +962,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_pool(POOL_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.POOL: _pool_mock,
|
||||
constants.LISTENERS:
|
||||
@ -995,7 +995,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.update_pool(POOL_ID, POOL_UPDATE_DICT)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.POOL: _pool_mock,
|
||||
constants.LISTENERS:
|
||||
@ -1030,7 +1030,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_l7policy(L7POLICY_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.L7POLICY: _l7policy_mock,
|
||||
constants.LISTENERS:
|
||||
@ -1063,7 +1063,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_l7policy(L7POLICY_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.L7POLICY: _l7policy_mock,
|
||||
constants.LISTENERS:
|
||||
@ -1096,7 +1096,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.update_l7policy(L7POLICY_ID, L7POLICY_UPDATE_DICT)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.L7POLICY: _l7policy_mock,
|
||||
constants.LISTENERS:
|
||||
@ -1131,7 +1131,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.create_l7rule(L7RULE_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.L7RULE: _l7rule_mock,
|
||||
constants.L7POLICY: _l7policy_mock,
|
||||
@ -1165,7 +1165,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.delete_l7rule(L7RULE_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.L7RULE: _l7rule_mock,
|
||||
constants.L7POLICY: _l7policy_mock,
|
||||
@ -1199,7 +1199,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.update_l7rule(L7RULE_ID, L7RULE_UPDATE_DICT)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.L7RULE: _l7rule_mock,
|
||||
constants.L7POLICY: _l7policy_mock,
|
||||
@ -1242,7 +1242,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.FLAVOR: {'loadbalancer_topology':
|
||||
@ -1297,7 +1297,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.FLAVOR: {'loadbalancer_topology':
|
||||
@ -1352,7 +1352,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.FLAVOR: {'loadbalancer_topology':
|
||||
@ -1407,7 +1407,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.FLAVOR: {'loadbalancer_topology':
|
||||
@ -1465,7 +1465,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.FLAVOR: {'loadbalancer_topology':
|
||||
@ -1567,7 +1567,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
mock_get_failover_amp_flow.assert_called_once_with(mock_amphora, None)
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(FAKE_FLOW, store=expected_stored_params))
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
|
||||
@ -1604,7 +1604,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.LOADBALANCER: None,
|
||||
@ -2065,7 +2065,7 @@ class TestControllerWorker(base.TestCase):
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.failover_amphora(AMP_ID)
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(
|
||||
_flow_mock,
|
||||
store={constants.LOADBALANCER_ID: _load_balancer_mock.id,
|
||||
@ -2101,7 +2101,7 @@ class TestControllerWorker(base.TestCase):
|
||||
_flow_mock.reset_mock()
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.amphora_cert_rotation(AMP_ID)
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.AMPHORA: _amphora_mock,
|
||||
constants.AMPHORA_ID:
|
||||
@ -2140,7 +2140,7 @@ class TestControllerWorker(base.TestCase):
|
||||
mock_amp_repo_get.assert_called_once_with(_db_session, id=AMP_ID)
|
||||
mock_get_lb_for_amp.assert_called_once_with(_db_session, AMP_ID)
|
||||
mock_flavor_meta.assert_called_once_with(_db_session, 'vanilla')
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.AMPHORA: _amphora_mock,
|
||||
constants.FLAVOR: {'test': 'dict'}}))
|
||||
@ -2151,13 +2151,13 @@ class TestControllerWorker(base.TestCase):
|
||||
mock_amp_repo_get.reset_mock()
|
||||
mock_get_lb_for_amp.reset_mock()
|
||||
mock_flavor_meta.reset_mock()
|
||||
base_taskflow.BaseTaskFlowEngine._taskflow_load.reset_mock()
|
||||
base_taskflow.BaseTaskFlowEngine.taskflow_load.reset_mock()
|
||||
mock_lb.flavor_id = None
|
||||
cw.update_amphora_agent_config(AMP_ID)
|
||||
mock_amp_repo_get.assert_called_once_with(_db_session, id=AMP_ID)
|
||||
mock_get_lb_for_amp.assert_called_once_with(_db_session, AMP_ID)
|
||||
mock_flavor_meta.assert_not_called()
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
(base_taskflow.BaseTaskFlowEngine.taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={constants.AMPHORA: _amphora_mock,
|
||||
constants.FLAVOR: {}}))
|
||||
|
@ -116,6 +116,7 @@ class TestControllerWorker(base.TestCase):
|
||||
def setUp(self):
|
||||
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
self.conf.config(group="task_flow", jobboard_enabled=True)
|
||||
|
||||
_db_pool_mock.listeners = [_listener_mock]
|
||||
_db_pool_mock.load_balancer = _db_load_balancer_mock
|
||||
|
@ -0,0 +1,8 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Added a new configuration setting (``[task_flow]/jobboard_enabled``) to
|
||||
enable/disable jobboard functionality in the amphorav2 provider. When
|
||||
disabled, the amphorav2 provider behaves similarly to the amphora v1
|
||||
provider and does not require extra dependencies. The default setting is
|
||||
jobboard disabled while jobboard remains an experimental feature.
|
@ -78,7 +78,7 @@
|
||||
parent: octavia-v2-dsvm-scenario
|
||||
vars:
|
||||
devstack_localrc:
|
||||
OCTAVIA_ENABLE_AMPHORAV2_PROVIDER: True
|
||||
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: True
|
||||
devstack_local_conf:
|
||||
post-config:
|
||||
$OCTAVIA_CONF:
|
||||
@ -93,6 +93,13 @@
|
||||
enabled_provider_drivers: amphorav2:The v2 amphora driver.
|
||||
provider: amphorav2
|
||||
|
||||
- job:
|
||||
name: octavia-v2-dsvm-scenario-amphora-v2-no-jobboard
|
||||
parent: octavia-v2-dsvm-scenario-amphora-v2
|
||||
vars:
|
||||
devstack_localrc:
|
||||
OCTAVIA_ENABLE_AMPHORAV2_JOBBOARD: False
|
||||
|
||||
- project-template:
|
||||
name: octavia-tox-tips
|
||||
check:
|
||||
|
@ -58,6 +58,9 @@
|
||||
- octavia-v2-dsvm-scenario-amphora-v2:
|
||||
irrelevant-files: *irrelevant-files
|
||||
voting: false
|
||||
- octavia-v2-dsvm-scenario-amphora-v2-no-jobboard:
|
||||
irrelevant-files: *irrelevant-files
|
||||
voting: false
|
||||
- octavia-v2-dsvm-scenario-centos-8:
|
||||
irrelevant-files: *irrelevant-files
|
||||
voting: false
|
||||
|
Loading…
Reference in New Issue
Block a user