Add ability of job poster/job iterator to wait for jobs to complete

It is quite useful for someone who has posted a job to be able to
easily wait for its completion (up to a given timeout) so add a wait
method onto the job class to allow for this to be possible.

Change-Id: Id3a7c724020962591e323da0febfd0c71d1acc50
This commit is contained in:
Joshua Harlow
2015-12-15 17:53:52 -08:00
committed by Joshua Harlow
parent 2d83552890
commit 898eb11f11
4 changed files with 111 additions and 0 deletions

View File

@@ -18,13 +18,16 @@
import abc
import collections
import contextlib
import time
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from taskflow import exceptions as excp
from taskflow import states
from taskflow.types import notifier
from taskflow.utils import iter_utils
@six.add_metaclass(abc.ABCMeta)
@@ -83,6 +86,43 @@ class Job(object):
"""Access the current state of this job."""
pass
def wait(self, timeout=None,
delay=0.01, delay_multiplier=2.0, max_delay=60.0,
sleep_func=time.sleep):
"""Wait for job to enter completion state.
If the job has not completed in the given timeout, then return false,
otherwise return true (a job failure exception may also be raised if
the job information can not be read, for whatever reason). Periodic
state checks will happen every ``delay`` seconds where ``delay`` will
be multipled by the given multipler after a state is found that is
**not** complete.
Note that if no timeout is given this is equivalent to blocking
until the job has completed. Also note that if a jobboard backend
can optimize this method then its implementation may not use
delays (and backoffs) at all. In general though no matter what
optimizations are applied implementations must **always** respect
the given timeout value.
"""
if timeout is not None:
w = timeutils.StopWatch(duration=timeout)
w.start()
else:
w = None
delay_gen = iter_utils.generate_delays(delay, max_delay,
multiplier=delay_multiplier)
while True:
if w is not None and w.expired():
return False
if self.state == states.COMPLETE:
return True
sleepy_secs = six.next(delay_gen)
if w is not None:
sleepy_secs = min(w.leftover(), sleepy_secs)
sleep_func(sleepy_secs)
return False
@property
def book(self):
"""Logbook associated with this job.

View File

@@ -176,6 +176,19 @@ class BoardTestMixin(object):
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
def test_posting_consume_wait(self):
with connect_close(self.board):
jb = self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.board.claim(possible_jobs[0], self.board.name)
self.board.consume(possible_jobs[0], self.board.name)
self.assertTrue(jb.wait())
def test_posting_no_consume_wait(self):
with connect_close(self.board):
jb = self.board.post('test', p_utils.temporary_log_book())
self.assertFalse(jb.wait(0.1))
def test_posting_with_book(self):
backend = impl_dir.DirBackend(conf={
'path': self.makeTmpDir(),

View File

@@ -16,6 +16,7 @@
import string
import six
from six.moves import range as compat_range
from taskflow import test
@@ -42,6 +43,32 @@ class IterUtilsTest(test.TestCase):
self.assertRaises(ValueError,
iter_utils.unique_seen, *iters)
def test_generate_delays(self):
it = iter_utils.generate_delays(1, 60)
self.assertEqual(1, six.next(it))
self.assertEqual(2, six.next(it))
self.assertEqual(4, six.next(it))
self.assertEqual(8, six.next(it))
self.assertEqual(16, six.next(it))
self.assertEqual(32, six.next(it))
self.assertEqual(60, six.next(it))
self.assertEqual(60, six.next(it))
def test_generate_delays_custom_multiplier(self):
it = iter_utils.generate_delays(1, 60, multiplier=4)
self.assertEqual(1, six.next(it))
self.assertEqual(4, six.next(it))
self.assertEqual(16, six.next(it))
self.assertEqual(60, six.next(it))
self.assertEqual(60, six.next(it))
def test_generate_delays_bad(self):
self.assertRaises(ValueError, iter_utils.generate_delays, -1, -1)
self.assertRaises(ValueError, iter_utils.generate_delays, -1, 2)
self.assertRaises(ValueError, iter_utils.generate_delays, 2, -1)
self.assertRaises(ValueError, iter_utils.generate_delays, 1, 1,
multiplier=0.5)
def test_unique_seen(self):
iters = [
['a', 'b'],

View File

@@ -60,6 +60,37 @@ def count(it):
return sum(1 for _value in it)
def generate_delays(delay, max_delay, multiplier=2):
"""Generator/iterator that provides back delays values.
The values it generates increments by a given multiple after each
iteration (using the max delay as a upper bound). Negative values
will never be generated... and it will iterate forever (ie it will never
stop generating values).
"""
if max_delay < 0:
raise ValueError("Provided delay (max) must be greater"
" than or equal to zero")
if delay < 0:
raise ValueError("Provided delay must start off greater"
" than or equal to zero")
if multiplier < 1.0:
raise ValueError("Provided multiplier must be greater than"
" or equal to 1.0")
def _gen_it():
# NOTE(harlowja): Generation is delayed so that validation
# can happen before generation/iteration... (instead of
# during generation/iteration)
curr_delay = delay
while True:
curr_delay = max(0, min(max_delay, curr_delay))
yield curr_delay
curr_delay = curr_delay * multiplier
return _gen_it()
def unique_seen(it, *its):
"""Yields unique values from iterator(s) (and retains order)."""