Refactors periodic tasks to use a decorator.
Additional work:
    1. Added support for tasks being scheduled at differing rates via the
       `ticks_between_runs` argument.
    2. Fixed `reclaim_queued_deletes` so that it doesn't run if
       `FLAGS.reclaim_instance_interval` is 0.
Change-Id: I18c01baf07bd06301a6fe26a7b29dc2452a4fa96
			
			
This commit is contained in:
		@@ -66,16 +66,98 @@ FLAGS = flags.FLAGS
 | 
			
		||||
LOG = logging.getLogger('nova.manager')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def periodic_task(*args, **kwargs):
 | 
			
		||||
    """Decorator to indicate that a method is a periodic task.
 | 
			
		||||
 | 
			
		||||
    This decorator can be used in two ways:
 | 
			
		||||
 | 
			
		||||
        1. Without arguments '@periodic_task', this will be run on every tick
 | 
			
		||||
           of the periodic scheduler.
 | 
			
		||||
 | 
			
		||||
        2. With arguments, @periodic_task(ticks_between_runs=N), this will be
 | 
			
		||||
           run on every N ticks of the periodic scheduler.
 | 
			
		||||
    """
 | 
			
		||||
    def decorator(f):
 | 
			
		||||
        f._periodic_task = True
 | 
			
		||||
        f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
 | 
			
		||||
        return f
 | 
			
		||||
 | 
			
		||||
    # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
 | 
			
		||||
    # and without parens.
 | 
			
		||||
    #
 | 
			
		||||
    # In the 'with-parens' case (with kwargs present), this function needs to
 | 
			
		||||
    # return a decorator function since the interpreter will invoke it like:
 | 
			
		||||
    #
 | 
			
		||||
    #   periodic_task(*args, **kwargs)(f)
 | 
			
		||||
    #
 | 
			
		||||
    # In the 'without-parens' case, the original function will be passed
 | 
			
		||||
    # in as the first argument, like:
 | 
			
		||||
    #
 | 
			
		||||
    #   periodic_task(f)
 | 
			
		||||
    if kwargs:
 | 
			
		||||
        return decorator
 | 
			
		||||
    else:
 | 
			
		||||
        return decorator(args[0])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ManagerMeta(type):
 | 
			
		||||
    def __init__(cls, names, bases, dict_):
 | 
			
		||||
        """Metaclass that allows us to collect decorated periodic tasks."""
 | 
			
		||||
        super(ManagerMeta, cls).__init__(names, bases, dict_)
 | 
			
		||||
 | 
			
		||||
        # NOTE(sirp): if the attribute is not present then we must be the base
 | 
			
		||||
        # class, so, go ahead an initialize it. If the attribute is present,
 | 
			
		||||
        # then we're a subclass so make a copy of it so we don't step on our
 | 
			
		||||
        # parent's toes.
 | 
			
		||||
        try:
 | 
			
		||||
            cls._periodic_tasks = cls._periodic_tasks[:]
 | 
			
		||||
        except AttributeError:
 | 
			
		||||
            cls._periodic_tasks = []
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            cls._ticks_to_skip = cls._ticks_to_skip.copy()
 | 
			
		||||
        except AttributeError:
 | 
			
		||||
            cls._ticks_to_skip = {}
 | 
			
		||||
 | 
			
		||||
        for value in cls.__dict__.values():
 | 
			
		||||
            if getattr(value, '_periodic_task', False):
 | 
			
		||||
                task = value
 | 
			
		||||
                name = task.__name__
 | 
			
		||||
                cls._periodic_tasks.append((name, task))
 | 
			
		||||
                cls._ticks_to_skip[name] = task._ticks_between_runs
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Manager(base.Base):
 | 
			
		||||
    __metaclass__ = ManagerMeta
 | 
			
		||||
 | 
			
		||||
    def __init__(self, host=None, db_driver=None):
 | 
			
		||||
        if not host:
 | 
			
		||||
            host = FLAGS.host
 | 
			
		||||
        self.host = host
 | 
			
		||||
        super(Manager, self).__init__(db_driver)
 | 
			
		||||
 | 
			
		||||
    def periodic_tasks(self, context=None):
 | 
			
		||||
    def periodic_tasks(self, context, raise_on_error=False):
 | 
			
		||||
        """Tasks to be run at a periodic interval."""
 | 
			
		||||
        pass
 | 
			
		||||
        for task_name, task in self._periodic_tasks:
 | 
			
		||||
            full_task_name = '.'.join([self.__class__.__name__, task_name])
 | 
			
		||||
 | 
			
		||||
            ticks_to_skip = self._ticks_to_skip[task_name]
 | 
			
		||||
            if ticks_to_skip > 0:
 | 
			
		||||
                LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
 | 
			
		||||
                            " ticks left until next run"), locals())
 | 
			
		||||
                self._ticks_to_skip[task_name] -= 1
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            self._ticks_to_skip[task_name] = task._ticks_between_runs
 | 
			
		||||
            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                task(self, context)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                if raise_on_error:
 | 
			
		||||
                    raise
 | 
			
		||||
                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
 | 
			
		||||
                              locals())
 | 
			
		||||
 | 
			
		||||
    def init_host(self):
 | 
			
		||||
        """Handle initialization if this is a standalone service.
 | 
			
		||||
@@ -105,11 +187,10 @@ class SchedulerDependentManager(Manager):
 | 
			
		||||
        """Remember these capabilities to send on next periodic update."""
 | 
			
		||||
        self.last_capabilities = capabilities
 | 
			
		||||
 | 
			
		||||
    def periodic_tasks(self, context=None):
 | 
			
		||||
    @periodic_task
 | 
			
		||||
    def _publish_service_capabilities(self, context):
 | 
			
		||||
        """Pass data back to the scheduler at a periodic interval."""
 | 
			
		||||
        if self.last_capabilities:
 | 
			
		||||
            LOG.debug(_('Notifying Schedulers of capabilities ...'))
 | 
			
		||||
            api.update_service_capabilities(context, self.service_name,
 | 
			
		||||
                                self.host, self.last_capabilities)
 | 
			
		||||
 | 
			
		||||
        super(SchedulerDependentManager, self).periodic_tasks(context)
 | 
			
		||||
 
 | 
			
		||||
@@ -54,7 +54,8 @@ class SchedulerManager(manager.Manager):
 | 
			
		||||
        """Converts all method calls to use the schedule method"""
 | 
			
		||||
        return functools.partial(self._schedule, key)
 | 
			
		||||
 | 
			
		||||
    def periodic_tasks(self, context=None):
 | 
			
		||||
    @manager.periodic_task
 | 
			
		||||
    def _poll_child_zones(self, context):
 | 
			
		||||
        """Poll child zones periodically to get status."""
 | 
			
		||||
        self.zone_manager.ping(context)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -318,7 +318,7 @@ class ZoneManager(object):
 | 
			
		||||
        """Try to connect to each child zone and get update."""
 | 
			
		||||
        self.green_pool.imap(_poll_zone, self.zone_states.values())
 | 
			
		||||
 | 
			
		||||
    def ping(self, context=None):
 | 
			
		||||
    def ping(self, context):
 | 
			
		||||
        """Ping should be called periodically to update zone status."""
 | 
			
		||||
        diff = utils.utcnow() - self.last_zone_db_check
 | 
			
		||||
        if diff.seconds >= FLAGS.zone_db_check_interval:
 | 
			
		||||
 
 | 
			
		||||
@@ -252,9 +252,10 @@ class Service(object):
 | 
			
		||||
            except Exception:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
    def periodic_tasks(self):
 | 
			
		||||
    def periodic_tasks(self, raise_on_error=False):
 | 
			
		||||
        """Tasks to be run at a periodic interval."""
 | 
			
		||||
        self.manager.periodic_tasks(context.get_admin_context())
 | 
			
		||||
        ctxt = context.get_admin_context()
 | 
			
		||||
        self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
 | 
			
		||||
 | 
			
		||||
    def report_state(self):
 | 
			
		||||
        """Update the state of this service in the datastore."""
 | 
			
		||||
 
 | 
			
		||||
@@ -1054,8 +1054,8 @@ class ComputeTestCase(BaseTestCase):
 | 
			
		||||
        self.compute.driver.test_remove_vm(instance_name)
 | 
			
		||||
 | 
			
		||||
        # Force the compute manager to do its periodic poll
 | 
			
		||||
        error_list = self.compute.periodic_tasks(context.get_admin_context())
 | 
			
		||||
        self.assertFalse(error_list)
 | 
			
		||||
        ctxt = context.get_admin_context()
 | 
			
		||||
        self.compute.periodic_tasks(ctxt, raise_on_error=True)
 | 
			
		||||
 | 
			
		||||
        instances = db.instance_get_all(context.get_admin_context())
 | 
			
		||||
        LOG.info(_("After force-killing instances: %s"), instances)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user