From d5128cf51a554c2aa0f50e4b48b953933fc33f89 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 28 Jan 2015 15:55:39 -0800 Subject: [PATCH] Stopwatch usage cleanup/tweak Instead of optionally creating a stopwatch when a provided timeout is not none (to avoid the stopwatch leftover() method raising a error) just allow the stopwatch leftover() method to not raise when no duration is provided to avoid these repeated styles of usage/checks in the first place. By default the leftover() method still raises an error (a new keyword argument is now accepted to turn off this behavior). Change-Id: If934ee6e6855adbb6975cd6ea41e273d40e73dac --- taskflow/engines/action_engine/executor.py | 15 +++++++------- taskflow/engines/worker_based/types.py | 12 ++++------- taskflow/jobs/backends/impl_zookeeper.py | 12 ++++------- taskflow/tests/unit/test_types.py | 7 +++++++ taskflow/types/latch.py | 15 ++++++-------- taskflow/types/timing.py | 23 +++++++++++++++------- 6 files changed, 45 insertions(+), 39 deletions(-) diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 9b794d416..b271beb8d 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -175,10 +175,11 @@ class _WaitWorkItem(object): 'kind': _KIND_COMPLETE_ME, } if self._channel.put(message): - w = timing.StopWatch().start() + watch = timing.StopWatch() + watch.start() self._barrier.wait() LOG.blather("Waited %s seconds until task '%s' %s emitted" - " notifications were depleted", w.elapsed(), + " notifications were depleted", watch.elapsed(), self._task, sent_events) def __call__(self): @@ -303,11 +304,11 @@ class _Dispatcher(object): " %s to target '%s'", kind, sender, target) def run(self, queue): - w = timing.StopWatch(duration=self._dispatch_periodicity) + watch = timing.StopWatch(duration=self._dispatch_periodicity) while (not self._dead.is_set() or (self._stop_when_empty and self._targets)): - w.restart() - leftover = w.leftover() + watch.restart() + leftover = watch.leftover() while leftover: try: message = queue.get(timeout=leftover) @@ -315,8 +316,8 @@ class _Dispatcher(object): break else: self._dispatch(message) - leftover = w.leftover() - leftover = w.leftover() + leftover = watch.leftover() + leftover = watch.leftover() if leftover: self._dead.wait(leftover) diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index 3d8aa632f..d8a7e4130 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -157,17 +157,13 @@ class TopicWorkers(object): """ if workers <= 0: raise ValueError("Worker amount must be greater than zero") - w = None - if timeout is not None: - w = tt.StopWatch(timeout).start() + watch = tt.StopWatch(duration=timeout) + watch.start() with self._cond: while len(self._workers) < workers: - if w is not None and w.expired(): + if watch.expired(): return max(0, workers - len(self._workers)) - timeout = None - if w is not None: - timeout = w.leftover() - self._cond.wait(timeout) + self._cond.wait(watch.leftover(return_none=True)) return 0 def get_worker_for_task(self, task): diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 1fe745007..3e52f65be 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -661,13 +661,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def wait(self, timeout=None): # Wait until timeout expires (or forever) for jobs to appear. - watch = None - if timeout is not None: - watch = tt.StopWatch(duration=float(timeout)).start() + watch = tt.StopWatch(duration=timeout) + watch.start() with self._job_cond: while True: if not self._known_jobs: - if watch is not None and watch.expired(): + if watch.expired(): raise excp.NotFound("Expired waiting for jobs to" " arrive; waited %s seconds" % watch.elapsed()) @@ -676,10 +675,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): # when we acquire the condition that there will actually # be jobs (especially if we are spuriously awaken), so we # must recalculate the amount of time we really have left. - timeout = None - if watch is not None: - timeout = watch.leftover() - self._job_cond.wait(timeout) + self._job_cond.wait(watch.leftover(return_none=True)) else: it = ZookeeperJobBoardIterator(self) it._jobs.extend(self._fetch_jobs()) diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index ebb8b6728..1e6397671 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -125,6 +125,13 @@ class StopWatchTest(test.TestCase): tt.StopWatch.set_now_override(now=0) self.addCleanup(tt.StopWatch.clear_overrides) + def test_leftover_no_duration(self): + watch = tt.StopWatch() + watch.start() + self.assertRaises(RuntimeError, watch.leftover) + self.assertRaises(RuntimeError, watch.leftover, return_none=False) + self.assertIsNone(watch.leftover(return_none=True)) + def test_no_states(self): watch = tt.StopWatch() self.assertRaises(RuntimeError, watch.stop) diff --git a/taskflow/types/latch.py b/taskflow/types/latch.py index db6e56f34..3e2797872 100644 --- a/taskflow/types/latch.py +++ b/taskflow/types/latch.py @@ -54,15 +54,12 @@ class Latch(object): timeout expires then this will return True, otherwise it will return False. """ - w = None - if timeout is not None: - w = tt.StopWatch(timeout).start() + watch = tt.StopWatch(duration=timeout) + watch.start() with self._cond: while self._count > 0: - if w is not None: - if w.expired(): - return False - else: - timeout = w.leftover() - self._cond.wait(timeout) + if watch.expired(): + return False + else: + self._cond.wait(watch.leftover(return_none=True)) return True diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 8e60e6b80..da3938dca 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -245,23 +245,32 @@ class StopWatch(object): except RuntimeError: pass - def leftover(self): - """Returns how many seconds are left until the watch expires.""" - if self._duration is None: - raise RuntimeError("Can not get the leftover time of a watch that" - " has no duration") + def leftover(self, return_none=False): + """Returns how many seconds are left until the watch expires. + + :param return_none: when ``True`` instead of raising a ``RuntimeError`` + when no duration has been set this call will + return ``None`` instead. + :type return_none: boolean + """ if self._state != self._STARTED: raise RuntimeError("Can not get the leftover time of a stopwatch" " that has not been started") + if self._duration is None: + if not return_none: + raise RuntimeError("Can not get the leftover time of a watch" + " that has no duration") + else: + return None return max(0.0, self._duration - self.elapsed()) def expired(self): """Returns if the watch has expired (ie, duration provided elapsed).""" - if self._duration is None: - return False if self._state is None: raise RuntimeError("Can not check if a stopwatch has expired" " if it has not been started/stopped") + if self._duration is None: + return False if self.elapsed() > self._duration: return True return False