Merge "Scheduler: Add a progress callback to TaskRunner"
This commit is contained in:
commit
94a8d440c6
|
@ -157,18 +157,19 @@ class TaskRunner(object):
|
|||
LOG.debug('%s sleeping' % six.text_type(self))
|
||||
eventlet.sleep(wait_time)
|
||||
|
||||
def __call__(self, wait_time=1, timeout=None):
|
||||
def __call__(self, wait_time=1, timeout=None, progress_callback=None):
|
||||
"""Start and run the task to completion.
|
||||
|
||||
The task will first sleep for zero seconds, then sleep for `wait_time`
|
||||
seconds between steps. To avoid sleeping, pass `None` for `wait_time`.
|
||||
"""
|
||||
self.start(timeout=timeout)
|
||||
# ensure that zero second sleep is applied only if task
|
||||
# has not completed.
|
||||
if not self.done():
|
||||
self._sleep(0 if wait_time is not None else None)
|
||||
self.run_to_completion(wait_time=wait_time)
|
||||
assert self._runner is None, "Task already started"
|
||||
|
||||
started = False
|
||||
for step in self.as_task(timeout=timeout,
|
||||
progress_callback=progress_callback):
|
||||
self._sleep(wait_time if (started or wait_time is None) else 0)
|
||||
started = True
|
||||
|
||||
def start(self, timeout=None):
|
||||
"""Initialise the task and run its first step.
|
||||
|
@ -227,16 +228,18 @@ class TaskRunner(object):
|
|||
|
||||
return self._done
|
||||
|
||||
def run_to_completion(self, wait_time=1):
|
||||
def run_to_completion(self, wait_time=1, progress_callback=None):
|
||||
"""Run the task to completion.
|
||||
|
||||
The task will sleep for `wait_time` seconds between steps. To avoid
|
||||
sleeping, pass `None` for `wait_time`.
|
||||
"""
|
||||
while not self.step():
|
||||
assert self._runner is not None, "Task not started"
|
||||
|
||||
for step in self.as_task(progress_callback=progress_callback):
|
||||
self._sleep(wait_time)
|
||||
|
||||
def as_task(self, timeout=None):
|
||||
def as_task(self, timeout=None, progress_callback=None):
|
||||
"""Return a task that drives the TaskRunner."""
|
||||
resuming = self.started()
|
||||
if not resuming:
|
||||
|
@ -251,6 +254,9 @@ class TaskRunner(object):
|
|||
while not done:
|
||||
try:
|
||||
yield
|
||||
|
||||
if progress_callback is not None:
|
||||
progress_callback()
|
||||
except GeneratorExit:
|
||||
self.cancel()
|
||||
raise
|
||||
|
|
|
@ -572,6 +572,242 @@ class TaskTest(common.HeatTestCase):
|
|||
runner.start()
|
||||
runner.run_to_completion(wait_time=24)
|
||||
|
||||
def test_run_progress(self):
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
progress_count.append(None)
|
||||
|
||||
task = DummyTask()
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(0).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
task.do_step(3).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
scheduler.TaskRunner(task)(progress_callback=progress)
|
||||
self.assertEqual(task.num_steps, len(progress_count))
|
||||
|
||||
def test_start_run_progress(self):
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
progress_count.append(None)
|
||||
|
||||
task = DummyTask()
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
task.do_step(3).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
runner = scheduler.TaskRunner(task)
|
||||
runner.start()
|
||||
runner.run_to_completion(progress_callback=progress)
|
||||
self.assertEqual(task.num_steps - 1, len(progress_count))
|
||||
|
||||
def test_run_as_task_progress(self):
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
progress_count.append(None)
|
||||
|
||||
task = DummyTask()
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
task.do_step(3).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
tr = scheduler.TaskRunner(task)
|
||||
rt = tr.as_task(progress_callback=progress)
|
||||
for step in rt:
|
||||
pass
|
||||
self.assertEqual(task.num_steps, len(progress_count))
|
||||
|
||||
def test_run_progress_exception(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
if progress_count:
|
||||
raise TestException
|
||||
progress_count.append(None)
|
||||
|
||||
task = DummyTask()
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(0).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.assertRaises(TestException, scheduler.TaskRunner(task),
|
||||
progress_callback=progress)
|
||||
self.assertEqual(1, len(progress_count))
|
||||
|
||||
def test_start_run_progress_exception(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
if progress_count:
|
||||
raise TestException
|
||||
progress_count.append(None)
|
||||
|
||||
task = DummyTask()
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
task.do_step(3).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
runner = scheduler.TaskRunner(task)
|
||||
runner.start()
|
||||
self.assertRaises(TestException, runner.run_to_completion,
|
||||
progress_callback=progress)
|
||||
self.assertEqual(1, len(progress_count))
|
||||
|
||||
def test_run_as_task_progress_exception(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
if progress_count:
|
||||
raise TestException
|
||||
progress_count.append(None)
|
||||
|
||||
task = DummyTask()
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
tr = scheduler.TaskRunner(task)
|
||||
rt = tr.as_task(progress_callback=progress)
|
||||
next(rt)
|
||||
next(rt)
|
||||
self.assertRaises(TestException, next, rt)
|
||||
self.assertEqual(1, len(progress_count))
|
||||
|
||||
def test_run_progress_exception_swallow(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
try:
|
||||
if not progress_count:
|
||||
raise TestException
|
||||
finally:
|
||||
progress_count.append(None)
|
||||
|
||||
def task():
|
||||
try:
|
||||
yield
|
||||
except TestException:
|
||||
yield
|
||||
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
scheduler.TaskRunner._sleep(0).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
scheduler.TaskRunner(task)(progress_callback=progress)
|
||||
self.assertEqual(2, len(progress_count))
|
||||
|
||||
def test_start_run_progress_exception_swallow(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
try:
|
||||
if not progress_count:
|
||||
raise TestException
|
||||
finally:
|
||||
progress_count.append(None)
|
||||
|
||||
def task():
|
||||
yield
|
||||
try:
|
||||
yield
|
||||
except TestException:
|
||||
yield
|
||||
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
runner = scheduler.TaskRunner(task)
|
||||
runner.start()
|
||||
runner.run_to_completion(progress_callback=progress)
|
||||
self.assertEqual(2, len(progress_count))
|
||||
|
||||
def test_run_as_task_progress_exception_swallow(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
progress_count = []
|
||||
|
||||
def progress():
|
||||
try:
|
||||
if not progress_count:
|
||||
raise TestException
|
||||
finally:
|
||||
progress_count.append(None)
|
||||
|
||||
def task():
|
||||
try:
|
||||
yield
|
||||
except TestException:
|
||||
yield
|
||||
|
||||
tr = scheduler.TaskRunner(task)
|
||||
rt = tr.as_task(progress_callback=progress)
|
||||
next(rt)
|
||||
next(rt)
|
||||
self.assertRaises(StopIteration, next, rt)
|
||||
self.assertEqual(2, len(progress_count))
|
||||
|
||||
def test_sleep(self):
|
||||
sleep_time = 42
|
||||
self.m.StubOutWithMock(eventlet, 'sleep')
|
||||
|
|
Loading…
Reference in New Issue