diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index bfd4d3d4..d13d666c 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -20,8 +20,8 @@ import six from taskflow.conductors import base from taskflow import exceptions as excp from taskflow.listeners import logging as logging_listener +from taskflow.types import time as tt from taskflow.utils import lock_utils -from taskflow.utils import misc LOG = logging.getLogger(__name__) WAIT_TIMEOUT = 0.5 @@ -58,8 +58,8 @@ class SingleThreadedConductor(base.Conductor): if wait_timeout is None: wait_timeout = WAIT_TIMEOUT if isinstance(wait_timeout, (int, float) + six.string_types): - self._wait_timeout = misc.Timeout(float(wait_timeout)) - elif isinstance(wait_timeout, misc.Timeout): + self._wait_timeout = tt.Timeout(float(wait_timeout)) + elif isinstance(wait_timeout, tt.Timeout): self._wait_timeout = wait_timeout else: raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 35febd40..afea043f 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -23,6 +23,7 @@ from taskflow.engines.worker_based import cache from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow import exceptions as exc +from taskflow.types import time as tt from taskflow.utils import async_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -77,7 +78,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): self._proxy = proxy.Proxy(uuid, exchange, self._on_message, self._on_wait, **kwargs) self._proxy_thread = None - self._periodic = PeriodicWorker(misc.Timeout(pr.NOTIFY_PERIOD), + self._periodic = PeriodicWorker(tt.Timeout(pr.NOTIFY_PERIOD), [self._notify_topics]) self._periodic_thread = None diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 40a227a8..d8cab533 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -21,6 +21,7 @@ import six from concurrent import futures from taskflow.engines.action_engine import executor +from taskflow.types import time from taskflow.utils import misc from taskflow.utils import reflection @@ -103,7 +104,7 @@ class Request(Message): self._arguments = arguments self._progress_callback = progress_callback self._kwargs = kwargs - self._watch = misc.StopWatch(duration=timeout).start() + self._watch = time.StopWatch(duration=timeout).start() self._state = WAITING self.result = futures.Future() diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index fd73a097..be305a12 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -33,6 +33,7 @@ from taskflow.openstack.common import excutils from taskflow.openstack.common import jsonutils from taskflow.openstack.common import uuidutils from taskflow import states +from taskflow.types import time from taskflow.utils import kazoo_utils from taskflow.utils import lock_utils from taskflow.utils import misc @@ -586,13 +587,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): # Wait until timeout expires (or forever) for jobs to appear. watch = None if timeout is not None: - watch = misc.StopWatch(duration=float(timeout)) - watch.start() + watch = time.StopWatch(duration=float(timeout)).start() self._job_cond.acquire() try: while True: if not self._known_jobs: - if watch and watch.expired(): + if watch is not None and watch.expired(): raise excp.NotFound("Expired waiting for jobs to" " arrive; waited %s seconds" % watch.elapsed()) diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index 15ebe82e..87240a36 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -21,7 +21,7 @@ import logging from taskflow import exceptions as exc from taskflow.listeners import base from taskflow import states -from taskflow.utils import misc +from taskflow.types import time STARTING_STATES = (states.RUNNING, states.REVERTING) FINISHED_STATES = base.FINISH_STATES + (states.REVERTED,) @@ -64,8 +64,7 @@ class TimingListener(base.ListenerBase): if state == states.PENDING: self._timers.pop(task_name, None) elif state in STARTING_STATES: - self._timers[task_name] = misc.StopWatch() - self._timers[task_name].start() + self._timers[task_name] = time.StopWatch().start() elif state in FINISHED_STATES: if task_name in self._timers: self._record_ending(self._timers[task_name], task_name) diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 1d1ea336..2c2dac2d 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -23,6 +23,7 @@ import time from taskflow import states from taskflow import test from taskflow.tests import utils as test_utils +from taskflow.types import time as tt from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -496,30 +497,30 @@ class IsValidAttributeNameTestCase(test.TestCase): class StopWatchUtilsTest(test.TestCase): def test_no_states(self): - watch = misc.StopWatch() + watch = tt.StopWatch() self.assertRaises(RuntimeError, watch.stop) self.assertRaises(RuntimeError, watch.resume) def test_expiry(self): - watch = misc.StopWatch(0.1) + watch = tt.StopWatch(0.1) watch.start() time.sleep(0.2) self.assertTrue(watch.expired()) def test_no_expiry(self): - watch = misc.StopWatch(0.1) + watch = tt.StopWatch(0.1) watch.start() self.assertFalse(watch.expired()) def test_elapsed(self): - watch = misc.StopWatch() + watch = tt.StopWatch() watch.start() time.sleep(0.2) # NOTE(harlowja): Allow for a slight variation by using 0.19. self.assertGreaterEqual(0.19, watch.elapsed()) def test_pause_resume(self): - watch = misc.StopWatch() + watch = tt.StopWatch() watch.start() time.sleep(0.05) watch.stop() @@ -530,7 +531,7 @@ class StopWatchUtilsTest(test.TestCase): self.assertNotEqual(elapsed, watch.elapsed()) def test_context_manager(self): - with misc.StopWatch() as watch: + with tt.StopWatch() as watch: time.sleep(0.05) self.assertGreater(0.01, watch.elapsed()) diff --git a/taskflow/types/time.py b/taskflow/types/time.py new file mode 100644 index 00000000..cd822ae7 --- /dev/null +++ b/taskflow/types/time.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading + +from taskflow.utils import misc + + +class Timeout(object): + """An object which represents a timeout. + + This object has the ability to be interrupted before the actual timeout + is reached. + """ + def __init__(self, timeout): + if timeout < 0: + raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) + self._timeout = timeout + self._event = threading.Event() + + def interrupt(self): + self._event.set() + + def is_stopped(self): + return self._event.is_set() + + def wait(self): + self._event.wait(self._timeout) + + def reset(self): + self._event.clear() + + +class StopWatch(object): + """A simple timer/stopwatch helper class. + + Inspired by: apache-commons-lang java stopwatch. + + Not thread-safe. + """ + _STARTED = 'STARTED' + _STOPPED = 'STOPPED' + + def __init__(self, duration=None): + self._duration = duration + self._started_at = None + self._stopped_at = None + self._state = None + + def start(self): + if self._state == self._STARTED: + return self + self._started_at = misc.wallclock() + self._stopped_at = None + self._state = self._STARTED + return self + + def elapsed(self): + if self._state == self._STOPPED: + return float(self._stopped_at - self._started_at) + elif self._state == self._STARTED: + return float(misc.wallclock() - self._started_at) + else: + raise RuntimeError("Can not get the elapsed time of an invalid" + " stopwatch") + + def __enter__(self): + self.start() + return self + + def __exit__(self, type, value, traceback): + try: + self.stop() + except RuntimeError: + pass + # NOTE(harlowja): don't silence the exception. + return False + + def leftover(self): + if self._duration is None: + raise RuntimeError("Can not get the leftover time of a watch that" + " has no duration") + if self._state != self._STARTED: + raise RuntimeError("Can not get the leftover time of a stopwatch" + " that has not been started") + end_time = self._started_at + self._duration + return max(0.0, end_time - misc.wallclock()) + + def expired(self): + if self._duration is None: + return False + if self.elapsed() > self._duration: + return True + return False + + def resume(self): + if self._state == self._STOPPED: + self._state = self._STARTED + return self + else: + raise RuntimeError("Can not resume a stopwatch that has not been" + " stopped") + + def stop(self): + if self._state == self._STOPPED: + return self + if self._state != self._STARTED: + raise RuntimeError("Can not stop a stopwatch that has not been" + " started") + self._stopped_at = misc.wallclock() + self._state = self._STOPPED + return self diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index eea56d6d..65f21e24 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -27,7 +27,6 @@ import os import re import string import sys -import threading import time import traceback @@ -360,31 +359,6 @@ class AttrDict(dict): self[name] = value -class Timeout(object): - """An object which represents a timeout. - - This object has the ability to be interrupted before the actual timeout - is reached. - """ - def __init__(self, timeout): - if timeout < 0: - raise ValueError("Timeout must be >= 0 and not %s" % (timeout)) - self._timeout = timeout - self._event = threading.Event() - - def interrupt(self): - self._event.set() - - def is_stopped(self): - return self._event.is_set() - - def wait(self): - self._event.wait(self._timeout) - - def reset(self): - self._event.clear() - - class ExponentialBackoff(object): """An iterable object that will yield back an exponential delay sequence. @@ -444,87 +418,6 @@ def ensure_tree(path): raise -class StopWatch(object): - """A simple timer/stopwatch helper class. - - Inspired by: apache-commons-lang java stopwatch. - - Not thread-safe. - """ - _STARTED = 'STARTED' - _STOPPED = 'STOPPED' - - def __init__(self, duration=None): - self._duration = duration - self._started_at = None - self._stopped_at = None - self._state = None - - def start(self): - if self._state == self._STARTED: - return self - self._started_at = wallclock() - self._stopped_at = None - self._state = self._STARTED - return self - - def elapsed(self): - if self._state == self._STOPPED: - return float(self._stopped_at - self._started_at) - elif self._state == self._STARTED: - return float(wallclock() - self._started_at) - else: - raise RuntimeError("Can not get the elapsed time of an invalid" - " stopwatch") - - def __enter__(self): - self.start() - return self - - def __exit__(self, type, value, traceback): - try: - self.stop() - except RuntimeError: - pass - # NOTE(harlowja): don't silence the exception. - return False - - def leftover(self): - if self._duration is None: - raise RuntimeError("Can not get the leftover time of a watch that" - " has no duration") - if self._state != self._STARTED: - raise RuntimeError("Can not get the leftover time of a stopwatch" - " that has not been started") - end_time = self._started_at + self._duration - return max(0.0, end_time - wallclock()) - - def expired(self): - if self._duration is None: - return False - if self.elapsed() > self._duration: - return True - return False - - def resume(self): - if self._state == self._STOPPED: - self._state = self._STARTED - return self - else: - raise RuntimeError("Can not resume a stopwatch that has not been" - " stopped") - - def stop(self): - if self._state == self._STOPPED: - return self - if self._state != self._STARTED: - raise RuntimeError("Can not stop a stopwatch that has not been" - " started") - self._stopped_at = wallclock() - self._state = self._STOPPED - return self - - class Notifier(object): """A notification helper class.