Stopwatch usage cleanup/tweak
Instead of optionally creating a stopwatch when a provided timeout is not none (to avoid the stopwatch leftover() method raising a error) just allow the stopwatch leftover() method to not raise when no duration is provided to avoid these repeated styles of usage/checks in the first place. By default the leftover() method still raises an error (a new keyword argument is now accepted to turn off this behavior). Change-Id: If934ee6e6855adbb6975cd6ea41e273d40e73dac
This commit is contained in:
@@ -175,10 +175,11 @@ class _WaitWorkItem(object):
|
|||||||
'kind': _KIND_COMPLETE_ME,
|
'kind': _KIND_COMPLETE_ME,
|
||||||
}
|
}
|
||||||
if self._channel.put(message):
|
if self._channel.put(message):
|
||||||
w = timing.StopWatch().start()
|
watch = timing.StopWatch()
|
||||||
|
watch.start()
|
||||||
self._barrier.wait()
|
self._barrier.wait()
|
||||||
LOG.blather("Waited %s seconds until task '%s' %s emitted"
|
LOG.blather("Waited %s seconds until task '%s' %s emitted"
|
||||||
" notifications were depleted", w.elapsed(),
|
" notifications were depleted", watch.elapsed(),
|
||||||
self._task, sent_events)
|
self._task, sent_events)
|
||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
@@ -303,11 +304,11 @@ class _Dispatcher(object):
|
|||||||
" %s to target '%s'", kind, sender, target)
|
" %s to target '%s'", kind, sender, target)
|
||||||
|
|
||||||
def run(self, queue):
|
def run(self, queue):
|
||||||
w = timing.StopWatch(duration=self._dispatch_periodicity)
|
watch = timing.StopWatch(duration=self._dispatch_periodicity)
|
||||||
while (not self._dead.is_set() or
|
while (not self._dead.is_set() or
|
||||||
(self._stop_when_empty and self._targets)):
|
(self._stop_when_empty and self._targets)):
|
||||||
w.restart()
|
watch.restart()
|
||||||
leftover = w.leftover()
|
leftover = watch.leftover()
|
||||||
while leftover:
|
while leftover:
|
||||||
try:
|
try:
|
||||||
message = queue.get(timeout=leftover)
|
message = queue.get(timeout=leftover)
|
||||||
@@ -315,8 +316,8 @@ class _Dispatcher(object):
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
self._dispatch(message)
|
self._dispatch(message)
|
||||||
leftover = w.leftover()
|
leftover = watch.leftover()
|
||||||
leftover = w.leftover()
|
leftover = watch.leftover()
|
||||||
if leftover:
|
if leftover:
|
||||||
self._dead.wait(leftover)
|
self._dead.wait(leftover)
|
||||||
|
|
||||||
|
|||||||
@@ -157,17 +157,13 @@ class TopicWorkers(object):
|
|||||||
"""
|
"""
|
||||||
if workers <= 0:
|
if workers <= 0:
|
||||||
raise ValueError("Worker amount must be greater than zero")
|
raise ValueError("Worker amount must be greater than zero")
|
||||||
w = None
|
watch = tt.StopWatch(duration=timeout)
|
||||||
if timeout is not None:
|
watch.start()
|
||||||
w = tt.StopWatch(timeout).start()
|
|
||||||
with self._cond:
|
with self._cond:
|
||||||
while len(self._workers) < workers:
|
while len(self._workers) < workers:
|
||||||
if w is not None and w.expired():
|
if watch.expired():
|
||||||
return max(0, workers - len(self._workers))
|
return max(0, workers - len(self._workers))
|
||||||
timeout = None
|
self._cond.wait(watch.leftover(return_none=True))
|
||||||
if w is not None:
|
|
||||||
timeout = w.leftover()
|
|
||||||
self._cond.wait(timeout)
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def get_worker_for_task(self, task):
|
def get_worker_for_task(self, task):
|
||||||
|
|||||||
@@ -661,13 +661,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
|
|
||||||
def wait(self, timeout=None):
|
def wait(self, timeout=None):
|
||||||
# Wait until timeout expires (or forever) for jobs to appear.
|
# Wait until timeout expires (or forever) for jobs to appear.
|
||||||
watch = None
|
watch = tt.StopWatch(duration=timeout)
|
||||||
if timeout is not None:
|
watch.start()
|
||||||
watch = tt.StopWatch(duration=float(timeout)).start()
|
|
||||||
with self._job_cond:
|
with self._job_cond:
|
||||||
while True:
|
while True:
|
||||||
if not self._known_jobs:
|
if not self._known_jobs:
|
||||||
if watch is not None and watch.expired():
|
if watch.expired():
|
||||||
raise excp.NotFound("Expired waiting for jobs to"
|
raise excp.NotFound("Expired waiting for jobs to"
|
||||||
" arrive; waited %s seconds"
|
" arrive; waited %s seconds"
|
||||||
% watch.elapsed())
|
% watch.elapsed())
|
||||||
@@ -676,10 +675,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
|
|||||||
# when we acquire the condition that there will actually
|
# when we acquire the condition that there will actually
|
||||||
# be jobs (especially if we are spuriously awaken), so we
|
# be jobs (especially if we are spuriously awaken), so we
|
||||||
# must recalculate the amount of time we really have left.
|
# must recalculate the amount of time we really have left.
|
||||||
timeout = None
|
self._job_cond.wait(watch.leftover(return_none=True))
|
||||||
if watch is not None:
|
|
||||||
timeout = watch.leftover()
|
|
||||||
self._job_cond.wait(timeout)
|
|
||||||
else:
|
else:
|
||||||
it = ZookeeperJobBoardIterator(self)
|
it = ZookeeperJobBoardIterator(self)
|
||||||
it._jobs.extend(self._fetch_jobs())
|
it._jobs.extend(self._fetch_jobs())
|
||||||
|
|||||||
@@ -125,6 +125,13 @@ class StopWatchTest(test.TestCase):
|
|||||||
tt.StopWatch.set_now_override(now=0)
|
tt.StopWatch.set_now_override(now=0)
|
||||||
self.addCleanup(tt.StopWatch.clear_overrides)
|
self.addCleanup(tt.StopWatch.clear_overrides)
|
||||||
|
|
||||||
|
def test_leftover_no_duration(self):
|
||||||
|
watch = tt.StopWatch()
|
||||||
|
watch.start()
|
||||||
|
self.assertRaises(RuntimeError, watch.leftover)
|
||||||
|
self.assertRaises(RuntimeError, watch.leftover, return_none=False)
|
||||||
|
self.assertIsNone(watch.leftover(return_none=True))
|
||||||
|
|
||||||
def test_no_states(self):
|
def test_no_states(self):
|
||||||
watch = tt.StopWatch()
|
watch = tt.StopWatch()
|
||||||
self.assertRaises(RuntimeError, watch.stop)
|
self.assertRaises(RuntimeError, watch.stop)
|
||||||
|
|||||||
@@ -54,15 +54,12 @@ class Latch(object):
|
|||||||
timeout expires then this will return True, otherwise it will
|
timeout expires then this will return True, otherwise it will
|
||||||
return False.
|
return False.
|
||||||
"""
|
"""
|
||||||
w = None
|
watch = tt.StopWatch(duration=timeout)
|
||||||
if timeout is not None:
|
watch.start()
|
||||||
w = tt.StopWatch(timeout).start()
|
|
||||||
with self._cond:
|
with self._cond:
|
||||||
while self._count > 0:
|
while self._count > 0:
|
||||||
if w is not None:
|
if watch.expired():
|
||||||
if w.expired():
|
return False
|
||||||
return False
|
else:
|
||||||
else:
|
self._cond.wait(watch.leftover(return_none=True))
|
||||||
timeout = w.leftover()
|
|
||||||
self._cond.wait(timeout)
|
|
||||||
return True
|
return True
|
||||||
|
|||||||
@@ -245,23 +245,32 @@ class StopWatch(object):
|
|||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def leftover(self):
|
def leftover(self, return_none=False):
|
||||||
"""Returns how many seconds are left until the watch expires."""
|
"""Returns how many seconds are left until the watch expires.
|
||||||
if self._duration is None:
|
|
||||||
raise RuntimeError("Can not get the leftover time of a watch that"
|
:param return_none: when ``True`` instead of raising a ``RuntimeError``
|
||||||
" has no duration")
|
when no duration has been set this call will
|
||||||
|
return ``None`` instead.
|
||||||
|
:type return_none: boolean
|
||||||
|
"""
|
||||||
if self._state != self._STARTED:
|
if self._state != self._STARTED:
|
||||||
raise RuntimeError("Can not get the leftover time of a stopwatch"
|
raise RuntimeError("Can not get the leftover time of a stopwatch"
|
||||||
" that has not been started")
|
" that has not been started")
|
||||||
|
if self._duration is None:
|
||||||
|
if not return_none:
|
||||||
|
raise RuntimeError("Can not get the leftover time of a watch"
|
||||||
|
" that has no duration")
|
||||||
|
else:
|
||||||
|
return None
|
||||||
return max(0.0, self._duration - self.elapsed())
|
return max(0.0, self._duration - self.elapsed())
|
||||||
|
|
||||||
def expired(self):
|
def expired(self):
|
||||||
"""Returns if the watch has expired (ie, duration provided elapsed)."""
|
"""Returns if the watch has expired (ie, duration provided elapsed)."""
|
||||||
if self._duration is None:
|
|
||||||
return False
|
|
||||||
if self._state is None:
|
if self._state is None:
|
||||||
raise RuntimeError("Can not check if a stopwatch has expired"
|
raise RuntimeError("Can not check if a stopwatch has expired"
|
||||||
" if it has not been started/stopped")
|
" if it has not been started/stopped")
|
||||||
|
if self._duration is None:
|
||||||
|
return False
|
||||||
if self.elapsed() > self._duration:
|
if self.elapsed() > self._duration:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|||||||
Reference in New Issue
Block a user