Refactor periodic tasks.

This review allows periodic tasks to be enabled or disabled in the
decorator, as well as by specifying an interval which is negative.

The spacing between runs of a periodic task is now specified in
seconds, with zero meaning the default spacing which is currently 60
seconds.

There is also a new argument to the decorator which indicates if a
periodic task _needs_ to be run in the nova-compute process. There is
also a flag (run_external_periodic_tasks) which can be used to move
these periodic tasks out of the nova-compute process.

I also remove the periodic_interval flag to services, as the interval
between runs is now dynamic based on the number of seconds that a
periodic task wants to wait for its next run. For callers who want to
twiddle the sleep period (for example unit tests), there is a
create() argument periodic_interval_max which lets the period
periodic_tasks() specifies be overridden. This is not exposed as a
flag because I cannot see a use case for that. It is needed for unit
testing however.

DocImpact. Resolves bug 939087.

Change-Id: I7f245a88b8d229a481c1b65a4c0f1e2769bf3901
This commit is contained in:
Michael Still
2012-12-24 15:00:52 +11:00
parent 6510ee105b
commit 6187ae91fe
5 changed files with 264 additions and 42 deletions

View File

@@ -410,6 +410,10 @@ class InvalidUUID(Invalid):
message = _("Expected a uuid but received %(uuid)s.")
class InvalidPeriodicTaskArg(Invalid):
message = _("Unexpected argument for periodic task creation: %(arg)s.")
class ConstraintNotMet(NovaException):
message = _("Constraint not met.")
code = 412

View File

@@ -54,8 +54,10 @@ This module provides Manager, a base class for managers.
"""
import eventlet
import time
from nova.db import base
from nova import exception
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova.openstack.common.plugin import pluginmanager
@@ -63,25 +65,50 @@ from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import version
periodic_opts = [
cfg.BoolOpt('run_external_periodic_tasks',
default=True,
help=('Some periodic tasks can be run in a separate process. '
'Should we run them here?')),
]
CONF = cfg.CONF
CONF.register_opts(periodic_opts)
CONF.import_opt('host', 'nova.config')
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 60.0
def periodic_task(*args, **kwargs):
"""Decorator to indicate that a method is a periodic task.
This decorator can be used in two ways:
1. Without arguments '@periodic_task', this will be run on every tick
1. Without arguments '@periodic_task', this will be run on every cycle
of the periodic scheduler.
2. With arguments, @periodic_task(ticks_between_runs=N), this will be
run on every N ticks of the periodic scheduler.
2. With arguments, @periodic_task(periodic_spacing=N), this will be
run on approximately every N seconds. If this number is negative the
periodic task will be disabled.
"""
def decorator(f):
# Test for old style invocation
if 'ticks_between_runs' in kwargs:
raise exception.InvalidPeriodicTaskArg(arg='ticks_between_runs')
# Control if run at all
f._periodic_task = True
f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
f._periodic_enabled = False
else:
f._periodic_enabled = kwargs.pop('enabled', True)
# Control frequency
f._periodic_spacing = kwargs.pop('spacing', 0)
f._periodic_last_run = time.time()
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
@@ -117,17 +144,39 @@ class ManagerMeta(type):
cls._periodic_tasks = []
try:
cls._ticks_to_skip = cls._ticks_to_skip.copy()
cls._periodic_last_run = cls._periodic_last_run.copy()
except AttributeError:
cls._ticks_to_skip = {}
cls._periodic_last_run = {}
try:
cls._periodic_spacing = cls._periodic_spacing.copy()
except AttributeError:
cls._periodic_spacing = {}
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
if task._ticks_between_runs >= 0:
if task._periodic_spacing < 0:
LOG.info(_('Skipping periodic task %(task)s because '
'its interval is negative'),
{'task': name})
continue
if not task._periodic_enabled:
LOG.info(_('Skipping periodic task %(task)s because '
'it is disabled'),
{'task': name})
continue
# A periodic spacing of zero indicates that this task should
# be run every pass
if task._periodic_spacing == 0:
task._periodic_spacing = None
cls._periodic_tasks.append((name, task))
cls._ticks_to_skip[name] = task._ticks_between_runs
cls._periodic_spacing[name] = task._periodic_spacing
cls._periodic_last_run[name] = task._periodic_last_run
class Manager(base.Base):
@@ -158,30 +207,39 @@ class Manager(base.Base):
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
" ticks left until next run"), locals())
self._ticks_to_skip[task_name] -= 1
# If a periodic task is _nearly_ due, then we'll run it early
if self._periodic_spacing[task_name] is None:
wait = 0
else:
wait = time.time() - (self._periodic_last_run[task_name] +
self._periodic_spacing[task_name])
if wait > 0.2:
if wait < idle_for:
idle_for = wait
continue
self._ticks_to_skip[task_name] = task._ticks_between_runs
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
self._periodic_last_run[task_name] = time.time()
try:
task(self, context)
# NOTE(tiantian): After finished a task, allow manager to
# do other work (report_state, processing AMPQ request etc.)
eventlet.sleep(0)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
locals())
if (not self._periodic_spacing[task_name] is None and
self._periodic_spacing[task_name] < idle_for):
idle_for = self._periodic_spacing[task_name]
eventlet.sleep(0)
return idle_for
def init_host(self):
"""Hook to do additional manager initialization when one requests
the service be started. This is called before any service record

View File

@@ -49,9 +49,9 @@ service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='seconds between nodes reporting state to datastore'),
cfg.IntOpt('periodic_interval',
default=60,
help='seconds between running periodic tasks'),
cfg.BoolOpt('periodic_enable',
default=True,
help='enable periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='range of seconds to randomly delay when starting the'
@@ -371,7 +371,8 @@ class Service(object):
it state to the database services table."""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None,
*args, **kwargs):
self.host = host
self.binary = binary
@@ -380,8 +381,9 @@ class Service(object):
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
self.backdoor_port = None
@@ -433,15 +435,15 @@ class Service(object):
if pulse:
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
periodic = utils.DynamicLoopingCall(self.periodic_tasks)
periodic.start(initial_delay=initial_delay,
periodic_interval_max=self.periodic_interval_max)
self.timers.append(periodic)
def _create_service_ref(self, context):
@@ -460,8 +462,8 @@ class Service(object):
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
report_interval=None, periodic_enable=None,
periodic_fuzzy_delay=None, periodic_interval_max=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
@@ -469,8 +471,9 @@ class Service(object):
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_enable: defaults to CONF.periodic_enable
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
:param periodic_interval_max: if set, the max time to wait between runs
"""
if not host:
@@ -486,14 +489,15 @@ class Service(object):
manager = CONF.get(manager_cls, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_interval is None:
periodic_interval = CONF.periodic_interval
if periodic_enable is None:
periodic_enable = CONF.periodic_enable
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
periodic_enable=periodic_enable,
periodic_fuzzy_delay=periodic_fuzzy_delay,
periodic_interval_max=periodic_interval_max)
return service_obj
@@ -529,7 +533,7 @@ class Service(object):
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
class WSGIService(object):

View File

@@ -0,0 +1,109 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from nova import manager
from nova import test
class ManagerMetaTestCase(test.TestCase):
"""Tests for the meta class which manages the creation of periodic tasks.
"""
def test_meta(self):
class Manager(object):
__metaclass__ = manager.ManagerMeta
@manager.periodic_task
def foo(self):
return 'foo'
@manager.periodic_task(spacing=4)
def bar(self):
return 'bar'
@manager.periodic_task(enabled=False)
def baz(self):
return 'baz'
m = Manager()
self.assertEqual(2, len(m._periodic_tasks))
self.assertEqual(None, m._periodic_spacing['foo'])
self.assertEqual(4, m._periodic_spacing['bar'])
self.assertFalse('baz' in m._periodic_spacing)
class Manager(test.TestCase):
"""Tests the periodic tasks portion of the manager class."""
def test_periodic_tasks_with_idle(self):
class Manager(manager.Manager):
@manager.periodic_task(spacing=200)
def bar(self):
return 'bar'
m = Manager()
self.assertEqual(1, len(m._periodic_tasks))
self.assertEqual(200, m._periodic_spacing['bar'])
# Now a single pass of the periodic tasks
idle = m.periodic_tasks(None)
self.assertAlmostEqual(60, idle, 1)
def test_periodic_tasks_constant(self):
class Manager(manager.Manager):
@manager.periodic_task(spacing=0)
def bar(self):
return 'bar'
m = Manager()
idle = m.periodic_tasks(None)
self.assertAlmostEqual(60, idle, 1)
def test_periodic_tasks_disabled(self):
class Manager(manager.Manager):
@manager.periodic_task(spacing=-1)
def bar(self):
return 'bar'
m = Manager()
idle = m.periodic_tasks(None)
self.assertAlmostEqual(60, idle, 1)
def test_external_running_here(self):
self.flags(run_external_periodic_tasks=True)
class Manager(manager.Manager):
@manager.periodic_task(spacing=200, external_process_ok=True)
def bar(self):
return 'bar'
m = Manager()
self.assertEqual(1, len(m._periodic_tasks))
def test_external_running_elsewhere(self):
self.flags(run_external_periodic_tasks=False)
class Manager(manager.Manager):
@manager.periodic_task(spacing=200, external_process_ok=True)
def bar(self):
return 'bar'
m = Manager()
self.assertEqual(0, len(m._periodic_tasks))

View File

@@ -556,12 +556,23 @@ class LoopingCallDone(Exception):
self.retvalue = retvalue
class LoopingCall(object):
class LoopingCallBase(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
self.done = None
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
"""A looping call which happens at a fixed interval."""
def start(self, interval, initial_delay=None):
self._running = True
@@ -581,7 +592,7 @@ class LoopingCall(object):
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in looping call'))
LOG.exception(_('in fixed duration looping call'))
done.send_exception(*sys.exc_info())
return
else:
@@ -592,11 +603,47 @@ class LoopingCall(object):
greenthread.spawn(_inner)
return self.done
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class DynamicLoopingCall(LoopingCallBase):
"""A looping call which happens sleeps until the next known event.
The function called should return how long to sleep for before being
called again.
"""
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if not periodic_interval_max is None:
idle = min(idle, periodic_interval_max)
LOG.debug(_('Periodic task processor sleeping for %.02f '
'seconds'), idle)
greenthread.sleep(idle)
except LoopingCallDone, e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in dynamic looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done
def xhtml_escape(value):