Update to using oslo periodic tasks implementation.
Convert nova to using the oslo periodic tasks implementation. There are no functional changes in this review. Change-Id: I767e0ad17781d5f9d5e987e0a4ad65796243ae5c
This commit is contained in:
@@ -452,10 +452,6 @@ class InvalidID(Invalid):
|
||||
message = _("Invalid ID received %(id)s.")
|
||||
|
||||
|
||||
class InvalidPeriodicTaskArg(Invalid):
|
||||
message = _("Unexpected argument for periodic task creation: %(arg)s.")
|
||||
|
||||
|
||||
class ConstraintNotMet(NovaException):
|
||||
message = _("Constraint not met.")
|
||||
code = 412
|
||||
|
161
nova/manager.py
161
nova/manager.py
@@ -53,146 +53,23 @@ This module provides Manager, a base class for managers.
|
||||
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import eventlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova import baserpc
|
||||
from nova.db import base
|
||||
from nova import exception
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import periodic_task
|
||||
from nova.openstack.common.plugin import pluginmanager
|
||||
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
|
||||
from nova.openstack.common import timeutils
|
||||
from nova.scheduler import rpcapi as scheduler_rpcapi
|
||||
|
||||
|
||||
periodic_opts = [
|
||||
cfg.BoolOpt('run_external_periodic_tasks',
|
||||
default=True,
|
||||
help=('Some periodic tasks can be run in a separate process. '
|
||||
'Should we run them here?')),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(periodic_opts)
|
||||
CONF.import_opt('host', 'nova.netconf')
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_INTERVAL = 60.0
|
||||
|
||||
|
||||
def periodic_task(*args, **kwargs):
|
||||
"""Decorator to indicate that a method is a periodic task.
|
||||
|
||||
This decorator can be used in two ways:
|
||||
|
||||
1. Without arguments '@periodic_task', this will be run on every cycle
|
||||
of the periodic scheduler.
|
||||
|
||||
2. With arguments:
|
||||
@periodic_task(spacing=N [, run_immediately=[True|False]])
|
||||
this will be run on approximately every N seconds. If this number is
|
||||
negative the periodic task will be disabled. If the run_immediately
|
||||
argument is provided and has a value of 'True', the first run of the
|
||||
task will be shortly after task scheduler starts. If
|
||||
run_immediately is omitted or set to 'False', the first time the
|
||||
task runs will be approximately N seconds after the task scheduler
|
||||
starts.
|
||||
"""
|
||||
def decorator(f):
|
||||
# Test for old style invocation
|
||||
if 'ticks_between_runs' in kwargs:
|
||||
raise exception.InvalidPeriodicTaskArg(arg='ticks_between_runs')
|
||||
|
||||
# Control if run at all
|
||||
f._periodic_task = True
|
||||
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
|
||||
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
|
||||
f._periodic_enabled = False
|
||||
else:
|
||||
f._periodic_enabled = kwargs.pop('enabled', True)
|
||||
|
||||
# Control frequency
|
||||
f._periodic_spacing = kwargs.pop('spacing', 0)
|
||||
f._periodic_immediate = kwargs.pop('run_immediately', False)
|
||||
if f._periodic_immediate:
|
||||
f._periodic_last_run = None
|
||||
else:
|
||||
f._periodic_last_run = timeutils.utcnow()
|
||||
return f
|
||||
|
||||
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
|
||||
# and without parens.
|
||||
#
|
||||
# In the 'with-parens' case (with kwargs present), this function needs to
|
||||
# return a decorator function since the interpreter will invoke it like:
|
||||
#
|
||||
# periodic_task(*args, **kwargs)(f)
|
||||
#
|
||||
# In the 'without-parens' case, the original function will be passed
|
||||
# in as the first argument, like:
|
||||
#
|
||||
# periodic_task(f)
|
||||
if kwargs:
|
||||
return decorator
|
||||
else:
|
||||
return decorator(args[0])
|
||||
|
||||
|
||||
class ManagerMeta(type):
|
||||
def __init__(cls, names, bases, dict_):
|
||||
"""Metaclass that allows us to collect decorated periodic tasks."""
|
||||
super(ManagerMeta, cls).__init__(names, bases, dict_)
|
||||
|
||||
# NOTE(sirp): if the attribute is not present then we must be the base
|
||||
# class, so, go ahead an initialize it. If the attribute is present,
|
||||
# then we're a subclass so make a copy of it so we don't step on our
|
||||
# parent's toes.
|
||||
try:
|
||||
cls._periodic_tasks = cls._periodic_tasks[:]
|
||||
except AttributeError:
|
||||
cls._periodic_tasks = []
|
||||
|
||||
try:
|
||||
cls._periodic_last_run = cls._periodic_last_run.copy()
|
||||
except AttributeError:
|
||||
cls._periodic_last_run = {}
|
||||
|
||||
try:
|
||||
cls._periodic_spacing = cls._periodic_spacing.copy()
|
||||
except AttributeError:
|
||||
cls._periodic_spacing = {}
|
||||
|
||||
for value in cls.__dict__.values():
|
||||
if getattr(value, '_periodic_task', False):
|
||||
task = value
|
||||
name = task.__name__
|
||||
|
||||
if task._periodic_spacing < 0:
|
||||
LOG.info(_('Skipping periodic task %(task)s because '
|
||||
'its interval is negative'),
|
||||
{'task': name})
|
||||
continue
|
||||
if not task._periodic_enabled:
|
||||
LOG.info(_('Skipping periodic task %(task)s because '
|
||||
'it is disabled'),
|
||||
{'task': name})
|
||||
continue
|
||||
|
||||
# A periodic spacing of zero indicates that this task should
|
||||
# be run every pass
|
||||
if task._periodic_spacing == 0:
|
||||
task._periodic_spacing = None
|
||||
|
||||
cls._periodic_tasks.append((name, task))
|
||||
cls._periodic_spacing[name] = task._periodic_spacing
|
||||
cls._periodic_last_run[name] = task._periodic_last_run
|
||||
|
||||
|
||||
class Manager(base.Base):
|
||||
__metaclass__ = ManagerMeta
|
||||
|
||||
class Manager(base.Base, periodic_task.PeriodicTasks):
|
||||
# Set RPC API version to 1.0 by default.
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
@@ -220,37 +97,7 @@ class Manager(base.Base):
|
||||
|
||||
def periodic_tasks(self, context, raise_on_error=False):
|
||||
"""Tasks to be run at a periodic interval."""
|
||||
idle_for = DEFAULT_INTERVAL
|
||||
for task_name, task in self._periodic_tasks:
|
||||
full_task_name = '.'.join([self.__class__.__name__, task_name])
|
||||
|
||||
now = timeutils.utcnow()
|
||||
spacing = self._periodic_spacing[task_name]
|
||||
last_run = self._periodic_last_run[task_name]
|
||||
|
||||
# If a periodic task is _nearly_ due, then we'll run it early
|
||||
if spacing is not None and last_run is not None:
|
||||
due = last_run + datetime.timedelta(seconds=spacing)
|
||||
if not timeutils.is_soon(due, 0.2):
|
||||
idle_for = min(idle_for, timeutils.delta_seconds(now, due))
|
||||
continue
|
||||
|
||||
if spacing is not None:
|
||||
idle_for = min(idle_for, spacing)
|
||||
|
||||
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
|
||||
self._periodic_last_run[task_name] = timeutils.utcnow()
|
||||
|
||||
try:
|
||||
task(self, context)
|
||||
except Exception as e:
|
||||
if raise_on_error:
|
||||
raise
|
||||
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
|
||||
locals())
|
||||
eventlet.sleep(0)
|
||||
|
||||
return idle_for
|
||||
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
|
||||
|
||||
def init_host(self):
|
||||
"""Hook to do additional manager initialization when one requests
|
||||
@@ -308,7 +155,7 @@ class SchedulerDependentManager(Manager):
|
||||
capabilities = [capabilities]
|
||||
self.last_capabilities = capabilities
|
||||
|
||||
@periodic_task
|
||||
@periodic_task.periodic_task
|
||||
def publish_service_capabilities(self, context):
|
||||
"""Pass data back to the scheduler.
|
||||
|
||||
|
@@ -39,6 +39,7 @@ from nova.openstack.common import importutils
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common.notifier import api as notifier
|
||||
from nova.openstack.common import periodic_task
|
||||
from nova import quota
|
||||
|
||||
|
||||
@@ -287,7 +288,7 @@ class SchedulerManager(manager.Manager):
|
||||
|
||||
return {'resource': resource, 'usage': usage}
|
||||
|
||||
@manager.periodic_task
|
||||
@periodic_task.periodic_task
|
||||
def _expire_reservations(self, context):
|
||||
QUOTAS.expire(context)
|
||||
|
||||
|
@@ -1,182 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
from testtools import matchers
|
||||
|
||||
from nova import manager
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import test
|
||||
|
||||
|
||||
class ManagerMetaTestCase(test.TestCase):
|
||||
"""Tests for the meta class which manages the creation of periodic tasks.
|
||||
"""
|
||||
|
||||
def test_meta(self):
|
||||
class Manager(object):
|
||||
__metaclass__ = manager.ManagerMeta
|
||||
|
||||
@manager.periodic_task
|
||||
def foo(self):
|
||||
return 'foo'
|
||||
|
||||
@manager.periodic_task(spacing=4)
|
||||
def bar(self):
|
||||
return 'bar'
|
||||
|
||||
@manager.periodic_task(enabled=False)
|
||||
def baz(self):
|
||||
return 'baz'
|
||||
|
||||
m = Manager()
|
||||
self.assertThat(m._periodic_tasks, matchers.HasLength(2))
|
||||
self.assertEqual(None, m._periodic_spacing['foo'])
|
||||
self.assertEqual(4, m._periodic_spacing['bar'])
|
||||
self.assertThat(
|
||||
m._periodic_spacing, matchers.Not(matchers.Contains('baz')))
|
||||
|
||||
|
||||
class Manager(test.TestCase):
|
||||
"""Tests the periodic tasks portion of the manager class."""
|
||||
|
||||
def test_periodic_tasks_with_idle(self):
|
||||
class Manager(manager.Manager):
|
||||
@manager.periodic_task(spacing=200)
|
||||
def bar(self):
|
||||
return 'bar'
|
||||
|
||||
m = Manager()
|
||||
self.assertThat(m._periodic_tasks, matchers.HasLength(1))
|
||||
self.assertEqual(200, m._periodic_spacing['bar'])
|
||||
|
||||
# Now a single pass of the periodic tasks
|
||||
idle = m.periodic_tasks(None)
|
||||
self.assertAlmostEqual(60, idle, 1)
|
||||
|
||||
def test_periodic_tasks_constant(self):
|
||||
class Manager(manager.Manager):
|
||||
@manager.periodic_task(spacing=0)
|
||||
def bar(self):
|
||||
return 'bar'
|
||||
|
||||
m = Manager()
|
||||
idle = m.periodic_tasks(None)
|
||||
self.assertAlmostEqual(60, idle, 1)
|
||||
|
||||
def test_periodic_tasks_idle_calculation(self):
|
||||
fake_time = datetime.datetime(3000, 1, 1)
|
||||
timeutils.set_time_override(fake_time)
|
||||
|
||||
class Manager(manager.Manager):
|
||||
@manager.periodic_task(spacing=10)
|
||||
def bar(self, context):
|
||||
return 'bar'
|
||||
|
||||
m = Manager()
|
||||
|
||||
# Ensure initial values are correct
|
||||
self.assertEqual(1, len(m._periodic_tasks))
|
||||
task_name, task = m._periodic_tasks[0]
|
||||
|
||||
# Test task values
|
||||
self.assertEqual('bar', task_name)
|
||||
self.assertEqual(10, task._periodic_spacing)
|
||||
self.assertEqual(True, task._periodic_enabled)
|
||||
self.assertEqual(False, task._periodic_external_ok)
|
||||
self.assertEqual(False, task._periodic_immediate)
|
||||
self.assertNotEqual(None, task._periodic_last_run)
|
||||
|
||||
# Test the manager's representation of those values
|
||||
self.assertEqual(10, m._periodic_spacing[task_name])
|
||||
self.assertNotEqual(None, m._periodic_last_run[task_name])
|
||||
|
||||
timeutils.advance_time_delta(datetime.timedelta(seconds=5))
|
||||
m.periodic_tasks(None)
|
||||
|
||||
timeutils.advance_time_delta(datetime.timedelta(seconds=5))
|
||||
idle = m.periodic_tasks(None)
|
||||
self.assertAlmostEqual(10, idle, 1)
|
||||
|
||||
def test_periodic_tasks_immediate_runs_now(self):
|
||||
fake_time = datetime.datetime(3000, 1, 1)
|
||||
timeutils.set_time_override(fake_time)
|
||||
|
||||
class Manager(manager.Manager):
|
||||
@manager.periodic_task(spacing=10, run_immediately=True)
|
||||
def bar(self, context):
|
||||
return 'bar'
|
||||
|
||||
m = Manager()
|
||||
|
||||
# Ensure initial values are correct
|
||||
self.assertEqual(1, len(m._periodic_tasks))
|
||||
task_name, task = m._periodic_tasks[0]
|
||||
|
||||
# Test task values
|
||||
self.assertEqual('bar', task_name)
|
||||
self.assertEqual(10, task._periodic_spacing)
|
||||
self.assertEqual(True, task._periodic_enabled)
|
||||
self.assertEqual(False, task._periodic_external_ok)
|
||||
self.assertEqual(True, task._periodic_immediate)
|
||||
self.assertEqual(None, task._periodic_last_run)
|
||||
|
||||
# Test the manager's representation of those values
|
||||
self.assertEqual(10, m._periodic_spacing[task_name])
|
||||
self.assertEqual(None, m._periodic_last_run[task_name])
|
||||
|
||||
idle = m.periodic_tasks(None)
|
||||
self.assertEqual(datetime.datetime(3000, 1, 1, 0, 0),
|
||||
m._periodic_last_run[task_name])
|
||||
self.assertAlmostEqual(10, idle, 1)
|
||||
|
||||
timeutils.advance_time_delta(datetime.timedelta(seconds=5))
|
||||
idle = m.periodic_tasks(None)
|
||||
self.assertAlmostEqual(5, idle, 1)
|
||||
|
||||
def test_periodic_tasks_disabled(self):
|
||||
class Manager(manager.Manager):
|
||||
@manager.periodic_task(spacing=-1)
|
||||
def bar(self):
|
||||
return 'bar'
|
||||
|
||||
m = Manager()
|
||||
idle = m.periodic_tasks(None)
|
||||
self.assertAlmostEqual(60, idle, 1)
|
||||
|
||||
def test_external_running_here(self):
|
||||
self.flags(run_external_periodic_tasks=True)
|
||||
|
||||
class Manager(manager.Manager):
|
||||
@manager.periodic_task(spacing=200, external_process_ok=True)
|
||||
def bar(self):
|
||||
return 'bar'
|
||||
|
||||
m = Manager()
|
||||
self.assertThat(m._periodic_tasks, matchers.HasLength(1))
|
||||
|
||||
def test_external_running_elsewhere(self):
|
||||
self.flags(run_external_periodic_tasks=False)
|
||||
|
||||
class Manager(manager.Manager):
|
||||
@manager.periodic_task(spacing=200, external_process_ok=True)
|
||||
def bar(self):
|
||||
return 'bar'
|
||||
|
||||
m = Manager()
|
||||
self.assertEqual([], m._periodic_tasks)
|
Reference in New Issue
Block a user