Merge "Use monotonic time when/if available"
This commit is contained in:
@@ -15,7 +15,6 @@
|
||||
# under the License.
|
||||
|
||||
import networkx as nx
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from taskflow import exceptions as excp
|
||||
@@ -123,8 +122,8 @@ class TreeTest(test.TestCase):
|
||||
class StopWatchTest(test.TestCase):
|
||||
def setUp(self):
|
||||
super(StopWatchTest, self).setUp()
|
||||
timeutils.set_time_override()
|
||||
self.addCleanup(timeutils.clear_time_override)
|
||||
tt.StopWatch.set_now_override(now=0)
|
||||
self.addCleanup(tt.StopWatch.clear_overrides)
|
||||
|
||||
def test_no_states(self):
|
||||
watch = tt.StopWatch()
|
||||
@@ -137,23 +136,23 @@ class StopWatchTest(test.TestCase):
|
||||
def test_backwards(self):
|
||||
watch = tt.StopWatch(0.1)
|
||||
watch.start()
|
||||
timeutils.advance_time_seconds(0.5)
|
||||
tt.StopWatch.advance_time_seconds(0.5)
|
||||
self.assertTrue(watch.expired())
|
||||
|
||||
timeutils.advance_time_seconds(-1.0)
|
||||
tt.StopWatch.advance_time_seconds(-1.0)
|
||||
self.assertFalse(watch.expired())
|
||||
self.assertEqual(0.0, watch.elapsed())
|
||||
|
||||
def test_expiry(self):
|
||||
watch = tt.StopWatch(0.1)
|
||||
watch.start()
|
||||
timeutils.advance_time_seconds(0.2)
|
||||
tt.StopWatch.advance_time_seconds(0.2)
|
||||
self.assertTrue(watch.expired())
|
||||
|
||||
def test_not_expired(self):
|
||||
watch = tt.StopWatch(0.1)
|
||||
watch.start()
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
tt.StopWatch.advance_time_seconds(0.05)
|
||||
self.assertFalse(watch.expired())
|
||||
|
||||
def test_no_expiry(self):
|
||||
@@ -163,7 +162,7 @@ class StopWatchTest(test.TestCase):
|
||||
def test_elapsed(self):
|
||||
watch = tt.StopWatch()
|
||||
watch.start()
|
||||
timeutils.advance_time_seconds(0.2)
|
||||
tt.StopWatch.advance_time_seconds(0.2)
|
||||
# NOTE(harlowja): Allow for a slight variation by using 0.19.
|
||||
self.assertGreaterEqual(0.19, watch.elapsed())
|
||||
|
||||
@@ -180,17 +179,17 @@ class StopWatchTest(test.TestCase):
|
||||
def test_pause_resume(self):
|
||||
watch = tt.StopWatch()
|
||||
watch.start()
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
tt.StopWatch.advance_time_seconds(0.05)
|
||||
watch.stop()
|
||||
elapsed = watch.elapsed()
|
||||
self.assertAlmostEqual(elapsed, watch.elapsed())
|
||||
watch.resume()
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
tt.StopWatch.advance_time_seconds(0.05)
|
||||
self.assertNotEqual(elapsed, watch.elapsed())
|
||||
|
||||
def test_context_manager(self):
|
||||
with tt.StopWatch() as watch:
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
tt.StopWatch.advance_time_seconds(0.05)
|
||||
self.assertGreater(0.01, watch.elapsed())
|
||||
|
||||
def test_splits(self):
|
||||
@@ -203,7 +202,7 @@ class StopWatchTest(test.TestCase):
|
||||
self.assertEqual(watch.splits[0].elapsed,
|
||||
watch.splits[0].length)
|
||||
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
tt.StopWatch.advance_time_seconds(0.05)
|
||||
watch.split()
|
||||
splits = watch.splits
|
||||
self.assertEqual(2, len(splits))
|
||||
@@ -221,16 +220,16 @@ class StopWatchTest(test.TestCase):
|
||||
watch = tt.StopWatch()
|
||||
watch.start()
|
||||
|
||||
timeutils.advance_time_seconds(1)
|
||||
tt.StopWatch.advance_time_seconds(1)
|
||||
self.assertEqual(1, watch.elapsed())
|
||||
|
||||
timeutils.advance_time_seconds(10)
|
||||
tt.StopWatch.advance_time_seconds(10)
|
||||
self.assertEqual(11, watch.elapsed())
|
||||
self.assertEqual(1, watch.elapsed(maximum=1))
|
||||
|
||||
watch.stop()
|
||||
self.assertEqual(11, watch.elapsed())
|
||||
timeutils.advance_time_seconds(10)
|
||||
tt.StopWatch.advance_time_seconds(10)
|
||||
self.assertEqual(11, watch.elapsed())
|
||||
self.assertEqual(0, watch.elapsed(maximum=-1))
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
# under the License.
|
||||
|
||||
from concurrent import futures
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from taskflow.engines.action_engine import executor
|
||||
@@ -24,6 +23,7 @@ from taskflow import exceptions as excp
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import timing
|
||||
|
||||
|
||||
class TestProtocolValidation(test.TestCase):
|
||||
@@ -94,8 +94,8 @@ class TestProtocol(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestProtocol, self).setUp()
|
||||
timeutils.set_time_override()
|
||||
self.addCleanup(timeutils.clear_time_override)
|
||||
timing.StopWatch.set_now_override()
|
||||
self.addCleanup(timing.StopWatch.clear_overrides)
|
||||
self.task = utils.DummyTask()
|
||||
self.task_uuid = 'task-uuid'
|
||||
self.task_action = 'execute'
|
||||
@@ -166,19 +166,19 @@ class TestProtocol(test.TestCase):
|
||||
|
||||
def test_pending_not_expired(self):
|
||||
req = self.request()
|
||||
timeutils.advance_time_seconds(self.timeout - 1)
|
||||
timing.StopWatch.set_offset_override(self.timeout - 1)
|
||||
self.assertFalse(req.expired)
|
||||
|
||||
def test_pending_expired(self):
|
||||
req = self.request()
|
||||
timeutils.advance_time_seconds(self.timeout + 1)
|
||||
timing.StopWatch.set_offset_override(self.timeout + 1)
|
||||
self.assertTrue(req.expired)
|
||||
|
||||
def test_running_not_expired(self):
|
||||
request = self.request()
|
||||
request.transition(pr.PENDING)
|
||||
request.transition(pr.RUNNING)
|
||||
timeutils.advance_time_seconds(self.timeout + 1)
|
||||
timing.StopWatch.set_offset_override(self.timeout + 1)
|
||||
self.assertFalse(request.expired)
|
||||
|
||||
def test_set_result(self):
|
||||
|
||||
@@ -14,12 +14,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo.utils import reflection
|
||||
from oslo.utils import timeutils
|
||||
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow.engines.worker_based import types as worker_types
|
||||
@@ -33,6 +31,7 @@ class TestWorkerTypes(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestWorkerTypes, self).setUp()
|
||||
self.addCleanup(timing.StopWatch.clear_overrides)
|
||||
self.task = utils.DummyTask()
|
||||
self.task_uuid = 'task-uuid'
|
||||
self.task_action = 'execute'
|
||||
@@ -52,15 +51,12 @@ class TestWorkerTypes(test.TestCase):
|
||||
def test_requests_cache_expiry(self):
|
||||
# Mock out the calls the underlying objects will soon use to return
|
||||
# times that we can control more easily...
|
||||
now = timeutils.utcnow()
|
||||
overrides = [
|
||||
now,
|
||||
now,
|
||||
now + datetime.timedelta(seconds=1),
|
||||
now + datetime.timedelta(seconds=self.timeout + 1),
|
||||
0,
|
||||
1,
|
||||
self.timeout + 1,
|
||||
]
|
||||
timeutils.set_time_override(overrides)
|
||||
self.addCleanup(timeutils.clear_time_override)
|
||||
timing.StopWatch.set_now_override(overrides)
|
||||
|
||||
cache = worker_types.RequestsCache()
|
||||
cache[self.task_uuid] = self.request()
|
||||
|
||||
@@ -15,10 +15,14 @@
|
||||
# under the License.
|
||||
|
||||
from oslo_utils import reflection
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
# Find a monotonic providing time (or fallback to using time.time()
|
||||
# which isn't *always* accurate but will suffice).
|
||||
_now = misc.find_monotonic(allow_time_time=True)
|
||||
|
||||
|
||||
class Timeout(object):
|
||||
"""An object which represents a timeout.
|
||||
@@ -86,6 +90,12 @@ class StopWatch(object):
|
||||
_STARTED = 'STARTED'
|
||||
_STOPPED = 'STOPPED'
|
||||
|
||||
"""
|
||||
Class variables that should only be used for testing purposes only...
|
||||
"""
|
||||
_now_offset = None
|
||||
_now_override = None
|
||||
|
||||
def __init__(self, duration=None):
|
||||
if duration is not None:
|
||||
if duration < 0:
|
||||
@@ -105,7 +115,7 @@ class StopWatch(object):
|
||||
"""
|
||||
if self._state == self._STARTED:
|
||||
return self
|
||||
self._started_at = timeutils.utcnow()
|
||||
self._started_at = self._now()
|
||||
self._stopped_at = None
|
||||
self._state = self._STARTED
|
||||
self._splits = []
|
||||
@@ -121,7 +131,7 @@ class StopWatch(object):
|
||||
if self._state == self._STARTED:
|
||||
elapsed = self.elapsed()
|
||||
if self._splits:
|
||||
length = max(0.0, elapsed - self._splits[-1].elapsed)
|
||||
length = self._delta_seconds(self._splits[-1].elapsed, elapsed)
|
||||
else:
|
||||
length = elapsed
|
||||
self._splits.append(Split(elapsed, length))
|
||||
@@ -137,17 +147,88 @@ class StopWatch(object):
|
||||
self.start()
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def clear_overrides(cls):
|
||||
"""Clears all overrides/offsets.
|
||||
|
||||
**Only to be used for testing (affects all watch instances).**
|
||||
"""
|
||||
cls._now_override = None
|
||||
cls._now_offset = None
|
||||
|
||||
@classmethod
|
||||
def set_offset_override(cls, offset):
|
||||
"""Sets a offset that is applied to each time fetch.
|
||||
|
||||
**Only to be used for testing (affects all watch instances).**
|
||||
"""
|
||||
cls._now_offset = offset
|
||||
|
||||
@classmethod
|
||||
def advance_time_seconds(cls, offset):
|
||||
"""Advances/sets a offset that is applied to each time fetch.
|
||||
|
||||
NOTE(harlowja): if a previous offset exists (not ``None``) then this
|
||||
offset will be added onto the existing one (if you want to reset
|
||||
the offset completely use the :meth:`.set_offset_override`
|
||||
method instead).
|
||||
|
||||
**Only to be used for testing (affects all watch instances).**
|
||||
"""
|
||||
if cls._now_offset is None:
|
||||
cls.set_offset_override(offset)
|
||||
else:
|
||||
cls.set_offset_override(cls._now_offset + offset)
|
||||
|
||||
@classmethod
|
||||
def set_now_override(cls, now=None):
|
||||
"""Sets time override to use (if none, then current time is fetched).
|
||||
|
||||
NOTE(harlowja): if a list/tuple is provided then the first element of
|
||||
the list will be used (and removed) each time a time fetch occurs (once
|
||||
it becomes empty the override/s will no longer be applied). If a
|
||||
numeric value is provided then it will be used (and never removed
|
||||
until the override(s) are cleared via the :meth:`.clear_overrides`
|
||||
method).
|
||||
|
||||
**Only to be used for testing (affects all watch instances).**
|
||||
"""
|
||||
if isinstance(now, (list, tuple)):
|
||||
cls._now_override = list(now)
|
||||
else:
|
||||
if now is None:
|
||||
now = _now()
|
||||
cls._now_override = now
|
||||
|
||||
@staticmethod
|
||||
def _delta_seconds(earlier, later):
|
||||
return max(0.0, later - earlier)
|
||||
|
||||
@classmethod
|
||||
def _now(cls):
|
||||
if cls._now_override is not None:
|
||||
if isinstance(cls._now_override, list):
|
||||
try:
|
||||
now = cls._now_override.pop(0)
|
||||
except IndexError:
|
||||
now = _now()
|
||||
else:
|
||||
now = cls._now_override
|
||||
else:
|
||||
now = _now()
|
||||
if cls._now_offset is not None:
|
||||
now = now + cls._now_offset
|
||||
return now
|
||||
|
||||
def elapsed(self, maximum=None):
|
||||
"""Returns how many seconds have elapsed."""
|
||||
if self._state not in (self._STOPPED, self._STARTED):
|
||||
raise RuntimeError("Can not get the elapsed time of a stopwatch"
|
||||
" if it has not been started/stopped")
|
||||
if self._state == self._STOPPED:
|
||||
elapsed = max(0.0, float(timeutils.delta_seconds(
|
||||
self._started_at, self._stopped_at)))
|
||||
elapsed = self._delta_seconds(self._started_at, self._stopped_at)
|
||||
else:
|
||||
elapsed = max(0.0, float(timeutils.delta_seconds(
|
||||
self._started_at, timeutils.utcnow())))
|
||||
elapsed = self._delta_seconds(self._started_at, self._now())
|
||||
if maximum is not None and elapsed > maximum:
|
||||
elapsed = max(0.0, maximum)
|
||||
return elapsed
|
||||
@@ -201,6 +282,6 @@ class StopWatch(object):
|
||||
if self._state != self._STARTED:
|
||||
raise RuntimeError("Can not stop a stopwatch that has not been"
|
||||
" started")
|
||||
self._stopped_at = timeutils.utcnow()
|
||||
self._stopped_at = self._now()
|
||||
self._state = self._STOPPED
|
||||
return self
|
||||
|
||||
@@ -23,6 +23,7 @@ import os
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
@@ -44,6 +45,34 @@ NUMERIC_TYPES = six.integer_types + (float,)
|
||||
# see RFC 3986 section 3.1
|
||||
_SCHEME_REGEX = re.compile(r"^([A-Za-z][A-Za-z0-9+.-]*):")
|
||||
|
||||
_MONOTONIC_LOCATIONS = tuple([
|
||||
# The built-in/expected location in python3.3+
|
||||
'time.monotonic',
|
||||
# NOTE(harlowja): Try to use the pypi module that provides this
|
||||
# functionality for older versions of python less than 3.3 so that
|
||||
# they to can benefit from better timing...
|
||||
#
|
||||
# See: http://pypi.python.org/pypi/monotonic
|
||||
'monotonic.monotonic',
|
||||
])
|
||||
|
||||
|
||||
def find_monotonic(allow_time_time=False):
|
||||
"""Tries to find a monotonic time providing function (and returns it)."""
|
||||
for import_str in _MONOTONIC_LOCATIONS:
|
||||
mod_str, _sep, attr_str = import_str.rpartition('.')
|
||||
mod = importutils.try_import(mod_str)
|
||||
if mod is None:
|
||||
continue
|
||||
func = getattr(mod, attr_str, None)
|
||||
if func is not None:
|
||||
return func
|
||||
# Finally give up and use time.time (which isn't monotonic)...
|
||||
if allow_time_time:
|
||||
return time.time
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def merge_uri(uri, conf):
|
||||
"""Merges a parsed uri into the given configuration dictionary.
|
||||
|
||||
Reference in New Issue
Block a user