Merge "Create a top level time type"

This commit is contained in:
Jenkins
2014-07-03 03:41:36 +00:00
committed by Gerrit Code Review
8 changed files with 144 additions and 124 deletions

View File

@@ -20,8 +20,8 @@ import six
from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow.types import time as tt
from taskflow.utils import lock_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
WAIT_TIMEOUT = 0.5
@@ -58,8 +58,8 @@ class SingleThreadedConductor(base.Conductor):
if wait_timeout is None:
wait_timeout = WAIT_TIMEOUT
if isinstance(wait_timeout, (int, float) + six.string_types):
self._wait_timeout = misc.Timeout(float(wait_timeout))
elif isinstance(wait_timeout, misc.Timeout):
self._wait_timeout = tt.Timeout(float(wait_timeout))
elif isinstance(wait_timeout, tt.Timeout):
self._wait_timeout = wait_timeout
else:
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))

View File

@@ -23,6 +23,7 @@ from taskflow.engines.worker_based import cache
from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy
from taskflow import exceptions as exc
from taskflow.types import time as tt
from taskflow.utils import async_utils
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -77,7 +78,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
self._proxy = proxy.Proxy(uuid, exchange, self._on_message,
self._on_wait, **kwargs)
self._proxy_thread = None
self._periodic = PeriodicWorker(misc.Timeout(pr.NOTIFY_PERIOD),
self._periodic = PeriodicWorker(tt.Timeout(pr.NOTIFY_PERIOD),
[self._notify_topics])
self._periodic_thread = None

View File

@@ -21,6 +21,7 @@ import six
from concurrent import futures
from taskflow.engines.action_engine import executor
from taskflow.types import time
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -103,7 +104,7 @@ class Request(Message):
self._arguments = arguments
self._progress_callback = progress_callback
self._kwargs = kwargs
self._watch = misc.StopWatch(duration=timeout).start()
self._watch = time.StopWatch(duration=timeout).start()
self._state = WAITING
self.result = futures.Future()

View File

@@ -33,6 +33,7 @@ from taskflow.openstack.common import excutils
from taskflow.openstack.common import jsonutils
from taskflow.openstack.common import uuidutils
from taskflow import states
from taskflow.types import time
from taskflow.utils import kazoo_utils
from taskflow.utils import lock_utils
from taskflow.utils import misc
@@ -586,13 +587,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
# Wait until timeout expires (or forever) for jobs to appear.
watch = None
if timeout is not None:
watch = misc.StopWatch(duration=float(timeout))
watch.start()
watch = time.StopWatch(duration=float(timeout)).start()
self._job_cond.acquire()
try:
while True:
if not self._known_jobs:
if watch and watch.expired():
if watch is not None and watch.expired():
raise excp.NotFound("Expired waiting for jobs to"
" arrive; waited %s seconds"
% watch.elapsed())

View File

@@ -21,7 +21,7 @@ import logging
from taskflow import exceptions as exc
from taskflow.listeners import base
from taskflow import states
from taskflow.utils import misc
from taskflow.types import time
STARTING_STATES = (states.RUNNING, states.REVERTING)
FINISHED_STATES = base.FINISH_STATES + (states.REVERTED,)
@@ -64,8 +64,7 @@ class TimingListener(base.ListenerBase):
if state == states.PENDING:
self._timers.pop(task_name, None)
elif state in STARTING_STATES:
self._timers[task_name] = misc.StopWatch()
self._timers[task_name].start()
self._timers[task_name] = time.StopWatch().start()
elif state in FINISHED_STATES:
if task_name in self._timers:
self._record_ending(self._timers[task_name], task_name)

View File

@@ -23,6 +23,7 @@ import time
from taskflow import states
from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.types import time as tt
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -496,30 +497,30 @@ class IsValidAttributeNameTestCase(test.TestCase):
class StopWatchUtilsTest(test.TestCase):
def test_no_states(self):
watch = misc.StopWatch()
watch = tt.StopWatch()
self.assertRaises(RuntimeError, watch.stop)
self.assertRaises(RuntimeError, watch.resume)
def test_expiry(self):
watch = misc.StopWatch(0.1)
watch = tt.StopWatch(0.1)
watch.start()
time.sleep(0.2)
self.assertTrue(watch.expired())
def test_no_expiry(self):
watch = misc.StopWatch(0.1)
watch = tt.StopWatch(0.1)
watch.start()
self.assertFalse(watch.expired())
def test_elapsed(self):
watch = misc.StopWatch()
watch = tt.StopWatch()
watch.start()
time.sleep(0.2)
# NOTE(harlowja): Allow for a slight variation by using 0.19.
self.assertGreaterEqual(0.19, watch.elapsed())
def test_pause_resume(self):
watch = misc.StopWatch()
watch = tt.StopWatch()
watch.start()
time.sleep(0.05)
watch.stop()
@@ -530,7 +531,7 @@ class StopWatchUtilsTest(test.TestCase):
self.assertNotEqual(elapsed, watch.elapsed())
def test_context_manager(self):
with misc.StopWatch() as watch:
with tt.StopWatch() as watch:
time.sleep(0.05)
self.assertGreater(0.01, watch.elapsed())

125
taskflow/types/time.py Normal file
View File

@@ -0,0 +1,125 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from taskflow.utils import misc
class Timeout(object):
"""An object which represents a timeout.
This object has the ability to be interrupted before the actual timeout
is reached.
"""
def __init__(self, timeout):
if timeout < 0:
raise ValueError("Timeout must be >= 0 and not %s" % (timeout))
self._timeout = timeout
self._event = threading.Event()
def interrupt(self):
self._event.set()
def is_stopped(self):
return self._event.is_set()
def wait(self):
self._event.wait(self._timeout)
def reset(self):
self._event.clear()
class StopWatch(object):
"""A simple timer/stopwatch helper class.
Inspired by: apache-commons-lang java stopwatch.
Not thread-safe.
"""
_STARTED = 'STARTED'
_STOPPED = 'STOPPED'
def __init__(self, duration=None):
self._duration = duration
self._started_at = None
self._stopped_at = None
self._state = None
def start(self):
if self._state == self._STARTED:
return self
self._started_at = misc.wallclock()
self._stopped_at = None
self._state = self._STARTED
return self
def elapsed(self):
if self._state == self._STOPPED:
return float(self._stopped_at - self._started_at)
elif self._state == self._STARTED:
return float(misc.wallclock() - self._started_at)
else:
raise RuntimeError("Can not get the elapsed time of an invalid"
" stopwatch")
def __enter__(self):
self.start()
return self
def __exit__(self, type, value, traceback):
try:
self.stop()
except RuntimeError:
pass
# NOTE(harlowja): don't silence the exception.
return False
def leftover(self):
if self._duration is None:
raise RuntimeError("Can not get the leftover time of a watch that"
" has no duration")
if self._state != self._STARTED:
raise RuntimeError("Can not get the leftover time of a stopwatch"
" that has not been started")
end_time = self._started_at + self._duration
return max(0.0, end_time - misc.wallclock())
def expired(self):
if self._duration is None:
return False
if self.elapsed() > self._duration:
return True
return False
def resume(self):
if self._state == self._STOPPED:
self._state = self._STARTED
return self
else:
raise RuntimeError("Can not resume a stopwatch that has not been"
" stopped")
def stop(self):
if self._state == self._STOPPED:
return self
if self._state != self._STARTED:
raise RuntimeError("Can not stop a stopwatch that has not been"
" started")
self._stopped_at = misc.wallclock()
self._state = self._STOPPED
return self

View File

@@ -27,7 +27,6 @@ import os
import re
import string
import sys
import threading
import time
import traceback
@@ -360,31 +359,6 @@ class AttrDict(dict):
self[name] = value
class Timeout(object):
"""An object which represents a timeout.
This object has the ability to be interrupted before the actual timeout
is reached.
"""
def __init__(self, timeout):
if timeout < 0:
raise ValueError("Timeout must be >= 0 and not %s" % (timeout))
self._timeout = timeout
self._event = threading.Event()
def interrupt(self):
self._event.set()
def is_stopped(self):
return self._event.is_set()
def wait(self):
self._event.wait(self._timeout)
def reset(self):
self._event.clear()
class ExponentialBackoff(object):
"""An iterable object that will yield back an exponential delay sequence.
@@ -444,87 +418,6 @@ def ensure_tree(path):
raise
class StopWatch(object):
"""A simple timer/stopwatch helper class.
Inspired by: apache-commons-lang java stopwatch.
Not thread-safe.
"""
_STARTED = 'STARTED'
_STOPPED = 'STOPPED'
def __init__(self, duration=None):
self._duration = duration
self._started_at = None
self._stopped_at = None
self._state = None
def start(self):
if self._state == self._STARTED:
return self
self._started_at = wallclock()
self._stopped_at = None
self._state = self._STARTED
return self
def elapsed(self):
if self._state == self._STOPPED:
return float(self._stopped_at - self._started_at)
elif self._state == self._STARTED:
return float(wallclock() - self._started_at)
else:
raise RuntimeError("Can not get the elapsed time of an invalid"
" stopwatch")
def __enter__(self):
self.start()
return self
def __exit__(self, type, value, traceback):
try:
self.stop()
except RuntimeError:
pass
# NOTE(harlowja): don't silence the exception.
return False
def leftover(self):
if self._duration is None:
raise RuntimeError("Can not get the leftover time of a watch that"
" has no duration")
if self._state != self._STARTED:
raise RuntimeError("Can not get the leftover time of a stopwatch"
" that has not been started")
end_time = self._started_at + self._duration
return max(0.0, end_time - wallclock())
def expired(self):
if self._duration is None:
return False
if self.elapsed() > self._duration:
return True
return False
def resume(self):
if self._state == self._STOPPED:
self._state = self._STARTED
return self
else:
raise RuntimeError("Can not resume a stopwatch that has not been"
" stopped")
def stop(self):
if self._state == self._STOPPED:
return self
if self._state != self._STARTED:
raise RuntimeError("Can not stop a stopwatch that has not been"
" started")
self._stopped_at = wallclock()
self._state = self._STOPPED
return self
class Notifier(object):
"""A notification helper class.