Browse Source

Merge "Jobboard based controller"

tags/6.0.0.0rc1
Zuul 3 months ago
committed by Gerrit Code Review
parent
commit
73fbc05386
32 changed files with 1281 additions and 602 deletions
  1. +47
    -1
      devstack/plugin.sh
  2. +2
    -0
      devstack/settings
  3. +31
    -0
      doc/source/admin/providers/amphorav2.rst
  4. +2
    -0
      doc/source/admin/providers/index.rst
  5. +84
    -0
      doc/source/install/install-amphorav2.rst
  6. +1
    -0
      doc/source/install/install.rst
  7. +43
    -0
      etc/octavia.conf
  8. +2
    -1
      lower-constraints.txt
  9. +10
    -2
      octavia/api/common/types.py
  10. +15
    -4
      octavia/api/v2/controllers/amphora.py
  11. +131
    -2
      octavia/common/base_taskflow.py
  12. +42
    -2
      octavia/common/config.py
  13. +1
    -0
      octavia/common/constants.py
  14. +6
    -1
      octavia/common/data_models.py
  15. +6
    -2
      octavia/controller/healthmanager/health_manager.py
  16. +14
    -4
      octavia/controller/housekeeping/house_keeping.py
  17. +4
    -0
      octavia/controller/queue/v2/consumer.py
  18. +204
    -272
      octavia/controller/worker/v2/controller_worker.py
  19. +158
    -0
      octavia/controller/worker/v2/flows/flow_utils.py
  20. +102
    -0
      octavia/controller/worker/v2/taskflow_jobboard_driver.py
  21. +18
    -0
      octavia/db/migration/cli.py
  22. +13
    -3
      octavia/db/repositories.py
  23. +15
    -0
      octavia/tests/functional/db/test_repositories.py
  24. +83
    -0
      octavia/tests/unit/common/test_base_taskflow.py
  25. +22
    -6
      octavia/tests/unit/controller/healthmanager/test_health_manager.py
  26. +70
    -3
      octavia/tests/unit/controller/housekeeping/test_house_keeping.py
  27. +2
    -1
      octavia/tests/unit/controller/queue/v2/test_consumer.py
  28. +137
    -297
      octavia/tests/unit/controller/worker/v2/test_controller_worker.py
  29. +7
    -0
      releasenotes/notes/add-jobboard-based-controller-599279c7cc172e955.yaml
  30. +2
    -1
      requirements.txt
  31. +3
    -0
      setup.cfg
  32. +4
    -0
      zuul.d/jobs.yaml

+ 47
- 1
devstack/plugin.sh View File

@@ -225,6 +225,36 @@ function create_octavia_accounts {
fi
}

function install_redis {
if is_fedora; then
install_package redis
elif is_ubuntu; then
install_package redis-server
elif is_suse; then
install_package redis
else
exit_distro_not_supported "redis installation"
fi

start_service redis

pip_install_gr redis
}

function uninstall_redis {
if is_fedora; then
uninstall_package redis
elif is_ubuntu; then
uninstall_package redis-server
elif is_suse; then
uninstall_package redis
fi

stop_service redis

pip_unistall redis
}

function octavia_configure {

sudo mkdir -m 755 -p $OCTAVIA_CONF_DIR
@@ -250,7 +280,9 @@ function octavia_configure {
iniset $OCTAVIA_CONF api_settings api_handler queue_producer

iniset $OCTAVIA_CONF database connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia"

if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then
iniset $OCTAVIA_CONF task_flow persistence_connection "mysql+pymysql://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:3306/octavia_persistence"
fi
# Configure keystone auth_token for all users
configure_keystone_authtoken_middleware $OCTAVIA_CONF octavia

@@ -324,12 +356,22 @@ function octavia_configure {
if [ $OCTAVIA_NODE == 'main' ] || [ $OCTAVIA_NODE == 'standalone' ] || [ $OCTAVIA_NODE == 'api' ]; then
recreate_database_mysql octavia
octavia-db-manage upgrade head

if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then
recreate_database_mysql octavia_persistence
octavia-db-manage upgrade_persistence
fi
fi

if [[ -a $OCTAVIA_CERTS_DIR ]] ; then
rm -rf $OCTAVIA_CERTS_DIR
fi

# amphorav2 required redis installation
if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then
install_redis
fi

if [[ "$(trueorfalse False OCTAVIA_USE_PREGENERATED_CERTS)" == "True" ]]; then
cp -rfp ${OCTAVIA_PREGENERATED_CERTS_DIR} ${OCTAVIA_CERTS_DIR}
else
@@ -615,6 +657,10 @@ function octavia_cleanup {

sudo rm -rf $NOVA_STATE_PATH $NOVA_AUTH_CACHE_DIR

if [[ ${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER} = True ]]; then
uninstall_redis
fi

sudo rm -f /etc/rsyslog.d/10-octavia-log-offloading.conf
restart_service rsyslog



+ 2
- 0
devstack/settings View File

@@ -33,6 +33,8 @@ OCTAVIA_PORT=${OCTAVIA_PORT:-"9876"}
OCTAVIA_HA_PORT=${OCTAVIA_HA_PORT:-"9875"}
OCTAVIA_HM_LISTEN_PORT=${OCTAVIA_HM_LISTEN_PORT:-"5555"}

OCTAVIA_ENABLE_AMPHORAV2_PROVIDER=${OCTAVIA_ENABLE_AMPHORAV2_PROVIDER:-False}

OCTAVIA_MGMT_SUBNET=${OCTAVIA_MGMT_SUBNET:-"192.168.0.0/24"}
OCTAVIA_MGMT_SUBNET_START=${OCTAVIA_MGMT_SUBNET_START:-"192.168.0.2"}
OCTAVIA_MGMT_SUBNET_END=${OCTAVIA_MGMT_SUBNET_END:-"192.168.0.200"}


+ 31
- 0
doc/source/admin/providers/amphorav2.rst View File

@@ -0,0 +1,31 @@
..
Copyright 2020 Mirantis Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.

Amphorav2
=========

This is extension of the reference driver for Octavia. It adopts taskflow
jobboard feature and saves task states into the persistence backend, this
allows to continue task execution if controller work was interrupted.

Default provider name: **amphorav2**

The driver package: https://pypi.org/project/octavia/

The driver source: https://opendev.org/openstack/octavia/

The documentation: https://docs.openstack.org/octavia/latest/

Where to report issues with the driver: https://storyboard.openstack.org/#!/project/openstack/octavia

+ 2
- 0
doc/source/admin/providers/index.rst View File

@@ -45,6 +45,8 @@ your Octavia API instances.

.. include:: amphora.rst

.. include:: amphorav2.rst

.. include:: f5.rst

.. include:: ovn.rst


+ 84
- 0
doc/source/install/install-amphorav2.rst View File

@@ -0,0 +1,84 @@
.. _install-amphorav2:

Additional configuration steps to configure amphorav2 provider
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you would like to use amphorav2 provider for load-balancer service the
following additional steps are required.


Prerequisites
-------------

Amphorav2 provider requires creation of additional database
``octavia_persistence`` to store info about state of tasks and progress of its
execution.
Also to monitor progress on taskflow jobs amphorav2 provider uses
jobboard. As jobboard backend could be used Redis or Zookeeper key-value
storages. Operator should chose the one that is more preferable for specific
cloud. The default is Redis.

1. Create the database, complete these steps:

* Use the database access client to connect to the database
server as the ``root`` user:

.. code-block:: console

# mysql

* Create the ``octavia_persistence`` database:

.. code-block:: console

CREATE DATABASE octavia_persistence;

* Grant proper access to the ``octavia_persistence`` database:

.. code-block:: console

GRANT ALL PRIVILEGES ON octavia.* TO 'octavia_persistence'@'localhost' \
IDENTIFIED BY 'OCTAVIA_DBPASS';
GRANT ALL PRIVILEGES ON octavia.* TO 'octavia_persistence'@'%' \
IDENTIFIED BY 'OCTAVIA_DBPASS';

Replace OCTAVIA_DBPASS with a suitable password.


2. Install desired key-value backend (Redis or Zookeper).

Additional configuration to octavia components
----------------------------------------------

1. Edit the ``/etc/octavia/octavia.conf`` file ``[task_flow]`` section

* Configure database access for persistence backend:

.. code-block:: ini

[task_flow]
persistence_connection = mysql+pymysql://octavia:OCTAVIA_DBPASS@controller/octavia_persistence

Replace OCTAVIA_DBPASS with the password you chose for the Octavia databases.

* Set desired jobboard backend and its configuration:

.. code-block:: ini

[task_flow]
jobboard_backend_driver = 'redis_taskflow_driver'
jobboard_backend_hosts = KEYVALUE_HOST_IPS
jobboard_backend_port = KEYVALUE_PORT
jobboard_backend_password = OCTAVIA_JOBBOARDPASS
jobboard_backend_namespace = 'octavia_jobboard'

Replace OCTAVIA_JOBBOARDPASS with the password you chose for the Octavia
key-value storage.
Replace KEYVALUE_HOST_IPS and KEYVALUE_PORT with ip and port which
chosen key-value storage is using.

2. Populate the octavia database:

.. code-block:: console

# octavia-db-manage --config-file /etc/octavia/octavia.conf upgrade_persistence

+ 1
- 0
doc/source/install/install.rst View File

@@ -17,3 +17,4 @@ Note that installation and configuration vary by distribution.
:maxdepth: 2

install-ubuntu.rst
install-amphorav2.rst

+ 43
- 0
etc/octavia.conf View File

@@ -289,6 +289,49 @@
# for debugging purposes.
# disable_revert = False

# Persistence database, which will be used to store tasks states.
# Database connection url with db name (string value)
#persistence_connection = sqlite://

# Jobboard backend driver that will monitor job state. (string value)
# Possible values:
# - redis_taskflow_driver: Driver that will use Redis to store job states.
# - zookeeper_taskflow_driver: Driver that will use Zookeeper to store job
# states.
#jobboard_backend_driver = redis_taskflow_driver

# Jobboard backend server host(s). (list value)
#jobboard_backend_hosts = 127.0.0.1

# Jobboard backend server port (port value)
# Minimum value: 0
# Maximum value: 65535
#jobboard_backend_port = 6379

# Jobboard backend server password (string value)
#jobboard_backend_password =

# Jobboard name that should be used to store taskflow job id and
# claims for it. (string value)
#jobboard_backend_namespace = octavia_jobboard

# Redis jobboard backend ssl configuration options. (dict value)
# SSL is disabled by default
#jobboard_redis_backend_ssl_options = ssl:False,ssl_ca_certs:None,ssl_cert_reqs:required,ssl_certfile:None,ssl_keyfile:None

# Zookeeper jobboard backend ssl configuration options. (dict value)
# SSL is disabled by default
#jobboard_zookeeper_ssl_options = use_ssl:False,certfile:None,keyfile:None,keyfile_password:None,verify_certs:True

# For backends like redis claiming jobs requiring setting the expiry -
# how many seconds the claim should be retained for. (integer value)
#jobboard_expiration_time = 30

# If for analysis required saving logbooks info, set this parameter to
# True. By default remove logbook from persistence backend when job
# completed. (boolean value)
#jobboard_save_logbook = false

[oslo_messaging]
# Queue Consumer Thread Pool Size
# rpc_thread_pool_size = 2


+ 2
- 1
lower-constraints.txt View File

@@ -149,12 +149,13 @@ Sphinx==1.6.2
sphinxcontrib-svg2pdfconverter==0.1.0
sphinxcontrib-websupport==1.0.1
SQLAlchemy==1.0.10
SQLAlchemy-Utils==0.30.11
sqlalchemy-migrate==0.11.0
sqlparse==0.2.4
statsd==3.2.2
stestr==2.0.0
stevedore==1.20.0
taskflow==2.16.0
taskflow==4.1.0
tempest==17.1.0
Tempita==0.5.2
tenacity==5.0.4


+ 10
- 2
octavia/api/common/types.py View File

@@ -14,6 +14,7 @@

import copy

from dateutil import parser
import netaddr
from wsme import types as wtypes

@@ -127,12 +128,19 @@ class BaseType(wtypes.Base, metaclass=BaseMeta):
:param data_model: data model to convert from
:param children: convert child data models
"""
type_dict = data_model.to_dict()
# We need to have json convertible data for storing it in persistence
# jobboard backend.
for k, v in type_dict.items():
if ('_at' in k or 'expiration' in k) and v is not None:
type_dict[k] = parser.parse(v)

if not hasattr(cls, '_type_to_model_map'):
return cls(**data_model.to_dict())
return cls(**type_dict)

dm_to_type_map = {value: key
for key, value in cls._type_to_model_map.items()}
type_dict = data_model.to_dict()
new_dict = copy.deepcopy(type_dict)
for key, value in type_dict.items():
if isinstance(value, dict):


+ 15
- 4
octavia/api/v2/controllers/amphora.py View File

@@ -98,10 +98,15 @@ class FailoverController(base.BaseController):

def __init__(self, amp_id):
super(FailoverController, self).__init__()
topic = cfg.CONF.oslo_messaging.topic
if CONF.api_settings.default_provider_driver == constants.AMPHORAV2:
topic = constants.TOPIC_AMPHORA_V2
version = "2.0"
else:
topic = cfg.CONF.oslo_messaging.topic
version = "1.0"
self.target = messaging.Target(
namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT,
topic=topic, version="1.0", fanout=False)
topic=topic, version=version, fanout=False)
self.client = rpc.get_client(self.target)
self.amp_id = amp_id

@@ -143,11 +148,17 @@ class AmphoraUpdateController(base.BaseController):

def __init__(self, amp_id):
super(AmphoraUpdateController, self).__init__()
topic = cfg.CONF.oslo_messaging.topic

if CONF.api_settings.default_provider_driver == constants.AMPHORAV2:
topic = constants.TOPIC_AMPHORA_V2
version = "2.0"
else:
topic = cfg.CONF.oslo_messaging.topic
version = "1.0"
self.transport = messaging.get_rpc_transport(cfg.CONF)
self.target = messaging.Target(
namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT,
topic=topic, version="1.0", fanout=False)
topic=topic, version=version, fanout=False)
self.client = messaging.RPCClient(self.transport, target=self.target)
self.amp_id = amp_id



+ 131
- 2
octavia/common/base_taskflow.py View File

@@ -15,14 +15,38 @@

import concurrent.futures
import datetime
import functools

from oslo_config import cfg
from taskflow import engines as tf_engines
from oslo_log import log
from oslo_utils import uuidutils
from taskflow.conductors.backends import impl_blocking
from taskflow import engines
from taskflow import exceptions as taskflow_exc
from taskflow.listeners import base
from taskflow.listeners import logging
from taskflow.persistence import models
from taskflow import states

from octavia.amphorae.driver_exceptions import exceptions

LOG = log.getLogger(__name__)

CONF = cfg.CONF


# We do not need to log retry exception information. Warning "Could not connect
# to instance" will be logged as usual.
def retryMaskFilter(record):
if record.exc_info is not None and isinstance(
record.exc_info[1], exceptions.AmpConnectionRetry):
return False
return True


LOG.logger.addFilter(retryMaskFilter)


class BaseTaskFlowEngine(object):
"""This is the task flow engine

@@ -37,7 +61,7 @@ class BaseTaskFlowEngine(object):
max_workers=CONF.task_flow.max_workers)

def _taskflow_load(self, flow, **kwargs):
eng = tf_engines.load(
eng = engines.load(
flow,
engine=CONF.task_flow.engine,
executor=self.executor,
@@ -47,3 +71,108 @@ class BaseTaskFlowEngine(object):
eng.prepare()

return eng


class ExtendExpiryListener(base.Listener):

def __init__(self, engine, job):
super(ExtendExpiryListener, self).__init__(engine)
self.job = job

def _task_receiver(self, state, details):
self.job.extend_expiry(cfg.CONF.task_flow.jobboard_expiration_time)

def _flow_receiver(self, state, details):
self.job.extend_expiry(cfg.CONF.task_flow.jobboard_expiration_time)

def _retry_receiver(self, state, details):
self.job.extend_expiry(cfg.CONF.task_flow.jobboard_expiration_time)


class DynamicLoggingConductor(impl_blocking.BlockingConductor):

def _listeners_from_job(self, job, engine):
listeners = super(DynamicLoggingConductor, self)._listeners_from_job(
job, engine)
listeners.append(logging.DynamicLoggingListener(engine, log=LOG))

return listeners

def _on_job_done(self, job, fut):
super(DynamicLoggingConductor, self)._on_job_done(job, fut)
# Double check that job is complete.
if (not CONF.task_flow.jobboard_save_logbook and
job.state == states.COMPLETE):
LOG.debug("Job %s is complete. Cleaning up job logbook.", job.name)
try:
self._persistence.get_connection().destroy_logbook(
job.book.uuid)
except taskflow_exc.NotFound:
LOG.debug("Logbook for job %s has been already cleaned up",
job.name)


class RedisDynamicLoggingConductor(DynamicLoggingConductor):

def _listeners_from_job(self, job, engine):
listeners = super(RedisDynamicLoggingConductor,
self)._listeners_from_job(job, engine)
listeners.append(ExtendExpiryListener(engine, job))
return listeners


class TaskFlowServiceController(object):

def __init__(self, driver):
self.driver = driver

def run_poster(self, flow_factory, *args, wait=False, **kwargs):
with self.driver.persistence_driver.get_persistence() as persistence:
with self.driver.job_board(persistence) as job_board:
job_id = uuidutils.generate_uuid()
job_name = '-'.join([flow_factory.__name__, job_id])
job_logbook = models.LogBook(job_name)
flow_detail = models.FlowDetail(
job_name, job_id)
job_details = {
'store': kwargs.pop('store')
}
job_logbook.add(flow_detail)
persistence.get_connection().save_logbook(job_logbook)
engines.save_factory_details(flow_detail, flow_factory,
args, kwargs,
backend=persistence)

job_board.post(job_name, book=job_logbook,
details=job_details)
if wait:
self._wait_for_job(job_board)

return job_id

def _wait_for_job(self, job_board):
# Wait for job to its complete state
for job in job_board.iterjobs():
LOG.debug("Waiting for job %s to finish", job.name)
job.wait()

def run_conductor(self, name):
with self.driver.persistence_driver.get_persistence() as persistence:
with self.driver.job_board(persistence) as board:
# Redis do not expire jobs by default, so jobs won't be resumed
# with restart of controller. Add expiry for board and use
# special listener.
if (CONF.task_flow.jobboard_backend_driver ==
'redis_taskflow_driver'):
conductor = RedisDynamicLoggingConductor(
name, board, persistence=persistence,
engine=CONF.task_flow.engine)
board.claim = functools.partial(
board.claim,
expiry=CONF.task_flow.jobboard_expiration_time)
else:
conductor = DynamicLoggingConductor(
name, board, persistence=persistence,
engine=CONF.task_flow.engine)

conductor.run()

+ 42
- 2
octavia/common/config.py View File

@@ -92,7 +92,8 @@ api_opts = [
'Amphora driver.'),
default={'amphora': 'The Octavia Amphora driver.',
'octavia': 'Deprecated alias of the Octavia Amphora '
'driver.'}),
'driver.',
}),
cfg.StrOpt('default_provider_driver', default='amphora',
help=_('Default provider driver.')),
cfg.IntOpt('udp_connect_min_interval_health_monitor',
@@ -461,7 +462,46 @@ task_flow_opts = [
help=_('If True, disables the controller worker taskflow '
'flows from reverting. This will leave resources in '
'an inconsistent state and should only be used for '
'debugging purposes.'))
'debugging purposes.')),
cfg.StrOpt('persistence_connection',
default='sqlite://',
help='Persistence database, which will be used to store tasks '
'states. Database connection url with db name'),
cfg.StrOpt('jobboard_backend_driver',
default='redis_taskflow_driver',
choices=['redis_taskflow_driver', 'zookeeper_taskflow_driver'],
help='Jobboard backend driver that will monitor job state.'),
cfg.ListOpt('jobboard_backend_hosts', default=['127.0.0.1'],
help='Jobboard backend server host(s).'),
cfg.PortOpt('jobboard_backend_port', default=6379,
help='Jobboard backend server port'),
cfg.StrOpt('jobboard_backend_password', default='', secret=True,
help='Jobboard backend server password'),
cfg.StrOpt('jobboard_backend_namespace', default='octavia_jobboard',
help='Jobboard name that should be used to store taskflow '
'job id and claims for it.'),
cfg.DictOpt('jobboard_redis_backend_ssl_options',
help='Redis jobboard backend ssl configuration options.',
default={'ssl': False,
'ssl_keyfile': None,
'ssl_certfile': None,
'ssl_ca_certs': None,
'ssl_cert_reqs': 'required'}),
cfg.DictOpt('jobboard_zookeeper_ssl_options',
help='Zookeeper jobboard backend ssl configuration options.',
default={'use_ssl': False,
'keyfile': None,
'keyfile_password': None,
'certfile': None,
'verify_certs': True}),
cfg.IntOpt('jobboard_expiration_time', default=30,
help='For backends like redis claiming jobs requiring setting '
'the expiry - how many seconds the claim should be '
'retained for.'),
cfg.BoolOpt('jobboard_save_logbook', default=False,
help='If for analysis required saving logbooks info, set this '
'parameter to True. By default remove logbook from '
'persistence backend when job completed.'),
]

core_cli_opts = []


+ 1
- 0
octavia/common/constants.py View File

@@ -718,6 +718,7 @@ RBAC_GET_STATUS = 'get_status'

# PROVIDERS
OCTAVIA = 'octavia'
AMPHORAV2 = 'amphorav2'

# systemctl commands
DISABLE = 'disable'


+ 6
- 1
octavia/common/data_models.py View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import re

from sqlalchemy.orm import collections
@@ -34,7 +35,11 @@ class BaseDataModel(object):
# tags is a list, it doesn't need recurse
ret[attr] = value
continue

# We need to have json convertible data for storing it in
# persistence jobboard backend.
if isinstance(value, datetime.datetime):
ret[attr] = value.isoformat()
continue
if recurse:
if isinstance(getattr(self, attr), list):
ret[attr] = []


+ 6
- 2
octavia/controller/healthmanager/health_manager.py View File

@@ -23,7 +23,8 @@ from oslo_log import log as logging
from oslo_utils import excutils

from octavia.common import constants
from octavia.controller.worker.v1 import controller_worker as cw
from octavia.controller.worker.v1 import controller_worker as cw1
from octavia.controller.worker.v2 import controller_worker as cw2
from octavia.db import api as db_api
from octavia.db import repositories as repo

@@ -57,7 +58,10 @@ def update_stats_on_done(stats, fut):

class HealthManager(object):
def __init__(self, exit_event):
self.cw = cw.ControllerWorker()
if CONF.api_settings.default_provider_driver == constants.AMPHORAV2:
self.cw = cw2.ControllerWorker()
else:
self.cw = cw1.ControllerWorker()
self.threads = CONF.health_manager.failover_threads
self.executor = futures.ThreadPoolExecutor(max_workers=self.threads)
self.amp_repo = repo.AmphoraRepository()


+ 14
- 4
octavia/controller/housekeeping/house_keeping.py View File

@@ -21,7 +21,8 @@ from oslo_utils import timeutils
from sqlalchemy.orm import exc as sqlalchemy_exceptions

from octavia.common import constants
from octavia.controller.worker.v1 import controller_worker as cw
from octavia.controller.worker.v1 import controller_worker as cw1
from octavia.controller.worker.v2 import controller_worker as cw2
from octavia.db import api as db_api
from octavia.db import repositories as repo

@@ -34,7 +35,12 @@ class SpareAmphora(object):
self.amp_repo = repo.AmphoraRepository()
self.spares_repo = repo.SparesPoolRepository()
self.az_repo = repo.AvailabilityZoneRepository()
self.cw = cw.ControllerWorker()
if CONF.api_settings.default_provider_driver == constants.AMPHORAV2:
self.cw = cw2.ControllerWorker()
self.check_booting_amphora = True
else:
self.cw = cw1.ControllerWorker()
self.check_booting_amphora = False

def spare_check(self):
"""Checks the DB for the Spare amphora count.
@@ -74,7 +80,8 @@ class SpareAmphora(object):
# will function more accurately if the operator actually
# configures the AZ setting properly.
curr_spare_cnt = self.amp_repo.get_spare_amphora_count(
session, availability_zone=az_name)
session, availability_zone=az_name,
check_booting_amphora=self.check_booting_amphora)
LOG.debug("Current Spare Amphora count for AZ %s: %d",
az_name, curr_spare_cnt)
diff_count = conf_spare_cnt - curr_spare_cnt
@@ -151,7 +158,10 @@ class DatabaseCleanup(object):
class CertRotation(object):
def __init__(self):
self.threads = CONF.house_keeping.cert_rotate_threads
self.cw = cw.ControllerWorker()
if CONF.api_settings.default_provider_driver == constants.AMPHORAV2:
self.cw = cw2.ControllerWorker()
else:
self.cw = cw1.ControllerWorker()

def rotate(self):
"""Check the amphora db table for expiring auth certs."""


+ 4
- 0
octavia/controller/queue/v2/consumer.py View File

@@ -16,6 +16,7 @@ import cotyledon
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_utils import uuidutils

from octavia.common import constants
from octavia.common import rpc
@@ -46,6 +47,9 @@ class ConsumerService(cotyledon.Service):
access_policy=self.access_policy
)
self.message_listener.start()
for e in self.endpoints:
e.worker.services_controller.run_conductor(
'octavia-task-flow-conductor-%s' % uuidutils.generate_uuid())

def terminate(self):
if self.message_listener:


+ 204
- 272
octavia/controller/worker/v2/controller_worker.py View File

@@ -13,26 +13,18 @@
# under the License.
#


from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from sqlalchemy.orm import exc as db_exceptions
from taskflow.listeners import logging as tf_logging
from stevedore import driver as stevedore_driver
import tenacity

from octavia.amphorae.driver_exceptions import exceptions
from octavia.api.drivers import utils as provider_utils
from octavia.common import base_taskflow
from octavia.common import constants
from octavia.controller.worker.v2.flows import amphora_flows
from octavia.controller.worker.v2.flows import health_monitor_flows
from octavia.controller.worker.v2.flows import l7policy_flows
from octavia.controller.worker.v2.flows import l7rule_flows
from octavia.controller.worker.v2.flows import listener_flows
from octavia.controller.worker.v2.flows import load_balancer_flows
from octavia.controller.worker.v2.flows import member_flows
from octavia.controller.worker.v2.flows import pool_flows
from octavia.controller.worker.v2.flows import flow_utils
from octavia.controller.worker.v2 import taskflow_jobboard_driver as tsk_driver
from octavia.db import api as db_apis
from octavia.db import repositories as repo

@@ -45,35 +37,14 @@ RETRY_BACKOFF = 1
RETRY_MAX = 5


# We do not need to log retry exception information. Warning "Could not connect
# to instance" will be logged as usual.
def retryMaskFilter(record):
if record.exc_info is not None and isinstance(
record.exc_info[1], exceptions.AmpConnectionRetry):
return False
return True


LOG.logger.addFilter(retryMaskFilter)


def _is_provisioning_status_pending_update(lb_obj):
return not lb_obj.provisioning_status == constants.PENDING_UPDATE


class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
class ControllerWorker(object):

def __init__(self):

self._amphora_flows = amphora_flows.AmphoraFlows()
self._health_monitor_flows = health_monitor_flows.HealthMonitorFlows()
self._lb_flows = load_balancer_flows.LoadBalancerFlows()
self._listener_flows = listener_flows.ListenerFlows()
self._member_flows = member_flows.MemberFlows()
self._pool_flows = pool_flows.PoolFlows()
self._l7policy_flows = l7policy_flows.L7PolicyFlows()
self._l7rule_flows = l7rule_flows.L7RuleFlows()

self._amphora_repo = repo.AmphoraRepository()
self._amphora_health_repo = repo.AmphoraHealthRepository()
self._health_mon_repo = repo.HealthMonitorRepository()
@@ -86,7 +57,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
self._flavor_repo = repo.FlavorRepository()
self._az_repo = repo.AvailabilityZoneRepository()

super(ControllerWorker, self).__init__()
persistence = tsk_driver.MysqlPersistenceDriver()

self.jobboard_driver = stevedore_driver.DriverManager(
namespace='octavia.worker.jobboard_driver',
name=CONF.task_flow.jobboard_backend_driver,
invoke_args=(persistence,),
invoke_on_load=True).driver

@tenacity.retry(
retry=(
@@ -99,12 +76,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):

return repo.get(db_apis.get_session(), id=id)

@property
def services_controller(self):
return base_taskflow.TaskFlowServiceController(self.jobboard_driver)

def create_amphora(self, availability_zone=None):
"""Creates an Amphora.

This is used to create spare amphora.

:returns: amphora_id
:returns: uuid
"""
try:
store = {constants.BUILD_TYPE_PRIORITY:
@@ -115,13 +96,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
store[constants.AVAILABILITY_ZONE] = (
self._az_repo.get_availability_zone_metadata_dict(
db_apis.get_session(), availability_zone))
create_amp_tf = self._taskflow_load(
self._amphora_flows.get_create_amphora_flow(),
store=store)
with tf_logging.DynamicLoggingListener(create_amp_tf, log=LOG):
create_amp_tf.run()
job_id = self.services_controller.run_poster(
flow_utils.get_create_amphora_flow,
store=store, wait=True)

return create_amp_tf.storage.fetch('amphora')
return job_id
except Exception as e:
LOG.error('Failed to create an amphora due to: {}'.format(str(e)))

@@ -134,13 +113,16 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
"""
amphora = self._amphora_repo.get(db_apis.get_session(),
id=amphora_id)
delete_amp_tf = self._taskflow_load(
self._amphora_flows.get_delete_amphora_flow(),
store={constants.AMPHORA: amphora.to_dict()})
with tf_logging.DynamicLoggingListener(delete_amp_tf,
log=LOG):
delete_amp_tf.run()
store = {constants.AMPHORA: amphora.to_dict()}
self.services_controller.run_poster(
flow_utils.get_delete_amphora_flow,
store=store)

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_health_monitor(self, health_monitor):
"""Creates a health monitor.

@@ -162,16 +144,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))

create_hm_tf = self._taskflow_load(
self._health_monitor_flows.get_create_health_monitor_flow(),
store={constants.HEALTH_MON: health_monitor,
constants.POOL_ID: pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb})
with tf_logging.DynamicLoggingListener(create_hm_tf,
log=LOG):
create_hm_tf.run()
store = {constants.HEALTH_MON: health_monitor,
constants.POOL_ID: pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb}
self.services_controller.run_poster(
flow_utils.get_create_health_monitor_flow,
store=store)

def delete_health_monitor(self, health_monitor):
"""Deletes a health monitor.
@@ -193,17 +173,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
pool.listeners))

delete_hm_tf = self._taskflow_load(
self._health_monitor_flows.get_delete_health_monitor_flow(),
store={constants.HEALTH_MON: health_monitor,
constants.POOL_ID: pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.PROJECT_ID: load_balancer.project_id})
with tf_logging.DynamicLoggingListener(delete_hm_tf,
log=LOG):
delete_hm_tf.run()
store = {constants.HEALTH_MON: health_monitor,
constants.POOL_ID: pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.PROJECT_ID: load_balancer.project_id}
self.services_controller.run_poster(
flow_utils.get_delete_health_monitor_flow,
store=store)

def update_health_monitor(self, original_health_monitor,
health_monitor_updates):
@@ -236,18 +214,21 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()

update_hm_tf = self._taskflow_load(
self._health_monitor_flows.get_update_health_monitor_flow(),
store={constants.HEALTH_MON: original_health_monitor,
constants.POOL_ID: pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.UPDATE_DICT: health_monitor_updates})
with tf_logging.DynamicLoggingListener(update_hm_tf,
log=LOG):
update_hm_tf.run()
store = {constants.HEALTH_MON: original_health_monitor,
constants.POOL_ID: pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb,
constants.UPDATE_DICT: health_monitor_updates}
self.services_controller.run_poster(
flow_utils.get_update_health_monitor_flow,
store=store)

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_listener(self, listener):
"""Creates a listener.

@@ -272,14 +253,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()

create_listener_tf = self._taskflow_load(
self._listener_flows.get_create_listener_flow(),
store={constants.LISTENERS: dict_listeners,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id})
with tf_logging.DynamicLoggingListener(create_listener_tf,
log=LOG):
create_listener_tf.run()
store = {constants.LISTENERS: dict_listeners,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id}

self.services_controller.run_poster(
flow_utils.get_create_listener_flow,
store=store)

def delete_listener(self, listener):
"""Deletes a listener.
@@ -292,15 +272,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
# the project ID
lb = self._lb_repo.get(db_apis.get_session(),
id=listener[constants.LOADBALANCER_ID])
delete_listener_tf = self._taskflow_load(
self._listener_flows.get_delete_listener_flow(),
store={constants.LISTENER: listener,
constants.LOADBALANCER_ID:
listener[constants.LOADBALANCER_ID],
constants.PROJECT_ID: lb.project_id})
with tf_logging.DynamicLoggingListener(delete_listener_tf,
log=LOG):
delete_listener_tf.run()
store = {constants.LISTENER: listener,
constants.LOADBALANCER_ID:
listener[constants.LOADBALANCER_ID],
constants.PROJECT_ID: lb.project_id}
self.services_controller.run_poster(
flow_utils.get_delete_listener_flow,
store=store)

def update_listener(self, listener, listener_updates):
"""Updates a listener.
@@ -312,14 +290,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
"""
db_lb = self._lb_repo.get(db_apis.get_session(),
id=listener[constants.LOADBALANCER_ID])
update_listener_tf = self._taskflow_load(
self._listener_flows.get_update_listener_flow(),
store={constants.LISTENER: listener,
constants.UPDATE_DICT: listener_updates,
constants.LOADBALANCER_ID: db_lb.id,
constants.LISTENERS: [listener]})
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run()
store = {constants.LISTENER: listener,
constants.UPDATE_DICT: listener_updates,
constants.LOADBALANCER_ID: db_lb.id,
constants.LISTENERS: [listener]}
self.services_controller.run_poster(
flow_utils.get_update_listener_flow,
store=store)

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
@@ -364,13 +341,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
store[constants.UPDATE_DICT] = {
constants.TOPOLOGY: topology
}

create_lb_flow = self._lb_flows.get_create_load_balancer_flow(
topology=topology, listeners=listeners_dicts)

create_lb_tf = self._taskflow_load(create_lb_flow, store=store)
with tf_logging.DynamicLoggingListener(create_lb_tf, log=LOG):
create_lb_tf.run()
self.services_controller.run_poster(
flow_utils.get_create_load_balancer_flow,
topology, listeners=listeners_dicts,
store=store)

def delete_load_balancer(self, load_balancer, cascade=False):
"""Deletes a load balancer by de-allocating Amphorae.
@@ -381,25 +355,19 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
"""
db_lb = self._lb_repo.get(db_apis.get_session(),
id=load_balancer[constants.LOADBALANCER_ID])
store = {}

store = {constants.LOADBALANCER: load_balancer,
constants.SERVER_GROUP_ID: db_lb.server_group_id,
constants.PROJECT_ID: db_lb.project_id}
if cascade:
flow = self._lb_flows.get_cascade_delete_load_balancer_flow(
load_balancer)
store.update(self._lb_flows.get_delete_pools_store(db_lb))
store.update(self._lb_flows.get_delete_listeners_store(db_lb))
store.update(flow_utils.get_delete_pools_store(db_lb))
store.update(flow_utils.get_delete_listeners_store(db_lb))
self.services_controller.run_poster(
flow_utils.get_cascade_delete_load_balancer_flow,
load_balancer, store=store)
else:
flow = self._lb_flows.get_delete_load_balancer_flow(
load_balancer)
store.update({constants.LOADBALANCER: load_balancer,
constants.SERVER_GROUP_ID: db_lb.server_group_id,
constants.PROJECT_ID: db_lb.project_id})

delete_lb_tf = self._taskflow_load(flow, store=store)

with tf_logging.DynamicLoggingListener(delete_lb_tf,
log=LOG):
delete_lb_tf.run()
self.services_controller.run_poster(
flow_utils.get_delete_load_balancer_flow,
load_balancer, store=store)

def update_load_balancer(self, original_load_balancer,
load_balancer_updates):
@@ -410,17 +378,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:returns: None
:raises LBNotFound: The referenced load balancer was not found
"""
store = {constants.LOADBALANCER: original_load_balancer,
constants.LOADBALANCER_ID:
original_load_balancer[constants.LOADBALANCER_ID],
constants.UPDATE_DICT: load_balancer_updates}

update_lb_tf = self._taskflow_load(
self._lb_flows.get_update_load_balancer_flow(),
store={constants.LOADBALANCER: original_load_balancer,
constants.LOADBALANCER_ID:
original_load_balancer[constants.LOADBALANCER_ID],
constants.UPDATE_DICT: load_balancer_updates})

with tf_logging.DynamicLoggingListener(update_lb_tf,
log=LOG):
update_lb_tf.run()
self.services_controller.run_poster(
flow_utils.get_update_load_balancer_flow,
store=store)

def create_member(self, member):
"""Creates a pool member.
@@ -452,12 +417,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}

create_member_tf = self._taskflow_load(
self._member_flows.get_create_member_flow(),
self.services_controller.run_poster(
flow_utils.get_create_member_flow,
store=store)
with tf_logging.DynamicLoggingListener(create_member_tf,
log=LOG):
create_member_tf.run()

def delete_member(self, member):
"""Deletes a pool member.
@@ -491,13 +453,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}

delete_member_tf = self._taskflow_load(
self._member_flows.get_delete_member_flow(),
store=store
)
with tf_logging.DynamicLoggingListener(delete_member_tf,
log=LOG):
delete_member_tf.run()
self.services_controller.run_poster(
flow_utils.get_delete_member_flow,
store=store)

def batch_update_members(self, old_members, new_members,
updated_members):
@@ -543,13 +501,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}

batch_update_members_tf = self._taskflow_load(
self._member_flows.get_batch_update_members_flow(
provider_old_members, new_members, updated_members),
self.services_controller.run_poster(
flow_utils.get_batch_update_members_flow,
provider_old_members, new_members, updated_members,
store=store)
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
log=LOG):
batch_update_members_tf.run()

def update_member(self, member, member_updates):
"""Updates a pool member.
@@ -584,13 +539,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
store[constants.AVAILABILITY_ZONE] = {}

update_member_tf = self._taskflow_load(
self._member_flows.get_update_member_flow(),
self.services_controller.run_poster(
flow_utils.get_update_member_flow,
store=store)
with tf_logging.DynamicLoggingListener(update_member_tf,
log=LOG):
update_member_tf.run()

@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_pool(self, pool):
"""Creates a node pool.

@@ -616,15 +573,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
db_pool.listeners))

create_pool_tf = self._taskflow_load(
self._pool_flows.get_create_pool_flow(),
store={constants.POOL_ID: pool[constants.POOL_ID],
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb})
with tf_logging.DynamicLoggingListener(create_pool_tf,
log=LOG):
create_pool_tf.run()
store = {constants.POOL_ID: pool[constants.POOL_ID],
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id,
constants.LOADBALANCER: provider_lb}
self.services_controller.run_poster(
flow_utils.get_create_pool_flow,
store=store)

def delete_pool(self, pool):
"""Deletes a node pool.
@@ -644,16 +599,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()

delete_pool_tf = self._taskflow_load(
self._pool_flows.get_delete_pool_flow(),
store={constants.POOL_ID: pool[constants.POOL_ID],
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.PROJECT_ID: db_pool.project_id})
with tf_logging.DynamicLoggingListener(delete_pool_tf,
log=LOG):
delete_pool_tf.run()
store = {constants.POOL_ID: pool[constants.POOL_ID],
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.PROJECT_ID: db_pool.project_id}
self.services_controller.run_poster(
flow_utils.get_delete_pool_flow,
store=store)

def update_pool(self, origin_pool, pool_updates):
"""Updates a node pool.
@@ -682,16 +635,14 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
db_pool.listeners))

update_pool_tf = self._taskflow_load(
self._pool_flows.get_update_pool_flow(),
store={constants.POOL_ID: db_pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: pool_updates})
with tf_logging.DynamicLoggingListener(update_pool_tf,
log=LOG):
update_pool_tf.run()
store = {constants.POOL_ID: db_pool.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER: provider_lb,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: pool_updates}
self.services_controller.run_poster(
flow_utils.get_update_pool_flow,
store=store)

def create_l7policy(self, l7policy):
"""Creates an L7 Policy.
@@ -707,15 +658,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[db_listener]))

create_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_create_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id
})
with tf_logging.DynamicLoggingListener(create_l7policy_tf,
log=LOG):
create_l7policy_tf.run()
store = {constants.L7POLICY: l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id
}
self.services_controller.run_poster(
flow_utils.get_create_l7policy_flow,
store=store)

def delete_l7policy(self, l7policy):
"""Deletes an L7 policy.
@@ -730,15 +679,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[db_listener]))

delete_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_delete_l7policy_flow(),
store={constants.L7POLICY: l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id
})
with tf_logging.DynamicLoggingListener(delete_l7policy_tf,
log=LOG):
delete_l7policy_tf.run()
store = {constants.L7POLICY: l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id
}
self.services_controller.run_poster(
flow_utils.get_delete_l7policy_flow,
store=store)

def update_l7policy(self, original_l7policy, l7policy_updates):
"""Updates an L7 policy.
@@ -755,15 +702,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[db_listener]))

update_l7policy_tf = self._taskflow_load(
self._l7policy_flows.get_update_l7policy_flow(),
store={constants.L7POLICY: original_l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id,
constants.UPDATE_DICT: l7policy_updates})
with tf_logging.DynamicLoggingListener(update_l7policy_tf,
log=LOG):
update_l7policy_tf.run()
store = {constants.L7POLICY: original_l7policy,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: db_listener.load_balancer.id,
constants.UPDATE_DICT: l7policy_updates}
self.services_controller.run_poster(
flow_utils.get_update_l7policy_flow,
store=store)

def create_l7rule(self, l7rule):
"""Creates an L7 Rule.
@@ -783,17 +728,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
db_l7policy)

create_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_create_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy_dict.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.L7POLICY_ID: db_l7policy.id,
constants.LOADBALANCER_ID: load_balancer.id
})
with tf_logging.DynamicLoggingListener(create_l7rule_tf,
log=LOG):
create_l7rule_tf.run()
store = {constants.L7RULE: l7rule,
constants.L7POLICY: l7policy_dict.to_dict(),
constants.L7POLICY_ID: db_l7policy.id,
constants.LISTENERS: listeners_dicts,
constants.LOADBALANCER_ID: load_balancer.id
}
self.services_controller.run_poster(
flow_utils.get_create_l7rule_flow,
store=store)

def delete_l7rule(self, l7rule):
"""Deletes an L7 rule.
@@ -811,17 +754,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
provider_utils.db_listeners_to_provider_dicts_list_of_dicts(
[db_l7policy.listener]))

delete_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_delete_l7rule_flow(),
store={constants.L7RULE: l7rule,
constants.L7POLICY: l7policy.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.L7POLICY_ID: db_l7policy.id,
constants.LOADBALANCER_ID: load_balancer.id
})
with tf_logging.DynamicLoggingListener(delete_l7rule_tf,
log=LOG):
delete_l7rule_tf.run()
store = {constants.L7RULE: l7rule,
constants.L7POLICY: l7policy.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.L7POLICY_ID: db_l7policy.id,
constants.LOADBALANCER_ID: load_balancer.id
}
self.services_controller.run_poster(
flow_utils.get_delete_l7rule_flow,
store=store)

def update_l7rule(self, original_l7rule, l7rule_updates):
"""Updates an L7 rule.
@@ -841,17 +782,15 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
l7policy_dict = provider_utils.db_l7policy_to_provider_l7policy(
db_l7policy)

update_l7rule_tf = self._taskflow_load(
self._l7rule_flows.get_update_l7rule_flow(),
store={constants.L7RULE: original_l7rule,
constants.L7POLICY: l7policy_dict.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.L7POLICY_ID: db_l7policy.id,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: l7rule_updates})
with tf_logging.DynamicLoggingListener(update_l7rule_tf,
log=LOG):
update_l7rule_tf.run()
store = {constants.L7RULE: original_l7rule,
constants.L7POLICY: l7policy_dict.to_dict(),
constants.LISTENERS: listeners_dicts,
constants.L7POLICY_ID: db_l7policy.id,
constants.LOADBALANCER_ID: load_balancer.id,
constants.UPDATE_DICT: l7rule_updates}
self.services_controller.run_poster(
flow_utils.get_update_l7rule_flow,
store=store)

def _perform_amphora_failover(self, amp, priority):
"""Internal method to perform failover operations for an amphora.
@@ -903,7 +842,7 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
lb).to_dict() if lb else lb
if CONF.nova.enable_anti_affinity and lb:
stored_params[constants.SERVER_GROUP_ID] = lb.server_group_id
if lb and lb.flavor_id:
if lb is not None and lb.flavor_id:
stored_params[constants.FLAVOR] = (
self._flavor_repo.get_flavor_metadata_dict(
db_apis.get_session(), lb.flavor_id))
@@ -916,13 +855,10 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
else:
stored_params[constants.AVAILABILITY_ZONE] = {}

failover_amphora_tf = self._taskflow_load(
self._amphora_flows.get_failover_flow(
role=amp.role, load_balancer=provider_lb),
store=stored_params)

with tf_logging.DynamicLoggingListener(failover_amphora_tf, log=LOG):
failover_amphora_tf.run()
self.services_controller.run_poster(
flow_utils.get_failover_flow,
role=amp.role, load_balancer=provider_lb,
store=stored_params, wait=True)

LOG.info("Successfully completed the failover for an amphora: %s",
{"id": amp.id,
@@ -1019,14 +955,12 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
id=amphora_id)
LOG.info("Start amphora cert rotation, amphora's id is: %s", amp.id)

certrotation_amphora_tf = self._taskflow_load(
self._amphora_flows.cert_rotate_amphora_flow(),
store={constants.AMPHORA: amp.to_dict(),
constants.AMPHORA_ID: amphora_id})
store = {constants.AMPHORA: amp.to_dict(),
constants.AMPHORA_ID: amphora_id}

with tf_logging.DynamicLoggingListener(certrotation_amphora_tf,
log=LOG):
certrotation_amphora_tf.run()
self.services_controller.run_poster(
flow_utils.cert_rotate_amphora_flow,
store=store)

def update_amphora_agent_config(self, amphora_id):
"""Update the amphora agent configuration.
@@ -1048,11 +982,9 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
flavor = self._flavor_repo.get_flavor_metadata_dict(
db_apis.get_session(), lb.flavor_id)

update_amphora_tf = self._taskflow_load(
self._amphora_flows.update_amphora_config_flow(),
store={constants.AMPHORA: amp.to_dict(),
constants.FLAVOR: flavor})
store = {constants.AMPHORA: amp.to_dict(),
constants.FLAVOR: flavor}

with tf_logging.DynamicLoggingListener(update_amphora_tf,
log=LOG):
update_amphora_tf.run()
self.services_controller.run_poster(
flow_utils.update_amphora_config_flow,
store=store)

+ 158
- 0
octavia/controller/worker/v2/flows/flow_utils.py View File

@@ -0,0 +1,158 @@

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from octavia.common import constants
from octavia.controller.worker.v2.flows import amphora_flows
from octavia.controller.worker.v2.flows import health_monitor_flows
from octavia.controller.worker.v2.flows import l7policy_flows
from octavia.controller.worker.v2.flows import l7rule_flows
from octavia.controller.worker.v2.flows import listener_flows
from octavia.controller.worker.v2.flows import load_balancer_flows
from octavia.controller.worker.v2.flows import member_flows
from octavia.controller.worker.v2.flows import pool_flows


LB_FLOWS = load_balancer_flows.LoadBalancerFlows()
AMP_FLOWS = amphora_flows.AmphoraFlows()
HM_FLOWS = health_monitor_flows.HealthMonitorFlows()
L7_POLICY_FLOWS = l7policy_flows.L7PolicyFlows()
L7_RULES_FLOWS = l7rule_flows.L7RuleFlows()
LISTENER_FLOWS = listener_flows.ListenerFlows()
M_FLOWS = member_flows.MemberFlows()
P_FLOWS = pool_flows.PoolFlows()


def get_create_load_balancer_flow(topology, listeners=None):
return LB_FLOWS.get_create_load_balancer_flow(topology,
listeners=listeners)


def get_delete_load_balancer_flow(lb):
return LB_FLOWS.get_delete_load_balancer_flow(lb)


def get_delete_listeners_store(lb):
return LB_FLOWS.get_delete_listeners_store(lb)


def get_delete_pools_store(lb):
return LB_FLOWS.get_delete_pools_store(lb)


def get_cascade_delete_load_balancer_flow(lb):
return LB_FLOWS.get_cascade_delete_load_balancer_flow(lb)


def get_update_load_balancer_flow():
return LB_FLOWS.get_update_load_balancer_flow()


def get_create_amphora_flow():
return AMP_FLOWS.get_create_amphora_flow()


def get_delete_amphora_flow():
return AMP_FLOWS.get_delete_amphora_flow()


def get_failover_flow(role=constants.ROLE_STANDALONE, load_balancer=None):
return AMP_FLOWS.get_failover_flow(role=role, load_balancer=load_balancer)


def cert_rotate_amphora_flow():
return AMP_FLOWS.cert_rotate_amphora_flow()


def update_amphora_config_flow():
return AMP_FLOWS.update_amphora_config_flow()


def get_create_health_monitor_flow():
return HM_FLOWS.get_create_health_monitor_flow()


def get_delete_health_monitor_flow():
return HM_FLOWS.get_delete_health_monitor_flow()


def get_update_health_monitor_flow():
return HM_FLOWS.get_update_health_monitor_flow()


def get_create_l7policy_flow():
return L7_POLICY_FLOWS.get_create_l7policy_flow()


def get_delete_l7policy_flow():
return L7_POLICY_FLOWS.get_delete_l7policy_flow()


def get_update_l7policy_flow():
return L7_POLICY_FLOWS.get_update_l7policy_flow()


def get_create_l7rule_flow():
return L7_RULES_FLOWS.get_create_l7rule_flow()


def get_delete_l7rule_flow():
return L7_RULES_FLOWS.get_delete_l7rule_flow()


def get_update_l7rule_flow():
return L7_RULES_FLOWS.get_update_l7rule_flow()


def get_create_listener_flow():
return LISTENER_FLOWS.get_create_listener_flow()


def get_create_all_listeners_flow():
return LISTENER_FLOWS.get_create_all_listeners_flow()


def get_delete_listener_flow():
return LISTENER_FLOWS.get_delete_listener_flow()


def get_update_listener_flow():
return LISTENER_FLOWS.get_update_listener_flow()


def get_create_member_flow():
return M_FLOWS.get_create_member_flow()


def get_delete_member_flow():
return M_FLOWS.get_delete_member_flow()


def get_update_member_flow():
return M_FLOWS.get_update_member_flow()


def get_batch_update_members_flow(old_members, new_members, updated_members):
return M_FLOWS.get_batch_update_members_flow(old_members, new_members,
updated_members)


def get_create_pool_flow():
return P_FLOWS.get_create_pool_flow()


def get_delete_pool_flow():
return P_FLOWS.get_delete_pool_flow()


def get_update_pool_flow():
return P_FLOWS.get_update_pool_flow()

+ 102
- 0
octavia/controller/worker/v2/taskflow_jobboard_driver.py View File

@@ -0,0 +1,102 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import abc
import contextlib

from oslo_config import cfg
from oslo_log import log
from taskflow.jobs import backends as job_backends
from taskflow.persistence import backends as persistence_backends

LOG = log.getLogger(__name__)
CONF = cfg.CONF


class JobboardTaskFlowDriver(object, metaclass=abc.ABCMeta):

@abc.abstractmethod
def job_board(self, persistence):
"""Setting up jobboard backend based on configuration setting.

:param persistence: taskflow persistence backend instance
:return: taskflow jobboard backend instance
"""


class MysqlPersistenceDriver(object):

def __init__(self):
self.persistence_conf = {
'connection': CONF.task_flow.persistence_connection,
'max_pool_size': CONF.database.max_pool_size,
'max_overflow': CONF.database.max_overflow,
'pool_timeout': CONF.database.pool_timeout,
}

def initialize(self):
# Run migrations once on service start.
backend = persistence_backends.fetch(self.persistence_conf)
with contextlib.closing(backend):
with contextlib.closing(backend.get_connection()) as connection:
connection.upgrade()

@contextlib.contextmanager
def get_persistence(self):
# Rewrite taskflow get backend, so it won't run migrations on each call
backend = persistence_backends.fetch(self.persistence_conf)
with contextlib.closing(backend):
with contextlib.closing(backend.get_connection()) as conn:
conn.validate()
yield backend


class ZookeeperTaskFlowDriver(JobboardTaskFlowDriver):

def __init__(self, persistence_driver):
self.persistence_driver = persistence_driver

def job_board(self, persistence):
job_backends_hosts = ','.join(
['%s:%s' % (host, CONF.task_flow.jobboard_backend_port)
for host in CONF.task_flow.jobboard_backend_hosts])
jobboard_backend_conf = {
'board': 'zookeeper',
'hosts': job_backends_hosts,
'path': '/' + CONF.task_flow.jobboard_backend_namespace,
}
jobboard_backend_conf.update(
CONF.task_flow.jobboard_zookeeper_ssl_options)
return job_backends.backend(CONF.task_flow.jobboard_name,
jobboard_backend_conf,
persistence=persistence)


class RedisTaskFlowDriver(JobboardTaskFlowDriver):

def __init__(self, persistence_driver):
self.persistence_driver = persistence_driver

def job_board(self, persistence):
jobboard_backend_conf = {
'board': 'redis',
'host': CONF.task_flow.jobboard_backend_hosts[0],
'port': CONF.task_flow.jobboard_backend_port,
'password': CONF.task_flow.jobboard_backend_password,
'namespace': CONF.task_flow.jobboard_backend_namespace,
}
jobboard_backend_conf.update(
CONF.task_flow.jobboard_redis_backend_ssl_options)
return job_backends.backend(
CONF.task_flow.jobboard_backend_namespace,
jobboard_backend_conf,
persistence=persistence)

+ 18
- 0
octavia/db/migration/cli.py View File

@@ -22,10 +22,15 @@ from oslo_config import cfg
from oslo_db import options
from oslo_log import log

from octavia.controller.worker.v2 import taskflow_jobboard_driver
from octavia.i18n import _

CONF = cfg.CONF
options.set_defaults(CONF)
# Setting explicitly here needed for taskflow persistence successful
# initialization
options.set_defaults(CONF, max_pool_size=10, max_overflow=20,
pool_timeout=10)
log.set_defaults()
log.register_options(CONF)
log.setup(CONF, 'octavia-db-manage')
@@ -85,6 +90,14 @@ def do_revision(config, cmd):
sql=CONF.command.sql)


def do_persistence_upgrade(config, cmd):
opt = cfg.StrOpt('persistence_connection',
default='sqlite://')
cfg.CONF.register_opts([opt], group='task_flow')
persistence = taskflow_jobboard_driver.MysqlPersistenceDriver()
persistence.initialize()


def add_command_parsers(subparsers):
for name in ['current', 'history', 'branches']:
parser = add_alembic_subparser(subparsers, name)
@@ -101,6 +114,11 @@ def add_command_parsers(subparsers):
parser.add_argument('revision', nargs='?')
parser.set_defaults(func=do_upgrade)

parser = subparsers.add_parser(
"upgrade_persistence",
help="Run migrations for persistence backend")
parser.set_defaults(func=do_persistence_upgrade)

parser = subparsers.add_parser('downgrade', help="(No longer supported)")
parser.add_argument('None', nargs='?', help="Downgrade not supported")
parser.set_defaults(func=no_downgrade)


+ 13
- 3
octavia/db/repositories.py View File

@@ -1249,21 +1249,31 @@ class AmphoraRepository(BaseRepository):
return db_lb.to_data_model()
return None

def get_spare_amphora_count(self, session, availability_zone=None):
def get_spare_amphora_count(self, session, availability_zone=None,
check_booting_amphora=False):
"""Get the count of the spare amphora.

:returns: Number of current spare amphora.
"""
filters = {
'status': consts.AMPHORA_READY,
'load_balancer_id': None
}
# For jobboard based controller amphora in booting/pending create state
# can reach READY state after restart of housekeeping/worker service,
# so include amphora in these state to query
if check_booting_amphora:
status = [consts.AMPHORA_READY,
consts.AMPHORA_BOOTING,
consts.PENDING_CREATE]
else:
status = [consts.AMPHORA_READY]

if availability_zone is not None:
filters['cached_zone'] = availability_zone

with session.begin(subtransactions=True):
count = session.query(self.model_class).filter_by(
**filters).count()
**filters).filter(self.model_class.status.in_(status)).count()

return count



+ 15
- 0
octavia/tests/functional/db/test_repositories.py View File

@@ -3260,6 +3260,21 @@ class AmphoraRepositoryTest(BaseRepositoryTest):
count = self.amphora_repo.get_spare_amphora_count(self.session)
self.assertEqual(2, count)

def test_get_spare_amphora_count_check_booting_amphora_true(self):
count = self.amphora_repo.get_spare_amphora_count(
self.session, check_booting_amphora=True)
self.assertEqual(0, count)

amphora1 = self.create_amphora(self.FAKE_UUID_1)
self.amphora_repo.update(self.session, amphora1.id,
status=constants.AMPHORA_READY,)
amphora2 = self.create_amphora(self.FAKE_UUID_2)
self.amphora_repo.update(self.session, amphora2.id,
status=constants.AMPHORA_BOOTING)
count = self.amphora_repo.get_spare_amphora_count(
self.session, check_booting_amphora=True)
self.assertEqual(2, count)

def test_get_none_cert_expired_amphora(self):
# test with no expired amphora
amp = self.amphora_repo.get_cert_expiring_amphora(self.session)


+ 83
- 0
octavia/tests/unit/common/test_base_taskflow.py View File

@@ -66,3 +66,86 @@ class TestBaseTaskFlowEngine(base.TestCase):

_engine_mock.compile.assert_called_once_with()
_engine_mock.prepare.assert_called_once_with()


class TestTaskFlowServiceController(base.TestCase):

_mock_uuid = '9a2ebc48-cd3e-429e-aa04-e32f5fc5442a'

def setUp(self):
self.conf = oslo_fixture.Config(cfg.CONF)
self.conf.config(group="task_flow", engine='parallel')
self.driver_mock = mock.MagicMock()
self.persistence_mock = mock.MagicMock()
self.jobboard_mock = mock.MagicMock()
self.driver_mock.job_board.return_value = self.jobboard_mock
self.driver_mock.persistence_driver.get_persistence.return_value = (
self.persistence_mock)
self.service_controller = base_taskflow.TaskFlowServiceController(
self.driver_mock)
super(TestTaskFlowServiceController, self).setUp()

@mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid)
@mock.patch('taskflow.engines.save_factory_details')
def test_run_poster(self, mock_engines, mockuuid):
flow_factory = mock.MagicMock()
flow_factory.__name__ = 'testname'
job_name = 'testname-%s' % self._mock_uuid
job_details = {'store': 'test'}
with mock.patch.object(self.service_controller, '_wait_for_job'
) as wait:
uuid = self.service_controller.run_poster(flow_factory,
**job_details)
save_logbook = self.persistence_mock.__enter__().get_connection(
).save_logbook
save_logbook.assert_called()
self.assertEqual(job_name, save_logbook.call_args[0][0].name)

mock_engines.assert_called()
save_args = mock_engines.call_args
self.assertEqual(job_name, save_args[0][0].name)
self.assertEqual(self._mock_uuid, save_args[0][0].uuid)
self.assertEqual(flow_factory, save_args[0][1])
self.assertEqual(self.persistence_mock.__enter__(),
save_args[1]['backend'])

self.jobboard_mock.__enter__().post.assert_called()
post_args = self.jobboard_mock.__enter__().post.call_args
self.assertEqual(job_name, post_args[0][0])
self.assertEqual(job_details, post_args[1]['details'])
wait.assert_not_called()
self.assertEqual(self._mock_uuid, uuid)

@mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid)
@mock.patch('taskflow.engines.save_factory_details')
def test_run_poster_wait(self, mock_engines, mockuuid):
flow_factory = mock.MagicMock()
flow_factory.__name__ = 'testname'
job_details = {'store': 'test'}
with mock.patch.object(self.service_controller, '_wait_for_job'
) as wait:
uuid = self.service_controller.run_poster(flow_factory, wait=True,
**job_details)
self.persistence_mock.__enter__().get_connection(
).save_logbook.assert_called()
mock_engines.assert_called()
self.jobboard_mock.__enter__().post.assert_called()
wait.assert_called_once_with(self.jobboard_mock.__enter__())
self.assertEqual(self._mock_uuid, uuid)

@mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor')
@mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor')
def test_run_conductor(self, dynamiccond, rediscond):
self.service_controller.run_conductor("test")
rediscond.assert_called_once_with(
"test", self.jobboard_mock.__enter__(),
persistence=self.persistence_mock.__enter__(),
engine='parallel')
self.conf.config(group="task_flow",
jobboard_backend_driver='zookeeper_taskflow_driver')

self.service_controller.run_conductor("test2")
dynamiccond.assert_called_once_with(
"test2", self.jobboard_mock.__enter__(),
persistence=self.persistence_mock.__enter__(),
engine='parallel')

+ 22
- 6
octavia/tests/unit/controller/healthmanager/test_health_manager.py View File

@@ -45,11 +45,14 @@ class TestHealthManager(base.TestCase):
@mock.patch('octavia.db.api.wait_for_connection')
@mock.patch('octavia.controller.worker.v1.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.controller.worker.v2.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora')
@mock.patch('octavia.db.api.get_session')
def test_health_check_stale_amphora(self, session_mock, get_stale_amp_mock,
failover_mock, db_wait_mock):
failover_mockv2, failover_mock,
db_wait_mock):
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="health_manager", heartbeat_timeout=5)
amphora_health = mock.MagicMock()
@@ -86,11 +89,14 @@ class TestHealthManager(base.TestCase):

@mock.patch('octavia.controller.worker.v1.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.controller.worker.v2.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora', return_value=None)
@mock.patch('octavia.db.api.get_session')
def test_health_check_nonstale_amphora(self, session_mock,
get_stale_amp_mock, failover_mock):
get_stale_amp_mock, failover_mockv2,
failover_mock):
get_stale_amp_mock.side_effect = [None, TestException('test')]

exit_event = threading.Event()
@@ -98,15 +104,20 @@ class TestHealthManager(base.TestCase):

hm.health_check()
session_mock.assert_called_once_with(autocommit=False)
self.assertFalse(failover_mock.called)
if CONF.api_settings.default_provider_driver == 'amphorav2':
self.assertFalse(failover_mockv2.called)
else:
self.assertFalse(failover_mock.called)

@mock.patch('octavia.controller.worker.v1.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.controller.worker.v2.controller_worker.'
'ControllerWorker.failover_amphora')
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.'
'get_stale_amphora', return_value=None)
@mock.patch('octavia.db.api.get_session')
def test_health_check_exit(self, session_mock, get_stale_amp_mock,
failover_mock):