diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index 2151b9e1a..5f750d202 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -533,23 +533,21 @@ class PeriodicTest(test.TestCase): def test_periodic_single(self): barrier = latch.Latch(5) capture = [] - tombstone = tu.Event() @periodic.periodic(0.01) def callee(): barrier.countdown() if barrier.needed == 0: - tombstone.set() + w.stop() capture.append(1) - w = periodic.PeriodicWorker([callee], tombstone=tombstone) + w = periodic.PeriodicWorker([callee]) t = tu.daemon_thread(target=w.start) t.start() t.join() self.assertEqual(0, barrier.needed) self.assertEqual(5, sum(capture)) - self.assertTrue(tombstone.is_set()) def test_immediate(self): capture = [] diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 514675297..1477fe5ad 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -17,9 +17,11 @@ import collections import inspect import random +import string import time import six +import testscenarios from taskflow import test from taskflow.utils import misc @@ -192,6 +194,22 @@ class TestSequenceMinus(test.TestCase): self.assertEqual(result, [2, 1]) +class TestReversedEnumerate(testscenarios.TestWithScenarios, test.TestCase): + scenarios = [ + ('ten', {'sample': [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]}), + ('empty', {'sample': []}), + ('negative', {'sample': [-1, -2, -3]}), + ('one', {'sample': [1]}), + ('abc', {'sample': ['a', 'b', 'c']}), + ('ascii_letters', {'sample': list(string.ascii_letters)}), + ] + + def test_sample_equivalence(self): + expected = list(reversed(list(enumerate(self.sample)))) + actual = list(misc.reverse_enumerate(self.sample)) + self.assertEqual(expected, actual) + + class TestCountdownIter(test.TestCase): def test_expected_count(self): upper = 100 diff --git a/taskflow/types/periodic.py b/taskflow/types/periodic.py index 9237d9c5e..1cce5dd0f 100644 --- a/taskflow/types/periodic.py +++ b/taskflow/types/periodic.py @@ -21,6 +21,7 @@ from oslo_utils import reflection import six from taskflow import logging +from taskflow.utils import deprecation from taskflow.utils import misc from taskflow.utils import threading_utils as tu @@ -64,6 +65,51 @@ def periodic(spacing, run_immediately=True): return wrapper +class _Schedule(object): + """Internal heap-based structure that maintains the schedule/ordering.""" + + def __init__(self): + self._ordering = [] + + def push(self, next_run, index): + heapq.heappush(self._ordering, (next_run, index)) + + def push_next(self, cb, index, now=None): + if now is None: + now = _now() + self.push(now + cb._periodic_spacing, index) + + def __len__(self): + return len(self._ordering) + + def pop(self): + return heapq.heappop(self._ordering) + + +def _build(callables): + schedule = _Schedule() + now = None + immediates = [] + # Reverse order is used since these are later popped off (and to + # ensure the popping order is first -> last we need to append them + # in the opposite ordering last -> first). + for i, cb in misc.reverse_enumerate(callables): + if cb._periodic_run_immediately: + immediates.append(i) + else: + if now is None: + now = _now() + schedule.push_next(cb, i, now=now) + return immediates, schedule + + +def _safe_call(cb, kind): + try: + cb() + except Exception: + LOG.warn("Failed to call %s '%r'", kind, cb, exc_info=True) + + class PeriodicWorker(object): """Calls a collection of callables periodically (sleeping as needed...). @@ -96,54 +142,29 @@ class PeriodicWorker(object): callables.append(member) return cls(callables) + @deprecation.removed_kwarg('tombstone', version="0.8", removal_version="?") def __init__(self, callables, tombstone=None): if tombstone is None: self._tombstone = tu.Event() else: - # Allows someone to share an event (if they so want to...) self._tombstone = tombstone - almost_callables = list(callables) - for cb in almost_callables: + self._callables = [] + for cb in callables: if not six.callable(cb): raise ValueError("Periodic callback must be callable") for attr_name in _PERIODIC_ATTRS: if not hasattr(cb, attr_name): raise ValueError("Periodic callback missing required" " attribute '%s'" % attr_name) - self._callables = tuple((cb, reflection.get_callable_name(cb)) - for cb in almost_callables) - self._schedule = [] - now = _now() - for i, (cb, cb_name) in enumerate(self._callables): - spacing = cb._periodic_spacing - next_run = now + spacing - heapq.heappush(self._schedule, (next_run, i)) - self._immediates = self._fetch_immediates(self._callables) + if cb._periodic: + self._callables.append(cb) + self._immediates, self._schedule = _build(self._callables) def __len__(self): return len(self._callables) - @staticmethod - def _fetch_immediates(callables): - immediates = [] - # Reverse order is used since these are later popped off (and to - # ensure the popping order is first -> last we need to append them - # in the opposite ordering last -> first). - for (cb, cb_name) in reversed(callables): - if cb._periodic_run_immediately: - immediates.append((cb, cb_name)) - return immediates - - @staticmethod - def _safe_call(cb, cb_name, kind='periodic'): - try: - cb() - except Exception: - LOG.warn("Failed to call %s callable '%s'", - kind, cb_name, exc_info=True) - def start(self): - """Starts running (will not stop/return until the tombstone is set). + """Starts running (will not return until :py:meth:`.stop` is called). NOTE(harlowja): If this worker has no contained callables this raises a runtime error and does not run since it is impossible to periodically @@ -154,29 +175,30 @@ class PeriodicWorker(object): " without any callables") while not self._tombstone.is_set(): if self._immediates: - cb, cb_name = self._immediates.pop() - LOG.debug("Calling immediate callable '%s'", cb_name) - self._safe_call(cb, cb_name, kind='immediate') + # Run & schedule its next execution. + index = self._immediates.pop() + cb = self._callables[index] + LOG.blather("Calling immediate '%r'", cb) + _safe_call(cb, 'immediate') + self._schedule.push_next(cb, index) else: # Figure out when we should run next (by selecting the # minimum item from the heap, where the minimum should be # the callable that needs to run next and has the lowest # next desired run time). now = _now() - next_run, i = heapq.heappop(self._schedule) + next_run, index = self._schedule.pop() when_next = next_run - now if when_next <= 0: - cb, cb_name = self._callables[i] - spacing = cb._periodic_spacing - LOG.debug("Calling periodic callable '%s' (it runs every" - " %s seconds)", cb_name, spacing) - self._safe_call(cb, cb_name) - # Run again someday... - next_run = now + spacing - heapq.heappush(self._schedule, (next_run, i)) + # Run & schedule its next execution. + cb = self._callables[index] + LOG.blather("Calling periodic '%r' (it runs every" + " %s seconds)", cb, cb._periodic_spacing) + _safe_call(cb, 'periodic') + self._schedule.push_next(cb, index, now=now) else: # Gotta wait... - heapq.heappush(self._schedule, (next_run, i)) + self._schedule.push(next_run, index) self._tombstone.wait(when_next) def stop(self): @@ -186,4 +208,4 @@ class PeriodicWorker(object): def reset(self): """Resets the tombstone and re-queues up any immediate executions.""" self._tombstone.clear() - self._immediates = self._fetch_immediates(self._callables) + self._immediates, self._schedule = _build(self._callables) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index daac38559..383b3e287 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -101,6 +101,12 @@ def countdown_iter(start_at, decr=1): start_at -= decr +def reverse_enumerate(items): + """Like reversed(enumerate(items)) but with less copying/cloning...""" + for i in countdown_iter(len(items)): + yield i - 1, items[i - 1] + + def merge_uri(uri, conf): """Merges a parsed uri into the given configuration dictionary. diff --git a/test-requirements.txt b/test-requirements.txt index dd4f36d0f..8c9a83a64 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,6 +6,7 @@ hacking<0.11,>=0.10.0 oslotest>=1.2.0 # Apache-2.0 mock>=1.0 testtools>=0.9.36,!=1.2.0 +testscenarios>=0.4 # Used for testing the WBE engine. kombu>=2.5.0