diff --git a/doc/source/dev/architecture.rst b/doc/source/dev/architecture.rst index e055c2479a..26dda4dfc8 100644 --- a/doc/source/dev/architecture.rst +++ b/doc/source/dev/architecture.rst @@ -77,12 +77,14 @@ Driver-Specific Periodic Tasks Drivers may run their own periodic tasks, i.e. actions run repeatedly after a certain amount of time. Such task is created by decorating a method on the -driver itself or on any interface with driver_periodic_task_ decorator, e.g. +driver itself or on any interface with periodic_ decorator, e.g. :: + from futurist import periodics + class FakePower(base.PowerInterface): - @base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task(self, manager, context): pass # do something @@ -90,7 +92,7 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g. def __init__(self): self.power = FakePower() - @base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task2(self, manager, context): pass # do something @@ -98,21 +100,6 @@ driver itself or on any interface with driver_periodic_task_ decorator, e.g. Here the ``spacing`` argument is a period in seconds for a given periodic task. For example 'spacing=5' means every 5 seconds. -.. note:: - The ``parallel`` argument may be passed to driver_periodic_task_. - If it's set to False, this task will be run in the periodic task loop, - rather than a separate greenthread. - - This is deprecated as of Liberty release, and the parallel argument will be - ignored starting in the Mitaka cycle, as such task would prevent all other - periodic tasks from starting while it is running. - -.. note:: - By default periodic task names are derived from method names, - so they should be unique within a Python module. - Use ``name`` argument to driver_periodic_task_ to override - automatically generated name. - Message Routing =============== @@ -137,4 +124,4 @@ driver actions such as take-over or clean-up. .. _DB API: ../api/ironic.db.api.html .. _diskimage-builder: https://github.com/openstack/diskimage-builder .. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html -.. _driver_periodic_task: ../api/ironic.drivers.base.html#ironic.drivers.base.driver_periodic_task +.. _periodic: http://docs.openstack.org/developer/futurist/api.html#futurist.periodics.periodic diff --git a/etc/ironic/ironic.conf.sample b/etc/ironic/ironic.conf.sample index 60f180b66e..30cbb6551a 100644 --- a/etc/ironic/ironic.conf.sample +++ b/etc/ironic/ironic.conf.sample @@ -117,7 +117,8 @@ # Options defined in ironic.common.service # -# Seconds between running periodic tasks. (integer value) +# Default interval for running driver periodic tasks. (integer +# value) #periodic_interval=60 # Name of this node. This can be an opaque identifier. It is @@ -596,7 +597,9 @@ # Options defined in ironic.conductor.base_manager # -# The size of the workers greenthread pool. (integer value) +# The size of the workers greenthread pool. Note that 2 +# threads will be reserved by the conductor itself for +# handling heart beats and periodic tasks. (integer value) #workers_pool_size=100 # Seconds between conductor heart beats. (integer value) diff --git a/ironic/common/service.py b/ironic/common/service.py index 01333c9642..71c8e30ab5 100644 --- a/ironic/common/service.py +++ b/ironic/common/service.py @@ -40,7 +40,8 @@ from ironic.objects import base as objects_base service_opts = [ cfg.IntOpt('periodic_interval', default=60, - help=_('Seconds between running periodic tasks.')), + help=_('Default interval for running driver periodic tasks.'), + deprecated_for_removal=True), cfg.StrOpt('host', default=socket.getfqdn(), help=_('Name of this node. This can be an opaque identifier. ' @@ -79,11 +80,7 @@ class RPCService(service.Service): self.rpcserver.start() self.handle_signal() - self.manager.init_host() - self.tg.add_dynamic_timer( - self.manager.periodic_tasks, - periodic_interval_max=CONF.periodic_interval, - context=admin_context) + self.manager.init_host(admin_context) LOG.info(_LI('Created RPC server for service %(service)s on host ' '%(host)s.'), diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py index 48abbbe4ba..e4ace43cca 100644 --- a/ironic/conductor/base_manager.py +++ b/ironic/conductor/base_manager.py @@ -15,13 +15,13 @@ import inspect import threading -from eventlet import greenpool -from oslo_concurrency import lockutils +import futurist +from futurist import periodics +from futurist import rejection from oslo_config import cfg from oslo_context import context as ironic_context from oslo_db import exception as db_exception from oslo_log import log -from oslo_service import periodic_task from oslo_utils import excutils from ironic.common import driver_factory @@ -40,8 +40,10 @@ from ironic.db import api as dbapi conductor_opts = [ cfg.IntOpt('workers_pool_size', - default=100, - help=_('The size of the workers greenthread pool.')), + default=100, min=3, + help=_('The size of the workers greenthread pool. ' + 'Note that 2 threads will be reserved by the conductor ' + 'itself for handling heart beats and periodic tasks.')), cfg.IntOpt('heartbeat_interval', default=10, help=_('Seconds between conductor heart beats.')), @@ -51,18 +53,18 @@ conductor_opts = [ CONF = cfg.CONF CONF.register_opts(conductor_opts, 'conductor') LOG = log.getLogger(__name__) -WORKER_SPAWN_lOCK = "conductor_worker_spawn" -class BaseConductorManager(periodic_task.PeriodicTasks): +class BaseConductorManager(object): def __init__(self, host, topic): - super(BaseConductorManager, self).__init__(CONF) + super(BaseConductorManager, self).__init__() if not host: host = CONF.host self.host = host self.topic = topic self.notifier = rpc.get_notifier() + self._started = False def _get_driver(self, driver_name): """Get the driver. @@ -78,15 +80,29 @@ class BaseConductorManager(periodic_task.PeriodicTasks): except KeyError: raise exception.DriverNotFound(driver_name=driver_name) - def init_host(self): + def init_host(self, admin_context=None): + """Initialize the conductor host. + + :param admin_context: the admin context to pass to periodic tasks. + :raises: RuntimeError when conductor is already running + :raises: NoDriversLoaded when no drivers are enabled on the conductor + """ + if self._started: + raise RuntimeError(_('Attempt to start an already running ' + 'conductor manager')) + self.dbapi = dbapi.get_instance() self._keepalive_evt = threading.Event() """Event for the keepalive thread.""" - self._worker_pool = greenpool.GreenPool( - size=CONF.conductor.workers_pool_size) - """GreenPool of background workers for performing tasks async.""" + # TODO(dtantsur): make the threshold configurable? + rejection_func = rejection.reject_when_reached( + CONF.conductor.workers_pool_size) + self._executor = futurist.GreenThreadPoolExecutor( + max_workers=CONF.conductor.workers_pool_size, + check_and_reject=rejection_func) + """Executor for performing tasks async.""" self.ring_manager = hash.HashRingManager() """Consistent hash ring which maps drivers to conductors.""" @@ -106,15 +122,36 @@ class BaseConductorManager(periodic_task.PeriodicTasks): LOG.error(msg, self.host) raise exception.NoDriversLoaded(conductor=self.host) - # Collect driver-specific periodic tasks + # Collect driver-specific periodic tasks. + # Conductor periodic tasks accept context argument, driver periodic + # tasks accept this manager and context. We have to ensure that the + # same driver interface class is not traversed twice, otherwise + # we'll have several instances of the same task. + LOG.debug('Collecting periodic tasks') + self._periodic_task_callables = [] + periodic_task_classes = set() + self._collect_periodic_tasks(self, (admin_context,)) for driver_obj in driver_factory.drivers().values(): - self._collect_periodic_tasks(driver_obj) + self._collect_periodic_tasks(driver_obj, (self, admin_context)) for iface_name in (driver_obj.core_interfaces + driver_obj.standard_interfaces + ['vendor']): iface = getattr(driver_obj, iface_name, None) - if iface: - self._collect_periodic_tasks(iface) + if iface and iface.__class__ not in periodic_task_classes: + self._collect_periodic_tasks(iface, (self, admin_context)) + periodic_task_classes.add(iface.__class__) + + if (len(self._periodic_task_callables) > + CONF.conductor.workers_pool_size): + LOG.warning(_LW('This conductor has %(tasks)d periodic tasks ' + 'enabled, but only %(workers)d task workers ' + 'allowed by [conductor]workers_pool_size option'), + {'tasks': len(self._periodic_task_callables), + 'workers': CONF.conductor.workers_pool_size}) + + self._periodic_tasks = periodics.PeriodicWorker( + self._periodic_task_callables, + executor_factory=periodics.ExistingExecutor(self._executor)) # clear all locks held by this conductor before registering self.dbapi.clear_node_reservations_for_conductor(self.host) @@ -134,6 +171,12 @@ class BaseConductorManager(periodic_task.PeriodicTasks): update_existing=True) self.conductor = cdr + # Start periodic tasks + self._periodic_tasks_worker = self._executor.submit( + self._periodic_tasks.start, allow_empty=True) + self._periodic_tasks_worker.add_done_callback( + self._on_periodic_tasks_stop) + # NOTE(lucasagomes): If the conductor server dies abruptly # mid deployment (OMM Killer, power outage, etc...) we # can not resume the deployment even if the conductor @@ -161,10 +204,7 @@ class BaseConductorManager(periodic_task.PeriodicTasks): LOG.critical(_LC('Failed to start keepalive')) self.del_host() - def _collect_periodic_tasks(self, obj): - for n, method in inspect.getmembers(obj, inspect.ismethod): - if getattr(method, '_periodic_enabled', False): - self.add_periodic_task(method) + self._started = True def del_host(self, deregister=True): # Conductor deregistration fails if called on non-initialized @@ -190,11 +230,34 @@ class BaseConductorManager(periodic_task.PeriodicTasks): # Waiting here to give workers the chance to finish. This has the # benefit of releasing locks workers placed on nodes, as well as # having work complete normally. - self._worker_pool.waitall() + self._periodic_tasks.stop() + self._periodic_tasks.wait() + self._executor.shutdown(wait=True) + self._started = False - def periodic_tasks(self, context, raise_on_error=False): - """Periodic tasks are run at pre-specified interval.""" - return self.run_periodic_tasks(context, raise_on_error=raise_on_error) + def _collect_periodic_tasks(self, obj, args): + """Collect periodic tasks from a given object. + + Populates self._periodic_task_callables with tuples + (callable, args, kwargs). + + :param obj: object containing periodic tasks as methods + :param args: tuple with arguments to pass to every task + """ + for name, member in inspect.getmembers(obj): + if periodics.is_periodic(member): + LOG.debug('Found periodic task %(owner)s.%(member)s', + {'owner': obj.__class__.__name__, + 'member': name}) + self._periodic_task_callables.append((member, args, {})) + + def _on_periodic_tasks_stop(self, fut): + try: + fut.result() + except Exception as exc: + LOG.critical(_LC('Periodic tasks worker has failed: %s'), exc) + else: + LOG.info(_LI('Successfully shut down periodic tasks')) def iter_nodes(self, fields=None, **kwargs): """Iterate over nodes mapped to this conductor. @@ -217,7 +280,6 @@ class BaseConductorManager(periodic_task.PeriodicTasks): if self._mapped_to_this_conductor(*result[:2]): yield result - @lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-') def _spawn_worker(self, func, *args, **kwargs): """Create a greenthread to run func(*args, **kwargs). @@ -225,13 +287,13 @@ class BaseConductorManager(periodic_task.PeriodicTasks): Spawns a greenthread if there are free slots in pool, otherwise raises exception. Execution control returns immediately to the caller. - :returns: GreenThread object. + :returns: Future object. :raises: NoFreeConductorWorker if worker pool is currently full. """ - if self._worker_pool.free(): - return self._worker_pool.spawn(func, *args, **kwargs) - else: + try: + return self._executor.submit(func, *args, **kwargs) + except futurist.RejectedSubmission: raise exception.NoFreeConductorWorker() def _conductor_service_record_keepalive(self): diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index db0f337c1d..9a91d85fe7 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -46,10 +46,10 @@ import datetime import tempfile import eventlet +from futurist import periodics from oslo_config import cfg from oslo_log import log import oslo_messaging as messaging -from oslo_service import periodic_task from oslo_utils import excutils from oslo_utils import uuidutils @@ -1200,8 +1200,7 @@ class ConductorManager(base_manager.BaseConductorManager): action=action, node=node.uuid, state=node.provision_state) - @periodic_task.periodic_task( - spacing=CONF.conductor.sync_power_state_interval) + @periodics.periodic(spacing=CONF.conductor.sync_power_state_interval) def _sync_power_states(self, context): """Periodic task to sync power states for the nodes. @@ -1269,8 +1268,7 @@ class ConductorManager(base_manager.BaseConductorManager): # Yield on every iteration eventlet.sleep(0) - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_deploy_timeouts(self, context): """Periodically checks whether a deploy RPC call has timed out. @@ -1292,8 +1290,7 @@ class ConductorManager(base_manager.BaseConductorManager): self._fail_if_in_state(context, filters, states.DEPLOYWAIT, sort_key, callback_method, err_handler) - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_deploying_status(self, context): """Periodically checks the status of nodes in DEPLOYING state. @@ -1376,8 +1373,7 @@ class ConductorManager(base_manager.BaseConductorManager): task.node.conductor_affinity = self.conductor.id task.node.save() - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_cleanwait_timeouts(self, context): """Periodically checks for nodes being cleaned. @@ -1402,8 +1398,7 @@ class ConductorManager(base_manager.BaseConductorManager): last_error=last_error, keep_target_state=True) - @periodic_task.periodic_task( - spacing=CONF.conductor.sync_local_state_interval) + @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval) def _sync_local_state(self, context): """Perform any actions necessary to sync local state. @@ -1826,8 +1821,7 @@ class ConductorManager(base_manager.BaseConductorManager): driver = self._get_driver(driver_name) return driver.get_properties() - @periodic_task.periodic_task( - spacing=CONF.conductor.send_sensor_data_interval) + @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval) def _send_sensor_data(self, context): """Periodically sends sensor data to Ceilometer.""" # do nothing if send_sensor_data option is False @@ -2061,8 +2055,7 @@ class ConductorManager(base_manager.BaseConductorManager): action='inspect', node=task.node.uuid, state=task.node.provision_state) - @periodic_task.periodic_task( - spacing=CONF.conductor.check_provision_state_interval) + @periodics.periodic(spacing=CONF.conductor.check_provision_state_interval) def _check_inspect_timeouts(self, context): """Periodically checks inspect_timeout and fails upon reaching it. diff --git a/ironic/conductor/task_manager.py b/ironic/conductor/task_manager.py index aac3ca12ca..5a0c3da176 100644 --- a/ironic/conductor/task_manager.py +++ b/ironic/conductor/task_manager.py @@ -383,15 +383,15 @@ class TaskManager(object): # for some reason, this is true. # All of the above are asserted in tests such that we'll # catch if eventlet ever changes this behavior. - thread = None + fut = None try: - thread = self._spawn_method(*self._spawn_args, - **self._spawn_kwargs) + fut = self._spawn_method(*self._spawn_args, + **self._spawn_kwargs) # NOTE(comstud): Trying to use a lambda here causes # the callback to not occur for some reason. This # also makes it easier to test. - thread.link(self._thread_release_resources) + fut.add_done_callback(self._thread_release_resources) # Don't unlock! The unlock will occur when the # thread finshes. return @@ -408,9 +408,9 @@ class TaskManager(object): {'method': self._on_error_method.__name__, 'node': self.node.uuid}) - if thread is not None: - # This means the link() failed for some + if fut is not None: + # This means the add_done_callback() failed for some # reason. Nuke the thread. - thread.cancel() + fut.cancel() self.release_resources() self.release_resources() diff --git a/ironic/drivers/base.py b/ironic/drivers/base.py index f71d3931a3..7b4bd60bff 100644 --- a/ironic/drivers/base.py +++ b/ironic/drivers/base.py @@ -24,9 +24,9 @@ import inspect import json import os -import eventlet +from futurist import periodics +from oslo_config import cfg from oslo_log import log as logging -from oslo_service import periodic_task from oslo_utils import excutils import six @@ -40,6 +40,10 @@ RAID_CONFIG_SCHEMA = os.path.join(os.path.dirname(__file__), 'raid_config_schema.json') +CONF = cfg.CONF +CONF.import_opt('periodic_interval', 'ironic.common.service') + + @six.add_metaclass(abc.ABCMeta) class BaseDriver(object): """Base class for all drivers. @@ -1116,45 +1120,36 @@ def clean_step(priority, abortable=False, argsinfo=None): return decorator -def driver_periodic_task(parallel=True, **other): +def driver_periodic_task(**kwargs): """Decorator for a driver-specific periodic task. + Deprecated, please use futurist directly. Example:: + from futurist import periodics + class MyDriver(base.BaseDriver): - @base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task(self, manager, context): # do some job - :param parallel: If True (default), this task is run in a separate thread. - If False, this task will be run in the conductor's periodic task - loop, rather than a separate greenthread. This parameter is - deprecated and will be ignored starting with Mitaka cycle. - :param other: arguments to pass to @periodic_task.periodic_task + :param kwargs: arguments to pass to @periodics.periodic """ - # TODO(dtantsur): drop all this magic once - # https://review.openstack.org/#/c/134303/ lands - semaphore = eventlet.semaphore.BoundedSemaphore() + LOG.warning(_LW('driver_periodic_task decorator is deprecated, please ' + 'use futurist.periodics.periodic directly')) + # Previously we accepted more arguments, make a backward compatibility + # layer for out-of-tree drivers. + new_kwargs = {} + for arg in ('spacing', 'enabled', 'run_immediately'): + try: + new_kwargs[arg] = kwargs.pop(arg) + except KeyError: + pass + new_kwargs.setdefault('spacing', CONF.periodic_interval) - def decorator2(func): - @six.wraps(func) - def wrapper(*args, **kwargs): - if parallel: - def _internal(): - with semaphore: - func(*args, **kwargs) + if kwargs: + LOG.warning(_LW('The following arguments are not supported by ' + 'futurist.periodics.periodic and are ignored: %s'), + ', '.join(kwargs)) - eventlet.greenthread.spawn_n(_internal) - else: - LOG.warning(_LW( - 'Using periodic tasks with parallel=False is deprecated, ' - '"parallel" argument will be ignored starting with ' - 'the Mitaka release')) - func(*args, **kwargs) - - # NOTE(dtantsur): name should be unique - other.setdefault('name', '%s.%s' % (func.__module__, func.__name__)) - decorator = periodic_task.periodic_task(**other) - return decorator(wrapper) - - return decorator2 + return periodics.periodic(**new_kwargs) diff --git a/ironic/drivers/modules/inspector.py b/ironic/drivers/modules/inspector.py index 44dd525d38..e89083aad1 100644 --- a/ironic/drivers/modules/inspector.py +++ b/ironic/drivers/modules/inspector.py @@ -16,6 +16,7 @@ Modules required to work with ironic_inspector: """ import eventlet +from futurist import periodics from oslo_config import cfg from oslo_log import log as logging from oslo_utils import importutils @@ -121,8 +122,8 @@ class Inspector(base.InspectInterface): eventlet.spawn_n(_start_inspection, task.node.uuid, task.context) return states.INSPECTING - @base.driver_periodic_task(spacing=CONF.inspector.status_check_period, - enabled=CONF.inspector.enabled) + @periodics.periodic(spacing=CONF.inspector.status_check_period, + enabled=CONF.inspector.enabled) def _periodic_check_result(self, manager, context): """Periodic task checking results of inspection.""" filters = {'provision_state': states.INSPECTING} diff --git a/ironic/tests/unit/conductor/mgr_utils.py b/ironic/tests/unit/conductor/mgr_utils.py index 4403efa59a..6564d32172 100644 --- a/ironic/tests/unit/conductor/mgr_utils.py +++ b/ironic/tests/unit/conductor/mgr_utils.py @@ -17,6 +17,7 @@ """Test utils for Ironic Managers.""" +from futurist import periodics import mock from oslo_utils import strutils from oslo_utils import uuidutils @@ -175,8 +176,12 @@ class ServiceSetUpMixin(object): return self.service.del_host() - def _start_service(self): - self.service.init_host() + def _start_service(self, start_periodic_tasks=False): + if start_periodic_tasks: + self.service.init_host() + else: + with mock.patch.object(periodics, 'PeriodicWorker', autospec=True): + self.service.init_host() self.addCleanup(self._stop_service) diff --git a/ironic/tests/unit/conductor/test_base_manager.py b/ironic/tests/unit/conductor/test_base_manager.py index 5ad86a5056..ce45434533 100644 --- a/ironic/tests/unit/conductor/test_base_manager.py +++ b/ironic/tests/unit/conductor/test_base_manager.py @@ -13,6 +13,8 @@ """Test class for Ironic BaseConductorManager.""" import eventlet +import futurist +from futurist import periodics import mock from oslo_config import cfg from oslo_db import exception as db_exception @@ -23,6 +25,7 @@ from ironic.conductor import base_manager from ironic.conductor import manager from ironic.drivers import base as drivers_base from ironic import objects +from ironic.tests import base as tests_base from ironic.tests.unit.conductor import mgr_utils from ironic.tests.unit.db import base as tests_db_base from ironic.tests.unit.objects import utils as obj_utils @@ -86,6 +89,7 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): res = objects.Conductor.get_by_hostname(self.context, self.hostname) self.assertEqual(init_names, res['drivers']) + self._stop_service() # verify that restart registers new driver names self.config(enabled_drivers=restart_names) @@ -98,12 +102,10 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): @mock.patch.object(driver_factory.DriverFactory, '__getitem__') def test_start_registers_driver_specific_tasks(self, get_mock): init_names = ['fake1'] - expected_name = 'ironic.tests.unit.conductor.test_base_manager.task' - expected_name2 = 'ironic.tests.unit.conductor.test_base_manager.iface' self.config(enabled_drivers=init_names) class TestInterface(object): - @drivers_base.driver_periodic_task(spacing=100500) + @periodics.periodic(spacing=100500) def iface(self): pass @@ -113,28 +115,27 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): iface = TestInterface() - @drivers_base.driver_periodic_task(spacing=42) + @periodics.periodic(spacing=42) def task(self, context): pass + @drivers_base.driver_periodic_task() + def deprecated_task(self, context): + pass + obj = Driver() - self.assertTrue(obj.task._periodic_enabled) get_mock.return_value = mock.Mock(obj=obj) with mock.patch.object( driver_factory.DriverFactory()._extension_manager, 'names') as mock_names: mock_names.return_value = init_names - self._start_service() - tasks = dict(self.service._periodic_tasks) - self.assertEqual(obj.task, tasks[expected_name]) - self.assertEqual(obj.iface.iface, tasks[expected_name2]) - self.assertEqual(42, - self.service._periodic_spacing[expected_name]) - self.assertEqual(100500, - self.service._periodic_spacing[expected_name2]) - self.assertIn(expected_name, self.service._periodic_last_run) - self.assertIn(expected_name2, self.service._periodic_last_run) + self._start_service(start_periodic_tasks=True) + + tasks = {c[0] for c in self.service._periodic_task_callables} + for t in (obj.task, obj.iface.iface, obj.deprecated_task): + self.assertTrue(periodics.is_periodic(t)) + self.assertIn(t, tasks) @mock.patch.object(driver_factory.DriverFactory, '__init__') def test_start_fails_on_missing_driver(self, mock_df): @@ -154,6 +155,17 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): self.service.init_host) self.assertTrue(log_mock.error.called) + def test_prevent_double_start(self): + self._start_service() + self.assertRaisesRegexp(RuntimeError, 'already running', + self.service.init_host) + + @mock.patch.object(base_manager, 'LOG') + def test_warning_on_low_workers_pool(self, log_mock): + CONF.set_override('workers_pool_size', 3, 'conductor') + self._start_service() + self.assertTrue(log_mock.warning.called) + @mock.patch.object(eventlet.greenpool.GreenPool, 'waitall') def test_del_host_waits_on_workerpool(self, wait_mock): self._start_service() @@ -185,3 +197,23 @@ class KeepAliveTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): mock_is_set.side_effect = [False, False, False, True] self.service._conductor_service_record_keepalive() self.assertEqual(3, mock_touch.call_count) + + +class ManagerSpawnWorkerTestCase(tests_base.TestCase): + def setUp(self): + super(ManagerSpawnWorkerTestCase, self).setUp() + self.service = manager.ConductorManager('hostname', 'test-topic') + self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor) + self.service._executor = self.executor + + def test__spawn_worker(self): + self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow') + + self.executor.submit.assert_called_once_with( + 'fake', 1, 2, foo='bar', cat='meow') + + def test__spawn_worker_none_free(self): + self.executor.submit.side_effect = futurist.RejectedSubmission() + + self.assertRaises(exception.NoFreeConductorWorker, + self.service._spawn_worker, 'fake') diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index 3c6f27b5a2..4d0c3e8290 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -70,7 +70,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin, self.service.change_node_power_state(self.context, node.uuid, states.POWER_ON) - self.service._worker_pool.waitall() + self._stop_service() get_power_mock.assert_called_once_with(mock.ANY) node.refresh() @@ -103,7 +103,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin, # In this test worker should not be spawned, but waiting to make sure # the below perform_mock assertion is valid. - self.service._worker_pool.waitall() + self._stop_service() self.assertFalse(pwr_act_mock.called, 'node_power_action has been ' 'unexpectedly called.') # Verify existing reservation wasn't broken. @@ -162,7 +162,7 @@ class ChangeNodePowerStateTestCase(mgr_utils.ServiceSetUpMixin, self.service.change_node_power_state(self.context, node.uuid, new_state) - self.service._worker_pool.waitall() + self._stop_service() get_power_mock.assert_called_once_with(mock.ANY) set_power_mock.assert_called_once_with(mock.ANY, new_state) @@ -298,7 +298,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, 'first_method', 'POST', info) # Waiting to make sure the below assertions are valid. - self.service._worker_pool.waitall() + self._stop_service() # Assert spawn_after was called self.assertTrue(mock_spawn.called) @@ -320,7 +320,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, 'third_method_sync', 'POST', info) # Waiting to make sure the below assertions are valid. - self.service._worker_pool.waitall() + self._stop_service() # Assert no workers were used self.assertFalse(mock_spawn.called) @@ -438,7 +438,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) # Waiting to make sure the below assertions are valid. - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertIsNone(node.last_error) @@ -715,7 +715,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, provision_state=states.AVAILABLE) self.service.do_node_deploy(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYING, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -745,7 +745,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, driver_internal_info={'is_whole_disk_image': False}) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYING, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -774,7 +774,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, instance_info={'image_source': uuidutils.generate_uuid()}) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYWAIT, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -798,7 +798,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.NOSTATE) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state) @@ -822,7 +822,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.NOSTATE) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state) @@ -845,7 +845,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.NOSTATE) self.service.do_node_deploy(self.context, node.uuid, rebuild=True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.ACTIVE, node.provision_state) self.assertEqual(states.NOSTATE, node.target_provision_state) @@ -893,7 +893,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) @@ -1049,7 +1049,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0)) self.service._check_deploy_timeouts(self.context) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.DEPLOYFAIL, node.provision_state) self.assertEqual(states.ACTIVE, node.target_provision_state) @@ -1067,7 +1067,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, provision_updated_at=datetime.datetime(2000, 1, 1, 0, 0)) self.service._check_cleanwait_timeouts(self.context) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANFAIL, node.provision_state) self.assertEqual(tgt_prov_state, node.target_provision_state) @@ -1162,8 +1162,9 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, target_provision_state=states.AVAILABLE, driver_internal_info={'is_whole_disk_image': False}) + self._start_service() self.service.do_node_tear_down(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Node will be moved to AVAILABLE after cleaning, not tested here self.assertEqual(states.CLEANING, node.provision_state) @@ -1176,7 +1177,6 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, def test__do_node_tear_down_from_valid_states(self): valid_states = [states.ACTIVE, states.DEPLOYWAIT, states.DEPLOYFAIL, states.ERROR] - self._start_service() for state in valid_states: self._test_do_node_tear_down_from_state(state) @@ -1207,7 +1207,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Assert instance_info/driver_internal_info was not touched self.assertEqual(fake_instance_info, node.instance_info) @@ -1236,7 +1236,7 @@ class DoNodeDeployTearDownTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, 'provide') # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) @@ -1463,7 +1463,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, clean_steps) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() mock_validate.assert_called_once_with(mock.ANY) mock_spawn.assert_called_with(self.service._do_node_clean, mock.ANY, clean_steps) @@ -1492,9 +1492,6 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.service.continue_node_clean, self.context, node.uuid) - self.service._worker_pool.waitall() - node.refresh() - @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker') def test_continue_node_clean_wrong_state(self, mock_spawn): # Test the appropriate exception is raised if node isn't already @@ -1511,7 +1508,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.service.continue_node_clean, self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) @@ -1533,7 +1530,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, clean_step=self.clean_steps[0]) self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANING, node.provision_state) self.assertEqual(tgt_prv_state, node.target_provision_state) @@ -1561,7 +1558,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, driver_internal_info=driver_info, clean_step=self.clean_steps[0]) self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() if skip: expected_step_index = 1 @@ -1591,7 +1588,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANFAIL, node.provision_state) self.assertEqual(tgt_prov_state, node.target_provision_state) @@ -1619,7 +1616,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self._start_service() self.service.continue_node_clean(self.context, node.uuid) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(tgt_prov_state, node.provision_state) self.assertIsNone(node.target_provision_state) @@ -1667,7 +1664,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, with task_manager.acquire( self.context, node.uuid, shared=False) as task: self.service._do_node_clean(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Assert that the node was moved to available without cleaning @@ -1779,7 +1776,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_node_clean(task, clean_steps=clean_steps) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_validate.assert_called_once_with(task) @@ -1827,7 +1824,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANWAIT, node.provision_state) @@ -1868,7 +1865,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, self.next_clean_step_index) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.CLEANWAIT, node.provision_state) @@ -1907,7 +1904,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, None) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Cleaning should be complete without calling additional steps @@ -1947,7 +1944,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Cleaning should be complete @@ -1992,7 +1989,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.service._do_next_clean_step(task, 0) tear_mock.assert_called_once_with(task.driver.deploy, task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure we go to CLEANFAIL, clear clean_steps @@ -2034,7 +2031,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure we go to CLEANFAIL, clear clean_steps @@ -2075,7 +2072,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, None) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Cleaning should be complete without calling additional steps @@ -2114,7 +2111,7 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, self.context, node.uuid, shared=False) as task: self.service._do_next_clean_step(task, 0) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure we go to CLEANFAIL, clear clean_steps @@ -2232,7 +2229,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, self.context, node['id'], shared=False) as task: self.service._do_node_verify(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_validate.assert_called_once_with(task) @@ -2261,7 +2258,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, self.context, node['id'], shared=False) as task: self.service._do_node_verify(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_validate.assert_called_once_with(task) @@ -2289,7 +2286,7 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, self.context, node['id'], shared=False) as task: self.service._do_node_verify(task) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() mock_get_power_state.assert_called_once_with(task) @@ -2394,14 +2391,14 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): self.context, node.uuid, True) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY) def test_set_console_mode_enabled(self): node = obj_utils.create_test_node(self.context, driver='fake') self._start_service() self.service.set_console_mode(self.context, node.uuid, True) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertTrue(node.console_enabled) @@ -2409,7 +2406,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): node = obj_utils.create_test_node(self.context, driver='fake') self._start_service() self.service.set_console_mode(self.context, node.uuid, False) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertFalse(node.console_enabled) @@ -2425,7 +2422,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.UnsupportedDriverExtension, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() def test_set_console_mode_validation_fail(self): @@ -2449,7 +2446,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): 'start_console') as mock_sc: mock_sc.side_effect = exception.IronicException('test-error') self.service.set_console_mode(self.context, node.uuid, True) - self.service._worker_pool.waitall() + self._stop_service() mock_sc.assert_called_once_with(mock.ANY) node.refresh() self.assertIsNotNone(node.last_error) @@ -2463,7 +2460,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): 'stop_console') as mock_sc: mock_sc.side_effect = exception.IronicException('test-error') self.service.set_console_mode(self.context, node.uuid, False) - self.service._worker_pool.waitall() + self._stop_service() mock_sc.assert_called_once_with(mock.ANY) node.refresh() self.assertIsNotNone(node.last_error) @@ -2475,7 +2472,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): with mock.patch.object(self.driver.console, 'start_console') as mock_sc: self.service.set_console_mode(self.context, node.uuid, True) - self.service._worker_pool.waitall() + self._stop_service() self.assertFalse(mock_sc.called) def test_disable_console_already_disabled(self): @@ -2485,7 +2482,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): with mock.patch.object(self.driver.console, 'stop_console') as mock_sc: self.service.set_console_mode(self.context, node.uuid, False) - self.service._worker_pool.waitall() + self._stop_service() self.assertFalse(mock_sc.called) def test_get_console(self): @@ -3065,32 +3062,6 @@ class RaidTestCases(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): self.assertEqual(exception.InvalidParameterValue, exc.exc_info[0]) -class ManagerSpawnWorkerTestCase(tests_base.TestCase): - def setUp(self): - super(ManagerSpawnWorkerTestCase, self).setUp() - self.service = manager.ConductorManager('hostname', 'test-topic') - - def test__spawn_worker(self): - worker_pool = mock.Mock(spec_set=['free', 'spawn']) - worker_pool.free.return_value = True - self.service._worker_pool = worker_pool - - self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow') - - worker_pool.spawn.assert_called_once_with( - 'fake', 1, 2, foo='bar', cat='meow') - - def test__spawn_worker_none_free(self): - worker_pool = mock.Mock(spec_set=['free', 'spawn']) - worker_pool.free.return_value = False - self.service._worker_pool = worker_pool - - self.assertRaises(exception.NoFreeConductorWorker, - self.service._spawn_worker, 'fake') - - self.assertFalse(worker_pool.spawn.called) - - @mock.patch.object(conductor_utils, 'node_power_action') class ManagerDoSyncPowerStateTestCase(tests_db_base.DbTestCase): def setUp(self): @@ -4184,7 +4155,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin, inspection_started_at=datetime.datetime(2000, 1, 1, 0, 0)) self.service._check_inspect_timeouts(self.context) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() self.assertEqual(states.INSPECTFAIL, node.provision_state) self.assertEqual(states.MANAGEABLE, node.target_provision_state) @@ -4207,7 +4178,7 @@ class NodeInspectHardware(mgr_utils.ServiceSetUpMixin, self.context, node.uuid) # Compare true exception hidden by @messaging.expected_exceptions self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0]) - self.service._worker_pool.waitall() + self._stop_service() node.refresh() # Make sure things were rolled back self.assertEqual(prv_state, node.provision_state) diff --git a/ironic/tests/unit/conductor/test_task_manager.py b/ironic/tests/unit/conductor/test_task_manager.py index 093bdecdfc..7aadc7bcba 100644 --- a/ironic/tests/unit/conductor/test_task_manager.py +++ b/ironic/tests/unit/conductor/test_task_manager.py @@ -17,8 +17,6 @@ """Tests for :class:`ironic.conductor.task_manager`.""" -import eventlet -from eventlet import greenpool import mock from oslo_utils import uuidutils @@ -47,6 +45,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): self.config(node_locked_retry_attempts=1, group='conductor') self.config(node_locked_retry_interval=0, group='conductor') self.node = obj_utils.create_test_node(self.context) + self.future_mock = mock.Mock(spec=['cancel', 'add_done_callback']) def test_excl_lock(self, get_portgroups_mock, get_ports_mock, get_driver_mock, reserve_mock, release_mock, @@ -389,8 +388,7 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): def test_spawn_after( self, get_portgroups_mock, get_ports_mock, get_driver_mock, reserve_mock, release_mock, node_get_mock): - thread_mock = mock.Mock(spec_set=['link', 'cancel']) - spawn_mock = mock.Mock(return_value=thread_mock) + spawn_mock = mock.Mock(return_value=self.future_mock) task_release_mock = mock.Mock() reserve_mock.return_value = self.node @@ -399,9 +397,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): task.release_resources = task_release_mock spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') - thread_mock.link.assert_called_once_with( + self.future_mock.add_done_callback.assert_called_once_with( task._thread_release_resources) - self.assertFalse(thread_mock.cancel.called) + self.assertFalse(self.future_mock.cancel.called) # Since we mocked link(), we're testing that __exit__ didn't # release resources pending the finishing of the background # thread @@ -444,9 +442,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): def test_spawn_after_link_fails( self, get_portgroups_mock, get_ports_mock, get_driver_mock, reserve_mock, release_mock, node_get_mock): - thread_mock = mock.Mock(spec_set=['link', 'cancel']) - thread_mock.link.side_effect = exception.IronicException('foo') - spawn_mock = mock.Mock(return_value=thread_mock) + self.future_mock.add_done_callback.side_effect = ( + exception.IronicException('foo')) + spawn_mock = mock.Mock(return_value=self.future_mock) task_release_mock = mock.Mock() thr_release_mock = mock.Mock(spec_set=[]) reserve_mock.return_value = self.node @@ -459,8 +457,9 @@ class TaskManagerTestCase(tests_db_base.DbTestCase): self.assertRaises(exception.IronicException, _test_it) spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') - thread_mock.link.assert_called_once_with(thr_release_mock) - thread_mock.cancel.assert_called_once_with() + self.future_mock.add_done_callback.assert_called_once_with( + thr_release_mock) + self.future_mock.cancel.assert_called_once_with() task_release_mock.assert_called_once_with() def test_spawn_after_on_error_hook( @@ -659,75 +658,3 @@ class ExclusiveLockDecoratorTestCase(tests_base.TestCase): _req_excl_lock_method, *self.args_task_second, **self.kwargs) - - -class TaskManagerGreenThreadTestCase(tests_base.TestCase): - """Class to assert our assumptions about greenthread behavior.""" - def test_gt_link_callback_added_during_execution(self): - pool = greenpool.GreenPool() - q1 = eventlet.Queue() - q2 = eventlet.Queue() - - def func(): - q1.put(None) - q2.get() - - link_callback = mock.Mock() - - thread = pool.spawn(func) - q1.get() - thread.link(link_callback) - q2.put(None) - pool.waitall() - link_callback.assert_called_once_with(thread) - - def test_gt_link_callback_added_after_execution(self): - pool = greenpool.GreenPool() - link_callback = mock.Mock() - - thread = pool.spawn(lambda: None) - pool.waitall() - thread.link(link_callback) - link_callback.assert_called_once_with(thread) - - def test_gt_link_callback_exception_inside_thread(self): - pool = greenpool.GreenPool() - q1 = eventlet.Queue() - q2 = eventlet.Queue() - - def func(): - q1.put(None) - q2.get() - raise Exception() - - link_callback = mock.Mock() - - thread = pool.spawn(func) - q1.get() - thread.link(link_callback) - q2.put(None) - pool.waitall() - link_callback.assert_called_once_with(thread) - - def test_gt_link_callback_added_after_exception_inside_thread(self): - pool = greenpool.GreenPool() - - def func(): - raise Exception() - - link_callback = mock.Mock() - - thread = pool.spawn(func) - pool.waitall() - thread.link(link_callback) - - link_callback.assert_called_once_with(thread) - - def test_gt_cancel_doesnt_run_thread(self): - pool = greenpool.GreenPool() - func = mock.Mock() - thread = pool.spawn(func) - thread.link(lambda t: None) - thread.cancel() - pool.waitall() - self.assertFalse(func.called) diff --git a/ironic/tests/unit/drivers/test_base.py b/ironic/tests/unit/drivers/test_base.py index 4f38b93212..55d647ed02 100644 --- a/ironic/tests/unit/drivers/test_base.py +++ b/ironic/tests/unit/drivers/test_base.py @@ -15,7 +15,7 @@ import json -import eventlet +from futurist import periodics import mock from ironic.common import exception @@ -85,36 +85,21 @@ class PassthruDecoratorTestCase(base.TestCase): inst2.driver_routes['driver_noexception']['func']) -@mock.patch.object(eventlet.greenthread, 'spawn_n', autospec=True, - side_effect=lambda func, *args, **kw: func(*args, **kw)) class DriverPeriodicTaskTestCase(base.TestCase): - def test(self, spawn_mock): + def test(self): method_mock = mock.MagicMock(spec_set=[]) - function_mock = mock.MagicMock(spec_set=[]) class TestClass(object): @driver_base.driver_periodic_task(spacing=42) def method(self, foo, bar=None): method_mock(foo, bar=bar) - @driver_base.driver_periodic_task(spacing=100, parallel=False) - def function(): - function_mock() - obj = TestClass() self.assertEqual(42, obj.method._periodic_spacing) - self.assertTrue(obj.method._periodic_task) - self.assertEqual('ironic.tests.unit.drivers.test_base.method', - obj.method._periodic_name) - self.assertEqual('ironic.tests.unit.drivers.test_base.function', - function._periodic_name) + self.assertTrue(periodics.is_periodic(obj.method)) obj.method(1, bar=2) method_mock.assert_called_once_with(1, bar=2) - self.assertEqual(1, spawn_mock.call_count) - function() - function_mock.assert_called_once_with() - self.assertEqual(1, spawn_mock.call_count) class CleanStepDecoratorTestCase(base.TestCase): diff --git a/releasenotes/notes/futurist-e9c55699f479f97a.yaml b/releasenotes/notes/futurist-e9c55699f479f97a.yaml new file mode 100644 index 0000000000..4423af5c31 --- /dev/null +++ b/releasenotes/notes/futurist-e9c55699f479f97a.yaml @@ -0,0 +1,16 @@ +--- +prelude: > + This release features switch to Oslo Futurist library for asynchronous + thread execution and periodic tasks. Main benefit is that periodic tasks + are now executed truly in parallel, and not sequentially in one + green thread. +upgrade: + - Configuration option "workers_pool_size" can no longer be less or equal + to 2. Please set it to greater value (the default is 100) before update. +deprecations: + - Configuration option "periodic_interval" is deprecated. + - Using "driver_periodic_task" decorator is deprecated. Please update your + out-of-tree drivers to use "periodics.periodic" decorator from Futurist + library. +fixes: + - Periodic tasks are no longer executed all in one thread. diff --git a/requirements.txt b/requirements.txt index 617859970f..2e22d49ee5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,3 +43,4 @@ retrying!=1.3.0,>=1.2.3 # Apache-2.0 oslo.versionedobjects>=1.5.0 # Apache-2.0 jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT psutil<2.0.0,>=1.1.1 # BSD +futurist>=0.11.0 # Apache-2.0