Merge "Add ability of job poster/job iterator to wait for jobs to complete"
This commit is contained in:
@@ -18,13 +18,16 @@
|
|||||||
import abc
|
import abc
|
||||||
import collections
|
import collections
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import time
|
||||||
|
|
||||||
|
from oslo_utils import timeutils
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from taskflow import exceptions as excp
|
from taskflow import exceptions as excp
|
||||||
from taskflow import states
|
from taskflow import states
|
||||||
from taskflow.types import notifier
|
from taskflow.types import notifier
|
||||||
|
from taskflow.utils import iter_utils
|
||||||
|
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
@@ -83,6 +86,43 @@ class Job(object):
|
|||||||
"""Access the current state of this job."""
|
"""Access the current state of this job."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def wait(self, timeout=None,
|
||||||
|
delay=0.01, delay_multiplier=2.0, max_delay=60.0,
|
||||||
|
sleep_func=time.sleep):
|
||||||
|
"""Wait for job to enter completion state.
|
||||||
|
|
||||||
|
If the job has not completed in the given timeout, then return false,
|
||||||
|
otherwise return true (a job failure exception may also be raised if
|
||||||
|
the job information can not be read, for whatever reason). Periodic
|
||||||
|
state checks will happen every ``delay`` seconds where ``delay`` will
|
||||||
|
be multipled by the given multipler after a state is found that is
|
||||||
|
**not** complete.
|
||||||
|
|
||||||
|
Note that if no timeout is given this is equivalent to blocking
|
||||||
|
until the job has completed. Also note that if a jobboard backend
|
||||||
|
can optimize this method then its implementation may not use
|
||||||
|
delays (and backoffs) at all. In general though no matter what
|
||||||
|
optimizations are applied implementations must **always** respect
|
||||||
|
the given timeout value.
|
||||||
|
"""
|
||||||
|
if timeout is not None:
|
||||||
|
w = timeutils.StopWatch(duration=timeout)
|
||||||
|
w.start()
|
||||||
|
else:
|
||||||
|
w = None
|
||||||
|
delay_gen = iter_utils.generate_delays(delay, max_delay,
|
||||||
|
multiplier=delay_multiplier)
|
||||||
|
while True:
|
||||||
|
if w is not None and w.expired():
|
||||||
|
return False
|
||||||
|
if self.state == states.COMPLETE:
|
||||||
|
return True
|
||||||
|
sleepy_secs = six.next(delay_gen)
|
||||||
|
if w is not None:
|
||||||
|
sleepy_secs = min(w.leftover(), sleepy_secs)
|
||||||
|
sleep_func(sleepy_secs)
|
||||||
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def book(self):
|
def book(self):
|
||||||
"""Logbook associated with this job.
|
"""Logbook associated with this job.
|
||||||
|
@@ -176,6 +176,19 @@ class BoardTestMixin(object):
|
|||||||
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
self.assertEqual(0, len(possible_jobs))
|
self.assertEqual(0, len(possible_jobs))
|
||||||
|
|
||||||
|
def test_posting_consume_wait(self):
|
||||||
|
with connect_close(self.board):
|
||||||
|
jb = self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
|
||||||
|
self.board.claim(possible_jobs[0], self.board.name)
|
||||||
|
self.board.consume(possible_jobs[0], self.board.name)
|
||||||
|
self.assertTrue(jb.wait())
|
||||||
|
|
||||||
|
def test_posting_no_consume_wait(self):
|
||||||
|
with connect_close(self.board):
|
||||||
|
jb = self.board.post('test', p_utils.temporary_log_book())
|
||||||
|
self.assertFalse(jb.wait(0.1))
|
||||||
|
|
||||||
def test_posting_with_book(self):
|
def test_posting_with_book(self):
|
||||||
backend = impl_dir.DirBackend(conf={
|
backend = impl_dir.DirBackend(conf={
|
||||||
'path': self.makeTmpDir(),
|
'path': self.makeTmpDir(),
|
||||||
|
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
import string
|
import string
|
||||||
|
|
||||||
|
import six
|
||||||
from six.moves import range as compat_range
|
from six.moves import range as compat_range
|
||||||
|
|
||||||
from taskflow import test
|
from taskflow import test
|
||||||
@@ -42,6 +43,32 @@ class IterUtilsTest(test.TestCase):
|
|||||||
self.assertRaises(ValueError,
|
self.assertRaises(ValueError,
|
||||||
iter_utils.unique_seen, *iters)
|
iter_utils.unique_seen, *iters)
|
||||||
|
|
||||||
|
def test_generate_delays(self):
|
||||||
|
it = iter_utils.generate_delays(1, 60)
|
||||||
|
self.assertEqual(1, six.next(it))
|
||||||
|
self.assertEqual(2, six.next(it))
|
||||||
|
self.assertEqual(4, six.next(it))
|
||||||
|
self.assertEqual(8, six.next(it))
|
||||||
|
self.assertEqual(16, six.next(it))
|
||||||
|
self.assertEqual(32, six.next(it))
|
||||||
|
self.assertEqual(60, six.next(it))
|
||||||
|
self.assertEqual(60, six.next(it))
|
||||||
|
|
||||||
|
def test_generate_delays_custom_multiplier(self):
|
||||||
|
it = iter_utils.generate_delays(1, 60, multiplier=4)
|
||||||
|
self.assertEqual(1, six.next(it))
|
||||||
|
self.assertEqual(4, six.next(it))
|
||||||
|
self.assertEqual(16, six.next(it))
|
||||||
|
self.assertEqual(60, six.next(it))
|
||||||
|
self.assertEqual(60, six.next(it))
|
||||||
|
|
||||||
|
def test_generate_delays_bad(self):
|
||||||
|
self.assertRaises(ValueError, iter_utils.generate_delays, -1, -1)
|
||||||
|
self.assertRaises(ValueError, iter_utils.generate_delays, -1, 2)
|
||||||
|
self.assertRaises(ValueError, iter_utils.generate_delays, 2, -1)
|
||||||
|
self.assertRaises(ValueError, iter_utils.generate_delays, 1, 1,
|
||||||
|
multiplier=0.5)
|
||||||
|
|
||||||
def test_unique_seen(self):
|
def test_unique_seen(self):
|
||||||
iters = [
|
iters = [
|
||||||
['a', 'b'],
|
['a', 'b'],
|
||||||
|
@@ -60,6 +60,37 @@ def count(it):
|
|||||||
return sum(1 for _value in it)
|
return sum(1 for _value in it)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_delays(delay, max_delay, multiplier=2):
|
||||||
|
"""Generator/iterator that provides back delays values.
|
||||||
|
|
||||||
|
The values it generates increments by a given multiple after each
|
||||||
|
iteration (using the max delay as a upper bound). Negative values
|
||||||
|
will never be generated... and it will iterate forever (ie it will never
|
||||||
|
stop generating values).
|
||||||
|
"""
|
||||||
|
if max_delay < 0:
|
||||||
|
raise ValueError("Provided delay (max) must be greater"
|
||||||
|
" than or equal to zero")
|
||||||
|
if delay < 0:
|
||||||
|
raise ValueError("Provided delay must start off greater"
|
||||||
|
" than or equal to zero")
|
||||||
|
if multiplier < 1.0:
|
||||||
|
raise ValueError("Provided multiplier must be greater than"
|
||||||
|
" or equal to 1.0")
|
||||||
|
|
||||||
|
def _gen_it():
|
||||||
|
# NOTE(harlowja): Generation is delayed so that validation
|
||||||
|
# can happen before generation/iteration... (instead of
|
||||||
|
# during generation/iteration)
|
||||||
|
curr_delay = delay
|
||||||
|
while True:
|
||||||
|
curr_delay = max(0, min(max_delay, curr_delay))
|
||||||
|
yield curr_delay
|
||||||
|
curr_delay = curr_delay * multiplier
|
||||||
|
|
||||||
|
return _gen_it()
|
||||||
|
|
||||||
|
|
||||||
def unique_seen(it, *its):
|
def unique_seen(it, *its):
|
||||||
"""Yields unique values from iterator(s) (and retains order)."""
|
"""Yields unique values from iterator(s) (and retains order)."""
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user