Merge "Refactor parts of the periodic worker"

This commit is contained in:
Jenkins
2015-02-24 06:58:24 +00:00
committed by Gerrit Code Review
5 changed files with 95 additions and 50 deletions

View File

@@ -533,23 +533,21 @@ class PeriodicTest(test.TestCase):
def test_periodic_single(self):
barrier = latch.Latch(5)
capture = []
tombstone = tu.Event()
@periodic.periodic(0.01)
def callee():
barrier.countdown()
if barrier.needed == 0:
tombstone.set()
w.stop()
capture.append(1)
w = periodic.PeriodicWorker([callee], tombstone=tombstone)
w = periodic.PeriodicWorker([callee])
t = tu.daemon_thread(target=w.start)
t.start()
t.join()
self.assertEqual(0, barrier.needed)
self.assertEqual(5, sum(capture))
self.assertTrue(tombstone.is_set())
def test_immediate(self):
capture = []

View File

@@ -17,9 +17,11 @@
import collections
import inspect
import random
import string
import time
import six
import testscenarios
from taskflow import test
from taskflow.utils import misc
@@ -192,6 +194,22 @@ class TestSequenceMinus(test.TestCase):
self.assertEqual(result, [2, 1])
class TestReversedEnumerate(testscenarios.TestWithScenarios, test.TestCase):
scenarios = [
('ten', {'sample': [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}),
('empty', {'sample': []}),
('negative', {'sample': [-1, -2, -3]}),
('one', {'sample': [1]}),
('abc', {'sample': ['a', 'b', 'c']}),
('ascii_letters', {'sample': list(string.ascii_letters)}),
]
def test_sample_equivalence(self):
expected = list(reversed(list(enumerate(self.sample))))
actual = list(misc.reverse_enumerate(self.sample))
self.assertEqual(expected, actual)
class TestCountdownIter(test.TestCase):
def test_expected_count(self):
upper = 100

View File

@@ -21,6 +21,7 @@ from oslo_utils import reflection
import six
from taskflow import logging
from taskflow.utils import deprecation
from taskflow.utils import misc
from taskflow.utils import threading_utils as tu
@@ -64,6 +65,51 @@ def periodic(spacing, run_immediately=True):
return wrapper
class _Schedule(object):
"""Internal heap-based structure that maintains the schedule/ordering."""
def __init__(self):
self._ordering = []
def push(self, next_run, index):
heapq.heappush(self._ordering, (next_run, index))
def push_next(self, cb, index, now=None):
if now is None:
now = _now()
self.push(now + cb._periodic_spacing, index)
def __len__(self):
return len(self._ordering)
def pop(self):
return heapq.heappop(self._ordering)
def _build(callables):
schedule = _Schedule()
now = None
immediates = []
# Reverse order is used since these are later popped off (and to
# ensure the popping order is first -> last we need to append them
# in the opposite ordering last -> first).
for i, cb in misc.reverse_enumerate(callables):
if cb._periodic_run_immediately:
immediates.append(i)
else:
if now is None:
now = _now()
schedule.push_next(cb, i, now=now)
return immediates, schedule
def _safe_call(cb, kind):
try:
cb()
except Exception:
LOG.warn("Failed to call %s '%r'", kind, cb, exc_info=True)
class PeriodicWorker(object):
"""Calls a collection of callables periodically (sleeping as needed...).
@@ -96,54 +142,29 @@ class PeriodicWorker(object):
callables.append(member)
return cls(callables)
@deprecation.removed_kwarg('tombstone', version="0.8", removal_version="?")
def __init__(self, callables, tombstone=None):
if tombstone is None:
self._tombstone = tu.Event()
else:
# Allows someone to share an event (if they so want to...)
self._tombstone = tombstone
almost_callables = list(callables)
for cb in almost_callables:
self._callables = []
for cb in callables:
if not six.callable(cb):
raise ValueError("Periodic callback must be callable")
for attr_name in _PERIODIC_ATTRS:
if not hasattr(cb, attr_name):
raise ValueError("Periodic callback missing required"
" attribute '%s'" % attr_name)
self._callables = tuple((cb, reflection.get_callable_name(cb))
for cb in almost_callables)
self._schedule = []
now = _now()
for i, (cb, cb_name) in enumerate(self._callables):
spacing = cb._periodic_spacing
next_run = now + spacing
heapq.heappush(self._schedule, (next_run, i))
self._immediates = self._fetch_immediates(self._callables)
if cb._periodic:
self._callables.append(cb)
self._immediates, self._schedule = _build(self._callables)
def __len__(self):
return len(self._callables)
@staticmethod
def _fetch_immediates(callables):
immediates = []
# Reverse order is used since these are later popped off (and to
# ensure the popping order is first -> last we need to append them
# in the opposite ordering last -> first).
for (cb, cb_name) in reversed(callables):
if cb._periodic_run_immediately:
immediates.append((cb, cb_name))
return immediates
@staticmethod
def _safe_call(cb, cb_name, kind='periodic'):
try:
cb()
except Exception:
LOG.warn("Failed to call %s callable '%s'",
kind, cb_name, exc_info=True)
def start(self):
"""Starts running (will not stop/return until the tombstone is set).
"""Starts running (will not return until :py:meth:`.stop` is called).
NOTE(harlowja): If this worker has no contained callables this raises
a runtime error and does not run since it is impossible to periodically
@@ -154,29 +175,30 @@ class PeriodicWorker(object):
" without any callables")
while not self._tombstone.is_set():
if self._immediates:
cb, cb_name = self._immediates.pop()
LOG.debug("Calling immediate callable '%s'", cb_name)
self._safe_call(cb, cb_name, kind='immediate')
# Run & schedule its next execution.
index = self._immediates.pop()
cb = self._callables[index]
LOG.blather("Calling immediate '%r'", cb)
_safe_call(cb, 'immediate')
self._schedule.push_next(cb, index)
else:
# Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest
# next desired run time).
now = _now()
next_run, i = heapq.heappop(self._schedule)
next_run, index = self._schedule.pop()
when_next = next_run - now
if when_next <= 0:
cb, cb_name = self._callables[i]
spacing = cb._periodic_spacing
LOG.debug("Calling periodic callable '%s' (it runs every"
" %s seconds)", cb_name, spacing)
self._safe_call(cb, cb_name)
# Run again someday...
next_run = now + spacing
heapq.heappush(self._schedule, (next_run, i))
# Run & schedule its next execution.
cb = self._callables[index]
LOG.blather("Calling periodic '%r' (it runs every"
" %s seconds)", cb, cb._periodic_spacing)
_safe_call(cb, 'periodic')
self._schedule.push_next(cb, index, now=now)
else:
# Gotta wait...
heapq.heappush(self._schedule, (next_run, i))
self._schedule.push(next_run, index)
self._tombstone.wait(when_next)
def stop(self):
@@ -186,4 +208,4 @@ class PeriodicWorker(object):
def reset(self):
"""Resets the tombstone and re-queues up any immediate executions."""
self._tombstone.clear()
self._immediates = self._fetch_immediates(self._callables)
self._immediates, self._schedule = _build(self._callables)

View File

@@ -101,6 +101,12 @@ def countdown_iter(start_at, decr=1):
start_at -= decr
def reverse_enumerate(items):
"""Like reversed(enumerate(items)) but with less copying/cloning..."""
for i in countdown_iter(len(items)):
yield i - 1, items[i - 1]
def merge_uri(uri, conf):
"""Merges a parsed uri into the given configuration dictionary.

View File

@@ -6,6 +6,7 @@ hacking<0.11,>=0.10.0
oslotest>=1.2.0 # Apache-2.0
mock>=1.0
testtools>=0.9.36,!=1.2.0
testscenarios>=0.4
# Used for testing the WBE engine.
kombu>=2.5.0