From 187e02d33e90fa0eeb702cbe5b4eb8e7436ff874 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 27 Oct 2015 19:07:10 -0700 Subject: [PATCH] Move 'convert_to_timeout' to timing type as a helper function Also adds some basic tests to this newly exposed helper function so it continues to operate as expected and better docstrings on the timeout types methods. Change-Id: I9fa4c7d313084800d49cfc77f6ca93afcff1169d --- taskflow/conductors/backends/impl_executor.py | 13 +------ taskflow/tests/unit/test_types.py | 36 ++++++++++++++++++ taskflow/types/timing.py | 38 ++++++++++++++++--- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py index d61f3e0d..908e4b68 100644 --- a/taskflow/conductors/backends/impl_executor.py +++ b/taskflow/conductors/backends/impl_executor.py @@ -37,17 +37,6 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -def _convert_to_timeout(value=None, default_value=None, event_factory=None): - if value is None: - value = default_value - if isinstance(value, (int, float) + six.string_types): - return tt.Timeout(float(value), event_factory=event_factory) - elif isinstance(value, tt.Timeout): - return value - else: - raise ValueError("Invalid timeout literal '%s'" % (value)) - - @six.add_metaclass(abc.ABCMeta) class ExecutorConductor(base.Conductor): """Dispatches jobs from blocking :py:meth:`.run` method to some executor. @@ -129,7 +118,7 @@ class ExecutorConductor(base.Conductor): super(ExecutorConductor, self).__init__( name, jobboard, persistence=persistence, engine=engine, engine_options=engine_options) - self._wait_timeout = _convert_to_timeout( + self._wait_timeout = tt.convert_to_timeout( value=wait_timeout, default_value=self.WAIT_TIMEOUT, event_factory=self._event_factory) self._dead = self._event_factory() diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index 92a7973d..b6767193 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -20,9 +20,45 @@ from six.moves import cPickle as pickle from taskflow import test from taskflow.types import graph from taskflow.types import sets +from taskflow.types import timing from taskflow.types import tree +class TimingTest(test.TestCase): + def test_convert_fail(self): + for baddie in ["abc123", "-1", "", object()]: + self.assertRaises(ValueError, + timing.convert_to_timeout, baddie) + + def test_convert_noop(self): + t = timing.convert_to_timeout(1.0) + t2 = timing.convert_to_timeout(t) + self.assertEqual(t, t2) + + def test_interrupt(self): + t = timing.convert_to_timeout(1.0) + self.assertFalse(t.is_stopped()) + t.interrupt() + self.assertTrue(t.is_stopped()) + + def test_reset(self): + t = timing.convert_to_timeout(1.0) + t.interrupt() + self.assertTrue(t.is_stopped()) + t.reset() + self.assertFalse(t.is_stopped()) + + def test_values(self): + for v, e_v in [("1.0", 1.0), (1, 1.0), + ("2.0", 2.0)]: + t = timing.convert_to_timeout(v) + self.assertEqual(e_v, t.value) + + def test_fail(self): + self.assertRaises(ValueError, + timing.Timeout, -1) + + class GraphTest(test.TestCase): def test_no_successors_no_predecessors(self): g = graph.DiGraph() diff --git a/taskflow/types/timing.py b/taskflow/types/timing.py index 99aeac56..be379bd9 100644 --- a/taskflow/types/timing.py +++ b/taskflow/types/timing.py @@ -18,7 +18,7 @@ import threading from debtcollector import moves from oslo_utils import timeutils - +import six # TODO(harlowja): Keep alias class... around until 2.0 is released. StopWatch = moves.moved_class(timeutils.StopWatch, 'StopWatch', __name__, @@ -31,20 +31,46 @@ class Timeout(object): This object has the ability to be interrupted before the actual timeout is reached. """ - def __init__(self, timeout, event_factory=threading.Event): - if timeout < 0: - raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) - self._timeout = timeout + def __init__(self, value, event_factory=threading.Event): + if value < 0: + raise ValueError("Timeout value must be greater or" + " equal to zero and not '%s'" % (value)) + self._value = value self._event = event_factory() + @property + def value(self): + """Immutable value of the internally used timeout.""" + return self._value + def interrupt(self): + """Forcefully set the timeout (releases any waiters).""" self._event.set() def is_stopped(self): + """Returns if the timeout has been interrupted.""" return self._event.is_set() def wait(self): - self._event.wait(self._timeout) + """Block current thread (up to timeout) and wait until interrupted.""" + self._event.wait(self._value) def reset(self): + """Reset so that interruption (and waiting) can happen again.""" self._event.clear() + + +def convert_to_timeout(value=None, default_value=None, + event_factory=threading.Event): + """Converts a given value to a timeout instance (and returns it). + + Does nothing if the value provided is already a timeout instance. + """ + if value is None: + value = default_value + if isinstance(value, (int, float) + six.string_types): + return Timeout(float(value), event_factory=event_factory) + elif isinstance(value, Timeout): + return value + else: + raise ValueError("Invalid timeout literal '%s'" % (value))