Add a timeout option to the scheduler

Change-Id: Ia796c5027c8faeefb9ddf48d88583d3d7901448d
This commit is contained in:
Zane Bitter 2013-05-13 17:55:41 +02:00
parent 8026858af5
commit 8c3b308803
2 changed files with 134 additions and 9 deletions

View File

@ -18,6 +18,7 @@ import functools
import itertools
import sys
import types
from time import time as wallclock
from heat.openstack.common import excutils
from heat.openstack.common import log as logging
@ -38,6 +39,35 @@ def task_description(task):
return repr(task)
class Timeout(BaseException):
"""
Timeout exception, raised within a task when it has exceeded its allotted
(wallclock) running time.
This allows the task to perform any necessary cleanup, as well as use a
different exception to notify the controlling task if appropriate. If the
task supresses the exception altogether, it will be cancelled but the
controlling task will not be notified of the timeout.
"""
def __init__(self, task_runner, timeout):
"""
Initialise with the TaskRunner and a timeout period in seconds.
"""
message = _('%s Timed out') % task_runner
super(Timeout, self).__init__(message)
# Note that we don't attempt to handle leap seconds or large clock
# jumps here. The latter are assumed to be rare and the former
# negligible in the context of the timeout. Time zone adjustments,
# Daylight Savings and the like *are* handled. PEP 418 adds a proper
# monotonic clock, but only in Python 3.3.
self._endtime = wallclock() + timeout
def expired(self):
return wallclock() > self._endtime
class TaskRunner(object):
"""
Wrapper for a resumable task (co-routine).
@ -58,6 +88,7 @@ class TaskRunner(object):
self._kwargs = kwargs
self._runner = None
self._done = False
self._timeout = None
self.name = task_description(task)
def __str__(self):
@ -70,24 +101,31 @@ class TaskRunner(object):
logger.debug('%s sleeping' % str(self))
eventlet.sleep(wait_time)
def __call__(self, wait_time=1):
def __call__(self, wait_time=1, timeout=None):
"""
Start and run the task to completion.
The task will sleep for `wait_time` seconds between steps. To avoid
sleeping, pass `None` for `wait_time`.
"""
self.start()
self.start(timeout=timeout)
self.run_to_completion(wait_time=wait_time)
def start(self):
def start(self, timeout=None):
"""
Initialise the task and run its first step.
If a timeout is specified, any attempt to step the task after that
number of seconds has elapsed will result in a Timeout being
raised inside the task.
"""
assert self._runner is None, "Task already started"
logger.debug('%s starting' % str(self))
if timeout is not None:
self._timeout = Timeout(self, timeout)
result = self._task(*self._args, **self._kwargs)
if isinstance(result, types.GeneratorType):
self._runner = result
@ -105,13 +143,24 @@ class TaskRunner(object):
if not self.done():
assert self._runner is not None, "Task not started"
logger.debug('%s running' % str(self))
if self._timeout is not None and self._timeout.expired():
logger.info('%s timed out' % str(self))
try:
next(self._runner)
except StopIteration:
self._done = True
logger.debug('%s complete' % str(self))
try:
self._runner.throw(self._timeout)
except StopIteration:
self._done = True
else:
# Clean up in case task swallows exception without exiting
self.cancel()
else:
logger.debug('%s running' % str(self))
try:
next(self._runner)
except StopIteration:
self._done = True
logger.debug('%s complete' % str(self))
return self._done

View File

@ -352,6 +352,82 @@ class TaskTest(mox.MoxTestBase):
self.assertTrue(runner.step())
self.assertTrue(runner.step())
def test_timeout(self):
st = scheduler.wallclock()
def task():
while True:
yield
self.mox.StubOutWithMock(scheduler, 'wallclock')
scheduler.wallclock().AndReturn(st)
scheduler.wallclock().AndReturn(st + 0.5)
scheduler.wallclock().AndReturn(st + 1.5)
self.mox.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start(timeout=1)
self.assertTrue(runner)
self.assertRaises(scheduler.Timeout, runner.step)
self.mox.VerifyAll()
def test_timeout_return(self):
st = scheduler.wallclock()
def task():
while True:
try:
yield
except scheduler.Timeout:
return
self.mox.StubOutWithMock(scheduler, 'wallclock')
scheduler.wallclock().AndReturn(st)
scheduler.wallclock().AndReturn(st + 0.5)
scheduler.wallclock().AndReturn(st + 1.5)
self.mox.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start(timeout=1)
self.assertTrue(runner)
self.assertTrue(runner.step())
self.assertFalse(runner)
self.mox.VerifyAll()
def test_timeout_swallowed(self):
st = scheduler.wallclock()
def task():
while True:
try:
yield
except scheduler.Timeout:
yield
self.fail('Task still running')
self.mox.StubOutWithMock(scheduler, 'wallclock')
scheduler.wallclock().AndReturn(st)
scheduler.wallclock().AndReturn(st + 0.5)
scheduler.wallclock().AndReturn(st + 1.5)
self.mox.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start(timeout=1)
self.assertTrue(runner)
self.assertTrue(runner.step())
self.assertFalse(runner)
self.assertTrue(runner.step())
self.mox.VerifyAll()
class WrapperTaskTest(mox.MoxTestBase):