diff --git a/taskflow/types/periodic.py b/taskflow/types/periodic.py index 2717be03..f00d97ef 100644 --- a/taskflow/types/periodic.py +++ b/taskflow/types/periodic.py @@ -108,19 +108,27 @@ class PeriodicWorker(object): self._callables = tuple((cb, reflection.get_callable_name(cb)) for cb in almost_callables) self._schedule = [] - self._immediates = [] now = _now() for i, (cb, cb_name) in enumerate(self._callables): spacing = cb._periodic_spacing next_run = now + spacing heapq.heappush(self._schedule, (next_run, i)) - for (cb, cb_name) in reversed(self._callables): - if cb._periodic_run_immediately: - self._immediates.append((cb, cb_name)) + self._immediates = self._fetch_immediates(self._callables) def __len__(self): return len(self._callables) + @staticmethod + def _fetch_immediates(callables): + immediates = [] + # Reverse order is used since these are later popped off (and to + # ensure the popping order is first -> last we need to append them + # in the opposite ordering last -> first). + for (cb, cb_name) in reversed(callables): + if cb._periodic_run_immediately: + immediates.append((cb, cb_name)) + return immediates + @staticmethod def _safe_call(cb, cb_name, kind='periodic'): try: @@ -173,7 +181,4 @@ class PeriodicWorker(object): def reset(self): """Resets the tombstone and re-queues up any immediate executions.""" self._tombstone.clear() - self._immediates = [] - for (cb, cb_name) in reversed(self._callables): - if cb._periodic_run_immediately: - self._immediates.append((cb, cb_name)) + self._immediates = self._fetch_immediates(self._callables)