diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index e1e07894..809c1926 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -28,7 +28,6 @@ from taskflow.types import latch from taskflow.types import periodic from taskflow.types import sets from taskflow.types import table -from taskflow.types import timing as tt from taskflow.types import tree from taskflow.utils import threading_utils as tu @@ -276,128 +275,6 @@ class TreeTest(test.TestCase): 'horse', 'human', 'monkey'], things) -class StopWatchTest(test.TestCase): - def setUp(self): - super(StopWatchTest, self).setUp() - tt.StopWatch.set_now_override(now=0) - self.addCleanup(tt.StopWatch.clear_overrides) - - def test_leftover_no_duration(self): - watch = tt.StopWatch() - watch.start() - self.assertRaises(RuntimeError, watch.leftover) - self.assertRaises(RuntimeError, watch.leftover, return_none=False) - self.assertIsNone(watch.leftover(return_none=True)) - - def test_no_states(self): - watch = tt.StopWatch() - self.assertRaises(RuntimeError, watch.stop) - self.assertRaises(RuntimeError, watch.resume) - - def test_bad_expiry(self): - self.assertRaises(ValueError, tt.StopWatch, -1) - - def test_backwards(self): - watch = tt.StopWatch(0.1) - watch.start() - tt.StopWatch.advance_time_seconds(0.5) - self.assertTrue(watch.expired()) - - tt.StopWatch.advance_time_seconds(-1.0) - self.assertFalse(watch.expired()) - self.assertEqual(0.0, watch.elapsed()) - - def test_expiry(self): - watch = tt.StopWatch(0.1) - watch.start() - tt.StopWatch.advance_time_seconds(0.2) - self.assertTrue(watch.expired()) - - def test_not_expired(self): - watch = tt.StopWatch(0.1) - watch.start() - tt.StopWatch.advance_time_seconds(0.05) - self.assertFalse(watch.expired()) - - def test_no_expiry(self): - watch = tt.StopWatch(0.1) - self.assertRaises(RuntimeError, watch.expired) - - def test_elapsed(self): - watch = tt.StopWatch() - watch.start() - tt.StopWatch.advance_time_seconds(0.2) - # NOTE(harlowja): Allow for a slight variation by using 0.19. - self.assertGreaterEqual(0.19, watch.elapsed()) - - def test_no_elapsed(self): - watch = tt.StopWatch() - self.assertRaises(RuntimeError, watch.elapsed) - - def test_no_leftover(self): - watch = tt.StopWatch() - self.assertRaises(RuntimeError, watch.leftover) - watch = tt.StopWatch(1) - self.assertRaises(RuntimeError, watch.leftover) - - def test_pause_resume(self): - watch = tt.StopWatch() - watch.start() - tt.StopWatch.advance_time_seconds(0.05) - watch.stop() - elapsed = watch.elapsed() - self.assertAlmostEqual(elapsed, watch.elapsed()) - watch.resume() - tt.StopWatch.advance_time_seconds(0.05) - self.assertNotEqual(elapsed, watch.elapsed()) - - def test_context_manager(self): - with tt.StopWatch() as watch: - tt.StopWatch.advance_time_seconds(0.05) - self.assertGreater(0.01, watch.elapsed()) - - def test_splits(self): - watch = tt.StopWatch() - watch.start() - self.assertEqual(0, len(watch.splits)) - - watch.split() - self.assertEqual(1, len(watch.splits)) - self.assertEqual(watch.splits[0].elapsed, - watch.splits[0].length) - - tt.StopWatch.advance_time_seconds(0.05) - watch.split() - splits = watch.splits - self.assertEqual(2, len(splits)) - self.assertNotEqual(splits[0].elapsed, splits[1].elapsed) - self.assertEqual(splits[1].length, - splits[1].elapsed - splits[0].elapsed) - - watch.stop() - self.assertEqual(2, len(watch.splits)) - - watch.start() - self.assertEqual(0, len(watch.splits)) - - def test_elapsed_maximum(self): - watch = tt.StopWatch() - watch.start() - - tt.StopWatch.advance_time_seconds(1) - self.assertEqual(1, watch.elapsed()) - - tt.StopWatch.advance_time_seconds(10) - self.assertEqual(11, watch.elapsed()) - self.assertEqual(1, watch.elapsed(maximum=1)) - - watch.stop() - self.assertEqual(11, watch.elapsed()) - tt.StopWatch.advance_time_seconds(10) - self.assertEqual(11, watch.elapsed()) - self.assertEqual(0, watch.elapsed(maximum=-1)) - - class TableTest(test.TestCase): def test_create_valid_no_rows(self): tbl = table.PleasantTable(['Name', 'City', 'State', 'Country']) diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index 5436df3c..73fe9c8c 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -21,9 +21,9 @@ from taskflow.engines.action_engine import executor from taskflow.engines.worker_based import protocol as pr from taskflow import exceptions as excp from taskflow import test +from taskflow.test import mock from taskflow.tests import utils from taskflow.types import failure -from taskflow.types import timing class TestProtocolValidation(test.TestCase): @@ -94,8 +94,6 @@ class TestProtocol(test.TestCase): def setUp(self): super(TestProtocol, self).setUp() - timing.StopWatch.set_now_override() - self.addCleanup(timing.StopWatch.clear_overrides) self.task = utils.DummyTask() self.task_uuid = 'task-uuid' self.task_action = 'execute' @@ -164,21 +162,27 @@ class TestProtocol(test.TestCase): failures={self.task.name: a_failure.to_dict()}) self.assertEqual(request.to_dict(), expected) - def test_pending_not_expired(self): + @mock.patch('oslo_utils.timeutils.now') + def test_pending_not_expired(self, now): + now.return_value = 0 req = self.request() - timing.StopWatch.set_offset_override(self.timeout - 1) + now.return_value = self.timeout - 1 self.assertFalse(req.expired) - def test_pending_expired(self): + @mock.patch('oslo_utils.timeutils.now') + def test_pending_expired(self, now): + now.return_value = 0 req = self.request() - timing.StopWatch.set_offset_override(self.timeout + 1) + now.return_value = self.timeout + 1 self.assertTrue(req.expired) - def test_running_not_expired(self): + @mock.patch('oslo_utils.timeutils.now') + def test_running_not_expired(self, now): + now.return_value = 0 request = self.request() request.transition(pr.PENDING) request.transition(pr.RUNNING) - timing.StopWatch.set_offset_override(self.timeout + 1) + now.return_value = self.timeout + 1 self.assertFalse(request.expired) def test_set_result(self): diff --git a/taskflow/tests/unit/worker_based/test_types.py b/taskflow/tests/unit/worker_based/test_types.py index 287283cf..095bf1e3 100644 --- a/taskflow/tests/unit/worker_based/test_types.py +++ b/taskflow/tests/unit/worker_based/test_types.py @@ -14,21 +14,19 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo.utils import reflection +from oslo_utils import reflection from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import types as worker_types from taskflow import test from taskflow.test import mock from taskflow.tests import utils -from taskflow.types import timing class TestRequestCache(test.TestCase): def setUp(self): super(TestRequestCache, self).setUp() - self.addCleanup(timing.StopWatch.clear_overrides) self.task = utils.DummyTask() self.task_uuid = 'task-uuid' self.task_action = 'execute' @@ -45,7 +43,8 @@ class TestRequestCache(test.TestCase): request_kwargs.update(kwargs) return pr.Request(**request_kwargs) - def test_requests_cache_expiry(self): + @mock.patch('oslo_utils.timeutils.now') + def test_requests_cache_expiry(self, now): # Mock out the calls the underlying objects will soon use to return # times that we can control more easily... overrides = [ @@ -53,7 +52,7 @@ class TestRequestCache(test.TestCase): 1, self.timeout + 1, ] - timing.StopWatch.set_now_override(overrides) + now.side_effect = overrides cache = worker_types.RequestsCache() cache[self.task_uuid] = self.request() diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 0dad971f..791bce3e 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -14,14 +14,15 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_utils import reflection +from oslo_utils import timeutils -from taskflow.utils import misc from taskflow.utils import threading_utils -# Find a monotonic providing time (or fallback to using time.time() -# which isn't *always* accurate but will suffice). -_now = misc.find_monotonic(allow_time_time=True) +#: Moved to oslo.utils (just reference them from there until a later time). +Split = timeutils.Split + +#: Moved to oslo.utils (just reference them from there until a later time). +StopWatch = timeutils.StopWatch class Timeout(object): @@ -47,256 +48,3 @@ class Timeout(object): def reset(self): self._event.clear() - - -class Split(object): - """A *immutable* stopwatch split. - - See: http://en.wikipedia.org/wiki/Stopwatch for what this is/represents. - """ - - __slots__ = ['_elapsed', '_length'] - - def __init__(self, elapsed, length): - self._elapsed = elapsed - self._length = length - - @property - def elapsed(self): - """Duration from stopwatch start.""" - return self._elapsed - - @property - def length(self): - """Seconds from last split (or the elapsed time if no prior split).""" - return self._length - - def __repr__(self): - r = reflection.get_class_name(self, fully_qualified=False) - r += "(elapsed=%s, length=%s)" % (self._elapsed, self._length) - return r - - -class StopWatch(object): - """A simple timer/stopwatch helper class. - - Inspired by: apache-commons-lang java stopwatch. - - **Not** thread-safe (when a single watch is mutated by multiple threads at - the same time). It is thread-safe when used by a single thread (not - shared) or when operations are performed in a thread-safe manner on these - objects by wrapping those operations with locks. - """ - _STARTED = 'STARTED' - _STOPPED = 'STOPPED' - - """ - Class variables that should only be used for testing purposes only... - """ - _now_offset = None - _now_override = None - - def __init__(self, duration=None): - if duration is not None: - if duration < 0: - raise ValueError("Duration must be >= 0 and not %s" % duration) - self._duration = duration - else: - self._duration = None - self._started_at = None - self._stopped_at = None - self._state = None - self._splits = [] - - def start(self): - """Starts the watch (if not already started). - - NOTE(harlowja): resets any splits previously captured (if any). - """ - if self._state == self._STARTED: - return self - self._started_at = self._now() - self._stopped_at = None - self._state = self._STARTED - self._splits = [] - return self - - @property - def splits(self): - """Accessor to all/any splits that have been captured.""" - return tuple(self._splits) - - def split(self): - """Captures a split/elapsed since start time (and doesn't stop).""" - if self._state == self._STARTED: - elapsed = self.elapsed() - if self._splits: - length = self._delta_seconds(self._splits[-1].elapsed, elapsed) - else: - length = elapsed - self._splits.append(Split(elapsed, length)) - return self._splits[-1] - else: - raise RuntimeError("Can not create a split time of a stopwatch" - " if it has not been started") - - def restart(self): - """Restarts the watch from a started/stopped state.""" - if self._state == self._STARTED: - self.stop() - self.start() - return self - - @classmethod - def clear_overrides(cls): - """Clears all overrides/offsets. - - **Only to be used for testing (affects all watch instances).** - """ - cls._now_override = None - cls._now_offset = None - - @classmethod - def set_offset_override(cls, offset): - """Sets a offset that is applied to each time fetch. - - **Only to be used for testing (affects all watch instances).** - """ - cls._now_offset = offset - - @classmethod - def advance_time_seconds(cls, offset): - """Advances/sets a offset that is applied to each time fetch. - - NOTE(harlowja): if a previous offset exists (not ``None``) then this - offset will be added onto the existing one (if you want to reset - the offset completely use the :meth:`.set_offset_override` - method instead). - - **Only to be used for testing (affects all watch instances).** - """ - if cls._now_offset is None: - cls.set_offset_override(offset) - else: - cls.set_offset_override(cls._now_offset + offset) - - @classmethod - def set_now_override(cls, now=None): - """Sets time override to use (if none, then current time is fetched). - - NOTE(harlowja): if a list/tuple is provided then the first element of - the list will be used (and removed) each time a time fetch occurs (once - it becomes empty the override/s will no longer be applied). If a - numeric value is provided then it will be used (and never removed - until the override(s) are cleared via the :meth:`.clear_overrides` - method). - - **Only to be used for testing (affects all watch instances).** - """ - if isinstance(now, (list, tuple)): - cls._now_override = list(now) - else: - if now is None: - now = _now() - cls._now_override = now - - @staticmethod - def _delta_seconds(earlier, later): - return max(0.0, later - earlier) - - @classmethod - def _now(cls): - if cls._now_override is not None: - if isinstance(cls._now_override, list): - try: - now = cls._now_override.pop(0) - except IndexError: - now = _now() - else: - now = cls._now_override - else: - now = _now() - if cls._now_offset is not None: - now = now + cls._now_offset - return now - - def elapsed(self, maximum=None): - """Returns how many seconds have elapsed.""" - if self._state not in (self._STOPPED, self._STARTED): - raise RuntimeError("Can not get the elapsed time of a stopwatch" - " if it has not been started/stopped") - if self._state == self._STOPPED: - elapsed = self._delta_seconds(self._started_at, self._stopped_at) - else: - elapsed = self._delta_seconds(self._started_at, self._now()) - if maximum is not None and elapsed > maximum: - elapsed = max(0.0, maximum) - return elapsed - - def __enter__(self): - """Starts the watch.""" - self.start() - return self - - def __exit__(self, type, value, traceback): - """Stops the watch (ignoring errors if stop fails).""" - try: - self.stop() - except RuntimeError: - pass - - def leftover(self, return_none=False): - """Returns how many seconds are left until the watch expires. - - :param return_none: when ``True`` instead of raising a ``RuntimeError`` - when no duration has been set this call will - return ``None`` instead - :type return_none: boolean - :returns: how many seconds left until the watch expires - :rtype: number - """ - if self._state != self._STARTED: - raise RuntimeError("Can not get the leftover time of a stopwatch" - " that has not been started") - if self._duration is None: - if not return_none: - raise RuntimeError("Can not get the leftover time of a watch" - " that has no duration") - else: - return None - return max(0.0, self._duration - self.elapsed()) - - def expired(self): - """Returns if the watch has expired (ie, duration provided elapsed). - - :returns: if the watch has expired - :rtype: boolean - """ - if self._state is None: - raise RuntimeError("Can not check if a stopwatch has expired" - " if it has not been started/stopped") - if self._duration is None: - return False - if self.elapsed() > self._duration: - return True - return False - - def resume(self): - """Resumes the watch from a stopped state.""" - if self._state == self._STOPPED: - self._state = self._STARTED - return self - else: - raise RuntimeError("Can not resume a stopwatch that has not been" - " stopped") - - def stop(self): - """Stops the watch.""" - if self._state == self._STOPPED: - return self - if self._state != self._STARTED: - raise RuntimeError("Can not stop a stopwatch that has not been" - " started") - self._stopped_at = self._now() - self._state = self._STOPPED - return self