From 898eb11f11db153c26ddd71e756fbccda867387e Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 15 Dec 2015 17:53:52 -0800 Subject: [PATCH] Add ability of job poster/job iterator to wait for jobs to complete It is quite useful for someone who has posted a job to be able to easily wait for its completion (up to a given timeout) so add a wait method onto the job class to allow for this to be possible. Change-Id: Id3a7c724020962591e323da0febfd0c71d1acc50 --- taskflow/jobs/base.py | 40 ++++++++++++++++++++ taskflow/tests/unit/jobs/base.py | 13 +++++++ taskflow/tests/unit/test_utils_iter_utils.py | 27 +++++++++++++ taskflow/utils/iter_utils.py | 31 +++++++++++++++ 4 files changed, 111 insertions(+) diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index 8e5d77c3..ef9ef7ea 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -18,13 +18,16 @@ import abc import collections import contextlib +import time +from oslo_utils import timeutils from oslo_utils import uuidutils import six from taskflow import exceptions as excp from taskflow import states from taskflow.types import notifier +from taskflow.utils import iter_utils @six.add_metaclass(abc.ABCMeta) @@ -83,6 +86,43 @@ class Job(object): """Access the current state of this job.""" pass + def wait(self, timeout=None, + delay=0.01, delay_multiplier=2.0, max_delay=60.0, + sleep_func=time.sleep): + """Wait for job to enter completion state. + + If the job has not completed in the given timeout, then return false, + otherwise return true (a job failure exception may also be raised if + the job information can not be read, for whatever reason). Periodic + state checks will happen every ``delay`` seconds where ``delay`` will + be multipled by the given multipler after a state is found that is + **not** complete. + + Note that if no timeout is given this is equivalent to blocking + until the job has completed. Also note that if a jobboard backend + can optimize this method then its implementation may not use + delays (and backoffs) at all. In general though no matter what + optimizations are applied implementations must **always** respect + the given timeout value. + """ + if timeout is not None: + w = timeutils.StopWatch(duration=timeout) + w.start() + else: + w = None + delay_gen = iter_utils.generate_delays(delay, max_delay, + multiplier=delay_multiplier) + while True: + if w is not None and w.expired(): + return False + if self.state == states.COMPLETE: + return True + sleepy_secs = six.next(delay_gen) + if w is not None: + sleepy_secs = min(w.leftover(), sleepy_secs) + sleep_func(sleepy_secs) + return False + @property def book(self): """Logbook associated with this job. diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py index 46c78dfe..cbb0f68e 100644 --- a/taskflow/tests/unit/jobs/base.py +++ b/taskflow/tests/unit/jobs/base.py @@ -176,6 +176,19 @@ class BoardTestMixin(object): possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) self.assertEqual(0, len(possible_jobs)) + def test_posting_consume_wait(self): + with connect_close(self.board): + jb = self.board.post('test', p_utils.temporary_log_book()) + possible_jobs = list(self.board.iterjobs(only_unclaimed=True)) + self.board.claim(possible_jobs[0], self.board.name) + self.board.consume(possible_jobs[0], self.board.name) + self.assertTrue(jb.wait()) + + def test_posting_no_consume_wait(self): + with connect_close(self.board): + jb = self.board.post('test', p_utils.temporary_log_book()) + self.assertFalse(jb.wait(0.1)) + def test_posting_with_book(self): backend = impl_dir.DirBackend(conf={ 'path': self.makeTmpDir(), diff --git a/taskflow/tests/unit/test_utils_iter_utils.py b/taskflow/tests/unit/test_utils_iter_utils.py index 88879811..ad522a4e 100644 --- a/taskflow/tests/unit/test_utils_iter_utils.py +++ b/taskflow/tests/unit/test_utils_iter_utils.py @@ -16,6 +16,7 @@ import string +import six from six.moves import range as compat_range from taskflow import test @@ -42,6 +43,32 @@ class IterUtilsTest(test.TestCase): self.assertRaises(ValueError, iter_utils.unique_seen, *iters) + def test_generate_delays(self): + it = iter_utils.generate_delays(1, 60) + self.assertEqual(1, six.next(it)) + self.assertEqual(2, six.next(it)) + self.assertEqual(4, six.next(it)) + self.assertEqual(8, six.next(it)) + self.assertEqual(16, six.next(it)) + self.assertEqual(32, six.next(it)) + self.assertEqual(60, six.next(it)) + self.assertEqual(60, six.next(it)) + + def test_generate_delays_custom_multiplier(self): + it = iter_utils.generate_delays(1, 60, multiplier=4) + self.assertEqual(1, six.next(it)) + self.assertEqual(4, six.next(it)) + self.assertEqual(16, six.next(it)) + self.assertEqual(60, six.next(it)) + self.assertEqual(60, six.next(it)) + + def test_generate_delays_bad(self): + self.assertRaises(ValueError, iter_utils.generate_delays, -1, -1) + self.assertRaises(ValueError, iter_utils.generate_delays, -1, 2) + self.assertRaises(ValueError, iter_utils.generate_delays, 2, -1) + self.assertRaises(ValueError, iter_utils.generate_delays, 1, 1, + multiplier=0.5) + def test_unique_seen(self): iters = [ ['a', 'b'], diff --git a/taskflow/utils/iter_utils.py b/taskflow/utils/iter_utils.py index 5d0aff10..413b36f4 100644 --- a/taskflow/utils/iter_utils.py +++ b/taskflow/utils/iter_utils.py @@ -60,6 +60,37 @@ def count(it): return sum(1 for _value in it) +def generate_delays(delay, max_delay, multiplier=2): + """Generator/iterator that provides back delays values. + + The values it generates increments by a given multiple after each + iteration (using the max delay as a upper bound). Negative values + will never be generated... and it will iterate forever (ie it will never + stop generating values). + """ + if max_delay < 0: + raise ValueError("Provided delay (max) must be greater" + " than or equal to zero") + if delay < 0: + raise ValueError("Provided delay must start off greater" + " than or equal to zero") + if multiplier < 1.0: + raise ValueError("Provided multiplier must be greater than" + " or equal to 1.0") + + def _gen_it(): + # NOTE(harlowja): Generation is delayed so that validation + # can happen before generation/iteration... (instead of + # during generation/iteration) + curr_delay = delay + while True: + curr_delay = max(0, min(max_delay, curr_delay)) + yield curr_delay + curr_delay = curr_delay * multiplier + + return _gen_it() + + def unique_seen(it, *its): """Yields unique values from iterator(s) (and retains order)."""