Refactor parts of the periodic worker

Perform some adjustments to the periodic worker type to
make it easier to later change and make it better in the
future. These changes hopefully also reduce complexity
and increase understandability.

In part this deprecates providing a tombstone via
__init__() since that will restrict future enhancements we can
make (in retrospect we need more internal control over that
argument & type to build better workers).

Change-Id: I1965e157c303c2a45b9950e9f4a921c638f57fd1
This commit is contained in:
Joshua Harlow 2015-02-21 17:04:46 -08:00
parent 99b33b9886
commit 7a1a467a7a
5 changed files with 95 additions and 50 deletions

View File

@ -533,23 +533,21 @@ class PeriodicTest(test.TestCase):
def test_periodic_single(self): def test_periodic_single(self):
barrier = latch.Latch(5) barrier = latch.Latch(5)
capture = [] capture = []
tombstone = tu.Event()
@periodic.periodic(0.01) @periodic.periodic(0.01)
def callee(): def callee():
barrier.countdown() barrier.countdown()
if barrier.needed == 0: if barrier.needed == 0:
tombstone.set() w.stop()
capture.append(1) capture.append(1)
w = periodic.PeriodicWorker([callee], tombstone=tombstone) w = periodic.PeriodicWorker([callee])
t = tu.daemon_thread(target=w.start) t = tu.daemon_thread(target=w.start)
t.start() t.start()
t.join() t.join()
self.assertEqual(0, barrier.needed) self.assertEqual(0, barrier.needed)
self.assertEqual(5, sum(capture)) self.assertEqual(5, sum(capture))
self.assertTrue(tombstone.is_set())
def test_immediate(self): def test_immediate(self):
capture = [] capture = []

View File

@ -17,9 +17,11 @@
import collections import collections
import inspect import inspect
import random import random
import string
import time import time
import six import six
import testscenarios
from taskflow import test from taskflow import test
from taskflow.utils import misc from taskflow.utils import misc
@ -192,6 +194,22 @@ class TestSequenceMinus(test.TestCase):
self.assertEqual(result, [2, 1]) self.assertEqual(result, [2, 1])
class TestReversedEnumerate(testscenarios.TestWithScenarios, test.TestCase):
scenarios = [
('ten', {'sample': [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}),
('empty', {'sample': []}),
('negative', {'sample': [-1, -2, -3]}),
('one', {'sample': [1]}),
('abc', {'sample': ['a', 'b', 'c']}),
('ascii_letters', {'sample': list(string.ascii_letters)}),
]
def test_sample_equivalence(self):
expected = list(reversed(list(enumerate(self.sample))))
actual = list(misc.reverse_enumerate(self.sample))
self.assertEqual(expected, actual)
class TestCountdownIter(test.TestCase): class TestCountdownIter(test.TestCase):
def test_expected_count(self): def test_expected_count(self):
upper = 100 upper = 100

View File

@ -21,6 +21,7 @@ from oslo_utils import reflection
import six import six
from taskflow import logging from taskflow import logging
from taskflow.utils import deprecation
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import threading_utils as tu from taskflow.utils import threading_utils as tu
@ -64,6 +65,51 @@ def periodic(spacing, run_immediately=True):
return wrapper return wrapper
class _Schedule(object):
"""Internal heap-based structure that maintains the schedule/ordering."""
def __init__(self):
self._ordering = []
def push(self, next_run, index):
heapq.heappush(self._ordering, (next_run, index))
def push_next(self, cb, index, now=None):
if now is None:
now = _now()
self.push(now + cb._periodic_spacing, index)
def __len__(self):
return len(self._ordering)
def pop(self):
return heapq.heappop(self._ordering)
def _build(callables):
schedule = _Schedule()
now = None
immediates = []
# Reverse order is used since these are later popped off (and to
# ensure the popping order is first -> last we need to append them
# in the opposite ordering last -> first).
for i, cb in misc.reverse_enumerate(callables):
if cb._periodic_run_immediately:
immediates.append(i)
else:
if now is None:
now = _now()
schedule.push_next(cb, i, now=now)
return immediates, schedule
def _safe_call(cb, kind):
try:
cb()
except Exception:
LOG.warn("Failed to call %s '%r'", kind, cb, exc_info=True)
class PeriodicWorker(object): class PeriodicWorker(object):
"""Calls a collection of callables periodically (sleeping as needed...). """Calls a collection of callables periodically (sleeping as needed...).
@ -96,54 +142,29 @@ class PeriodicWorker(object):
callables.append(member) callables.append(member)
return cls(callables) return cls(callables)
@deprecation.removed_kwarg('tombstone', version="0.8", removal_version="?")
def __init__(self, callables, tombstone=None): def __init__(self, callables, tombstone=None):
if tombstone is None: if tombstone is None:
self._tombstone = tu.Event() self._tombstone = tu.Event()
else: else:
# Allows someone to share an event (if they so want to...)
self._tombstone = tombstone self._tombstone = tombstone
almost_callables = list(callables) self._callables = []
for cb in almost_callables: for cb in callables:
if not six.callable(cb): if not six.callable(cb):
raise ValueError("Periodic callback must be callable") raise ValueError("Periodic callback must be callable")
for attr_name in _PERIODIC_ATTRS: for attr_name in _PERIODIC_ATTRS:
if not hasattr(cb, attr_name): if not hasattr(cb, attr_name):
raise ValueError("Periodic callback missing required" raise ValueError("Periodic callback missing required"
" attribute '%s'" % attr_name) " attribute '%s'" % attr_name)
self._callables = tuple((cb, reflection.get_callable_name(cb)) if cb._periodic:
for cb in almost_callables) self._callables.append(cb)
self._schedule = [] self._immediates, self._schedule = _build(self._callables)
now = _now()
for i, (cb, cb_name) in enumerate(self._callables):
spacing = cb._periodic_spacing
next_run = now + spacing
heapq.heappush(self._schedule, (next_run, i))
self._immediates = self._fetch_immediates(self._callables)
def __len__(self): def __len__(self):
return len(self._callables) return len(self._callables)
@staticmethod
def _fetch_immediates(callables):
immediates = []
# Reverse order is used since these are later popped off (and to
# ensure the popping order is first -> last we need to append them
# in the opposite ordering last -> first).
for (cb, cb_name) in reversed(callables):
if cb._periodic_run_immediately:
immediates.append((cb, cb_name))
return immediates
@staticmethod
def _safe_call(cb, cb_name, kind='periodic'):
try:
cb()
except Exception:
LOG.warn("Failed to call %s callable '%s'",
kind, cb_name, exc_info=True)
def start(self): def start(self):
"""Starts running (will not stop/return until the tombstone is set). """Starts running (will not return until :py:meth:`.stop` is called).
NOTE(harlowja): If this worker has no contained callables this raises NOTE(harlowja): If this worker has no contained callables this raises
a runtime error and does not run since it is impossible to periodically a runtime error and does not run since it is impossible to periodically
@ -154,29 +175,30 @@ class PeriodicWorker(object):
" without any callables") " without any callables")
while not self._tombstone.is_set(): while not self._tombstone.is_set():
if self._immediates: if self._immediates:
cb, cb_name = self._immediates.pop() # Run & schedule its next execution.
LOG.debug("Calling immediate callable '%s'", cb_name) index = self._immediates.pop()
self._safe_call(cb, cb_name, kind='immediate') cb = self._callables[index]
LOG.blather("Calling immediate '%r'", cb)
_safe_call(cb, 'immediate')
self._schedule.push_next(cb, index)
else: else:
# Figure out when we should run next (by selecting the # Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be # minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest # the callable that needs to run next and has the lowest
# next desired run time). # next desired run time).
now = _now() now = _now()
next_run, i = heapq.heappop(self._schedule) next_run, index = self._schedule.pop()
when_next = next_run - now when_next = next_run - now
if when_next <= 0: if when_next <= 0:
cb, cb_name = self._callables[i] # Run & schedule its next execution.
spacing = cb._periodic_spacing cb = self._callables[index]
LOG.debug("Calling periodic callable '%s' (it runs every" LOG.blather("Calling periodic '%r' (it runs every"
" %s seconds)", cb_name, spacing) " %s seconds)", cb, cb._periodic_spacing)
self._safe_call(cb, cb_name) _safe_call(cb, 'periodic')
# Run again someday... self._schedule.push_next(cb, index, now=now)
next_run = now + spacing
heapq.heappush(self._schedule, (next_run, i))
else: else:
# Gotta wait... # Gotta wait...
heapq.heappush(self._schedule, (next_run, i)) self._schedule.push(next_run, index)
self._tombstone.wait(when_next) self._tombstone.wait(when_next)
def stop(self): def stop(self):
@ -186,4 +208,4 @@ class PeriodicWorker(object):
def reset(self): def reset(self):
"""Resets the tombstone and re-queues up any immediate executions.""" """Resets the tombstone and re-queues up any immediate executions."""
self._tombstone.clear() self._tombstone.clear()
self._immediates = self._fetch_immediates(self._callables) self._immediates, self._schedule = _build(self._callables)

View File

@ -101,6 +101,12 @@ def countdown_iter(start_at, decr=1):
start_at -= decr start_at -= decr
def reverse_enumerate(items):
"""Like reversed(enumerate(items)) but with less copying/cloning..."""
for i in countdown_iter(len(items)):
yield i - 1, items[i - 1]
def merge_uri(uri, conf): def merge_uri(uri, conf):
"""Merges a parsed uri into the given configuration dictionary. """Merges a parsed uri into the given configuration dictionary.

View File

@ -6,6 +6,7 @@ hacking<0.11,>=0.10.0
oslotest>=1.2.0 # Apache-2.0 oslotest>=1.2.0 # Apache-2.0
mock>=1.0 mock>=1.0
testtools>=0.9.36,!=1.2.0 testtools>=0.9.36,!=1.2.0
testscenarios>=0.4
# Used for testing the WBE engine. # Used for testing the WBE engine.
kombu>=2.5.0 kombu>=2.5.0