Files
deb-python-taskflow/taskflow/types/timing.py

207 lines
6.7 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import reflection
from oslo_utils import timeutils
from taskflow.utils import threading_utils
class Timeout(object):
"""An object which represents a timeout.
This object has the ability to be interrupted before the actual timeout
is reached.
"""
def __init__(self, timeout):
if timeout < 0:
raise ValueError("Timeout must be >= 0 and not %s" % (timeout))
self._timeout = timeout
self._event = threading_utils.Event()
def interrupt(self):
self._event.set()
def is_stopped(self):
return self._event.is_set()
def wait(self):
self._event.wait(self._timeout)
def reset(self):
self._event.clear()
class Split(object):
"""A *immutable* stopwatch split.
See: http://en.wikipedia.org/wiki/Stopwatch for what this is/represents.
"""
__slots__ = ['_elapsed', '_length']
def __init__(self, elapsed, length):
self._elapsed = elapsed
self._length = length
@property
def elapsed(self):
"""Duration from stopwatch start."""
return self._elapsed
@property
def length(self):
"""Seconds from last split (or the elapsed time if no prior split)."""
return self._length
def __repr__(self):
r = reflection.get_class_name(self, fully_qualified=False)
r += "(elapsed=%s, length=%s)" % (self._elapsed, self._length)
return r
class StopWatch(object):
"""A simple timer/stopwatch helper class.
Inspired by: apache-commons-lang java stopwatch.
Not thread-safe (when a single watch is mutated by multiple threads at
the same time). Thread-safe when used by a single thread (not shared) or
when operations are performed in a thread-safe manner on these objects by
wrapping those operations with locks.
"""
_STARTED = 'STARTED'
_STOPPED = 'STOPPED'
def __init__(self, duration=None):
if duration is not None:
if duration < 0:
raise ValueError("Duration must be >= 0 and not %s" % duration)
self._duration = duration
else:
self._duration = None
self._started_at = None
self._stopped_at = None
self._state = None
self._splits = []
def start(self):
"""Starts the watch (if not already started).
NOTE(harlowja): resets any splits previously captured (if any).
"""
if self._state == self._STARTED:
return self
self._started_at = timeutils.utcnow()
self._stopped_at = None
self._state = self._STARTED
self._splits = []
return self
@property
def splits(self):
"""Accessor to all/any splits that have been captured."""
return tuple(self._splits)
def split(self):
"""Captures a split/elapsed since start time (and doesn't stop)."""
if self._state == self._STARTED:
elapsed = self.elapsed()
if self._splits:
length = max(0.0, elapsed - self._splits[-1].elapsed)
else:
length = elapsed
self._splits.append(Split(elapsed, length))
return self._splits[-1]
else:
raise RuntimeError("Can not create a split time of a stopwatch"
" if it has not been started")
def restart(self):
"""Restarts the watch from a started/stopped state."""
if self._state == self._STARTED:
self.stop()
self.start()
return self
def elapsed(self, maximum=None):
"""Returns how many seconds have elapsed."""
if self._state not in (self._STOPPED, self._STARTED):
raise RuntimeError("Can not get the elapsed time of a stopwatch"
" if it has not been started/stopped")
if self._state == self._STOPPED:
elapsed = max(0.0, float(timeutils.delta_seconds(
self._started_at, self._stopped_at)))
else:
elapsed = max(0.0, float(timeutils.delta_seconds(
self._started_at, timeutils.utcnow())))
if maximum is not None and elapsed > maximum:
elapsed = max(0.0, maximum)
return elapsed
def __enter__(self):
"""Starts the watch."""
self.start()
return self
def __exit__(self, type, value, traceback):
"""Stops the watch (ignoring errors if stop fails)."""
try:
self.stop()
except RuntimeError:
pass
def leftover(self):
"""Returns how many seconds are left until the watch expires."""
if self._duration is None:
raise RuntimeError("Can not get the leftover time of a watch that"
" has no duration")
if self._state != self._STARTED:
raise RuntimeError("Can not get the leftover time of a stopwatch"
" that has not been started")
return max(0.0, self._duration - self.elapsed())
def expired(self):
"""Returns if the watch has expired (ie, duration provided elapsed)."""
if self._duration is None:
return False
if self._state is None:
raise RuntimeError("Can not check if a stopwatch has expired"
" if it has not been started/stopped")
if self.elapsed() > self._duration:
return True
return False
def resume(self):
"""Resumes the watch from a stopped state."""
if self._state == self._STOPPED:
self._state = self._STARTED
return self
else:
raise RuntimeError("Can not resume a stopwatch that has not been"
" stopped")
def stop(self):
"""Stops the watch."""
if self._state == self._STOPPED:
return self
if self._state != self._STARTED:
raise RuntimeError("Can not stop a stopwatch that has not been"
" started")
self._stopped_at = timeutils.utcnow()
self._state = self._STOPPED
return self