From 3429e3824c060071e59a117c19c95659c78e4c8b Mon Sep 17 00:00:00 2001 From: Dmitry Tantsur Date: Thu, 7 Jan 2016 12:12:59 +0100 Subject: [PATCH] Switch to Futurist library for asynchronous execution and periodic tasks This change switches the conductor to using Futurist library executor and periodic tasks worker instead of oslo.service periodic tasks. This allows running periodic tasks in parallel and relying on more standard interfaces (futures, executors) when dealing with asynchronous execution. A green thread executor is used instead of using an eventlet green pool directly. The maximum number of workers is taken from the existing workers_pool_size configuration option, and no tasks are allowed to be enqueued to mimic the previous behaviour (might be lifted later). The periodic tasks worker is using the same executor, and its main loop thread is also running on it. For this reason minimum value for workers_pool_size is now 3: periodic task main loop, keep alive thread and at least one thread for other tasks. A special decorator for driver-periodic tasks is now deprecated, as the generic decorator can be used there as well. Closes-Bug: #1526277 Change-Id: I57bf7cebfb6db805b6c521bacfef2993b16ce1ee --- doc/source/dev/architecture.rst | 25 +--- etc/ironic/ironic.conf.sample | 7 +- ironic/common/service.py | 9 +- ironic/conductor/base_manager.py | 120 +++++++++++++---- ironic/conductor/manager.py | 23 ++-- ironic/conductor/task_manager.py | 14 +- ironic/drivers/base.py | 61 ++++----- ironic/drivers/modules/inspector.py | 5 +- ironic/tests/unit/conductor/mgr_utils.py | 9 +- .../tests/unit/conductor/test_base_manager.py | 62 ++++++--- ironic/tests/unit/conductor/test_manager.py | 125 +++++++----------- .../tests/unit/conductor/test_task_manager.py | 93 ++----------- ironic/tests/unit/drivers/test_base.py | 21 +-- .../notes/futurist-e9c55699f479f97a.yaml | 16 +++ requirements.txt | 1 + 15 files changed, 283 insertions(+), 308 deletions(-) create mode 100644 releasenotes/notes/futurist-e9c55699f479f97a.yaml diff --git a/doc/source/dev/architecture.rst b/doc/source/dev/architecture.rst index e055c2479a..26dda4dfc8 100644 --- a/doc/source/dev/architecture.rst +++ b/doc/source/dev/architecture.rst @@ -77,12 +77,14 @@ Driver-Specific Periodic Tasks Drivers may run their own periodic tasks, i.e. actions run repeatedly after a certain amount of time. Such task is created by decorating a method on the -driver itself or on any interface with driver_periodic_task_ decorator, e.g. +driver itself or on any interface with periodic_ decorator, e.g. :: + from futurist import periodics + class FakePower(base.PowerInterface): - @base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task(self, manager, context): pass # do something @@ -90,7 +92,7 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g. def __init__(self): self.power = FakePower() - @base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task2(self, manager, context): pass # do something @@ -98,21 +100,6 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g. Here the ``spacing`` argument is a period in seconds for a given periodic task. For example 'spacing=5' means every 5 seconds. -.. note:: - The ``parallel`` argument may be passed to driver_periodic_task_. - If it's set to False, this task will be run in the periodic task loop, - rather than a separate greenthread. - - This is deprecated as of Liberty release, and the parallel argument will be - ignored starting in the Mitaka cycle, as such task would prevent all other - periodic tasks from starting while it is running. - -.. note:: - By default periodic task names are derived from method names, - so they should be unique within a Python module. - Use ``name`` argument to driver_periodic_task_ to override - automatically generated name. - Message Routing =============== @@ -137,4 +124,4 @@ driver actions such as take-over or clean-up. .. _DB API: ../api/ironic.db.api.html .. _diskimage-builder: https://github.com/openstack/diskimage-builder .. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html -.. _driver_periodic_task: ../api/ironic.drivers.base.html#ironic.drivers.base.driver_periodic_task +.. _periodic: http://docs.openstack.org/developer/futurist/api.html#futurist.periodics.periodic diff --git a/etc/ironic/ironic.conf.sample b/etc/ironic/ironic.conf.sample index 60f180b66e..30cbb6551a 100644 --- a/etc/ironic/ironic.conf.sample +++ b/etc/ironic/ironic.conf.sample @@ -117,7 +117,8 @@ # Options defined in ironic.common.service # -# Seconds between running periodic tasks. (integer value) +# Default interval for running driver periodic tasks. (integer +# value) #periodic_interval=60 # Name of this node. This can be an opaque identifier. It is @@ -596,7 +597,9 @@ # Options defined in ironic.conductor.base_manager # -# The size of the workers greenthread pool. (integer value) +# The size of the workers greenthread pool. Note that 2 +# threads will be reserved by the conductor itself for +# handling heart beats and periodic tasks. (integer value) #workers_pool_size=100 # Seconds between conductor heart beats. (integer value) diff --git a/ironic/common/service.py b/ironic/common/service.py index 01333c9642..71c8e30ab5 100644 --- a/ironic/common/service.py +++ b/ironic/common/service.py @@ -40,7 +40,8 @@ from ironic.objects import base as objects_base service_opts = [ cfg.IntOpt('periodic_interval', default=60, - help=_('Seconds between running periodic tasks.')), + help=_('Default interval for running driver periodic tasks.'), + deprecated_for_removal=True), cfg.StrOpt('host', default=socket.getfqdn(), help=_('Name of this node. This can be an opaque identifier. ' @@ -79,11 +80,7 @@ class RPCService(service.Service): self.rpcserver.start() self.handle_signal() - self.manager.init_host() - self.tg.add_dynamic_timer( - self.manager.periodic_tasks, - periodic_interval_max=CONF.periodic_interval, - context=admin_context) + self.manager.init_host(admin_context) LOG.info(_LI('Created RPC server for service %(service)s on host ' '%(host)s.'), diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py index 48abbbe4ba..e4ace43cca 100644 --- a/ironic/conductor/base_manager.py +++ b/ironic/conductor/base_manager.py @@ -15,13 +15,13 @@ import inspect import threading -from eventlet import greenpool -from oslo_concurrency import lockutils +import futurist +from futurist import periodics +from futurist import rejection from oslo_config import cfg from oslo_context import context as ironic_context from oslo_db import exception as db_exception from oslo_log import log -from oslo_service import periodic_task from oslo_utils import excutils from ironic.common import driver_factory @@ -40,8 +40,10 @@ from ironic.db import api as dbapi conductor_opts = [ cfg.IntOpt('workers_pool_size', - default=100, - help=_('The size of the workers greenthread pool.')), + default=100, min=3, + help=_('The size of the workers greenthread pool. ' + 'Note that 2 threads will be reserved by the conductor ' + 'itself for handling heart beats and periodic tasks.')), cfg.IntOpt('heartbeat_interval', default=10, help=_('Seconds between conductor heart beats.')), @@ -51,18 +53,18 @@ conductor_opts = [ CONF = cfg.CONF CONF.register_opts(conductor_opts, 'conductor') LOG = log.getLogger(__name__) -WORKER_SPAWN_lOCK = "conductor_worker_spawn" -class BaseConductorManager(periodic_task.PeriodicTasks): +class BaseConductorManager(object): def __init__(self, host, topic): - super(BaseConductorManager, self).__init__(CONF) + super(BaseConductorManager, self).__init__() if not host: host = CONF.host self.host = host self.topic = topic self.notifier = rpc.get_notifier() + self._started = False def _get_driver(self, driver_name): """Get the driver. @@ -78,15 +80,29 @@ class BaseConductorManager(periodic_task.PeriodicTasks): except KeyError: raise exception.DriverNotFound(driver_name=driver_name) - def init_host(self): + def init_host(self, admin_context=None): + """Initialize the conductor host. + + :param admin_context: the admin context to pass to periodic tasks. + :raises: RuntimeError when conductor is already running + :raises: NoDriversLoaded when no drivers are enabled on the conductor + """ + if self._started: + raise RuntimeError(_('Attempt to start an already running ' + 'conductor manager')) + self.dbapi = dbapi.get_instance() self._keepalive_evt = threading.Event() """Event for the keepalive thread.""" - self._worker_pool = greenpool.GreenPool( - size=CONF.conductor.workers_pool_size) - """GreenPool of background workers for performing tasks async.""" + # TODO(dtantsur): make the threshold configurable? + rejection_func = rejection.reject_when_reached( + CONF.conductor.workers_pool_size) + self._executor = futurist.GreenThreadPoolExecutor( + max_workers=CONF.conductor.workers_pool_size, + check_and_reject=rejection_func) + """Executor for performing tasks async.""" self.ring_manager = hash.HashRingManager() """Consistent hash ring which maps drivers to conductors.""" @@ -106,15 +122,36 @@ class BaseConductorManager(periodic_task.PeriodicTasks): LOG.error(msg, self.host) raise exception.NoDriversLoaded(conductor=self.host) - # Collect driver-specific periodic tasks + # Collect driver-specific periodic tasks. + # Conductor periodic tasks accept context argument, driver periodic + # tasks accept this manager and context. We have to ensure that the + # same driver interface class is not traversed twice, otherwise + # we'll have several instances of the same task. + LOG.debug('Collecting periodic tasks') + self._periodic_task_callables = [] + periodic_task_classes = set() + self._collect_periodic_tasks(self, (admin_context,)) for driver_obj in driver_factory.drivers().values(): - self._collect_periodic_tasks(driver_obj) + self._collect_periodic_tasks(driver_obj, (self, admin_context)) for iface_name in (driver_obj.core_interfaces + driver_obj.standard_interfaces + ['vendor']): iface = getattr(driver_obj, iface_name, None) - if iface: - self._collect_periodic_tasks(iface) + if iface and iface.__class__ not in periodic_task_classes: + self._collect_periodic_tasks(iface, (self, admin_context)) + periodic_task_classes.add(iface.__class__) + + if (len(self._periodic_task_callables) > + CONF.conductor.workers_pool_size): + LOG.warning(_LW('This conductor has %(tasks)d periodic tasks ' + 'enabled, but only %(workers)d task workers ' + 'allowed by [conductor]workers_pool_size option'), + {'tasks': len(self._periodic_task_callables), + 'workers': CONF.conductor.workers_pool_size}) + + self._periodic_tasks = periodics.PeriodicWorker( + self._periodic_task_callables, + executor_factory=periodics.ExistingExecutor(self._executor)) # clear all locks held by this conductor before registering self.dbapi.clear_node_reservations_for_conductor(self.host) @@ -134,6 +171,12 @@ class BaseConductorManager(periodic_task.PeriodicTasks): update_existing=True) self.conductor = cdr + # Start periodic tasks + self._periodic_tasks_worker = self._executor.submit( + self._periodic_tasks.start, allow_empty=True) + self._periodic_tasks_worker.add_done_callback( + self._on_periodic_tasks_stop) + # NOTE(lucasagomes): If the conductor server dies abruptly # mid deployment (OMM Killer, power outage, etc...) we # can not resume the deployment even if the conductor @@ -161,10 +204,7 @@ class BaseConductorManager(periodic_task.PeriodicTasks): LOG.critical(_LC('Failed to start keepalive')) self.del_host() - def _collect_periodic_tasks(self, obj): - for n, method in inspect.getmembers(obj, inspect.ismethod): - if getattr(method, '_periodic_enabled', False): - self.add_periodic_task(method) + self._started = True def del_host(self, deregister=True): # Conductor deregistration fails if called on non-initialized @@ -190,11 +230,34 @@ class BaseConductorManager(periodic_task.PeriodicTasks): # Waiting here to give workers the chance to finish. This has the # benefit of releasing locks workers placed on nodes, as well as # having work complete normally. - self._worker_pool.waitall() + self._periodic_tasks.stop() + self._periodic_tasks.wait() + self._executor.shutdown(wait=True) + self._started = False - def periodic_tasks(self, context, raise_on_error=False): - """Periodic tasks are run at pre-specified interval.""" - return self.run_periodic_tasks(context, raise_on_error=raise_on_error) + def _collect_periodic_tasks(self, obj, args): + """Collect periodic tasks from a given object. + + Populates self._periodic_task_callables with tuples + (callable, args, kwargs). + + :param obj: object containing periodic tasks as methods + :param args: tuple with arguments to pass to every task + """ + for name, member in inspect.getmembers(obj): + if periodics.is_periodic(member): + LOG.debug('Found periodic task %(owner)s.%(member)s', + {'owner': obj.__class__.__name__, + 'member': name}) + self._periodic_task_callables.append((member, args, {})) + + def _on_periodic_tasks_stop(self, fut): + try: + fut.result() + except Exception as exc: + LOG.critical(_LC('Periodic tasks worker has failed: %s'), exc) + else: + LOG.info(_LI('Successfully shut down periodic tasks')) def iter_nodes(self, fields=None, **kwargs): """Iterate over nodes mapped to this conductor. @@ -217,7 +280,6 @@ class BaseConductorManager(periodic_task.PeriodicTasks): if self._mapped_to_this_conductor(*result[:2]): yield result - @lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-') def _spawn_worker(self, func, *args, **kwargs): """Create a greenthread to run func(*args, **kwargs). @@ -225,13 +287,13 @@ class BaseConductorManager(periodic_task.PeriodicTasks): Spawns a greenthread if there are free slots in pool, otherwise raises exception. Execution control returns immediately to the caller. - :returns: GreenThread object. + :returns: Future object. :raises: NoFreeConductorWorker if worker pool is currently full. """ - if self._worker_pool.free(): - return self._worker_pool.spawn(func, *args, **kwargs) - else: + try: + return self._executor.submit(func, *args, **kwargs) + except futurist.RejectedSubmission: raise exception.NoFreeConductorWorker() def _conductor_service_record_keepalive(self): diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index db0f337c1d..9a91d85fe7 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -46,10 +46,10 @@ import datetime import tempfile import eventlet +from futurist import periodics from oslo_config import cfg from oslo_log import log import oslo_messaging as messaging -from oslo_service import periodic_task from oslo_utils import excutils from oslo_utils import uuidutils @@ -1200,8 +1200,7 @@ class ConductorManager(base_manager.BaseConductorManager): action=action, node=node.uuid, state=node.provision_state) - @periodic_task.periodic_task( - spacing=CONF.conductor.sync_power_state_interval) + @periodics.periodic(spacing=CONF.conductor.sync_power_state_interval) def _sync_power_states(self, context): """Periodic task to sync power states for the nodes. @@ -1269,8 +1268,7 @@ class ConductorManager(base_manager.BaseConductorManager): # Yield on every iteration eventlet.sleep(0) - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_deploy_timeouts(self, context): """Periodically checks whether a deploy RPC call has timed out. @@ -1292,8 +1290,7 @@ class ConductorManager(base_manager.BaseConductorManager): self._fail_if_in_state(context, filters, states.DEPLOYWAIT, sort_key, callback_method, err_handler) - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_deploying_status(self, context): """Periodically checks the status of nodes in DEPLOYING state. @@ -1376,8 +1373,7 @@ class ConductorManager(base_manager.BaseConductorManager): task.node.conductor_affinity = self.conductor.id task.node.save() - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_cleanwait_timeouts(self, context): """Periodically checks for nodes being cleaned. @@ -1402,8 +1398,7 @@ class ConductorManager(base_manager.BaseConductorManager): last_error=last_error, keep_target_state=True) - @periodic_task.periodic_task( - spacing=CONF.conductor.sync_local_state_interval) + @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval) def _sync_local_state(self, context): """Perform any actions necessary to sync local state. @@ -1826,8 +1821,7 @@ class ConductorManager(base_manager.BaseConductorManager): driver = self._get_driver(driver_name) return driver.get_properties() - @periodic_task.periodic_task( - spacing=CONF.conductor.send_sensor_data_interval) + @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval) def _send_sensor_data(self, context): """Periodically sends sensor data to Ceilometer.""" # do nothing if send_sensor_data option is False @@ -2061,8 +2055,7 @@ class ConductorManager(base_manager.BaseConductorManager): action='inspect', node=task.node.uuid, state=task.node.provision_state) - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_inspect_timeouts(self, context): """Periodically checks inspect_timeout and fails upon reaching it. diff --git a/ironic/conductor/task_manager.py b/ironic/conductor/task_manager.py index aac3ca12ca..5a0c3da176 100644 --- a/ironic/conductor/task_manager.py +++ b/ironic/conductor/task_manager.py @@ -383,15 +383,15 @@ class TaskManager(object): # for some reason, this is true. # All of the above are asserted in tests such that we'll # catch if eventlet ever changes this behavior. - thread = None + fut = None try: - thread = self._spawn_method(*self._spawn_args, - **self._spawn_kwargs) + fut = self._spawn_method(*self._spawn_args, + **self._spawn_kwargs) # NOTE(comstud): Trying to use a lambda here causes # the callback to not occur for some reason. This # also makes it easier to test. - thread.link(self._thread_release_resources) + fut.add_done_callback(self._thread_release_resources) # Don't unlock! The unlock will occur when the # thread finshes. return @@ -408,9 +408,9 @@ class TaskManager(object): {'method': self._on_error_method.__name__, 'node': self.node.uuid}) - if thread is not None: - # This means the link() failed for some + if fut is not None: + # This means the add_done_callback() failed for some # reason. Nuke the thread. - thread.cancel() + fut.cancel() self.release_resources() self.release_resources() diff --git a/ironic/drivers/base.py b/ironic/drivers/base.py index f71d3931a3..7b4bd60bff 100644 --- a/ironic/drivers/base.py +++ b/ironic/drivers/base.py @@ -24,9 +24,9 @@ import inspect import json import os -import eventlet +from futurist import periodics +from oslo_config import cfg from oslo_log import log as logging -from oslo_service import periodic_task from oslo_utils import excutils import six @@ -40,6 +40,10 @@ RAID_CONFIG_SCHEMA = os.path.join(os.path.dirname(__file__), 'raid_config_schema.json') +CONF = cfg.CONF +CONF.import_opt('periodic_interval', 'ironic.common.service') + + @six.add_metaclass(abc.ABCMeta) class BaseDriver(object): """Base class for all drivers. @@ -1116,45 +1120,36 @@ def clean_step(priority, abortable=False, argsinfo=None): return decorator -def driver_periodic_task(parallel=True, **other): +def driver_periodic_task(**kwargs): """Decorator for a driver-specific periodic task. + Deprecated, please use futurist directly. Example:: + from futurist import periodics + class MyDriver(base.BaseDriver): - @base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task(self, manager, context): # do some job - :param parallel: If True (default), this task is run in a separate thread. - If False, this task will be run in the conductor's periodic task - loop, rather than a separate greenthread. This parameter is - deprecated and will be ignored starting with Mitaka cycle. - :param other: arguments to pass to @periodic_task.periodic_task + :param kwargs: arguments to pass to @periodics.periodic """ - # TODO(dtantsur): drop all this magic once - # https://review.openstack.org/#/c/134303/ lands - semaphore = eventlet.semaphore.BoundedSemaphore() + LOG.warning(_LW('driver_periodic_task decorator is deprecated, please ' + 'use futurist.periodics.periodic directly')) + # Previously we accepted more arguments, make a backward compatibility + # layer for out-of-tree drivers. + new_kwargs = {} + for arg in ('spacing', 'enabled', 'run_immediately'): + try: + new_kwargs[arg] = kwargs.pop(arg) + except KeyError: + pass + new_kwargs.setdefault('spacing', CONF.periodic_interval) - def decorator2(func): - @six.wraps(func) - def wrapper(*args, **kwargs): - if parallel: - def _internal(): - with semaphore: - func(*args, **kwargs) + if kwargs: + LOG.warning(_LW('The following arguments are not supported by ' + 'futurist.periodics.periodic and are ignored: %s'), + ', '.join(kwargs)) - eventlet.greenthread.spawn_n(_internal) - else: - LOG.warning(_LW( - 'Using periodic tasks with parallel=False is deprecated, ' - '"parallel" argument will be ignored starting with ' - 'the Mitaka release')) - func(*args, **kwargs) - - # NOTE(dtantsur): name should be unique - other.setdefault('name', '%s.%s' % (func.__module__, func.__name__)) - decorator = periodic_task.periodic_task(**other) - return decorator(wrapper) - - return decorator2 + return periodics.periodic(**new_kwargs) diff --git a/ironic/drivers/modules/inspector.py b/ironic/drivers/modules/inspector.py index 44dd525d38..e89083aad1 100644 --- a/ironic/drivers/modules/inspector.py +++ b/ironic/drivers/modules/inspector.py @@ -16,6 +16,7 @@ Modules required to work with ironic_inspector: """ import eventlet +from futurist import periodics from oslo_config import cfg from oslo_log import log as logging from oslo_utils import importutils @@ -121,8 +122,8 @@ class Inspector(base.InspectInterface): eventlet.spawn_n(_start_inspection, task.node.uuid, task.context) return states.INSPECTING - @base.driver_periodic_task(spacing=CONF.inspector.status_check_period, - enabled=CONF.inspector.enabled) + @periodics.periodic(spacing=CONF.inspector.status_check_period, + enabled=CONF.inspector.enabled) def _periodic_check_result(self, manager, context): """Periodic task checking results of inspection.""" filters = {'provision_state': states.INSPECTING} diff --git a/ironic/tests/unit/conductor/mgr_utils.py b/ironic/tests/unit/conductor/mgr_utils.py index 4403efa59a..6564d32172 100644 --- a/ironic/tests/unit/conductor/mgr_utils.py +++ b/ironic/tests/unit/conductor/mgr_utils.py @@ -17,6 +17,7 @@ """Test utils for Ironic Managers.""" +from futurist import periodics import mock from oslo_utils import strutils from oslo_utils import uuidutils @@ -175,8 +176,12 @@ class ServiceSetUpMixin(object): return self.service.del_host() - def _start_service(self): - self.service.init_host() + def _start_service(self, start_periodic_tasks=False): + if start_periodic_tasks: + self.service.init_host() + else: + with mock.patch.object(periodics, 'PeriodicWorker', autospec=True): + self.service.init_host() self.addCleanup(self._stop_service) diff --git a/ironic/tests/unit/conductor/test_base_manager.py b/ironic/tests/unit/conductor/test_base_manager.py index 5ad86a5056..ce45434533 100644 --- a/ironic/tests/unit/conductor/test_base_manager.py +++ b/ironic/tests/unit/conductor/test_base_manager.py @@ -13,6 +13,8 @@ """Test class for Ironic BaseConductorManager.""" import eventlet +import futurist +from futurist import periodics import mock from oslo_config import cfg from oslo_db import exception as db_exception @@ -23,6 +25,7 @@ from ironic.conductor import base_manager from ironic.conductor import manager from ironic.drivers import base as drivers_base from ironic import objects +from ironic.tests import base as tests_base from ironic.tests.unit.conductor import mgr_utils from ironic.tests.unit.db import base as tests_db_base from ironic.tests.unit.objects import utils as obj_utils @@ -86,6 +89,7 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): res = objects.Conductor.get_by_hostname(self.context, self.hostname) self.assertEqual(init_names, res['drivers']) + self._stop_service() # verify that restart registers new driver names self.config(enabled_drivers=restart_names) @@ -98,12 +102,10 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): @mock.patch.object(driver_factory.DriverFactory, '__getitem__') def test_start_registers_driver_specific_tasks(self, get_mock): init_names = ['fake1'] - expected_name = 'ironic.tests.unit.conductor.test_base_manager.task' - expected_name2 = 'ironic.tests.unit.conductor.test_base_manager.iface' self.config(enabled_drivers=init_names) class TestInterface(object): - @drivers_base.driver_periodic_task(spacing=100500) + @periodics.periodic(spacing=100500) def iface(self): pass @@ -113,28 +115,27 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): iface = TestInterface() - @drivers_base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task(self, context): pass + @drivers_base.driver_periodic_task() + def deprecated_task(self, context): + pass + obj = Driver() - self.assertTrue(obj.task._periodic_enabled) get_mock.return_value = mock.Mock(obj=obj) with mock.patch.object( driver_factory.DriverFactory()._extension_manager, 'names') as mock_names: mock_names.return_value = init_names - self._start_service() - tasks = dict(self.service._periodic_tasks) - self.assertEqual(obj.task, tasks[expected_name]) - self.assertEqual(obj.iface.iface, tasks[expected_name2]) - self.assertEqual(42, - self.service._periodic_spacing[expected_name]) - self.assertEqual(100500, - self.service._periodic_spacing[expected_name2]) - self.assertIn(expected_name, self.service._periodic_last_run) - self.assertIn(expected_name2, self.service._periodic_last_run) + self._start_service(start_periodic_tasks=True) + + tasks = {c[0] for c in self.service._periodic_task_callables} + for t in (obj.task, obj.iface.iface, obj.deprecated_task): + self.assertTrue(periodics.is_periodic(t)) + self.assertIn(t, tasks) @mock.patch.object(driver_factory.DriverFactory, '__init__') def test_start_fails_on_missing_driver(self, mock_df): @@ -154,6 +155,17 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): self.service.init_host) self.assertTrue(log_mock.error.called) + def test_prevent_double_start(self): + self._start_service() + self.assertRaisesRegexp(RuntimeError, 'already running', + self.service.init_host) + + @mock.patch.object(base_manager, 'LOG') + def test_warning_on_low_workers_pool(self, log_mock): + CONF.set_override('workers_pool_size', 3, 'conductor') + self._start_service() + self.assertTrue(log_mock.warning.called) + @mock.patch.object(eventlet.greenpool.GreenPool, 'waitall') def test_del_host_waits_on_workerpool(self, wait_mock): self._start_service() @@ -185,3 +197,23 @@ class KeepAliveTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): mock_is_set.side_effect = [False, False, False, True] self.service._conductor_service_record_keepalive() self.assertEqual(3, mock_touch.call_count) + + +class ManagerSpawnWorkerTestCase(tests_base.TestCase): + def setUp(self): + super(ManagerSpawnWorkerTestCase, self).setUp() + self.service = manager.ConductorManager('hostname', 'test-topic') + self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor) + self.service._executor = self.executor + + def test__spawn_worker(self): + self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow') + + self.executor.submit.assert_called_once_with( + 'fake', 1, 2, foo='bar', cat='meow') + + def test__spawn_worker_none_free(self): + self.executor.submit.side_effect = futurist.RejectedSubmission() + + self.assertRaises(exception.NoFreeConductorWorker, + self.service._spawn_worker, 'fake') diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index 3c6f27b5a2..4d0c3e8290 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -70,7 +70,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin, self.service.change_node_power_state(self.context, node.uuid, states.POWER_ON) - self.service._worker_pool.waitall() + self._stop_service() get_power_mock.assert_called_once_with(mock.ANY) node.refresh() @@ -103,7 +103,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin, # In this test worker should not be spawned, but waiting to make sure # the below perform_mock assertion is valid. - self.service._worker_pool.waitall() + self._stop_service() self.assertFalse(pwr_act_mock.called, 'node_power_action has been ' 'unexpectedly called.') # Verify existing reservation wasn't broken. @@ -162,7 +162,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin, self.service.change_node_power_state(self.context, node.uuid, new_state) - self.service._worker_pool.waitall() + self._stop_service() get_power_mock.assert_called_once_with(mock.ANY) set_power_mock.assert_called_once_with(mock.ANY, new_state) @@ -298,7 +298,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, 'first_method', 'POST', info) # Waiting to make sure the below assertions are valid. - self.service._worker_pool.waitall() + self._stop_service() # Assert spawn_after was called self.assertTrue(mock_spawn.called) @@ -320,7 +320,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, 'third_method_sync', 'POST', info) # Waiting to make sure the below assertions are valid. - self.service._worker_pool.waitall() + self._stop_service() # Assert no workers were used self.assertFalse(mock_spawn.called) @@ -438,7 +438,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) # Waiting to make sure the below assertions are valid. - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertIsNone(node.last_error) @@ -715,7 +715,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, provision_state=states.AVAILABLE) self.service.do_node_deploy(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYING, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -745,7 +745,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, driver_internal_info={'is_whole_disk_image': False}) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYING, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -774,7 +774,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, instance_info={'image_source': uuidutils.generate_uuid()}) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYWAIT, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -798,7 +798,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.NOSTATE) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state) @@ -822,7 +822,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.NOSTATE) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state) @@ -845,7 +845,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.NOSTATE) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state) @@ -893,7 +893,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) @@ -1049,7 +1049,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0)) self.service._check_deploy_timeouts(self.context) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYFAIL, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -1067,7 +1067,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0)) self.service._check_cleanwait_timeouts(self.context) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANFAIL, node.provision_state) self.assertEqual(tgt_prov_state, node.target_provision_state) @@ -1162,8 +1162,9 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.AVAILABLE, driver_internal_info={'is_whole_disk_image': False}) + self._start_service() self.service.do_node_tear_down(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Node will be moved to AVAILABLE after cleaning, not tested here self.assertEqual(states.CLEANING, node.provision_state) @@ -1176,7 +1177,6 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, def test__do_node_tear_down_from_valid_states(self): valid_states = [states.ACTIVE, states.DEPLOYWAIT, states.DEPLOYFAIL, states.ERROR] - self._start_service() for state in valid_states: self._test_do_node_tear_down_from_state(state) @@ -1207,7 +1207,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Assert instance_info/driver_internal_info was not touched self.assertEqual(fake_instance_info, node.instance_info) @@ -1236,7 +1236,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, 'provide') # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) @@ -1463,7 +1463,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, clean_steps) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() mock_validate.assert_called_once_with(mock.ANY) mock_spawn.assert_called_with(self.service._do_node_clean, mock.ANY, clean_steps) @@ -1492,9 +1492,6 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.service.continue_node_clean, self.context, node.uuid) - self.service._worker_pool.waitall() - node.refresh() - @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker') def test_continue_node_clean_wrong_state(self, mock_spawn): # Test the appropriate exception is raised if node isn't already @@ -1511,7 +1508,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.service.continue_node_clean, self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) @@ -1533,7 +1530,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, clean_step=self.clean_steps[0]) self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANING, node.provision_state) self.assertEqual(tgt_prv_state, node.target_provision_state) @@ -1561,7 +1558,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, driver_internal_info=driver_info, clean_step=self.clean_steps[0]) self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() if skip: expected_step_index = 1 @@ -1591,7 +1588,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANFAIL, node.provision_state) self.assertEqual(tgt_prov_state, node.target_provision_state) @@ -1619,7 +1616,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(tgt_prov_state, node.provision_state) self.assertIsNone(node.target_provision_state) @@ -1667,7 +1664,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, with task_manager.acquire( self.context, node.uuid, shared=False) as task: self.service._do_node_clean(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Assert that the node was moved to available without cleaning @@ -1779,7 +1776,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_node_clean(task, clean_steps=clean_steps) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_validate.assert_called_once_with(task) @@ -1827,7 +1824,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANWAIT, node.provision_state) @@ -1868,7 +1865,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, self.next_clean_step_index) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANWAIT, node.provision_state) @@ -1907,7 +1904,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, None) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Cleaning should be complete without calling additional steps @@ -1947,7 +1944,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Cleaning should be complete @@ -1992,7 +1989,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.service._do_next_clean_step(task, 0) tear_mock.assert_called_once_with(task.driver.deploy, task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure we go to CLEANFAIL, clear clean_steps @@ -2034,7 +2031,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure we go to CLEANFAIL, clear clean_steps @@ -2075,7 +2072,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, None) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Cleaning should be complete without calling additional steps @@ -2114,7 +2111,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure we go to CLEANFAIL, clear clean_steps @@ -2232,7 +2229,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, self.context, node['id'], shared=False) as task: self.service._do_node_verify(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_validate.assert_called_once_with(task) @@ -2261,7 +2258,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, self.context, node['id'], shared=False) as task: self.service._do_node_verify(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_validate.assert_called_once_with(task) @@ -2289,7 +2286,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, self.context, node['id'], shared=False) as task: self.service._do_node_verify(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_get_power_state.assert_called_once_with(task) @@ -2394,14 +2391,14 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): self.context, node.uuid, True) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY) def test_set_console_mode_enabled(self): node = obj_utils.create_test_node(self.context, driver='fake') self._start_service() self.service.set_console_mode(self.context, node.uuid, True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertTrue(node.console_enabled) @@ -2409,7 +2406,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): node = obj_utils.create_test_node(self.context, driver='fake') self._start_service() self.service.set_console_mode(self.context, node.uuid, False) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertFalse(node.console_enabled) @@ -2425,7 +2422,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.UnsupportedDriverExtension, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() def test_set_console_mode_validation_fail(self): @@ -2449,7 +2446,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): 'start_console') as mock_sc: mock_sc.side_effect = exception.IronicException('test-error') self.service.set_console_mode(self.context, node.uuid, True) - self.service._worker_pool.waitall() + self._stop_service() mock_sc.assert_called_once_with(mock.ANY) node.refresh() self.assertIsNotNone(node.last_error) @@ -2463,7 +2460,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): 'stop_console') as mock_sc: mock_sc.side_effect = exception.IronicException('test-error') self.service.set_console_mode(self.context, node.uuid, False) - self.service._worker_pool.waitall() + self._stop_service() mock_sc.assert_called_once_with(mock.ANY) node.refresh() self.assertIsNotNone(node.last_error) @@ -2475,7 +2472,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): with mock.patch.object(self.driver.console, 'start_console') as mock_sc: self.service.set_console_mode(self.context, node.uuid, True) - self.service._worker_pool.waitall() + self._stop_service() self.assertFalse(mock_sc.called) def test_disable_console_already_disabled(self): @@ -2485,7 +2482,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): with mock.patch.object(self.driver.console, 'stop_console') as mock_sc: self.service.set_console_mode(self.context, node.uuid, False) - self.service._worker_pool.waitall() + self._stop_service() self.assertFalse(mock_sc.called) def test_get_console(self): @@ -3065,32 +3062,6 @@ class RaidTestCases(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): self.assertEqual(exception.InvalidParameterValue, exc.exc_info[0]) -class ManagerSpawnWorkerTestCase(tests_base.TestCase): - def setUp(self): - super(ManagerSpawnWorkerTestCase, self).setUp() - self.service = manager.ConductorManager('hostname', 'test-topic') - - def test__spawn_worker(self): - worker_pool = mock.Mock(spec_set=['free', 'spawn']) - worker_pool.free.return_value = True - self.service._worker_pool = worker_pool - - self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow') - - worker_pool.spawn.assert_called_once_with( - 'fake', 1, 2, foo='bar', cat='meow') - - def test__spawn_worker_none_free(self): - worker_pool = mock.Mock(spec_set=['free', 'spawn']) - worker_pool.free.return_value = False - self.service._worker_pool = worker_pool - - self.assertRaises(exception.NoFreeConductorWorker, - self.service._spawn_worker, 'fake') - - self.assertFalse(worker_pool.spawn.called) - - @mock.patch.object(conductor_utils, 'node_power_action') class ManagerDoSyncPowerStateTestCase(tests_db_base.DbTestCase): def setUp(self): @@ -4184,7 +4155,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin, inspection_started_at=datetime.datetime(2000, 1, 1, 0, 0)) self.service._check_inspect_timeouts(self.context) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.INSPECTFAIL, node.provision_state) self.assertEqual(states.MANAGEABLE, node.target_provision_state) @@ -4207,7 +4178,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin, self.context, node.uuid) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) diff --git a/ironic/tests/unit/conductor/test_task_manager.py b/ironic/tests/unit/conductor/test_task_manager.py index 093bdecdfc..7aadc7bcba 100644 --- a/ironic/tests/unit/conductor/test_task_manager.py +++ b/ironic/tests/unit/conductor/test_task_manager.py @@ -17,8 +17,6 @@ """Tests for :class:`ironic.conductor.task_manager`.""" -import eventlet -from eventlet import greenpool import mock from oslo_utils import uuidutils @@ -47,6 +45,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): self.config(node_locked_retry_attempts=1, group='conductor') self.config(node_locked_retry_interval=0, group='conductor') self.node = obj_utils.create_test_node(self.context) + self.future_mock = mock.Mock(spec=['cancel', 'add_done_callback']) def test_excl_lock(self, get_portgroups_mock, get_ports_mock, get_driver_mock, reserve_mock, release_mock, @@ -389,8 +388,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): def test_spawn_after( self, get_portgroups_mock, get_ports_mock, get_driver_mock, reserve_mock, release_mock, node_get_mock): - thread_mock = mock.Mock(spec_set=['link', 'cancel']) - spawn_mock = mock.Mock(return_value=thread_mock) + spawn_mock = mock.Mock(return_value=self.future_mock) task_release_mock = mock.Mock() reserve_mock.return_value = self.node @@ -399,9 +397,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): task.release_resources = task_release_mock spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') - thread_mock.link.assert_called_once_with( + self.future_mock.add_done_callback.assert_called_once_with( task._thread_release_resources) - self.assertFalse(thread_mock.cancel.called) + self.assertFalse(self.future_mock.cancel.called) # Since we mocked link(), we're testing that __exit__ didn't # release resources pending the finishing of the background # thread @@ -444,9 +442,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): def test_spawn_after_link_fails( self, get_portgroups_mock, get_ports_mock, get_driver_mock, reserve_mock, release_mock, node_get_mock): - thread_mock = mock.Mock(spec_set=['link', 'cancel']) - thread_mock.link.side_effect = exception.IronicException('foo') - spawn_mock = mock.Mock(return_value=thread_mock) + self.future_mock.add_done_callback.side_effect = ( + exception.IronicException('foo')) + spawn_mock = mock.Mock(return_value=self.future_mock) task_release_mock = mock.Mock() thr_release_mock = mock.Mock(spec_set=[]) reserve_mock.return_value = self.node @@ -459,8 +457,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): self.assertRaises(exception.IronicException, _test_it) spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') - thread_mock.link.assert_called_once_with(thr_release_mock) - thread_mock.cancel.assert_called_once_with() + self.future_mock.add_done_callback.assert_called_once_with( + thr_release_mock) + self.future_mock.cancel.assert_called_once_with() task_release_mock.assert_called_once_with() def test_spawn_after_on_error_hook( @@ -659,75 +658,3 @@ class ExclusiveLockDecoratorTestCase(tests_base.TestCase): _req_excl_lock_method, *self.args_task_second, **self.kwargs) - - -class TaskManagerGreenThreadTestCase(tests_base.TestCase): - """Class to assert our assumptions about greenthread behavior.""" - def test_gt_link_callback_added_during_execution(self): - pool = greenpool.GreenPool() - q1 = eventlet.Queue() - q2 = eventlet.Queue() - - def func(): - q1.put(None) - q2.get() - - link_callback = mock.Mock() - - thread = pool.spawn(func) - q1.get() - thread.link(link_callback) - q2.put(None) - pool.waitall() - link_callback.assert_called_once_with(thread) - - def test_gt_link_callback_added_after_execution(self): - pool = greenpool.GreenPool() - link_callback = mock.Mock() - - thread = pool.spawn(lambda: None) - pool.waitall() - thread.link(link_callback) - link_callback.assert_called_once_with(thread) - - def test_gt_link_callback_exception_inside_thread(self): - pool = greenpool.GreenPool() - q1 = eventlet.Queue() - q2 = eventlet.Queue() - - def func(): - q1.put(None) - q2.get() - raise Exception() - - link_callback = mock.Mock() - - thread = pool.spawn(func) - q1.get() - thread.link(link_callback) - q2.put(None) - pool.waitall() - link_callback.assert_called_once_with(thread) - - def test_gt_link_callback_added_after_exception_inside_thread(self): - pool = greenpool.GreenPool() - - def func(): - raise Exception() - - link_callback = mock.Mock() - - thread = pool.spawn(func) - pool.waitall() - thread.link(link_callback) - - link_callback.assert_called_once_with(thread) - - def test_gt_cancel_doesnt_run_thread(self): - pool = greenpool.GreenPool() - func = mock.Mock() - thread = pool.spawn(func) - thread.link(lambda t: None) - thread.cancel() - pool.waitall() - self.assertFalse(func.called) diff --git a/ironic/tests/unit/drivers/test_base.py b/ironic/tests/unit/drivers/test_base.py index 4f38b93212..55d647ed02 100644 --- a/ironic/tests/unit/drivers/test_base.py +++ b/ironic/tests/unit/drivers/test_base.py @@ -15,7 +15,7 @@ import json -import eventlet +from futurist import periodics import mock from ironic.common import exception @@ -85,36 +85,21 @@ class PassthruDecoratorTestCase(base.TestCase): inst2.driver_routes['driver_noexception']['func']) -@mock.patch.object(eventlet.greenthread, 'spawn_n', autospec=True, - side_effect=lambda func, *args, **kw: func(*args, **kw)) class DriverPeriodicTaskTestCase(base.TestCase): - def test(self, spawn_mock): + def test(self): method_mock = mock.MagicMock(spec_set=[]) - function_mock = mock.MagicMock(spec_set=[]) class TestClass(object): @driver_base.driver_periodic_task(spacing=42) def method(self, foo, bar=None): method_mock(foo, bar=bar) - @driver_base.driver_periodic_task(spacing=100, parallel=False) - def function(): - function_mock() - obj = TestClass() self.assertEqual(42, obj.method._periodic_spacing) - self.assertTrue(obj.method._periodic_task) - self.assertEqual('ironic.tests.unit.drivers.test_base.method', - obj.method._periodic_name) - self.assertEqual('ironic.tests.unit.drivers.test_base.function', - function._periodic_name) + self.assertTrue(periodics.is_periodic(obj.method)) obj.method(1, bar=2) method_mock.assert_called_once_with(1, bar=2) - self.assertEqual(1, spawn_mock.call_count) - function() - function_mock.assert_called_once_with() - self.assertEqual(1, spawn_mock.call_count) class CleanStepDecoratorTestCase(base.TestCase): diff --git a/releasenotes/notes/futurist-e9c55699f479f97a.yaml b/releasenotes/notes/futurist-e9c55699f479f97a.yaml new file mode 100644 index 0000000000..4423af5c31 --- /dev/null +++ b/releasenotes/notes/futurist-e9c55699f479f97a.yaml @@ -0,0 +1,16 @@ +--- +prelude: > + This release features switch to Oslo Futurist library for asynchronous + thread execution and periodic tasks. Main benefit is that periodic tasks + are now executed truly in parallel, and not sequentially in one + green thread. +upgrade: + - Configuration option "workers_pool_size" can no longer be less or equal + to 2. Please set it to greater value (the default is 100) before update. +deprecations: + - Configuration option "periodic_interval" is deprecated. + - Using "driver_periodic_task" decorator is deprecated. Please update your + out-of-tree drivers to use "periodics.periodic" decorator from Futurist + library. +fixes: + - Periodic tasks are no longer executed all in one thread. diff --git a/requirements.txt b/requirements.txt index 617859970f..2e22d49ee5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,3 +43,4 @@ retrying!=1.3.0,>=1.2.3 # Apache-2.0 oslo.versionedobjects>=1.5.0 # Apache-2.0 jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT psutil<2.0.0,>=1.1.1 # BSD +futurist>=0.11.0 # Apache-2.0