From 9c13499d216ef6638a08ba4158575b182084b64f Mon Sep 17 00:00:00 2001 From: Ann Taraday Date: Mon, 11 Mar 2019 10:08:39 +0000 Subject: [PATCH] Jobboard based controller Introduce TaskFlowServiceController which uses taskflow jobboard feature and saves jobs info into persistence backend. Jobboard could be operated via RedisTaskFlowDriver or ZookeeperTaskFlowDriver, that could be set via the config. RedisTaskFlowDriver is intoduced as default backend for jobboard. Usage of jobboard allows to resume jobs in case of restart/stop of Octavia controller services. Persistence backend saves state of flow tasks that required in case of resuming job. SQLAlchemy backend is used here. Bump taskflow version to 3.7.1 and add dependency to SQLAlchemy-Utils (required for taskflow sqlalchemy backend support). Story: 2005072 Task: 30806 Task: 30816 Task: 30817 Change-Id: I92ee4e879e98e4718d2e9aba56486341223a9157 --- devstack/plugin.sh | 48 +- devstack/settings | 2 + doc/source/admin/providers/amphorav2.rst | 31 ++ doc/source/admin/providers/index.rst | 2 + doc/source/install/install-amphorav2.rst | 84 ++++ doc/source/install/install.rst | 1 + etc/octavia.conf | 43 ++ lower-constraints.txt | 3 +- octavia/api/common/types.py | 12 +- octavia/api/v2/controllers/amphora.py | 19 +- octavia/common/base_taskflow.py | 133 ++++- octavia/common/config.py | 44 +- octavia/common/constants.py | 1 + octavia/common/data_models.py | 7 +- .../healthmanager/health_manager.py | 8 +- .../controller/housekeeping/house_keeping.py | 18 +- octavia/controller/queue/v2/consumer.py | 4 + .../controller/worker/v2/controller_worker.py | 476 ++++++++---------- .../controller/worker/v2/flows/flow_utils.py | 158 ++++++ .../worker/v2/taskflow_jobboard_driver.py | 102 ++++ octavia/db/migration/cli.py | 18 + octavia/db/repositories.py | 16 +- .../tests/functional/db/test_repositories.py | 15 + .../tests/unit/common/test_base_taskflow.py | 83 +++ .../healthmanager/test_health_manager.py | 28 +- .../housekeeping/test_house_keeping.py | 73 ++- .../unit/controller/queue/v2/test_consumer.py | 3 +- .../worker/v2/test_controller_worker.py | 432 +++++----------- ...rd-based-controller-599279c7cc172e955.yaml | 7 + requirements.txt | 3 +- setup.cfg | 3 + zuul.d/jobs.yaml | 4 + 32 files changed, 1280 insertions(+), 601 deletions(-) create mode 100644 doc/source/admin/providers/amphorav2.rst create mode 100644 doc/source/install/install-amphorav2.rst create mode 100644 octavia/controller/worker/v2/flows/flow_utils.py create mode 100644 octavia/controller/worker/v2/taskflow_jobboard_driver.py create mode 100644 releasenotes/notes/add-jobboard-based-controller-599279c7cc172e955.yaml diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 98df497685..4f8bcc5fb2 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -225,6 +225,36 @@ function create_octavia_accounts { fi } +function install_redis { + if is_fedora; then + install_package redis + elif is_ubuntu; then + install_package redis-server + elif is_suse; then + install_package redis + else + exit_distro_not_supported "redis installation" + fi + + start_service redis + + pip_install_gr redis +} + +function uninstall_redis { + if is_fedora; then + uninstall_package redis + elif is_ubuntu; then + uninstall_package redis-server + elif is_suse; then + uninstall_package redis + fi + + stop_service redis + + pip_unistall redis +} + function octavia_configure { sudo mkdir -m 755 -p $OCTAVIA_CONF_DIR @@ -250,7 +280,9 @@ function octavia_configure { iniset $OCTAVIA_CONF api_settings api_handler queue_producer iniset $OCTAVIA_CONF database connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia" - + if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then + iniset $OCTAVIA_CONF task_flow persistence_connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia_persistence" + fi # Configure keystone auth_token for all users configure_keystone_authtoken_middleware $OCTAVIA_CONF octavia @@ -324,12 +356,22 @@ function octavia_configure { if [ $OCTAVIA_NODE == 'main' ] || [ $OCTAVIA_NODE == 'standalone' ] || [ $OCTAVIA_NODE == 'api' ]; then recreate_database_mysql octavia octavia-db-manage upgrade head + + if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then + recreate_database_mysql octavia_persistence + octavia-db-manage upgrade_persistence + fi fi if [[ -a $OCTAVIA_CERTS_DIR ]] ; then rm -rf $OCTAVIA_CERTS_DIR fi + # amphorav2 required redis installation + if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then + install_redis + fi + if [[ "$(trueorfalse False OCTAVIA_USE_PREGENERATED_CERTS)" == "True" ]]; then cp -rfp ${OCTAVIA_PREGENERATED_CERTS_DIR} ${OCTAVIA_CERTS_DIR} else @@ -615,6 +657,10 @@ function octavia_cleanup { sudo rm -rf $NOVA_STATE_PATH $NOVA_AUTH_CACHE_DIR + if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then + uninstall_redis + fi + sudo rm -f /etc/rsyslog.d/10-octavia-log-offloading.conf restart_service rsyslog diff --git a/devstack/settings b/devstack/settings index b3a5b5a367..80970193da 100644 --- a/devstack/settings +++ b/devstack/settings @@ -33,6 +33,8 @@ OCTAVIA_PORT=${OCTAVIA_PORT:-"9876"} OCTAVIA_HA_PORT=${OCTAVIA_HA_PORT:-"9875"} OCTAVIA_HM_LISTEN_PORT=${OCTAVIA_HM_LISTEN_PORT:-"5555"} +OCTAVIA_ENABLE_AMPHORAV2_PROVIDER=${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER:-False} + OCTAVIA_MGMT_SUBNET=${OCTAVIA_MGMT_SUBNET:-"192.168.0.0/24"} OCTAVIA_MGMT_SUBNET_START=${OCTAVIA_MGMT_SUBNET_START:-"192.168.0.2"} OCTAVIA_MGMT_SUBNET_END=${OCTAVIA_MGMT_SUBNET_END:-"192.168.0.200"} diff --git a/doc/source/admin/providers/amphorav2.rst b/doc/source/admin/providers/amphorav2.rst new file mode 100644 index 0000000000..d6db68dea7 --- /dev/null +++ b/doc/source/admin/providers/amphorav2.rst @@ -0,0 +1,31 @@ +.. + Copyright 2020 Mirantis Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + +Amphorav2 +========= + +This is extension of the reference driver for Octavia. It adopts taskflow +jobboard feature and saves task states into the persistence backend, this +allows to continue task execution if controller work was interrupted. + +Default provider name: **amphorav2** + +The driver package: https://pypi.org/project/octavia/ + +The driver source: https://opendev.org/openstack/octavia/ + +The documentation: https://docs.openstack.org/octavia/latest/ + +Where to report issues with the driver: https://storyboard.openstack.org/#!/project/openstack/octavia diff --git a/doc/source/admin/providers/index.rst b/doc/source/admin/providers/index.rst index 8c60c3c761..2eee086f46 100644 --- a/doc/source/admin/providers/index.rst +++ b/doc/source/admin/providers/index.rst @@ -45,6 +45,8 @@ your Octavia API instances. .. include:: amphora.rst +.. include:: amphorav2.rst + .. include:: f5.rst .. include:: ovn.rst diff --git a/doc/source/install/install-amphorav2.rst b/doc/source/install/install-amphorav2.rst new file mode 100644 index 0000000000..93eff9a0cb --- /dev/null +++ b/doc/source/install/install-amphorav2.rst @@ -0,0 +1,84 @@ +.. _install-amphorav2: + +Additional configuration steps to configure amphorav2 provider +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you would like to use amphorav2 provider for load-balancer service the +following additional steps are required. + + +Prerequisites +------------- + +Amphorav2 provider requires creation of additional database +``octavia_persistence`` to store info about state of tasks and progress of its +execution. +Also to monitor progress on taskflow jobs amphorav2 provider uses +jobboard. As jobboard backend could be used Redis or Zookeeper key-value +storages. Operator should chose the one that is more preferable for specific +cloud. The default is Redis. + +1. Create the database, complete these steps: + + * Use the database access client to connect to the database + server as the ``root`` user: + + .. code-block:: console + + # mysql + + * Create the ``octavia_persistence`` database: + + .. code-block:: console + + CREATE DATABASE octavia_persistence; + + * Grant proper access to the ``octavia_persistence`` database: + + .. code-block:: console + + GRANT ALL PRIVILEGES ON octavia.* TO 'octavia_persistence'@'localhost' \ + IDENTIFIED BY 'OCTAVIA_DBPASS'; + GRANT ALL PRIVILEGES ON octavia.* TO 'octavia_persistence'@'%' \ + IDENTIFIED BY 'OCTAVIA_DBPASS'; + + Replace OCTAVIA_DBPASS with a suitable password. + + +2. Install desired key-value backend (Redis or Zookeper). + +Additional configuration to octavia components +---------------------------------------------- + +1. Edit the ``/etc/octavia/octavia.conf`` file ``[task_flow]`` section + + * Configure database access for persistence backend: + + .. code-block:: ini + + [task_flow] + persistence_connection = mysql+pymysql://octavia:OCTAVIA_DBPASS@controller/octavia_persistence + + Replace OCTAVIA_DBPASS with the password you chose for the Octavia databases. + + * Set desired jobboard backend and its configuration: + + .. code-block:: ini + + [task_flow] + jobboard_backend_driver = 'redis_taskflow_driver' + jobboard_backend_hosts = KEYVALUE_HOST_IPS + jobboard_backend_port = KEYVALUE_PORT + jobboard_backend_password = OCTAVIA_JOBBOARDPASS + jobboard_backend_namespace = 'octavia_jobboard' + + Replace OCTAVIA_JOBBOARDPASS with the password you chose for the Octavia + key-value storage. + Replace KEYVALUE_HOST_IPS and KEYVALUE_PORT with ip and port which + chosen key-value storage is using. + +2. Populate the octavia database: + + .. code-block:: console + + # octavia-db-manage --config-file /etc/octavia/octavia.conf upgrade_persistence diff --git a/doc/source/install/install.rst b/doc/source/install/install.rst index d43ec0aafa..0a79242b8a 100644 --- a/doc/source/install/install.rst +++ b/doc/source/install/install.rst @@ -17,3 +17,4 @@ Note that installation and configuration vary by distribution. :maxdepth: 2 install-ubuntu.rst + install-amphorav2.rst diff --git a/etc/octavia.conf b/etc/octavia.conf index c806449a40..f4b7331d12 100644 --- a/etc/octavia.conf +++ b/etc/octavia.conf @@ -283,6 +283,49 @@ # for debugging purposes. # disable_revert = False +# Persistence database, which will be used to store tasks states. +# Database connection url with db name (string value) +#persistence_connection = sqlite:// + +# Jobboard backend driver that will monitor job state. (string value) +# Possible values: +# - redis_taskflow_driver: Driver that will use Redis to store job states. +# - zookeeper_taskflow_driver: Driver that will use Zookeeper to store job +# states. +#jobboard_backend_driver = redis_taskflow_driver + +# Jobboard backend server host(s). (list value) +#jobboard_backend_hosts = 127.0.0.1 + +# Jobboard backend server port (port value) +# Minimum value: 0 +# Maximum value: 65535 +#jobboard_backend_port = 6379 + +# Jobboard backend server password (string value) +#jobboard_backend_password = + +# Jobboard name that should be used to store taskflow job id and +# claims for it. (string value) +#jobboard_backend_namespace = octavia_jobboard + +# Redis jobboard backend ssl configuration options. (dict value) +# SSL is disabled by default +#jobboard_redis_backend_ssl_options = ssl:False,ssl_ca_certs:None,ssl_cert_reqs:required,ssl_certfile:None,ssl_keyfile:None + +# Zookeeper jobboard backend ssl configuration options. (dict value) +# SSL is disabled by default +#jobboard_zookeeper_ssl_options = use_ssl:False,certfile:None,keyfile:None,keyfile_password:None,verify_certs:True + +# For backends like redis claiming jobs requiring setting the expiry - +# how many seconds the claim should be retained for. (integer value) +#jobboard_expiration_time = 30 + +# If for analysis required saving logbooks info, set this parameter to +# True. By default remove logbook from persistence backend when job +# completed. (boolean value) +#jobboard_save_logbook = false + [oslo_messaging] # Queue Consumer Thread Pool Size # rpc_thread_pool_size = 2 diff --git a/lower-constraints.txt b/lower-constraints.txt index 0f2a5824a8..8188549cf9 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -149,12 +149,13 @@ Sphinx==1.6.2 sphinxcontrib-svg2pdfconverter==0.1.0 sphinxcontrib-websupport==1.0.1 SQLAlchemy==1.0.10 +SQLAlchemy-Utils==0.30.11 sqlalchemy-migrate==0.11.0 sqlparse==0.2.4 statsd==3.2.2 stestr==2.0.0 stevedore==1.20.0 -taskflow==2.16.0 +taskflow==4.1.0 tempest==17.1.0 Tempita==0.5.2 tenacity==5.0.4 diff --git a/octavia/api/common/types.py b/octavia/api/common/types.py index 95c9141c72..caf6c99c74 100644 --- a/octavia/api/common/types.py +++ b/octavia/api/common/types.py @@ -14,6 +14,7 @@ import copy +from dateutil import parser import netaddr from wsme import types as wtypes @@ -127,12 +128,19 @@ class BaseType(wtypes.Base, metaclass=BaseMeta): :param data_model: data model to convert from :param children: convert child data models """ + type_dict = data_model.to_dict() + # We need to have json convertible data for storing it in persistence + # jobboard backend. + for k, v in type_dict.items(): + if ('_at' in k or 'expiration' in k) and v is not None: + type_dict[k] = parser.parse(v) + if not hasattr(cls, '_type_to_model_map'): - return cls(**data_model.to_dict()) + return cls(**type_dict) dm_to_type_map = {value: key for key, value in cls._type_to_model_map.items()} - type_dict = data_model.to_dict() + new_dict = copy.deepcopy(type_dict) for key, value in type_dict.items(): if isinstance(value, dict): diff --git a/octavia/api/v2/controllers/amphora.py b/octavia/api/v2/controllers/amphora.py index f7d9923802..fc96c7b985 100644 --- a/octavia/api/v2/controllers/amphora.py +++ b/octavia/api/v2/controllers/amphora.py @@ -98,10 +98,15 @@ class FailoverController(base.BaseController): def __init__(self, amp_id): super(FailoverController, self).__init__() - topic = cfg.CONF.oslo_messaging.topic + if CONF.api_settings.default_provider_driver == constants.AMPHORAV2: + topic = constants.TOPIC_AMPHORA_V2 + version = "2.0" + else: + topic = cfg.CONF.oslo_messaging.topic + version = "1.0" self.target = messaging.Target( namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT, - topic=topic, version="1.0", fanout=False) + topic=topic, version=version, fanout=False) self.client = rpc.get_client(self.target) self.amp_id = amp_id @@ -143,11 +148,17 @@ class AmphoraUpdateController(base.BaseController): def __init__(self, amp_id): super(AmphoraUpdateController, self).__init__() - topic = cfg.CONF.oslo_messaging.topic + + if CONF.api_settings.default_provider_driver == constants.AMPHORAV2: + topic = constants.TOPIC_AMPHORA_V2 + version = "2.0" + else: + topic = cfg.CONF.oslo_messaging.topic + version = "1.0" self.transport = messaging.get_rpc_transport(cfg.CONF) self.target = messaging.Target( namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT, - topic=topic, version="1.0", fanout=False) + topic=topic, version=version, fanout=False) self.client = messaging.RPCClient(self.transport, target=self.target) self.amp_id = amp_id diff --git a/octavia/common/base_taskflow.py b/octavia/common/base_taskflow.py index 4fb3e3a183..27622cc0d0 100644 --- a/octavia/common/base_taskflow.py +++ b/octavia/common/base_taskflow.py @@ -15,14 +15,38 @@ import concurrent.futures import datetime +import functools from oslo_config import cfg -from taskflow import engines as tf_engines +from oslo_log import log +from oslo_utils import uuidutils +from taskflow.conductors.backends import impl_blocking +from taskflow import engines +from taskflow import exceptions as taskflow_exc +from taskflow.listeners import base +from taskflow.listeners import logging +from taskflow.persistence import models +from taskflow import states +from octavia.amphorae.driver_exceptions import exceptions + +LOG = log.getLogger(__name__) CONF = cfg.CONF +# We do not need to log retry exception information. Warning "Could not connect +# to instance" will be logged as usual. +def retryMaskFilter(record): + if record.exc_info is not None and isinstance( + record.exc_info[1], exceptions.AmpConnectionRetry): + return False + return True + + +LOG.logger.addFilter(retryMaskFilter) + + class BaseTaskFlowEngine(object): """This is the task flow engine @@ -37,7 +61,7 @@ class BaseTaskFlowEngine(object): max_workers=CONF.task_flow.max_workers) def _taskflow_load(self, flow, **kwargs): - eng = tf_engines.load( + eng = engines.load( flow, engine=CONF.task_flow.engine, executor=self.executor, @@ -47,3 +71,108 @@ class BaseTaskFlowEngine(object): eng.prepare() return eng + + +class ExtendExpiryListener(base.Listener): + + def __init__(self, engine, job): + super(ExtendExpiryListener, self).__init__(engine) + self.job = job + + def _task_receiver(self, state, details): + self.job.extend_expiry(cfg.CONF.task_flow.jobboard_expiration_time) + + def _flow_receiver(self, state, details): + self.job.extend_expiry(cfg.CONF.task_flow.jobboard_expiration_time) + + def _retry_receiver(self, state, details): + self.job.extend_expiry(cfg.CONF.task_flow.jobboard_expiration_time) + + +class DynamicLoggingConductor(impl_blocking.BlockingConductor): + + def _listeners_from_job(self, job, engine): + listeners = super(DynamicLoggingConductor, self)._listeners_from_job( + job, engine) + listeners.append(logging.DynamicLoggingListener(engine, log=LOG)) + + return listeners + + def _on_job_done(self, job, fut): + super(DynamicLoggingConductor, self)._on_job_done(job, fut) + # Double check that job is complete. + if (not CONF.task_flow.jobboard_save_logbook and + job.state == states.COMPLETE): + LOG.debug("Job %s is complete. Cleaning up job logbook.", job.name) + try: + self._persistence.get_connection().destroy_logbook( + job.book.uuid) + except taskflow_exc.NotFound: + LOG.debug("Logbook for job %s has been already cleaned up", + job.name) + + +class RedisDynamicLoggingConductor(DynamicLoggingConductor): + + def _listeners_from_job(self, job, engine): + listeners = super(RedisDynamicLoggingConductor, + self)._listeners_from_job(job, engine) + listeners.append(ExtendExpiryListener(engine, job)) + return listeners + + +class TaskFlowServiceController(object): + + def __init__(self, driver): + self.driver = driver + + def run_poster(self, flow_factory, *args, wait=False, **kwargs): + with self.driver.persistence_driver.get_persistence() as persistence: + with self.driver.job_board(persistence) as job_board: + job_id = uuidutils.generate_uuid() + job_name = '-'.join([flow_factory.__name__, job_id]) + job_logbook = models.LogBook(job_name) + flow_detail = models.FlowDetail( + job_name, job_id) + job_details = { + 'store': kwargs.pop('store') + } + job_logbook.add(flow_detail) + persistence.get_connection().save_logbook(job_logbook) + engines.save_factory_details(flow_detail, flow_factory, + args, kwargs, + backend=persistence) + + job_board.post(job_name, book=job_logbook, + details=job_details) + if wait: + self._wait_for_job(job_board) + + return job_id + + def _wait_for_job(self, job_board): + # Wait for job to its complete state + for job in job_board.iterjobs(): + LOG.debug("Waiting for job %s to finish", job.name) + job.wait() + + def run_conductor(self, name): + with self.driver.persistence_driver.get_persistence() as persistence: + with self.driver.job_board(persistence) as board: + # Redis do not expire jobs by default, so jobs won't be resumed + # with restart of controller. Add expiry for board and use + # special listener. + if (CONF.task_flow.jobboard_backend_driver == + 'redis_taskflow_driver'): + conductor = RedisDynamicLoggingConductor( + name, board, persistence=persistence, + engine=CONF.task_flow.engine) + board.claim = functools.partial( + board.claim, + expiry=CONF.task_flow.jobboard_expiration_time) + else: + conductor = DynamicLoggingConductor( + name, board, persistence=persistence, + engine=CONF.task_flow.engine) + + conductor.run() diff --git a/octavia/common/config.py b/octavia/common/config.py index 02ea14ae6b..8daea3f658 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -92,7 +92,8 @@ api_opts = [ 'Amphora driver.'), default={'amphora': 'The Octavia Amphora driver.', 'octavia': 'Deprecated alias of the Octavia Amphora ' - 'driver.'}), + 'driver.', + }), cfg.StrOpt('default_provider_driver', default='amphora', help=_('Default provider driver.')), cfg.IntOpt('udp_connect_min_interval_health_monitor', @@ -457,7 +458,46 @@ task_flow_opts = [ help=_('If True, disables the controller worker taskflow ' 'flows from reverting. This will leave resources in ' 'an inconsistent state and should only be used for ' - 'debugging purposes.')) + 'debugging purposes.')), + cfg.StrOpt('persistence_connection', + default='sqlite://', + help='Persistence database, which will be used to store tasks ' + 'states. Database connection url with db name'), + cfg.StrOpt('jobboard_backend_driver', + default='redis_taskflow_driver', + choices=['redis_taskflow_driver', 'zookeeper_taskflow_driver'], + help='Jobboard backend driver that will monitor job state.'), + cfg.ListOpt('jobboard_backend_hosts', default=['127.0.0.1'], + help='Jobboard backend server host(s).'), + cfg.PortOpt('jobboard_backend_port', default=6379, + help='Jobboard backend server port'), + cfg.StrOpt('jobboard_backend_password', default='', secret=True, + help='Jobboard backend server password'), + cfg.StrOpt('jobboard_backend_namespace', default='octavia_jobboard', + help='Jobboard name that should be used to store taskflow ' + 'job id and claims for it.'), + cfg.DictOpt('jobboard_redis_backend_ssl_options', + help='Redis jobboard backend ssl configuration options.', + default={'ssl': False, + 'ssl_keyfile': None, + 'ssl_certfile': None, + 'ssl_ca_certs': None, + 'ssl_cert_reqs': 'required'}), + cfg.DictOpt('jobboard_zookeeper_ssl_options', + help='Zookeeper jobboard backend ssl configuration options.', + default={'use_ssl': False, + 'keyfile': None, + 'keyfile_password': None, + 'certfile': None, + 'verify_certs': True}), + cfg.IntOpt('jobboard_expiration_time', default=30, + help='For backends like redis claiming jobs requiring setting ' + 'the expiry - how many seconds the claim should be ' + 'retained for.'), + cfg.BoolOpt('jobboard_save_logbook', default=False, + help='If for analysis required saving logbooks info, set this ' + 'parameter to True. By default remove logbook from ' + 'persistence backend when job completed.'), ] core_cli_opts = [] diff --git a/octavia/common/constants.py b/octavia/common/constants.py index fcfec2300f..d23a2e2d08 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -718,6 +718,7 @@ RBAC_GET_STATUS = 'get_status' # PROVIDERS OCTAVIA = 'octavia' +AMPHORAV2 = 'amphorav2' # systemctl commands DISABLE = 'disable' diff --git a/octavia/common/data_models.py b/octavia/common/data_models.py index 7b205aa688..35c407f510 100644 --- a/octavia/common/data_models.py +++ b/octavia/common/data_models.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import re from sqlalchemy.orm import collections @@ -34,7 +35,11 @@ class BaseDataModel(object): # tags is a list, it doesn't need recurse ret[attr] = value continue - + # We need to have json convertible data for storing it in + # persistence jobboard backend. + if isinstance(value, datetime.datetime): + ret[attr] = value.isoformat() + continue if recurse: if isinstance(getattr(self, attr), list): ret[attr] = [] diff --git a/octavia/controller/healthmanager/health_manager.py b/octavia/controller/healthmanager/health_manager.py index 764d1c60f5..266983b5fe 100644 --- a/octavia/controller/healthmanager/health_manager.py +++ b/octavia/controller/healthmanager/health_manager.py @@ -23,7 +23,8 @@ from oslo_log import log as logging from oslo_utils import excutils from octavia.common import constants -from octavia.controller.worker.v1 import controller_worker as cw +from octavia.controller.worker.v1 import controller_worker as cw1 +from octavia.controller.worker.v2 import controller_worker as cw2 from octavia.db import api as db_api from octavia.db import repositories as repo @@ -57,7 +58,10 @@ def update_stats_on_done(stats, fut): class HealthManager(object): def __init__(self, exit_event): - self.cw = cw.ControllerWorker() + if CONF.api_settings.default_provider_driver == constants.AMPHORAV2: + self.cw = cw2.ControllerWorker() + else: + self.cw = cw1.ControllerWorker() self.threads = CONF.health_manager.failover_threads self.executor = futures.ThreadPoolExecutor(max_workers=self.threads) self.amp_repo = repo.AmphoraRepository() diff --git a/octavia/controller/housekeeping/house_keeping.py b/octavia/controller/housekeeping/house_keeping.py index 8910bf3495..9b8de6f790 100644 --- a/octavia/controller/housekeeping/house_keeping.py +++ b/octavia/controller/housekeeping/house_keeping.py @@ -21,7 +21,8 @@ from oslo_utils import timeutils from sqlalchemy.orm import exc as sqlalchemy_exceptions from octavia.common import constants -from octavia.controller.worker.v1 import controller_worker as cw +from octavia.controller.worker.v1 import controller_worker as cw1 +from octavia.controller.worker.v2 import controller_worker as cw2 from octavia.db import api as db_api from octavia.db import repositories as repo @@ -34,7 +35,12 @@ class SpareAmphora(object): self.amp_repo = repo.AmphoraRepository() self.spares_repo = repo.SparesPoolRepository() self.az_repo = repo.AvailabilityZoneRepository() - self.cw = cw.ControllerWorker() + if CONF.api_settings.default_provider_driver == constants.AMPHORAV2: + self.cw = cw2.ControllerWorker() + self.check_booting_amphora = True + else: + self.cw = cw1.ControllerWorker() + self.check_booting_amphora = False def spare_check(self): """Checks the DB for the Spare amphora count. @@ -74,7 +80,8 @@ class SpareAmphora(object): # will function more accurately if the operator actually # configures the AZ setting properly. curr_spare_cnt = self.amp_repo.get_spare_amphora_count( - session, availability_zone=az_name) + session, availability_zone=az_name, + check_booting_amphora=self.check_booting_amphora) LOG.debug("Current Spare Amphora count for AZ %s: %d", az_name, curr_spare_cnt) diff_count = conf_spare_cnt - curr_spare_cnt @@ -151,7 +158,10 @@ class DatabaseCleanup(object): class CertRotation(object): def __init__(self): self.threads = CONF.house_keeping.cert_rotate_threads - self.cw = cw.ControllerWorker() + if CONF.api_settings.default_provider_driver == constants.AMPHORAV2: + self.cw = cw2.ControllerWorker() + else: + self.cw = cw1.ControllerWorker() def rotate(self): """Check the amphora db table for expiring auth certs.""" diff --git a/octavia/controller/queue/v2/consumer.py b/octavia/controller/queue/v2/consumer.py index f2c6dc010b..36b2a20d9e 100644 --- a/octavia/controller/queue/v2/consumer.py +++ b/octavia/controller/queue/v2/consumer.py @@ -16,6 +16,7 @@ import cotyledon from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher +from oslo_utils import uuidutils from octavia.common import constants from octavia.common import rpc @@ -46,6 +47,9 @@ class ConsumerService(cotyledon.Service): access_policy=self.access_policy ) self.message_listener.start() + for e in self.endpoints: + e.worker.services_controller.run_conductor( + 'octavia-task-flow-conductor-%s' % uuidutils.generate_uuid()) def terminate(self): if self.message_listener: diff --git a/octavia/controller/worker/v2/controller_worker.py b/octavia/controller/worker/v2/controller_worker.py index a0ce5b3454..edc55c4e0b 100644 --- a/octavia/controller/worker/v2/controller_worker.py +++ b/octavia/controller/worker/v2/controller_worker.py @@ -13,26 +13,18 @@ # under the License. # - from oslo_config import cfg from oslo_log import log as logging from oslo_utils import excutils from sqlalchemy.orm import exc as db_exceptions -from taskflow.listeners import logging as tf_logging +from stevedore import driver as stevedore_driver import tenacity -from octavia.amphorae.driver_exceptions import exceptions from octavia.api.drivers import utils as provider_utils from octavia.common import base_taskflow from octavia.common import constants -from octavia.controller.worker.v2.flows import amphora_flows -from octavia.controller.worker.v2.flows import health_monitor_flows -from octavia.controller.worker.v2.flows import l7policy_flows -from octavia.controller.worker.v2.flows import l7rule_flows -from octavia.controller.worker.v2.flows import listener_flows -from octavia.controller.worker.v2.flows import load_balancer_flows -from octavia.controller.worker.v2.flows import member_flows -from octavia.controller.worker.v2.flows import pool_flows +from octavia.controller.worker.v2.flows import flow_utils +from octavia.controller.worker.v2 import taskflow_jobboard_driver as tsk_driver from octavia.db import api as db_apis from octavia.db import repositories as repo @@ -45,35 +37,14 @@ RETRY_BACKOFF = 1 RETRY_MAX = 5 -# We do not need to log retry exception information. Warning "Could not connect -# to instance" will be logged as usual. -def retryMaskFilter(record): - if record.exc_info is not None and isinstance( - record.exc_info[1], exceptions.AmpConnectionRetry): - return False - return True - - -LOG.logger.addFilter(retryMaskFilter) - - def _is_provisioning_status_pending_update(lb_obj): return not lb_obj.provisioning_status == constants.PENDING_UPDATE -class ControllerWorker(base_taskflow.BaseTaskFlowEngine): +class ControllerWorker(object): def __init__(self): - self._amphora_flows = amphora_flows.AmphoraFlows() - self._health_monitor_flows = health_monitor_flows.HealthMonitorFlows() - self._lb_flows = load_balancer_flows.LoadBalancerFlows() - self._listener_flows = listener_flows.ListenerFlows() - self._member_flows = member_flows.MemberFlows() - self._pool_flows = pool_flows.PoolFlows() - self._l7policy_flows = l7policy_flows.L7PolicyFlows() - self._l7rule_flows = l7rule_flows.L7RuleFlows() - self._amphora_repo = repo.AmphoraRepository() self._amphora_health_repo = repo.AmphoraHealthRepository() self._health_mon_repo = repo.HealthMonitorRepository() @@ -86,7 +57,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): self._flavor_repo = repo.FlavorRepository() self._az_repo = repo.AvailabilityZoneRepository() - super(ControllerWorker, self).__init__() + persistence = tsk_driver.MysqlPersistenceDriver() + + self.jobboard_driver = stevedore_driver.DriverManager( + namespace='octavia.worker.jobboard_driver', + name=CONF.task_flow.jobboard_backend_driver, + invoke_args=(persistence,), + invoke_on_load=True).driver @tenacity.retry( retry=( @@ -99,12 +76,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): return repo.get(db_apis.get_session(), id=id) + @property + def services_controller(self): + return base_taskflow.TaskFlowServiceController(self.jobboard_driver) + def create_amphora(self, availability_zone=None): """Creates an Amphora. This is used to create spare amphora. - :returns: amphora_id + :returns: uuid """ try: store = {constants.BUILD_TYPE_PRIORITY: @@ -115,13 +96,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): store[constants.AVAILABILITY_ZONE] = ( self._az_repo.get_availability_zone_metadata_dict( db_apis.get_session(), availability_zone)) - create_amp_tf = self._taskflow_load( - self._amphora_flows.get_create_amphora_flow(), - store=store) - with tf_logging.DynamicLoggingListener(create_amp_tf, log=LOG): - create_amp_tf.run() + job_id = self.services_controller.run_poster( + flow_utils.get_create_amphora_flow, + store=store, wait=True) - return create_amp_tf.storage.fetch('amphora') + return job_id except Exception as e: LOG.error('Failed to create an amphora due to: {}'.format(str(e))) @@ -134,13 +113,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): """ amphora = self._amphora_repo.get(db_apis.get_session(), id=amphora_id) - delete_amp_tf = self._taskflow_load( - self._amphora_flows.get_delete_amphora_flow(), - store={constants.AMPHORA: amphora.to_dict()}) - with tf_logging.DynamicLoggingListener(delete_amp_tf, - log=LOG): - delete_amp_tf.run() + store = {constants.AMPHORA: amphora.to_dict()} + self.services_controller.run_poster( + flow_utils.get_delete_amphora_flow, + store=store) + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_health_monitor(self, health_monitor): """Creates a health monitor. @@ -162,16 +144,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) - create_hm_tf = self._taskflow_load( - self._health_monitor_flows.get_create_health_monitor_flow(), - store={constants.HEALTH_MON: health_monitor, - constants.POOL_ID: pool.id, - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER_ID: load_balancer.id, - constants.LOADBALANCER: provider_lb}) - with tf_logging.DynamicLoggingListener(create_hm_tf, - log=LOG): - create_hm_tf.run() + store = {constants.HEALTH_MON: health_monitor, + constants.POOL_ID: pool.id, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: load_balancer.id, + constants.LOADBALANCER: provider_lb} + self.services_controller.run_poster( + flow_utils.get_create_health_monitor_flow, + store=store) def delete_health_monitor(self, health_monitor): """Deletes a health monitor. @@ -193,17 +173,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( pool.listeners)) - delete_hm_tf = self._taskflow_load( - self._health_monitor_flows.get_delete_health_monitor_flow(), - store={constants.HEALTH_MON: health_monitor, - constants.POOL_ID: pool.id, - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER_ID: load_balancer.id, - constants.LOADBALANCER: provider_lb, - constants.PROJECT_ID: load_balancer.project_id}) - with tf_logging.DynamicLoggingListener(delete_hm_tf, - log=LOG): - delete_hm_tf.run() + store = {constants.HEALTH_MON: health_monitor, + constants.POOL_ID: pool.id, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: load_balancer.id, + constants.LOADBALANCER: provider_lb, + constants.PROJECT_ID: load_balancer.project_id} + self.services_controller.run_poster( + flow_utils.get_delete_health_monitor_flow, + store=store) def update_health_monitor(self, original_health_monitor, health_monitor_updates): @@ -236,18 +214,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() - update_hm_tf = self._taskflow_load( - self._health_monitor_flows.get_update_health_monitor_flow(), - store={constants.HEALTH_MON: original_health_monitor, - constants.POOL_ID: pool.id, - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER_ID: load_balancer.id, - constants.LOADBALANCER: provider_lb, - constants.UPDATE_DICT: health_monitor_updates}) - with tf_logging.DynamicLoggingListener(update_hm_tf, - log=LOG): - update_hm_tf.run() + store = {constants.HEALTH_MON: original_health_monitor, + constants.POOL_ID: pool.id, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: load_balancer.id, + constants.LOADBALANCER: provider_lb, + constants.UPDATE_DICT: health_monitor_updates} + self.services_controller.run_poster( + flow_utils.get_update_health_monitor_flow, + store=store) + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_listener(self, listener): """Creates a listener. @@ -272,14 +253,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() - create_listener_tf = self._taskflow_load( - self._listener_flows.get_create_listener_flow(), - store={constants.LISTENERS: dict_listeners, - constants.LOADBALANCER: provider_lb, - constants.LOADBALANCER_ID: load_balancer.id}) - with tf_logging.DynamicLoggingListener(create_listener_tf, - log=LOG): - create_listener_tf.run() + store = {constants.LISTENERS: dict_listeners, + constants.LOADBALANCER: provider_lb, + constants.LOADBALANCER_ID: load_balancer.id} + + self.services_controller.run_poster( + flow_utils.get_create_listener_flow, + store=store) def delete_listener(self, listener): """Deletes a listener. @@ -292,15 +272,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): # the project ID lb = self._lb_repo.get(db_apis.get_session(), id=listener[constants.LOADBALANCER_ID]) - delete_listener_tf = self._taskflow_load( - self._listener_flows.get_delete_listener_flow(), - store={constants.LISTENER: listener, - constants.LOADBALANCER_ID: - listener[constants.LOADBALANCER_ID], - constants.PROJECT_ID: lb.project_id}) - with tf_logging.DynamicLoggingListener(delete_listener_tf, - log=LOG): - delete_listener_tf.run() + store = {constants.LISTENER: listener, + constants.LOADBALANCER_ID: + listener[constants.LOADBALANCER_ID], + constants.PROJECT_ID: lb.project_id} + self.services_controller.run_poster( + flow_utils.get_delete_listener_flow, + store=store) def update_listener(self, listener, listener_updates): """Updates a listener. @@ -312,14 +290,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): """ db_lb = self._lb_repo.get(db_apis.get_session(), id=listener[constants.LOADBALANCER_ID]) - update_listener_tf = self._taskflow_load( - self._listener_flows.get_update_listener_flow(), - store={constants.LISTENER: listener, - constants.UPDATE_DICT: listener_updates, - constants.LOADBALANCER_ID: db_lb.id, - constants.LISTENERS: [listener]}) - with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG): - update_listener_tf.run() + store = {constants.LISTENER: listener, + constants.UPDATE_DICT: listener_updates, + constants.LOADBALANCER_ID: db_lb.id, + constants.LISTENERS: [listener]} + self.services_controller.run_poster( + flow_utils.get_update_listener_flow, + store=store) @tenacity.retry( retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), @@ -364,13 +341,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): store[constants.UPDATE_DICT] = { constants.TOPOLOGY: topology } - - create_lb_flow = self._lb_flows.get_create_load_balancer_flow( - topology=topology, listeners=listeners_dicts) - - create_lb_tf = self._taskflow_load(create_lb_flow, store=store) - with tf_logging.DynamicLoggingListener(create_lb_tf, log=LOG): - create_lb_tf.run() + self.services_controller.run_poster( + flow_utils.get_create_load_balancer_flow, + topology, listeners=listeners_dicts, + store=store) def delete_load_balancer(self, load_balancer, cascade=False): """Deletes a load balancer by de-allocating Amphorae. @@ -381,25 +355,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): """ db_lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer[constants.LOADBALANCER_ID]) - store = {} - + store = {constants.LOADBALANCER: load_balancer, + constants.SERVER_GROUP_ID: db_lb.server_group_id, + constants.PROJECT_ID: db_lb.project_id} if cascade: - flow = self._lb_flows.get_cascade_delete_load_balancer_flow( - load_balancer) - store.update(self._lb_flows.get_delete_pools_store(db_lb)) - store.update(self._lb_flows.get_delete_listeners_store(db_lb)) + store.update(flow_utils.get_delete_pools_store(db_lb)) + store.update(flow_utils.get_delete_listeners_store(db_lb)) + self.services_controller.run_poster( + flow_utils.get_cascade_delete_load_balancer_flow, + load_balancer, store=store) else: - flow = self._lb_flows.get_delete_load_balancer_flow( - load_balancer) - store.update({constants.LOADBALANCER: load_balancer, - constants.SERVER_GROUP_ID: db_lb.server_group_id, - constants.PROJECT_ID: db_lb.project_id}) - - delete_lb_tf = self._taskflow_load(flow, store=store) - - with tf_logging.DynamicLoggingListener(delete_lb_tf, - log=LOG): - delete_lb_tf.run() + self.services_controller.run_poster( + flow_utils.get_delete_load_balancer_flow, + load_balancer, store=store) def update_load_balancer(self, original_load_balancer, load_balancer_updates): @@ -410,17 +378,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): :returns: None :raises LBNotFound: The referenced load balancer was not found """ + store = {constants.LOADBALANCER: original_load_balancer, + constants.LOADBALANCER_ID: + original_load_balancer[constants.LOADBALANCER_ID], + constants.UPDATE_DICT: load_balancer_updates} - update_lb_tf = self._taskflow_load( - self._lb_flows.get_update_load_balancer_flow(), - store={constants.LOADBALANCER: original_load_balancer, - constants.LOADBALANCER_ID: - original_load_balancer[constants.LOADBALANCER_ID], - constants.UPDATE_DICT: load_balancer_updates}) - - with tf_logging.DynamicLoggingListener(update_lb_tf, - log=LOG): - update_lb_tf.run() + self.services_controller.run_poster( + flow_utils.get_update_load_balancer_flow, + store=store) def create_member(self, member): """Creates a pool member. @@ -452,12 +417,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): else: store[constants.AVAILABILITY_ZONE] = {} - create_member_tf = self._taskflow_load( - self._member_flows.get_create_member_flow(), + self.services_controller.run_poster( + flow_utils.get_create_member_flow, store=store) - with tf_logging.DynamicLoggingListener(create_member_tf, - log=LOG): - create_member_tf.run() def delete_member(self, member): """Deletes a pool member. @@ -491,13 +453,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): else: store[constants.AVAILABILITY_ZONE] = {} - delete_member_tf = self._taskflow_load( - self._member_flows.get_delete_member_flow(), - store=store - ) - with tf_logging.DynamicLoggingListener(delete_member_tf, - log=LOG): - delete_member_tf.run() + self.services_controller.run_poster( + flow_utils.get_delete_member_flow, + store=store) def batch_update_members(self, old_members, new_members, updated_members): @@ -543,13 +501,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): else: store[constants.AVAILABILITY_ZONE] = {} - batch_update_members_tf = self._taskflow_load( - self._member_flows.get_batch_update_members_flow( - provider_old_members, new_members, updated_members), + self.services_controller.run_poster( + flow_utils.get_batch_update_members_flow, + provider_old_members, new_members, updated_members, store=store) - with tf_logging.DynamicLoggingListener(batch_update_members_tf, - log=LOG): - batch_update_members_tf.run() def update_member(self, member, member_updates): """Updates a pool member. @@ -584,13 +539,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): else: store[constants.AVAILABILITY_ZONE] = {} - update_member_tf = self._taskflow_load( - self._member_flows.get_update_member_flow(), + self.services_controller.run_poster( + flow_utils.get_update_member_flow, store=store) - with tf_logging.DynamicLoggingListener(update_member_tf, - log=LOG): - update_member_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_pool(self, pool): """Creates a node pool. @@ -616,15 +573,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( db_pool.listeners)) - create_pool_tf = self._taskflow_load( - self._pool_flows.get_create_pool_flow(), - store={constants.POOL_ID: pool[constants.POOL_ID], - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER_ID: load_balancer.id, - constants.LOADBALANCER: provider_lb}) - with tf_logging.DynamicLoggingListener(create_pool_tf, - log=LOG): - create_pool_tf.run() + store = {constants.POOL_ID: pool[constants.POOL_ID], + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: load_balancer.id, + constants.LOADBALANCER: provider_lb} + self.services_controller.run_poster( + flow_utils.get_create_pool_flow, + store=store) def delete_pool(self, pool): """Deletes a node pool. @@ -644,16 +599,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict() - delete_pool_tf = self._taskflow_load( - self._pool_flows.get_delete_pool_flow(), - store={constants.POOL_ID: pool[constants.POOL_ID], - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER: provider_lb, - constants.LOADBALANCER_ID: load_balancer.id, - constants.PROJECT_ID: db_pool.project_id}) - with tf_logging.DynamicLoggingListener(delete_pool_tf, - log=LOG): - delete_pool_tf.run() + store = {constants.POOL_ID: pool[constants.POOL_ID], + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER: provider_lb, + constants.LOADBALANCER_ID: load_balancer.id, + constants.PROJECT_ID: db_pool.project_id} + self.services_controller.run_poster( + flow_utils.get_delete_pool_flow, + store=store) def update_pool(self, origin_pool, pool_updates): """Updates a node pool. @@ -682,16 +635,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( db_pool.listeners)) - update_pool_tf = self._taskflow_load( - self._pool_flows.get_update_pool_flow(), - store={constants.POOL_ID: db_pool.id, - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER: provider_lb, - constants.LOADBALANCER_ID: load_balancer.id, - constants.UPDATE_DICT: pool_updates}) - with tf_logging.DynamicLoggingListener(update_pool_tf, - log=LOG): - update_pool_tf.run() + store = {constants.POOL_ID: db_pool.id, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER: provider_lb, + constants.LOADBALANCER_ID: load_balancer.id, + constants.UPDATE_DICT: pool_updates} + self.services_controller.run_poster( + flow_utils.get_update_pool_flow, + store=store) def create_l7policy(self, l7policy): """Creates an L7 Policy. @@ -707,15 +658,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_listener])) - create_l7policy_tf = self._taskflow_load( - self._l7policy_flows.get_create_l7policy_flow(), - store={constants.L7POLICY: l7policy, - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER_ID: db_listener.load_balancer.id - }) - with tf_logging.DynamicLoggingListener(create_l7policy_tf, - log=LOG): - create_l7policy_tf.run() + store = {constants.L7POLICY: l7policy, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: db_listener.load_balancer.id + } + self.services_controller.run_poster( + flow_utils.get_create_l7policy_flow, + store=store) def delete_l7policy(self, l7policy): """Deletes an L7 policy. @@ -730,15 +679,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_listener])) - delete_l7policy_tf = self._taskflow_load( - self._l7policy_flows.get_delete_l7policy_flow(), - store={constants.L7POLICY: l7policy, - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER_ID: db_listener.load_balancer.id - }) - with tf_logging.DynamicLoggingListener(delete_l7policy_tf, - log=LOG): - delete_l7policy_tf.run() + store = {constants.L7POLICY: l7policy, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: db_listener.load_balancer.id + } + self.services_controller.run_poster( + flow_utils.get_delete_l7policy_flow, + store=store) def update_l7policy(self, original_l7policy, l7policy_updates): """Updates an L7 policy. @@ -755,15 +702,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_listener])) - update_l7policy_tf = self._taskflow_load( - self._l7policy_flows.get_update_l7policy_flow(), - store={constants.L7POLICY: original_l7policy, - constants.LISTENERS: listeners_dicts, - constants.LOADBALANCER_ID: db_listener.load_balancer.id, - constants.UPDATE_DICT: l7policy_updates}) - with tf_logging.DynamicLoggingListener(update_l7policy_tf, - log=LOG): - update_l7policy_tf.run() + store = {constants.L7POLICY: original_l7policy, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: db_listener.load_balancer.id, + constants.UPDATE_DICT: l7policy_updates} + self.services_controller.run_poster( + flow_utils.get_update_l7policy_flow, + store=store) def create_l7rule(self, l7rule): """Creates an L7 Rule. @@ -783,17 +728,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy( db_l7policy) - create_l7rule_tf = self._taskflow_load( - self._l7rule_flows.get_create_l7rule_flow(), - store={constants.L7RULE: l7rule, - constants.L7POLICY: l7policy_dict.to_dict(), - constants.LISTENERS: listeners_dicts, - constants.L7POLICY_ID: db_l7policy.id, - constants.LOADBALANCER_ID: load_balancer.id - }) - with tf_logging.DynamicLoggingListener(create_l7rule_tf, - log=LOG): - create_l7rule_tf.run() + store = {constants.L7RULE: l7rule, + constants.L7POLICY: l7policy_dict.to_dict(), + constants.L7POLICY_ID: db_l7policy.id, + constants.LISTENERS: listeners_dicts, + constants.LOADBALANCER_ID: load_balancer.id + } + self.services_controller.run_poster( + flow_utils.get_create_l7rule_flow, + store=store) def delete_l7rule(self, l7rule): """Deletes an L7 rule. @@ -811,17 +754,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): provider_utils.db_listeners_to_provider_dicts_list_of_dicts( [db_l7policy.listener])) - delete_l7rule_tf = self._taskflow_load( - self._l7rule_flows.get_delete_l7rule_flow(), - store={constants.L7RULE: l7rule, - constants.L7POLICY: l7policy.to_dict(), - constants.LISTENERS: listeners_dicts, - constants.L7POLICY_ID: db_l7policy.id, - constants.LOADBALANCER_ID: load_balancer.id - }) - with tf_logging.DynamicLoggingListener(delete_l7rule_tf, - log=LOG): - delete_l7rule_tf.run() + store = {constants.L7RULE: l7rule, + constants.L7POLICY: l7policy.to_dict(), + constants.LISTENERS: listeners_dicts, + constants.L7POLICY_ID: db_l7policy.id, + constants.LOADBALANCER_ID: load_balancer.id + } + self.services_controller.run_poster( + flow_utils.get_delete_l7rule_flow, + store=store) def update_l7rule(self, original_l7rule, l7rule_updates): """Updates an L7 rule. @@ -841,17 +782,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy( db_l7policy) - update_l7rule_tf = self._taskflow_load( - self._l7rule_flows.get_update_l7rule_flow(), - store={constants.L7RULE: original_l7rule, - constants.L7POLICY: l7policy_dict.to_dict(), - constants.LISTENERS: listeners_dicts, - constants.L7POLICY_ID: db_l7policy.id, - constants.LOADBALANCER_ID: load_balancer.id, - constants.UPDATE_DICT: l7rule_updates}) - with tf_logging.DynamicLoggingListener(update_l7rule_tf, - log=LOG): - update_l7rule_tf.run() + store = {constants.L7RULE: original_l7rule, + constants.L7POLICY: l7policy_dict.to_dict(), + constants.LISTENERS: listeners_dicts, + constants.L7POLICY_ID: db_l7policy.id, + constants.LOADBALANCER_ID: load_balancer.id, + constants.UPDATE_DICT: l7rule_updates} + self.services_controller.run_poster( + flow_utils.get_update_l7rule_flow, + store=store) def _perform_amphora_failover(self, amp, priority): """Internal method to perform failover operations for an amphora. @@ -903,7 +842,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): lb).to_dict() if lb else lb if CONF.nova.enable_anti_affinity and lb: stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id - if lb and lb.flavor_id: + if lb is not None and lb.flavor_id: stored_params[constants.FLAVOR] = ( self._flavor_repo.get_flavor_metadata_dict( db_apis.get_session(), lb.flavor_id)) @@ -916,13 +855,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): else: stored_params[constants.AVAILABILITY_ZONE] = {} - failover_amphora_tf = self._taskflow_load( - self._amphora_flows.get_failover_flow( - role=amp.role, load_balancer=provider_lb), - store=stored_params) - - with tf_logging.DynamicLoggingListener(failover_amphora_tf, log=LOG): - failover_amphora_tf.run() + self.services_controller.run_poster( + flow_utils.get_failover_flow, + role=amp.role, load_balancer=provider_lb, + store=stored_params, wait=True) LOG.info("Successfully completed the failover for an amphora: %s", {"id": amp.id, @@ -1019,14 +955,12 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): id=amphora_id) LOG.info("Start amphora cert rotation, amphora's id is: %s", amp.id) - certrotation_amphora_tf = self._taskflow_load( - self._amphora_flows.cert_rotate_amphora_flow(), - store={constants.AMPHORA: amp.to_dict(), - constants.AMPHORA_ID: amphora_id}) + store = {constants.AMPHORA: amp.to_dict(), + constants.AMPHORA_ID: amphora_id} - with tf_logging.DynamicLoggingListener(certrotation_amphora_tf, - log=LOG): - certrotation_amphora_tf.run() + self.services_controller.run_poster( + flow_utils.cert_rotate_amphora_flow, + store=store) def update_amphora_agent_config(self, amphora_id): """Update the amphora agent configuration. @@ -1048,11 +982,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): flavor = self._flavor_repo.get_flavor_metadata_dict( db_apis.get_session(), lb.flavor_id) - update_amphora_tf = self._taskflow_load( - self._amphora_flows.update_amphora_config_flow(), - store={constants.AMPHORA: amp.to_dict(), - constants.FLAVOR: flavor}) + store = {constants.AMPHORA: amp.to_dict(), + constants.FLAVOR: flavor} - with tf_logging.DynamicLoggingListener(update_amphora_tf, - log=LOG): - update_amphora_tf.run() + self.services_controller.run_poster( + flow_utils.update_amphora_config_flow, + store=store) diff --git a/octavia/controller/worker/v2/flows/flow_utils.py b/octavia/controller/worker/v2/flows/flow_utils.py new file mode 100644 index 0000000000..58ad1bd3b5 --- /dev/null +++ b/octavia/controller/worker/v2/flows/flow_utils.py @@ -0,0 +1,158 @@ + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from octavia.common import constants +from octavia.controller.worker.v2.flows import amphora_flows +from octavia.controller.worker.v2.flows import health_monitor_flows +from octavia.controller.worker.v2.flows import l7policy_flows +from octavia.controller.worker.v2.flows import l7rule_flows +from octavia.controller.worker.v2.flows import listener_flows +from octavia.controller.worker.v2.flows import load_balancer_flows +from octavia.controller.worker.v2.flows import member_flows +from octavia.controller.worker.v2.flows import pool_flows + + +LB_FLOWS = load_balancer_flows.LoadBalancerFlows() +AMP_FLOWS = amphora_flows.AmphoraFlows() +HM_FLOWS = health_monitor_flows.HealthMonitorFlows() +L7_POLICY_FLOWS = l7policy_flows.L7PolicyFlows() +L7_RULES_FLOWS = l7rule_flows.L7RuleFlows() +LISTENER_FLOWS = listener_flows.ListenerFlows() +M_FLOWS = member_flows.MemberFlows() +P_FLOWS = pool_flows.PoolFlows() + + +def get_create_load_balancer_flow(topology, listeners=None): + return LB_FLOWS.get_create_load_balancer_flow(topology, + listeners=listeners) + + +def get_delete_load_balancer_flow(lb): + return LB_FLOWS.get_delete_load_balancer_flow(lb) + + +def get_delete_listeners_store(lb): + return LB_FLOWS.get_delete_listeners_store(lb) + + +def get_delete_pools_store(lb): + return LB_FLOWS.get_delete_pools_store(lb) + + +def get_cascade_delete_load_balancer_flow(lb): + return LB_FLOWS.get_cascade_delete_load_balancer_flow(lb) + + +def get_update_load_balancer_flow(): + return LB_FLOWS.get_update_load_balancer_flow() + + +def get_create_amphora_flow(): + return AMP_FLOWS.get_create_amphora_flow() + + +def get_delete_amphora_flow(): + return AMP_FLOWS.get_delete_amphora_flow() + + +def get_failover_flow(role=constants.ROLE_STANDALONE, load_balancer=None): + return AMP_FLOWS.get_failover_flow(role=role, load_balancer=load_balancer) + + +def cert_rotate_amphora_flow(): + return AMP_FLOWS.cert_rotate_amphora_flow() + + +def update_amphora_config_flow(): + return AMP_FLOWS.update_amphora_config_flow() + + +def get_create_health_monitor_flow(): + return HM_FLOWS.get_create_health_monitor_flow() + + +def get_delete_health_monitor_flow(): + return HM_FLOWS.get_delete_health_monitor_flow() + + +def get_update_health_monitor_flow(): + return HM_FLOWS.get_update_health_monitor_flow() + + +def get_create_l7policy_flow(): + return L7_POLICY_FLOWS.get_create_l7policy_flow() + + +def get_delete_l7policy_flow(): + return L7_POLICY_FLOWS.get_delete_l7policy_flow() + + +def get_update_l7policy_flow(): + return L7_POLICY_FLOWS.get_update_l7policy_flow() + + +def get_create_l7rule_flow(): + return L7_RULES_FLOWS.get_create_l7rule_flow() + + +def get_delete_l7rule_flow(): + return L7_RULES_FLOWS.get_delete_l7rule_flow() + + +def get_update_l7rule_flow(): + return L7_RULES_FLOWS.get_update_l7rule_flow() + + +def get_create_listener_flow(): + return LISTENER_FLOWS.get_create_listener_flow() + + +def get_create_all_listeners_flow(): + return LISTENER_FLOWS.get_create_all_listeners_flow() + + +def get_delete_listener_flow(): + return LISTENER_FLOWS.get_delete_listener_flow() + + +def get_update_listener_flow(): + return LISTENER_FLOWS.get_update_listener_flow() + + +def get_create_member_flow(): + return M_FLOWS.get_create_member_flow() + + +def get_delete_member_flow(): + return M_FLOWS.get_delete_member_flow() + + +def get_update_member_flow(): + return M_FLOWS.get_update_member_flow() + + +def get_batch_update_members_flow(old_members, new_members, updated_members): + return M_FLOWS.get_batch_update_members_flow(old_members, new_members, + updated_members) + + +def get_create_pool_flow(): + return P_FLOWS.get_create_pool_flow() + + +def get_delete_pool_flow(): + return P_FLOWS.get_delete_pool_flow() + + +def get_update_pool_flow(): + return P_FLOWS.get_update_pool_flow() diff --git a/octavia/controller/worker/v2/taskflow_jobboard_driver.py b/octavia/controller/worker/v2/taskflow_jobboard_driver.py new file mode 100644 index 0000000000..e5a135a12d --- /dev/null +++ b/octavia/controller/worker/v2/taskflow_jobboard_driver.py @@ -0,0 +1,102 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import contextlib + +from oslo_config import cfg +from oslo_log import log +from taskflow.jobs import backends as job_backends +from taskflow.persistence import backends as persistence_backends + +LOG = log.getLogger(__name__) +CONF = cfg.CONF + + +class JobboardTaskFlowDriver(object, metaclass=abc.ABCMeta): + + @abc.abstractmethod + def job_board(self, persistence): + """Setting up jobboard backend based on configuration setting. + + :param persistence: taskflow persistence backend instance + :return: taskflow jobboard backend instance + """ + + +class MysqlPersistenceDriver(object): + + def __init__(self): + self.persistence_conf = { + 'connection': CONF.task_flow.persistence_connection, + 'max_pool_size': CONF.database.max_pool_size, + 'max_overflow': CONF.database.max_overflow, + 'pool_timeout': CONF.database.pool_timeout, + } + + def initialize(self): + # Run migrations once on service start. + backend = persistence_backends.fetch(self.persistence_conf) + with contextlib.closing(backend): + with contextlib.closing(backend.get_connection()) as connection: + connection.upgrade() + + @contextlib.contextmanager + def get_persistence(self): + # Rewrite taskflow get backend, so it won't run migrations on each call + backend = persistence_backends.fetch(self.persistence_conf) + with contextlib.closing(backend): + with contextlib.closing(backend.get_connection()) as conn: + conn.validate() + yield backend + + +class ZookeeperTaskFlowDriver(JobboardTaskFlowDriver): + + def __init__(self, persistence_driver): + self.persistence_driver = persistence_driver + + def job_board(self, persistence): + job_backends_hosts = ','.join( + ['%s:%s' % (host, CONF.task_flow.jobboard_backend_port) + for host in CONF.task_flow.jobboard_backend_hosts]) + jobboard_backend_conf = { + 'board': 'zookeeper', + 'hosts': job_backends_hosts, + 'path': '/' + CONF.task_flow.jobboard_backend_namespace, + } + jobboard_backend_conf.update( + CONF.task_flow.jobboard_zookeeper_ssl_options) + return job_backends.backend(CONF.task_flow.jobboard_name, + jobboard_backend_conf, + persistence=persistence) + + +class RedisTaskFlowDriver(JobboardTaskFlowDriver): + + def __init__(self, persistence_driver): + self.persistence_driver = persistence_driver + + def job_board(self, persistence): + jobboard_backend_conf = { + 'board': 'redis', + 'host': CONF.task_flow.jobboard_backend_hosts[0], + 'port': CONF.task_flow.jobboard_backend_port, + 'password': CONF.task_flow.jobboard_backend_password, + 'namespace': CONF.task_flow.jobboard_backend_namespace, + } + jobboard_backend_conf.update( + CONF.task_flow.jobboard_redis_backend_ssl_options) + return job_backends.backend( + CONF.task_flow.jobboard_backend_namespace, + jobboard_backend_conf, + persistence=persistence) diff --git a/octavia/db/migration/cli.py b/octavia/db/migration/cli.py index f38fd8e099..bb37fee812 100644 --- a/octavia/db/migration/cli.py +++ b/octavia/db/migration/cli.py @@ -22,10 +22,15 @@ from oslo_config import cfg from oslo_db import options from oslo_log import log +from octavia.controller.worker.v2 import taskflow_jobboard_driver from octavia.i18n import _ CONF = cfg.CONF options.set_defaults(CONF) +# Setting explicitly here needed for taskflow persistence successful +# initialization +options.set_defaults(CONF, max_pool_size=10, max_overflow=20, + pool_timeout=10) log.set_defaults() log.register_options(CONF) log.setup(CONF, 'octavia-db-manage') @@ -85,6 +90,14 @@ def do_revision(config, cmd): sql=CONF.command.sql) +def do_persistence_upgrade(config, cmd): + opt = cfg.StrOpt('persistence_connection', + default='sqlite://') + cfg.CONF.register_opts([opt], group='task_flow') + persistence = taskflow_jobboard_driver.MysqlPersistenceDriver() + persistence.initialize() + + def add_command_parsers(subparsers): for name in ['current', 'history', 'branches']: parser = add_alembic_subparser(subparsers, name) @@ -101,6 +114,11 @@ def add_command_parsers(subparsers): parser.add_argument('revision', nargs='?') parser.set_defaults(func=do_upgrade) + parser = subparsers.add_parser( + "upgrade_persistence", + help="Run migrations for persistence backend") + parser.set_defaults(func=do_persistence_upgrade) + parser = subparsers.add_parser('downgrade', help="(No longer supported)") parser.add_argument('None', nargs='?', help="Downgrade not supported") parser.set_defaults(func=no_downgrade) diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index 82631ae253..f47732dacb 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -1249,21 +1249,31 @@ class AmphoraRepository(BaseRepository): return db_lb.to_data_model() return None - def get_spare_amphora_count(self, session, availability_zone=None): + def get_spare_amphora_count(self, session, availability_zone=None, + check_booting_amphora=False): """Get the count of the spare amphora. :returns: Number of current spare amphora. """ filters = { - 'status': consts.AMPHORA_READY, 'load_balancer_id': None } + # For jobboard based controller amphora in booting/pending create state + # can reach READY state after restart of housekeeping/worker service, + # so include amphora in these state to query + if check_booting_amphora: + status = [consts.AMPHORA_READY, + consts.AMPHORA_BOOTING, + consts.PENDING_CREATE] + else: + status = [consts.AMPHORA_READY] + if availability_zone is not None: filters['cached_zone'] = availability_zone with session.begin(subtransactions=True): count = session.query(self.model_class).filter_by( - **filters).count() + **filters).filter(self.model_class.status.in_(status)).count() return count diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index 8f334a917d..df64cdbf0c 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -3258,6 +3258,21 @@ class AmphoraRepositoryTest(BaseRepositoryTest): count = self.amphora_repo.get_spare_amphora_count(self.session) self.assertEqual(2, count) + def test_get_spare_amphora_count_check_booting_amphora_true(self): + count = self.amphora_repo.get_spare_amphora_count( + self.session, check_booting_amphora=True) + self.assertEqual(0, count) + + amphora1 = self.create_amphora(self.FAKE_UUID_1) + self.amphora_repo.update(self.session, amphora1.id, + status=constants.AMPHORA_READY,) + amphora2 = self.create_amphora(self.FAKE_UUID_2) + self.amphora_repo.update(self.session, amphora2.id, + status=constants.AMPHORA_BOOTING) + count = self.amphora_repo.get_spare_amphora_count( + self.session, check_booting_amphora=True) + self.assertEqual(2, count) + def test_get_none_cert_expired_amphora(self): # test with no expired amphora amp = self.amphora_repo.get_cert_expiring_amphora(self.session) diff --git a/octavia/tests/unit/common/test_base_taskflow.py b/octavia/tests/unit/common/test_base_taskflow.py index ebef0eae44..ab8e293331 100644 --- a/octavia/tests/unit/common/test_base_taskflow.py +++ b/octavia/tests/unit/common/test_base_taskflow.py @@ -66,3 +66,86 @@ class TestBaseTaskFlowEngine(base.TestCase): _engine_mock.compile.assert_called_once_with() _engine_mock.prepare.assert_called_once_with() + + +class TestTaskFlowServiceController(base.TestCase): + + _mock_uuid = '9a2ebc48-cd3e-429e-aa04-e32f5fc5442a' + + def setUp(self): + self.conf = oslo_fixture.Config(cfg.CONF) + self.conf.config(group="task_flow", engine='parallel') + self.driver_mock = mock.MagicMock() + self.persistence_mock = mock.MagicMock() + self.jobboard_mock = mock.MagicMock() + self.driver_mock.job_board.return_value = self.jobboard_mock + self.driver_mock.persistence_driver.get_persistence.return_value = ( + self.persistence_mock) + self.service_controller = base_taskflow.TaskFlowServiceController( + self.driver_mock) + super(TestTaskFlowServiceController, self).setUp() + + @mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid) + @mock.patch('taskflow.engines.save_factory_details') + def test_run_poster(self, mock_engines, mockuuid): + flow_factory = mock.MagicMock() + flow_factory.__name__ = 'testname' + job_name = 'testname-%s' % self._mock_uuid + job_details = {'store': 'test'} + with mock.patch.object(self.service_controller, '_wait_for_job' + ) as wait: + uuid = self.service_controller.run_poster(flow_factory, + **job_details) + save_logbook = self.persistence_mock.__enter__().get_connection( + ).save_logbook + save_logbook.assert_called() + self.assertEqual(job_name, save_logbook.call_args[0][0].name) + + mock_engines.assert_called() + save_args = mock_engines.call_args + self.assertEqual(job_name, save_args[0][0].name) + self.assertEqual(self._mock_uuid, save_args[0][0].uuid) + self.assertEqual(flow_factory, save_args[0][1]) + self.assertEqual(self.persistence_mock.__enter__(), + save_args[1]['backend']) + + self.jobboard_mock.__enter__().post.assert_called() + post_args = self.jobboard_mock.__enter__().post.call_args + self.assertEqual(job_name, post_args[0][0]) + self.assertEqual(job_details, post_args[1]['details']) + wait.assert_not_called() + self.assertEqual(self._mock_uuid, uuid) + + @mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid) + @mock.patch('taskflow.engines.save_factory_details') + def test_run_poster_wait(self, mock_engines, mockuuid): + flow_factory = mock.MagicMock() + flow_factory.__name__ = 'testname' + job_details = {'store': 'test'} + with mock.patch.object(self.service_controller, '_wait_for_job' + ) as wait: + uuid = self.service_controller.run_poster(flow_factory, wait=True, + **job_details) + self.persistence_mock.__enter__().get_connection( + ).save_logbook.assert_called() + mock_engines.assert_called() + self.jobboard_mock.__enter__().post.assert_called() + wait.assert_called_once_with(self.jobboard_mock.__enter__()) + self.assertEqual(self._mock_uuid, uuid) + + @mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor') + @mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor') + def test_run_conductor(self, dynamiccond, rediscond): + self.service_controller.run_conductor("test") + rediscond.assert_called_once_with( + "test", self.jobboard_mock.__enter__(), + persistence=self.persistence_mock.__enter__(), + engine='parallel') + self.conf.config(group="task_flow", + jobboard_backend_driver='zookeeper_taskflow_driver') + + self.service_controller.run_conductor("test2") + dynamiccond.assert_called_once_with( + "test2", self.jobboard_mock.__enter__(), + persistence=self.persistence_mock.__enter__(), + engine='parallel') diff --git a/octavia/tests/unit/controller/healthmanager/test_health_manager.py b/octavia/tests/unit/controller/healthmanager/test_health_manager.py index d3d085f5ec..df850ed112 100644 --- a/octavia/tests/unit/controller/healthmanager/test_health_manager.py +++ b/octavia/tests/unit/controller/healthmanager/test_health_manager.py @@ -45,11 +45,14 @@ class TestHealthManager(base.TestCase): @mock.patch('octavia.db.api.wait_for_connection') @mock.patch('octavia.controller.worker.v1.controller_worker.' 'ControllerWorker.failover_amphora') + @mock.patch('octavia.controller.worker.v2.controller_worker.' + 'ControllerWorker.failover_amphora') @mock.patch('octavia.db.repositories.AmphoraHealthRepository.' 'get_stale_amphora') @mock.patch('octavia.db.api.get_session') def test_health_check_stale_amphora(self, session_mock, get_stale_amp_mock, - failover_mock, db_wait_mock): + failover_mockv2, failover_mock, + db_wait_mock): conf = oslo_fixture.Config(cfg.CONF) conf.config(group="health_manager", heartbeat_timeout=5) amphora_health = mock.MagicMock() @@ -86,11 +89,14 @@ class TestHealthManager(base.TestCase): @mock.patch('octavia.controller.worker.v1.controller_worker.' 'ControllerWorker.failover_amphora') + @mock.patch('octavia.controller.worker.v2.controller_worker.' + 'ControllerWorker.failover_amphora') @mock.patch('octavia.db.repositories.AmphoraHealthRepository.' 'get_stale_amphora', return_value=None) @mock.patch('octavia.db.api.get_session') def test_health_check_nonstale_amphora(self, session_mock, - get_stale_amp_mock, failover_mock): + get_stale_amp_mock, failover_mockv2, + failover_mock): get_stale_amp_mock.side_effect = [None, TestException('test')] exit_event = threading.Event() @@ -98,15 +104,20 @@ class TestHealthManager(base.TestCase): hm.health_check() session_mock.assert_called_once_with(autocommit=False) - self.assertFalse(failover_mock.called) + if CONF.api_settings.default_provider_driver == 'amphorav2': + self.assertFalse(failover_mockv2.called) + else: + self.assertFalse(failover_mock.called) @mock.patch('octavia.controller.worker.v1.controller_worker.' 'ControllerWorker.failover_amphora') + @mock.patch('octavia.controller.worker.v2.controller_worker.' + 'ControllerWorker.failover_amphora') @mock.patch('octavia.db.repositories.AmphoraHealthRepository.' 'get_stale_amphora', return_value=None) @mock.patch('octavia.db.api.get_session') def test_health_check_exit(self, session_mock, get_stale_amp_mock, - failover_mock): + failover_mockv2, failover_mock): get_stale_amp_mock.return_value = None exit_event = threading.Event() @@ -114,15 +125,20 @@ class TestHealthManager(base.TestCase): hm.health_check() session_mock.assert_called_once_with(autocommit=False) - self.assertFalse(failover_mock.called) + if CONF.api_settings.default_provider_driver == 'amphorav2': + self.assertFalse(failover_mockv2.called) + else: + self.assertFalse(failover_mock.called) @mock.patch('octavia.controller.worker.v1.controller_worker.' 'ControllerWorker.failover_amphora') + @mock.patch('octavia.controller.worker.v2.controller_worker.' + 'ControllerWorker.failover_amphora') @mock.patch('octavia.db.repositories.AmphoraHealthRepository.' 'get_stale_amphora', return_value=None) @mock.patch('octavia.db.api.get_session') def test_health_check_db_error(self, session_mock, get_stale_amp_mock, - failover_mock): + failover_mockv2, failover_mock): get_stale_amp_mock.return_value = None mock_session = mock.MagicMock() diff --git a/octavia/tests/unit/controller/housekeeping/test_house_keeping.py b/octavia/tests/unit/controller/housekeeping/test_house_keeping.py index 46b06ed33d..e00bda5d68 100644 --- a/octavia/tests/unit/controller/housekeeping/test_house_keeping.py +++ b/octavia/tests/unit/controller/housekeeping/test_house_keeping.py @@ -45,6 +45,9 @@ class TestSpareCheck(base.TestCase): def setUp(self): super(TestSpareCheck, self).setUp() + self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF)) + self.CONF.config(group="api_settings", + default_provider_driver='amphora') self.spare_amp = house_keeping.SpareAmphora() self.amp_repo = mock.MagicMock() self.az_repo = mock.MagicMock() @@ -53,7 +56,6 @@ class TestSpareCheck(base.TestCase): self.spare_amp.amp_repo = self.amp_repo self.spare_amp.az_repo = self.az_repo self.spare_amp.cw = self.cw - self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF)) @mock.patch('octavia.db.api.get_session') def test_spare_check_diff_count(self, session): @@ -90,8 +92,10 @@ class TestSpareCheck(base.TestCase): az1.name, az2.name) self.spare_amp.spare_check() - calls = [mock.call(session, availability_zone=az1.name), - mock.call(session, availability_zone=az2.name)] + calls = [mock.call(session, availability_zone=az1.name, + check_booting_amphora=False), + mock.call(session, availability_zone=az2.name, + check_booting_amphora=False)] self.amp_repo.get_spare_amphora_count.assert_has_calls(calls, any_order=True) @@ -251,6 +255,7 @@ class TestDatabaseCleanup(base.TestCase): class TestCertRotation(base.TestCase): def setUp(self): super(TestCertRotation, self).setUp() + self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF)) @mock.patch('octavia.controller.worker.v1.controller_worker.' 'ControllerWorker.amphora_cert_rotation') @@ -261,6 +266,8 @@ class TestCertRotation(base.TestCase): cert_exp_amp_mock, amp_cert_mock ): + self.CONF.config(group="api_settings", + default_provider_driver='amphora') amphora = mock.MagicMock() amphora.id = AMPHORA_ID @@ -281,6 +288,8 @@ class TestCertRotation(base.TestCase): cert_exp_amp_mock, amp_cert_mock ): + self.CONF.config(group="api_settings", + default_provider_driver='amphora') amphora = mock.MagicMock() amphora.id = AMPHORA_ID @@ -300,9 +309,67 @@ class TestCertRotation(base.TestCase): def test_cert_rotation_non_expired_amphora(self, session, cert_exp_amp_mock, amp_cert_mock): + self.CONF.config(group="api_settings", + default_provider_driver='amphora') session.return_value = session cert_exp_amp_mock.return_value = None cr = house_keeping.CertRotation() cr.rotate() self.assertFalse(amp_cert_mock.called) + + @mock.patch('octavia.controller.worker.v2.controller_worker.' + 'ControllerWorker.amphora_cert_rotation') + @mock.patch('octavia.db.repositories.AmphoraRepository.' + 'get_cert_expiring_amphora') + @mock.patch('octavia.db.api.get_session') + def test_cert_rotation_expired_amphora_with_exception_amphorav2( + self, session, cert_exp_amp_mock, amp_cert_mock): + self.CONF.config(group="api_settings", + default_provider_driver='amphorav2') + + amphora = mock.MagicMock() + amphora.id = AMPHORA_ID + + session.return_value = session + cert_exp_amp_mock.side_effect = [amphora, TestException( + 'break_while')] + + cr = house_keeping.CertRotation() + self.assertRaises(TestException, cr.rotate) + amp_cert_mock.assert_called_once_with(AMPHORA_ID) + + @mock.patch('octavia.controller.worker.v2.controller_worker.' + 'ControllerWorker.amphora_cert_rotation') + @mock.patch('octavia.db.repositories.AmphoraRepository.' + 'get_cert_expiring_amphora') + @mock.patch('octavia.db.api.get_session') + def test_cert_rotation_expired_amphora_without_exception_amphorav2( + self, session, cert_exp_amp_mock, amp_cert_mock): + self.CONF.config(group="api_settings", + default_provider_driver='amphorav2') + amphora = mock.MagicMock() + amphora.id = AMPHORA_ID + + session.return_value = session + cert_exp_amp_mock.side_effect = [amphora, None] + + cr = house_keeping.CertRotation() + + self.assertIsNone(cr.rotate()) + amp_cert_mock.assert_called_once_with(AMPHORA_ID) + + @mock.patch('octavia.controller.worker.v2.controller_worker.' + 'ControllerWorker.amphora_cert_rotation') + @mock.patch('octavia.db.repositories.AmphoraRepository.' + 'get_cert_expiring_amphora') + @mock.patch('octavia.db.api.get_session') + def test_cert_rotation_non_expired_amphora_amphorav2( + self, session, cert_exp_amp_mock, amp_cert_mock): + self.CONF.config(group="api_settings", + default_provider_driver='amphorav2') + session.return_value = session + cert_exp_amp_mock.return_value = None + cr = house_keeping.CertRotation() + cr.rotate() + self.assertFalse(amp_cert_mock.called) diff --git a/octavia/tests/unit/controller/queue/v2/test_consumer.py b/octavia/tests/unit/controller/queue/v2/test_consumer.py index ae62a64a8c..2f5850e6d7 100644 --- a/octavia/tests/unit/controller/queue/v2/test_consumer.py +++ b/octavia/tests/unit/controller/queue/v2/test_consumer.py @@ -50,7 +50,8 @@ class TestConsumer(base.TestRpc): mock_endpoint.assert_called_once_with() @mock.patch.object(messaging, 'get_rpc_server') - def test_consumer_terminate(self, mock_rpc_server): + @mock.patch.object(endpoints, 'Endpoints') + def test_consumer_terminate(self, mock_endpoint, mock_rpc_server): mock_rpc_server_rv = mock.Mock() mock_rpc_server.return_value = mock_rpc_server_rv diff --git a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py index 9face93ede..e55c65c3b3 100644 --- a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py @@ -19,10 +19,10 @@ from oslo_config import fixture as oslo_fixture from oslo_utils import uuidutils from octavia.api.drivers import utils as provider_utils -from octavia.common import base_taskflow from octavia.common import constants from octavia.common import data_models from octavia.controller.worker.v2 import controller_worker +from octavia.controller.worker.v2.flows import flow_utils import octavia.tests.unit.base as base @@ -107,7 +107,7 @@ class TestException(Exception): return_value=_member_mock) @mock.patch('octavia.db.repositories.PoolRepository.get', return_value=_db_pool_mock) -@mock.patch('octavia.common.base_taskflow.BaseTaskFlowEngine._taskflow_load', +@mock.patch('octavia.common.base_taskflow.TaskFlowServiceController', return_value=_flow_mock) @mock.patch('taskflow.listeners.logging.DynamicLoggingListener') @mock.patch('octavia.db.api.get_session', return_value=_db_session) @@ -137,8 +137,8 @@ class TestControllerWorker(base.TestCase): _db_load_balancer_mock.listeners = [_listener_mock] _db_load_balancer_mock.to_dict.return_value = {'id': LB_ID} - fetch_mock = mock.MagicMock(return_value=AMP_ID) - _flow_mock.storage.fetch = fetch_mock + fetch_mock = mock.MagicMock() + _flow_mock.driver.persistence = fetch_mock _db_pool_mock.id = POOL_ID _db_health_mon_mock.pool_id = POOL_ID @@ -170,22 +170,17 @@ class TestControllerWorker(base.TestCase): _flow_mock.reset_mock() cw = controller_worker.ControllerWorker() - amp = cw.create_amphora() + cw.create_amphora() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - 'TEST', + flow_utils.get_create_amphora_flow, + wait=True, store={constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_SPARES_POOL_PRIORITY, constants.FLAVOR: None, constants.AVAILABILITY_ZONE: None})) - _flow_mock.run.assert_called_once_with() - - _flow_mock.storage.fetch.assert_called_once_with('amphora') - - self.assertEqual(AMP_ID, amp) - @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict') @mock.patch('octavia.controller.worker.v2.flows.' @@ -211,22 +206,17 @@ class TestControllerWorker(base.TestCase): az_data = {constants.COMPUTE_ZONE: az} mock_get_az_metadata.return_value = az_data cw = controller_worker.ControllerWorker() - amp = cw.create_amphora(availability_zone=az) + cw.create_amphora(availability_zone=az) mock_get_az_metadata.assert_called_once_with(_db_session, az) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - 'TEST', + flow_utils.get_create_amphora_flow, + wait=True, store={constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_SPARES_POOL_PRIORITY, constants.FLAVOR: None, constants.AVAILABILITY_ZONE: az_data})) - _flow_mock.run.assert_called_once_with() - - _flow_mock.storage.fetch.assert_called_once_with('amphora') - - self.assertEqual(AMP_ID, amp) - @mock.patch('octavia.controller.worker.v2.flows.' 'amphora_flows.AmphoraFlows.get_delete_amphora_flow', return_value='TEST') @@ -254,11 +244,10 @@ class TestControllerWorker(base.TestCase): id=AMP_ID) mock_amp_repo_get.return_value = _db_amphora_mock - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - 'TEST', store={constants.AMPHORA: _db_amphora_mock.to_dict()})) - - _flow_mock.run.assert_called_once_with() + flow_utils.get_delete_amphora_flow, + store={constants.AMPHORA: _db_amphora_mock.to_dict()})) @mock.patch('octavia.controller.worker.v2.flows.' 'health_monitor_flows.HealthMonitorFlows.' @@ -285,8 +274,9 @@ class TestControllerWorker(base.TestCase): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() mock_health_mon_repo_get.return_value = _db_health_mon_mock - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_create_health_monitor_flow, store={constants.HEALTH_MON: _health_mon_mock, constants.LISTENERS: @@ -298,14 +288,7 @@ class TestControllerWorker(base.TestCase): constants.POOL_ID: POOL_ID})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'health_monitor_flows.HealthMonitorFlows.' - 'get_delete_health_monitor_flow', - return_value=_flow_mock) def test_delete_health_monitor(self, - mock_get_delete_hm_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -326,8 +309,8 @@ class TestControllerWorker(base.TestCase): cw.delete_health_monitor(_health_mon_mock) mock_health_mon_repo_get.return_value = _db_health_mon_mock - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_delete_health_monitor_flow, store={constants.HEALTH_MON: _health_mon_mock, constants.LISTENERS: @@ -340,14 +323,7 @@ class TestControllerWorker(base.TestCase): POOL_ID, constants.PROJECT_ID: PROJECT_ID})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'health_monitor_flows.HealthMonitorFlows.' - 'get_update_health_monitor_flow', - return_value=_flow_mock) def test_update_health_monitor(self, - mock_get_update_hm_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -370,8 +346,8 @@ class TestControllerWorker(base.TestCase): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_update_health_monitor_flow, store={constants.HEALTH_MON: _health_mon_mock, constants.POOL_ID: POOL_ID, @@ -384,13 +360,7 @@ class TestControllerWorker(base.TestCase): constants.UPDATE_DICT: HEALTH_UPDATE_DICT})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'listener_flows.ListenerFlows.get_create_listener_flow', - return_value=_flow_mock) def test_create_listener(self, - mock_get_create_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -413,20 +383,14 @@ class TestControllerWorker(base.TestCase): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, store={ + flow_utils.get_create_listener_flow, store={ constants.LOADBALANCER: provider_lb, constants.LOADBALANCER_ID: LB_ID, constants.LISTENERS: [listener_dict]})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'listener_flows.ListenerFlows.get_delete_listener_flow', - return_value=_flow_mock) def test_delete_listener(self, - mock_get_delete_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -446,19 +410,14 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.delete_listener(listener_dict) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, store={constants.LISTENER: self.ref_listener_dict, - constants.LOADBALANCER_ID: LB_ID, - constants.PROJECT_ID: PROJECT_ID})) + flow_utils.get_delete_listener_flow, + store={constants.LISTENER: self.ref_listener_dict, + constants.LOADBALANCER_ID: LB_ID, + constants.PROJECT_ID: PROJECT_ID})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'listener_flows.ListenerFlows.get_update_listener_flow', - return_value=_flow_mock) def test_update_listener(self, - mock_get_update_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -479,8 +438,8 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.update_listener(listener_dict, LISTENER_UPDATE_DICT) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_update_listener_flow, store={constants.LISTENER: listener_dict, constants.UPDATE_DICT: LISTENER_UPDATE_DICT, @@ -488,14 +447,8 @@ class TestControllerWorker(base.TestCase): constants.LISTENERS: [listener_dict]})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows.get_create_load_balancer_flow', - return_value=_flow_mock) def test_create_load_balancer_single( self, - mock_get_create_load_balancer_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -512,9 +465,7 @@ class TestControllerWorker(base.TestCase): self.conf.config(group="controller_worker", loadbalancer_topology=constants.TOPOLOGY_SINGLE) _flow_mock.reset_mock() - mock_taskflow_load.reset_mock() - mock_eng = mock.Mock() - mock_taskflow_load.return_value = mock_eng + store = { constants.LOADBALANCER_ID: LB_ID, 'update_dict': {'topology': constants.TOPOLOGY_SINGLE}, @@ -530,19 +481,13 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.create_load_balancer(_load_balancer_mock) - mock_get_create_load_balancer_flow.assert_called_with( - topology=constants.TOPOLOGY_SINGLE, listeners=[]) - mock_taskflow_load.assert_called_with( - mock_get_create_load_balancer_flow.return_value, store=store) - mock_eng.run.assert_any_call() + cw.services_controller.run_poster.assert_called_with( + flow_utils.get_create_load_balancer_flow, + constants.TOPOLOGY_SINGLE, listeners=[], store=store) self.assertEqual(4, mock_lb_repo_get.call_count) - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows.get_create_load_balancer_flow', - return_value=_flow_mock) def test_create_load_balancer_active_standby( self, - mock_get_create_load_balancer_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -560,9 +505,6 @@ class TestControllerWorker(base.TestCase): loadbalancer_topology=constants.TOPOLOGY_ACTIVE_STANDBY) _flow_mock.reset_mock() - mock_taskflow_load.reset_mock() - mock_eng = mock.Mock() - mock_taskflow_load.return_value = mock_eng store = { constants.LOADBALANCER_ID: LB_ID, 'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}, @@ -577,17 +519,12 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.create_load_balancer(_load_balancer_mock) - mock_get_create_load_balancer_flow.assert_called_with( - topology=constants.TOPOLOGY_ACTIVE_STANDBY, listeners=[]) - mock_taskflow_load.assert_called_with( - mock_get_create_load_balancer_flow.return_value, store=store) - mock_eng.run.assert_any_call() + cw.services_controller.run_poster.assert_called_with( + flow_utils.get_create_load_balancer_flow, + constants.TOPOLOGY_ACTIVE_STANDBY, listeners=[], store=store) - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows.get_create_load_balancer_flow') def test_create_load_balancer_full_graph_single( self, - mock_get_create_load_balancer_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -612,8 +549,6 @@ class TestControllerWorker(base.TestCase): lb = data_models.LoadBalancer(id=LB_ID, listeners=listeners, topology=constants.TOPOLOGY_SINGLE) mock_lb_repo_get.return_value = lb - mock_eng = mock.Mock() - mock_taskflow_load.return_value = mock_eng store = { constants.LOADBALANCER_ID: LB_ID, 'update_dict': {'topology': constants.TOPOLOGY_SINGLE}, @@ -625,25 +560,12 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.create_load_balancer(_load_balancer_mock) - # mock_create_single_topology.assert_called_once() - # mock_create_active_standby_topology.assert_not_called() - mock_get_create_load_balancer_flow.assert_called_with( - topology=constants.TOPOLOGY_SINGLE, listeners=dict_listeners) - mock_taskflow_load.assert_called_with( - mock_get_create_load_balancer_flow.return_value, store=store) - mock_eng.run.assert_any_call() + cw.services_controller.run_poster.assert_called_with( + flow_utils.get_create_load_balancer_flow, + constants.TOPOLOGY_SINGLE, listeners=dict_listeners, store=store) - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows.get_create_load_balancer_flow') - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows._create_single_topology') - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows._create_active_standby_topology') def test_create_load_balancer_full_graph_active_standby( self, - mock_create_active_standby_topology, - mock_create_single_topology, - mock_get_create_load_balancer_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -668,9 +590,10 @@ class TestControllerWorker(base.TestCase): lb = data_models.LoadBalancer( id=LB_ID, listeners=listeners, topology=constants.TOPOLOGY_ACTIVE_STANDBY) + dict_listeners = [listener.to_dict() for listener in + provider_utils.db_listeners_to_provider_listeners( + listeners)] mock_lb_repo_get.return_value = lb - mock_eng = mock.Mock() - mock_taskflow_load.return_value = mock_eng store = { constants.LOADBALANCER_ID: LB_ID, 'update_dict': {'topology': constants.TOPOLOGY_ACTIVE_STANDBY}, @@ -682,18 +605,12 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.create_load_balancer(_load_balancer_mock) - mock_get_create_load_balancer_flow.assert_called_with( - topology=constants.TOPOLOGY_ACTIVE_STANDBY, - listeners=dict_listeners) - mock_taskflow_load.assert_called_with( - mock_get_create_load_balancer_flow.return_value, store=store) - mock_eng.run.assert_any_call() + cw.services_controller.run_poster.assert_called_with( + flow_utils.get_create_load_balancer_flow, + constants.TOPOLOGY_ACTIVE_STANDBY, listeners=dict_listeners, + store=store) - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows.get_delete_load_balancer_flow', - return_value=_flow_mock) def test_delete_load_balancer_without_cascade(self, - mock_get_delete_lb_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -715,22 +632,18 @@ class TestControllerWorker(base.TestCase): _db_session, id=LB_ID) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, + flow_utils.get_delete_load_balancer_flow, + _load_balancer_mock, store={constants.LOADBALANCER: _load_balancer_mock, constants.SERVER_GROUP_ID: _db_load_balancer_mock.server_group_id, constants.PROJECT_ID: _db_load_balancer_mock.project_id, })) - _flow_mock.run.assert_called_once_with() - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows.get_cascade_delete_load_balancer_flow', - return_value=_flow_mock) def test_delete_load_balancer_with_cascade(self, - mock_get_delete_lb_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -753,9 +666,10 @@ class TestControllerWorker(base.TestCase): id=LB_ID) list_name = 'listener_%s' % _listener_mock.id - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, + flow_utils.get_cascade_delete_load_balancer_flow, + _load_balancer_mock, store={constants.LOADBALANCER: _load_balancer_mock, list_name: self.ref_listener_dict, constants.LOADBALANCER_ID: LB_ID, @@ -764,16 +678,11 @@ class TestControllerWorker(base.TestCase): constants.PROJECT_ID: _db_load_balancer_mock.project_id, }) ) - _flow_mock.run.assert_called_once_with() - @mock.patch('octavia.controller.worker.v2.flows.load_balancer_flows.' - 'LoadBalancerFlows.get_update_load_balancer_flow', - return_value=_flow_mock) @mock.patch('octavia.db.repositories.ListenerRepository.get_all', return_value=([_listener_mock], None)) def test_update_load_balancer(self, mock_listener_repo_get_all, - mock_get_update_lb_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -793,8 +702,8 @@ class TestControllerWorker(base.TestCase): change = 'TEST2' cw.update_load_balancer(_load_balancer_mock, change) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_update_load_balancer_flow, store={constants.UPDATE_DICT: change, constants.LOADBALANCER: _load_balancer_mock, @@ -802,8 +711,6 @@ class TestControllerWorker(base.TestCase): _db_load_balancer_mock.id, })) - _flow_mock.run.assert_called_once_with() - @mock.patch('octavia.controller.worker.v2.flows.' 'member_flows.MemberFlows.get_create_member_flow', return_value=_flow_mock) @@ -833,17 +740,18 @@ class TestControllerWorker(base.TestCase): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with( - _flow_mock, - store={constants.MEMBER: _member, - constants.LISTENERS: [self.ref_listener_dict], - constants.LOADBALANCER_ID: LB_ID, - constants.LOADBALANCER: provider_lb, - constants.POOL_ID: POOL_ID, - constants.AVAILABILITY_ZONE: {}})) - - _flow_mock.run.assert_called_once_with() + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_create_member_flow, + store={constants.MEMBER: _member, + constants.LISTENERS: + [self.ref_listener_dict], + constants.LOADBALANCER_ID: + LB_ID, + constants.LOADBALANCER: + provider_lb, + constants.POOL_ID: + POOL_ID, + constants.AVAILABILITY_ZONE: {}})) @mock.patch('octavia.controller.worker.v2.flows.' 'member_flows.MemberFlows.get_delete_member_flow', @@ -873,21 +781,16 @@ class TestControllerWorker(base.TestCase): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, store={constants.MEMBER: _member, - constants.LISTENERS: - [self.ref_listener_dict], - constants.LOADBALANCER_ID: - LB_ID, - constants.LOADBALANCER: - provider_lb, - constants.POOL_ID: - POOL_ID, - constants.PROJECT_ID: PROJECT_ID, - constants.AVAILABILITY_ZONE: {}})) - - _flow_mock.run.assert_called_once_with() + flow_utils.get_delete_member_flow, + store={constants.MEMBER: _member, + constants.LISTENERS: [self.ref_listener_dict], + constants.LOADBALANCER_ID: LB_ID, + constants.LOADBALANCER: provider_lb, + constants.POOL_ID: POOL_ID, + constants.PROJECT_ID: PROJECT_ID, + constants.AVAILABILITY_ZONE: {}})) @mock.patch('octavia.controller.worker.v2.flows.' 'member_flows.MemberFlows.get_update_member_flow', @@ -917,9 +820,8 @@ class TestControllerWorker(base.TestCase): cw.update_member(_member, MEMBER_UPDATE_DICT) provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_update_member_flow, store={constants.MEMBER: _member, constants.LISTENERS: [self.ref_listener_dict], @@ -933,8 +835,6 @@ class TestControllerWorker(base.TestCase): MEMBER_UPDATE_DICT, constants.AVAILABILITY_ZONE: {}})) - _flow_mock.run.assert_called_once_with() - @mock.patch('octavia.controller.worker.v2.flows.' 'member_flows.MemberFlows.get_batch_update_members_flow', return_value=_flow_mock) @@ -958,15 +858,25 @@ class TestControllerWorker(base.TestCase): _flow_mock.reset_mock() mock_get_az_metadata_dict.return_value = {} cw = controller_worker.ControllerWorker() + old_member = mock.MagicMock() + old_member.to_dict.return_value = {'id': 9, + constants.POOL_ID: 'testtest'} + mock_member_repo_get.side_effect = [_member_mock, old_member] cw.batch_update_members([{constants.MEMBER_ID: 9, constants.POOL_ID: 'testtest'}], [{constants.MEMBER_ID: 11}], [MEMBER_UPDATE_DICT]) + provider_m = provider_utils.db_member_to_provider_member(_member_mock) + old_provider_m = provider_utils.db_member_to_provider_member( + old_member).to_dict() provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, + flow_utils.get_batch_update_members_flow, + [old_provider_m], + [{'member_id': 11}], + [(provider_m.to_dict(), MEMBER_UPDATE_DICT)], store={constants.LISTENERS: [self.ref_listener_dict], constants.LOADBALANCER_ID: LB_ID, constants.LOADBALANCER: provider_lb, @@ -974,13 +884,7 @@ class TestControllerWorker(base.TestCase): constants.PROJECT_ID: PROJECT_ID, constants.AVAILABILITY_ZONE: {}})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'pool_flows.PoolFlows.get_create_pool_flow', - return_value=_flow_mock) def test_create_pool(self, - mock_get_create_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1001,8 +905,8 @@ class TestControllerWorker(base.TestCase): provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_create_pool_flow, store={constants.POOL_ID: POOL_ID, constants.LOADBALANCER_ID: LB_ID, @@ -1011,14 +915,9 @@ class TestControllerWorker(base.TestCase): constants.LOADBALANCER: provider_lb})) - _flow_mock.run.assert_called_once_with() self.assertEqual(1, mock_pool_repo_get.call_count) - @mock.patch('octavia.controller.worker.v2.flows.' - 'pool_flows.PoolFlows.get_delete_pool_flow', - return_value=_flow_mock) def test_delete_pool(self, - mock_get_delete_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1038,8 +937,8 @@ class TestControllerWorker(base.TestCase): cw.delete_pool(_pool_mock) provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_delete_pool_flow, store={constants.POOL_ID: POOL_ID, constants.LOADBALANCER_ID: LB_ID, @@ -1049,13 +948,7 @@ class TestControllerWorker(base.TestCase): provider_lb, constants.PROJECT_ID: PROJECT_ID})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'pool_flows.PoolFlows.get_update_pool_flow', - return_value=_flow_mock) def test_update_pool(self, - mock_get_update_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1076,8 +969,8 @@ class TestControllerWorker(base.TestCase): cw.update_pool(_pool_mock, POOL_UPDATE_DICT) provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_update_pool_flow, store={constants.POOL_ID: POOL_ID, constants.LISTENERS: [self.ref_listener_dict], @@ -1088,13 +981,7 @@ class TestControllerWorker(base.TestCase): constants.UPDATE_DICT: POOL_UPDATE_DICT})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'l7policy_flows.L7PolicyFlows.get_create_l7policy_flow', - return_value=_flow_mock) def test_create_l7policy(self, - mock_get_create_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1116,20 +1003,14 @@ class TestControllerWorker(base.TestCase): } cw.create_l7policy(l7policy_mock) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_create_l7policy_flow, store={constants.L7POLICY: l7policy_mock, constants.LISTENERS: [self.ref_listener_dict], constants.LOADBALANCER_ID: LB_ID})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'l7policy_flows.L7PolicyFlows.get_delete_l7policy_flow', - return_value=_flow_mock) def test_delete_l7policy(self, - mock_get_delete_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1151,21 +1032,15 @@ class TestControllerWorker(base.TestCase): } cw.delete_l7policy(l7policy_mock) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_delete_l7policy_flow, store={constants.L7POLICY: l7policy_mock, constants.LISTENERS: [self.ref_listener_dict], constants.LOADBALANCER_ID: LB_ID})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'l7policy_flows.L7PolicyFlows.get_update_l7policy_flow', - return_value=_flow_mock) def test_update_l7policy(self, - mock_get_update_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1190,8 +1065,8 @@ class TestControllerWorker(base.TestCase): cw.update_l7policy(l7policy_mock, L7POLICY_UPDATE_DICT) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_update_l7policy_flow, store={constants.L7POLICY: l7policy_mock, constants.LISTENERS: [self.ref_listener_dict], @@ -1200,13 +1075,7 @@ class TestControllerWorker(base.TestCase): constants.UPDATE_DICT: L7POLICY_UPDATE_DICT})) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'l7rule_flows.L7RuleFlows.get_create_l7rule_flow', - return_value=_flow_mock) def test_create_l7rule(self, - mock_get_create_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1228,8 +1097,9 @@ class TestControllerWorker(base.TestCase): l7_policy = provider_utils.db_l7policy_to_provider_l7policy( _l7policy_mock) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_create_l7rule_flow, store={constants.L7RULE: _l7rule_mock.to_dict(), constants.L7POLICY: @@ -1240,13 +1110,7 @@ class TestControllerWorker(base.TestCase): [self.ref_listener_dict] })) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'l7rule_flows.L7RuleFlows.get_delete_l7rule_flow', - return_value=_flow_mock) def test_delete_l7rule(self, - mock_get_delete_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1266,8 +1130,8 @@ class TestControllerWorker(base.TestCase): l7_policy = provider_utils.db_l7policy_to_provider_l7policy( _l7policy_mock) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_delete_l7rule_flow, store={ constants.L7RULE: _l7rule_mock.to_dict(), @@ -1279,13 +1143,7 @@ class TestControllerWorker(base.TestCase): constants.LOADBALANCER_ID: LB_ID, })) - _flow_mock.run.assert_called_once_with() - - @mock.patch('octavia.controller.worker.v2.flows.' - 'l7rule_flows.L7RuleFlows.get_update_l7rule_flow', - return_value=_flow_mock) def test_update_l7rule(self, - mock_get_update_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1306,8 +1164,8 @@ class TestControllerWorker(base.TestCase): l7_policy = provider_utils.db_l7policy_to_provider_l7policy( _l7policy_mock) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.get_update_l7rule_flow, store={ constants.L7RULE: _l7rule_mock.to_dict(), @@ -1320,19 +1178,13 @@ class TestControllerWorker(base.TestCase): constants.UPDATE_DICT: L7RULE_UPDATE_DICT})) - _flow_mock.run.assert_called_once_with() - @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict', return_value={}) @mock.patch('octavia.db.repositories.FlavorRepository.' 'get_flavor_metadata_dict', return_value={}) - @mock.patch('octavia.controller.worker.v2.flows.' - 'amphora_flows.AmphoraFlows.get_failover_flow', - return_value=_flow_mock) @mock.patch('octavia.db.repositories.LoadBalancerRepository.update') def test_failover_amphora(self, mock_update, - mock_get_failover_flow, mock_get_flavor_meta, mock_get_az_meta, mock_api_get_session, @@ -1348,13 +1200,17 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() - + _db_amphora_mock.reset_mock() + mock_amp_repo_get.return_value = _db_amphora_mock cw = controller_worker.ControllerWorker() cw.failover_amphora(AMP_ID) - mock_amp_repo_get.return_value = _db_amphora_mock - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + mock_lb_repo_get.return_value = _db_load_balancer_mock + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, + flow_utils.get_failover_flow, + role=_db_amphora_mock.role, + load_balancer={}, + wait=True, store={constants.FAILED_AMPHORA: _db_amphora_mock.to_dict(), constants.LOADBALANCER_ID: _db_amphora_mock.load_balancer_id, @@ -1364,7 +1220,6 @@ class TestControllerWorker(base.TestCase): constants.AVAILABILITY_ZONE: {} })) - _flow_mock.run.assert_called_once_with() mock_update.assert_called_with(_db_session, LB_ID, provisioning_status=constants.ACTIVE) @@ -1447,11 +1302,7 @@ class TestControllerWorker(base.TestCase): @mock.patch( 'octavia.db.repositories.AmphoraRepository.get_lb_for_amphora', return_value=None) - @mock.patch('octavia.controller.worker.v2.flows.' - 'amphora_flows.AmphoraFlows.get_failover_flow', - return_value=_flow_mock) def test_failover_spare_amphora(self, - mock_get_failover_flow, mock_get_lb_for_amphora, mock_api_get_session, mock_dyn_log_listener, @@ -1472,14 +1323,17 @@ class TestControllerWorker(base.TestCase): mock_amphora.id = AMP_ID mock_amphora.status = constants.AMPHORA_READY mock_amphora.load_balancer_id = None + mock_amphora.role = constants.ROLE_STANDALONE cw = controller_worker.ControllerWorker() cw._perform_amphora_failover(mock_amphora, constants.LB_CREATE_FAILOVER_PRIORITY) - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, + flow_utils.get_failover_flow, + role=constants.ROLE_STANDALONE, load_balancer=None, + wait=True, store={constants.FAILED_AMPHORA: mock_amphora.to_dict(), constants.LOADBALANCER_ID: None, constants.BUILD_TYPE_PRIORITY: @@ -1488,8 +1342,6 @@ class TestControllerWorker(base.TestCase): constants.AVAILABILITY_ZONE: {} })) - _flow_mock.run.assert_called_once_with() - @mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete') def test_failover_deleted_amphora(self, mock_delete, @@ -1567,9 +1419,6 @@ class TestControllerWorker(base.TestCase): 'get_availability_zone_metadata_dict', return_value={}) @mock.patch('octavia.db.repositories.FlavorRepository.' 'get_flavor_metadata_dict', return_value={}) - @mock.patch('octavia.controller.worker.v2.flows.' - 'amphora_flows.AmphoraFlows.get_failover_flow', - return_value=_flow_mock) @mock.patch( 'octavia.db.repositories.AmphoraRepository.get_lb_for_amphora', return_value=_db_load_balancer_mock) @@ -1577,7 +1426,6 @@ class TestControllerWorker(base.TestCase): def test_failover_amphora_anti_affinity(self, mock_update, mock_get_lb_for_amphora, - mock_get_update_listener_flow, mock_get_flavor_meta, mock_get_az_meta, mock_api_get_session, @@ -1598,10 +1446,15 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.failover_amphora(AMP_ID) + provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( + _db_load_balancer_mock).to_dict() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. + (cw.services_controller.run_poster. assert_called_once_with( - _flow_mock, + flow_utils.get_failover_flow, + role=_db_amphora_mock.role, + load_balancer=provider_lb, + wait=True, store={constants.FAILED_AMPHORA: _db_amphora_mock.to_dict(), constants.LOADBALANCER_ID: _db_amphora_mock.load_balancer_id, @@ -1612,15 +1465,10 @@ class TestControllerWorker(base.TestCase): constants.AVAILABILITY_ZONE: {} })) - _flow_mock.run.assert_called_once_with() mock_update.assert_called_with(_db_session, LB_ID, provisioning_status=constants.ACTIVE) - @mock.patch('octavia.controller.worker.v2.flows.' - 'amphora_flows.AmphoraFlows.cert_rotate_amphora_flow', - return_value=_flow_mock) def test_amphora_cert_rotation(self, - mock_get_update_listener_flow, mock_api_get_session, mock_dyn_log_listener, mock_taskflow_load, @@ -1636,22 +1484,17 @@ class TestControllerWorker(base.TestCase): cw = controller_worker.ControllerWorker() cw.amphora_cert_rotation(AMP_ID) mock_amp_repo_get.return_value = _db_amphora_mock - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.cert_rotate_amphora_flow, store={constants.AMPHORA: _db_amphora_mock.to_dict(), constants.AMPHORA_ID: _amphora_mock[constants.ID]})) - _flow_mock.run.assert_called_once_with() @mock.patch('octavia.db.repositories.FlavorRepository.' 'get_flavor_metadata_dict') @mock.patch('octavia.db.repositories.AmphoraRepository.get_lb_for_amphora') - @mock.patch('octavia.controller.worker.v2.flows.' - 'amphora_flows.AmphoraFlows.update_amphora_config_flow', - return_value=_flow_mock) def test_update_amphora_agent_config(self, - mock_update_flow, mock_get_lb_for_amp, mock_flavor_meta, mock_api_get_session, @@ -1676,27 +1519,24 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get.assert_called_once_with(_db_session, id=AMP_ID) mock_get_lb_for_amp.assert_called_once_with(_db_session, AMP_ID) mock_flavor_meta.assert_called_once_with(_db_session, 'vanilla') - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.update_amphora_config_flow, store={constants.AMPHORA: _db_amphora_mock.to_dict(), constants.FLAVOR: {'test': 'dict'}})) - _flow_mock.run.assert_called_once_with() # Test with no flavor _flow_mock.reset_mock() mock_amp_repo_get.reset_mock() mock_get_lb_for_amp.reset_mock() mock_flavor_meta.reset_mock() - base_taskflow.BaseTaskFlowEngine._taskflow_load.reset_mock() mock_lb.flavor_id = None cw.update_amphora_agent_config(AMP_ID) mock_amp_repo_get.assert_called_once_with(_db_session, id=AMP_ID) mock_get_lb_for_amp.assert_called_once_with(_db_session, AMP_ID) mock_flavor_meta.assert_not_called() - (base_taskflow.BaseTaskFlowEngine._taskflow_load. - assert_called_once_with(_flow_mock, + (cw.services_controller.run_poster. + assert_called_once_with(flow_utils.update_amphora_config_flow, store={constants.AMPHORA: _db_amphora_mock.to_dict(), constants.FLAVOR: {}})) - _flow_mock.run.assert_called_once_with() diff --git a/releasenotes/notes/add-jobboard-based-controller-599279c7cc172e955.yaml b/releasenotes/notes/add-jobboard-based-controller-599279c7cc172e955.yaml new file mode 100644 index 0000000000..3c586c96eb --- /dev/null +++ b/releasenotes/notes/add-jobboard-based-controller-599279c7cc172e955.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Operators can now use the amphorav2 provider which uses jobboard-based + controller. A jobboard controller solves the issue with resources stuck in + PENDING_* states by writing info about task states in persistent backend + and monitoring job claims via jobboard. diff --git a/requirements.txt b/requirements.txt index a182cb7589..050e01d5df 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ cotyledon>=1.3.0 # Apache-2.0 pecan>=1.3.2 # BSD pbr!=2.1.0,>=2.0.0 # Apache-2.0 SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT +SQLAlchemy-Utils>=0.30.11 Babel!=2.4.0,>=2.3.4 # BSD futurist>=1.2.0 # Apache-2.0 requests>=2.14.2 # Apache-2.0 @@ -37,7 +38,7 @@ python-cinderclient>=3.3.0 # Apache-2.0 pyOpenSSL>=17.1.0 # Apache-2.0 WSME>=0.8.0 # MIT Jinja2>=2.10 # BSD License (3 clause) -taskflow>=2.16.0 # Apache-2.0 +taskflow>=4.1.0 # Apache-2.0 diskimage-builder>=2.24.0 # Apache-2.0 castellan>=0.16.0 # Apache-2.0 tenacity>=5.0.4 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 3d667e11c0..41b81c9869 100644 --- a/setup.cfg +++ b/setup.cfg @@ -94,6 +94,9 @@ octavia.barbican_auth = barbican_acl_auth = octavia.certificates.common.auth.barbican_acl:BarbicanACLAuth octavia.plugins = hot_plug_plugin = octavia.controller.worker.v1.controller_worker:ControllerWorker +octavia.worker.jobboard_driver = + redis_taskflow_driver = octavia.controller.worker.v2.taskflow_jobboard_driver:RedisTaskFlowDriver + zookeeper_taskflow_driver = octavia.controller.worker.v2.taskflow_jobboard_driver:ZookeeperTaskFlowDriver oslo.config.opts = octavia = octavia.opts:list_opts oslo.policy.policies = diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml index 5453ef039a..9216afba7c 100644 --- a/zuul.d/jobs.yaml +++ b/zuul.d/jobs.yaml @@ -69,12 +69,16 @@ name: octavia-v2-dsvm-scenario-amphora-v2 parent: octavia-v2-dsvm-scenario vars: + devstack_localrc: + OCTAVIA_ENABLE_AMPHORAV2_PROVIDER: True devstack_local_conf: post-config: $OCTAVIA_CONF: api_settings: default_provider_driver: amphorav2 enabled_provider_drivers: amphorav2:The v2 amphora driver. + task_flow: + jobboard_expiration_time: 100 test-config: "$TEMPEST_CONFIG": load_balancer: