Use timeutils functions instead of misc.wallclock
The common oslo timeutils functions can perform the same time methods using the better datetime objects than using the raw unix timestamps directly, so in order to reduce a little bit of code just use the functions that module provides instead of our own. Also adds a few more tests that validate the various runtime errors being thrown to ensure they are thrown when expected and handles the case where time goes backwards (say when ntpd updates) in a more reliable manner (by not becoming negative). Change-Id: I6153ff8379833844105545ddb21dede65a7d4d3a
This commit is contained in:
committed by
Joshua Harlow
parent
5780a5d77e
commit
be254eac66
@@ -14,9 +14,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
import networkx as nx
|
||||
from oslo.utils import timeutils
|
||||
import six
|
||||
|
||||
from taskflow import exceptions as excp
|
||||
@@ -122,43 +121,76 @@ class TreeTest(test.TestCase):
|
||||
|
||||
|
||||
class StopWatchTest(test.TestCase):
|
||||
def setUp(self):
|
||||
super(StopWatchTest, self).setUp()
|
||||
timeutils.set_time_override()
|
||||
self.addCleanup(timeutils.clear_time_override)
|
||||
|
||||
def test_no_states(self):
|
||||
watch = tt.StopWatch()
|
||||
self.assertRaises(RuntimeError, watch.stop)
|
||||
self.assertRaises(RuntimeError, watch.resume)
|
||||
|
||||
def test_bad_expiry(self):
|
||||
self.assertRaises(ValueError, tt.StopWatch, -1)
|
||||
|
||||
def test_backwards(self):
|
||||
watch = tt.StopWatch(0.1)
|
||||
watch.start()
|
||||
timeutils.advance_time_seconds(0.5)
|
||||
self.assertTrue(watch.expired())
|
||||
|
||||
timeutils.advance_time_seconds(-1.0)
|
||||
self.assertFalse(watch.expired())
|
||||
self.assertEqual(0.0, watch.elapsed())
|
||||
|
||||
def test_expiry(self):
|
||||
watch = tt.StopWatch(0.1)
|
||||
watch.start()
|
||||
time.sleep(0.2)
|
||||
timeutils.advance_time_seconds(0.2)
|
||||
self.assertTrue(watch.expired())
|
||||
|
||||
def test_not_expired(self):
|
||||
watch = tt.StopWatch(0.1)
|
||||
watch.start()
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
self.assertFalse(watch.expired())
|
||||
|
||||
def test_no_expiry(self):
|
||||
watch = tt.StopWatch(0.1)
|
||||
watch.start()
|
||||
self.assertFalse(watch.expired())
|
||||
self.assertRaises(RuntimeError, watch.expired)
|
||||
|
||||
def test_elapsed(self):
|
||||
watch = tt.StopWatch()
|
||||
watch.start()
|
||||
time.sleep(0.2)
|
||||
timeutils.advance_time_seconds(0.2)
|
||||
# NOTE(harlowja): Allow for a slight variation by using 0.19.
|
||||
self.assertGreaterEqual(0.19, watch.elapsed())
|
||||
|
||||
def test_no_elapsed(self):
|
||||
watch = tt.StopWatch()
|
||||
self.assertRaises(RuntimeError, watch.elapsed)
|
||||
|
||||
def test_no_leftover(self):
|
||||
watch = tt.StopWatch()
|
||||
self.assertRaises(RuntimeError, watch.leftover)
|
||||
watch = tt.StopWatch(1)
|
||||
self.assertRaises(RuntimeError, watch.leftover)
|
||||
|
||||
def test_pause_resume(self):
|
||||
watch = tt.StopWatch()
|
||||
watch.start()
|
||||
time.sleep(0.05)
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
watch.stop()
|
||||
elapsed = watch.elapsed()
|
||||
time.sleep(0.05)
|
||||
self.assertAlmostEqual(elapsed, watch.elapsed())
|
||||
watch.resume()
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
self.assertNotEqual(elapsed, watch.elapsed())
|
||||
|
||||
def test_context_manager(self):
|
||||
with tt.StopWatch() as watch:
|
||||
time.sleep(0.05)
|
||||
timeutils.advance_time_seconds(0.05)
|
||||
self.assertGreater(0.01, watch.elapsed())
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# under the License.
|
||||
|
||||
from concurrent import futures
|
||||
from oslo.utils import timeutils
|
||||
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow import exceptions as excp
|
||||
@@ -90,6 +91,8 @@ class TestProtocol(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestProtocol, self).setUp()
|
||||
timeutils.set_time_override()
|
||||
self.addCleanup(timeutils.clear_time_override)
|
||||
self.task = utils.DummyTask()
|
||||
self.task_uuid = 'task-uuid'
|
||||
self.task_action = 'execute'
|
||||
@@ -157,22 +160,21 @@ class TestProtocol(test.TestCase):
|
||||
failures={self.task.name: failure.to_dict()})
|
||||
self.assertEqual(request.to_dict(), expected)
|
||||
|
||||
@mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock')
|
||||
def test_pending_not_expired(self, mocked_wallclock):
|
||||
mocked_wallclock.side_effect = [0, self.timeout - 1]
|
||||
self.assertFalse(self.request().expired)
|
||||
def test_pending_not_expired(self):
|
||||
req = self.request()
|
||||
timeutils.advance_time_seconds(self.timeout - 1)
|
||||
self.assertFalse(req.expired)
|
||||
|
||||
@mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock')
|
||||
def test_pending_expired(self, mocked_wallclock):
|
||||
mocked_wallclock.side_effect = [0, self.timeout + 2]
|
||||
self.assertTrue(self.request().expired)
|
||||
def test_pending_expired(self):
|
||||
req = self.request()
|
||||
timeutils.advance_time_seconds(self.timeout + 1)
|
||||
self.assertTrue(req.expired)
|
||||
|
||||
@mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock')
|
||||
def test_running_not_expired(self, mocked_wallclock):
|
||||
mocked_wallclock.side_effect = [0, self.timeout + 2]
|
||||
def test_running_not_expired(self):
|
||||
request = self.request()
|
||||
request.transition(pr.PENDING)
|
||||
request.transition(pr.RUNNING)
|
||||
timeutils.advance_time_seconds(self.timeout + 1)
|
||||
self.assertFalse(request.expired)
|
||||
|
||||
def test_set_result(self):
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
import threading
|
||||
|
||||
from taskflow.utils import misc
|
||||
from oslo.utils import timeutils
|
||||
|
||||
|
||||
class Timeout(object):
|
||||
@@ -55,7 +55,12 @@ class StopWatch(object):
|
||||
_STOPPED = 'STOPPED'
|
||||
|
||||
def __init__(self, duration=None):
|
||||
self._duration = duration
|
||||
if duration is not None:
|
||||
if duration < 0:
|
||||
raise ValueError("Duration must be >= 0 and not %s" % duration)
|
||||
self._duration = duration
|
||||
else:
|
||||
self._duration = None
|
||||
self._started_at = None
|
||||
self._stopped_at = None
|
||||
self._state = None
|
||||
@@ -63,19 +68,21 @@ class StopWatch(object):
|
||||
def start(self):
|
||||
if self._state == self._STARTED:
|
||||
return self
|
||||
self._started_at = misc.wallclock()
|
||||
self._started_at = timeutils.utcnow()
|
||||
self._stopped_at = None
|
||||
self._state = self._STARTED
|
||||
return self
|
||||
|
||||
def elapsed(self):
|
||||
if self._state == self._STOPPED:
|
||||
return float(self._stopped_at - self._started_at)
|
||||
return max(0.0, float(timeutils.delta_seconds(self._started_at,
|
||||
self._stopped_at)))
|
||||
elif self._state == self._STARTED:
|
||||
return float(misc.wallclock() - self._started_at)
|
||||
return max(0.0, float(timeutils.delta_seconds(self._started_at,
|
||||
timeutils.utcnow())))
|
||||
else:
|
||||
raise RuntimeError("Can not get the elapsed time of an invalid"
|
||||
" stopwatch")
|
||||
raise RuntimeError("Can not get the elapsed time of a stopwatch"
|
||||
" if it has not been started/stopped")
|
||||
|
||||
def __enter__(self):
|
||||
self.start()
|
||||
@@ -96,12 +103,14 @@ class StopWatch(object):
|
||||
if self._state != self._STARTED:
|
||||
raise RuntimeError("Can not get the leftover time of a stopwatch"
|
||||
" that has not been started")
|
||||
end_time = self._started_at + self._duration
|
||||
return max(0.0, end_time - misc.wallclock())
|
||||
return max(0.0, self._duration - self.elapsed())
|
||||
|
||||
def expired(self):
|
||||
if self._duration is None:
|
||||
return False
|
||||
if self._state is None:
|
||||
raise RuntimeError("Can not check if a stopwatch has expired"
|
||||
" if it has not been started/stopped")
|
||||
if self.elapsed() > self._duration:
|
||||
return True
|
||||
return False
|
||||
@@ -120,6 +129,6 @@ class StopWatch(object):
|
||||
if self._state != self._STARTED:
|
||||
raise RuntimeError("Can not stop a stopwatch that has not been"
|
||||
" started")
|
||||
self._stopped_at = misc.wallclock()
|
||||
self._stopped_at = timeutils.utcnow()
|
||||
self._state = self._STOPPED
|
||||
return self
|
||||
|
||||
@@ -28,7 +28,6 @@ import re
|
||||
import string
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from oslo.serialization import jsonutils
|
||||
@@ -222,13 +221,6 @@ class cachedproperty(object):
|
||||
return value
|
||||
|
||||
|
||||
def wallclock():
|
||||
# NOTE(harlowja): made into a function so that this can be easily mocked
|
||||
# out if we want to alter time related functionality (for testing
|
||||
# purposes).
|
||||
return time.time()
|
||||
|
||||
|
||||
def millis_to_datetime(milliseconds):
|
||||
"""Converts number of milliseconds (from epoch) into a datetime object."""
|
||||
return datetime.datetime.fromtimestamp(float(milliseconds) / 1000)
|
||||
|
||||
Reference in New Issue
Block a user