Instead of just telling the user of the periodic worker that certain callbacks were invalid tell the user which callback was invalid and exactly what attributes were expected but were not found. This makes the errors that much easier to diagnose and to resolve. Change-Id: Iaf8caf5e460e3ff4731ef575e924c9b86a9d5ae1
213 lines
7.3 KiB
Python
213 lines
7.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import heapq
|
|
import inspect
|
|
|
|
from oslo_utils import reflection
|
|
import six
|
|
|
|
from taskflow import logging
|
|
from taskflow.utils import deprecation
|
|
from taskflow.utils import misc
|
|
from taskflow.utils import threading_utils as tu
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
# Find a monotonic providing time (or fallback to using time.time()
|
|
# which isn't *always* accurate but will suffice).
|
|
_now = misc.find_monotonic(allow_time_time=True)
|
|
|
|
|
|
def _check_attrs(obj):
|
|
"""Checks that a periodic function/method has all the expected attributes.
|
|
|
|
This will return the expected attributes that were **not** found.
|
|
"""
|
|
missing_attrs = []
|
|
for a in ('_periodic', '_periodic_spacing', '_periodic_run_immediately'):
|
|
if not hasattr(obj, a):
|
|
missing_attrs.append(a)
|
|
return missing_attrs
|
|
|
|
|
|
def periodic(spacing, run_immediately=True):
|
|
"""Tags a method/function as wanting/able to execute periodically.
|
|
|
|
:param run_immediately: option to specify whether to run
|
|
immediately or not
|
|
:type run_immediately: boolean
|
|
"""
|
|
|
|
if spacing <= 0:
|
|
raise ValueError("Periodicity/spacing must be greater than"
|
|
" zero instead of %s" % spacing)
|
|
|
|
def wrapper(f):
|
|
f._periodic = True
|
|
f._periodic_spacing = spacing
|
|
f._periodic_run_immediately = run_immediately
|
|
|
|
@six.wraps(f)
|
|
def decorator(*args, **kwargs):
|
|
return f(*args, **kwargs)
|
|
|
|
return decorator
|
|
|
|
return wrapper
|
|
|
|
|
|
class _Schedule(object):
|
|
"""Internal heap-based structure that maintains the schedule/ordering."""
|
|
|
|
def __init__(self):
|
|
self._ordering = []
|
|
|
|
def push(self, next_run, index):
|
|
heapq.heappush(self._ordering, (next_run, index))
|
|
|
|
def push_next(self, cb, index, now=None):
|
|
if now is None:
|
|
now = _now()
|
|
self.push(now + cb._periodic_spacing, index)
|
|
|
|
def __len__(self):
|
|
return len(self._ordering)
|
|
|
|
def pop(self):
|
|
return heapq.heappop(self._ordering)
|
|
|
|
|
|
def _build(callables):
|
|
schedule = _Schedule()
|
|
now = None
|
|
immediates = []
|
|
# Reverse order is used since these are later popped off (and to
|
|
# ensure the popping order is first -> last we need to append them
|
|
# in the opposite ordering last -> first).
|
|
for i, cb in misc.reverse_enumerate(callables):
|
|
if cb._periodic_run_immediately:
|
|
immediates.append(i)
|
|
else:
|
|
if now is None:
|
|
now = _now()
|
|
schedule.push_next(cb, i, now=now)
|
|
return immediates, schedule
|
|
|
|
|
|
def _safe_call(cb, kind):
|
|
try:
|
|
cb()
|
|
except Exception:
|
|
LOG.warn("Failed to call %s '%r'", kind, cb, exc_info=True)
|
|
|
|
|
|
class PeriodicWorker(object):
|
|
"""Calls a collection of callables periodically (sleeping as needed...).
|
|
|
|
NOTE(harlowja): typically the :py:meth:`.start` method is executed in a
|
|
background thread so that the periodic callables are executed in
|
|
the background/asynchronously (using the defined periods to determine
|
|
when each is called).
|
|
"""
|
|
|
|
@classmethod
|
|
def create(cls, objects, exclude_hidden=True):
|
|
"""Automatically creates a worker by analyzing object(s) methods.
|
|
|
|
Only picks up methods that have been tagged/decorated with
|
|
the :py:func:`.periodic` decorator (does not match against private
|
|
or protected methods unless explicitly requested to).
|
|
"""
|
|
callables = []
|
|
for obj in objects:
|
|
for (name, member) in inspect.getmembers(obj):
|
|
if name.startswith("_") and exclude_hidden:
|
|
continue
|
|
if reflection.is_bound_method(member):
|
|
missing_attrs = _check_attrs(member)
|
|
if not missing_attrs:
|
|
callables.append(member)
|
|
return cls(callables)
|
|
|
|
@deprecation.removed_kwarg('tombstone', version="0.8", removal_version="?")
|
|
def __init__(self, callables, tombstone=None):
|
|
if tombstone is None:
|
|
self._tombstone = tu.Event()
|
|
else:
|
|
self._tombstone = tombstone
|
|
self._callables = []
|
|
for i, cb in enumerate(callables, 1):
|
|
if not six.callable(cb):
|
|
raise ValueError("Periodic callback %s must be callable" % i)
|
|
missing_attrs = _check_attrs(cb)
|
|
if missing_attrs:
|
|
raise ValueError("Periodic callback %s missing required"
|
|
" attributes %s" % (i, missing_attrs))
|
|
if cb._periodic:
|
|
self._callables.append(cb)
|
|
self._immediates, self._schedule = _build(self._callables)
|
|
|
|
def __len__(self):
|
|
return len(self._callables)
|
|
|
|
def start(self):
|
|
"""Starts running (will not return until :py:meth:`.stop` is called).
|
|
|
|
NOTE(harlowja): If this worker has no contained callables this raises
|
|
a runtime error and does not run since it is impossible to periodically
|
|
run nothing.
|
|
"""
|
|
if not self._callables:
|
|
raise RuntimeError("A periodic worker can not start"
|
|
" without any callables")
|
|
while not self._tombstone.is_set():
|
|
if self._immediates:
|
|
# Run & schedule its next execution.
|
|
index = self._immediates.pop()
|
|
cb = self._callables[index]
|
|
LOG.blather("Calling immediate '%r'", cb)
|
|
_safe_call(cb, 'immediate')
|
|
self._schedule.push_next(cb, index)
|
|
else:
|
|
# Figure out when we should run next (by selecting the
|
|
# minimum item from the heap, where the minimum should be
|
|
# the callable that needs to run next and has the lowest
|
|
# next desired run time).
|
|
now = _now()
|
|
next_run, index = self._schedule.pop()
|
|
when_next = next_run - now
|
|
if when_next <= 0:
|
|
# Run & schedule its next execution.
|
|
cb = self._callables[index]
|
|
LOG.blather("Calling periodic '%r' (it runs every"
|
|
" %s seconds)", cb, cb._periodic_spacing)
|
|
_safe_call(cb, 'periodic')
|
|
self._schedule.push_next(cb, index, now=now)
|
|
else:
|
|
# Gotta wait...
|
|
self._schedule.push(next_run, index)
|
|
self._tombstone.wait(when_next)
|
|
|
|
def stop(self):
|
|
"""Sets the tombstone (this stops any further executions)."""
|
|
self._tombstone.set()
|
|
|
|
def reset(self):
|
|
"""Resets the tombstone and re-queues up any immediate executions."""
|
|
self._tombstone.clear()
|
|
self._immediates, self._schedule = _build(self._callables)
|