From 479b3c84302066d58b2b16e784a3ebb438912631 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 29 Apr 2014 13:43:53 -0700 Subject: [PATCH] Move from generator to iterator for iterjobs Instead of yielding back valid jobs, alter the function to return a iterator which can support various capabilities when iterated over. These capabilities can be changed during iteration and are more flexible in allowing new jobboard implementations to change there supported iteration capabilities. Change-Id: Ibcd47d881a5c8689b44bc444402f51030649c0be --- taskflow/jobs/backends/impl_zookeeper.py | 89 ++++++++++++++++++------ taskflow/jobs/jobboard.py | 13 +++- taskflow/tests/unit/jobs/test_zk_job.py | 12 ++++ 3 files changed, 91 insertions(+), 23 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index b0a53ecc..b9331404 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import functools import logging @@ -201,6 +202,69 @@ class ZookeeperJob(base_job.Job): return self._book +class ZookeeperJobBoardIterator(six.Iterator): + """Iterator over a zookeeper jobboard. + + It supports the following attributes/constructor arguments: + + * ensure_fresh: boolean that requests that during every fetch of a new + set of jobs this will cause the iterator to force the backend to + refresh (ensuring that the jobboard has the most recent job listings). + * only_unclaimed: boolean that indicates whether to only iterate + over unclaimed jobs. + """ + + def __init__(self, board, only_unclaimed=False, ensure_fresh=False): + self._board = board + self._jobs = collections.deque() + self._fetched = False + self.ensure_fresh = ensure_fresh + self.only_unclaimed = only_unclaimed + + @property + def board(self): + return self._board + + def __iter__(self): + return self + + def _fetch_jobs(self): + if self.ensure_fresh: + self._board._force_refresh() + with self._board._job_mutate: + return sorted(six.itervalues(self._board._known_jobs)) + + def _next_job(self): + if self.only_unclaimed: + allowed_states = UNCLAIMED_JOB_STATES + else: + allowed_states = ALL_JOB_STATES + job = None + while self._jobs and job is None: + maybe_job = self._jobs.popleft() + try: + if maybe_job.state in allowed_states: + job = maybe_job + except excp.JobFailure: + LOG.warn("Failed determining the state of job: %s (%s)", + maybe_job.uuid, maybe_job.path, exc_info=True) + except excp.NotFound: + with self._board._job_mutate: + self._board._remove_job(maybe_job.path) + return job + + def __next__(self): + if not self._jobs: + if not self._fetched: + self._jobs.extend(self._fetch_jobs()) + self._fetched = True + job = self._next_job() + if job is None: + raise StopIteration + else: + return job + + class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def __init__(self, name, conf, client=None, persistence=None, emit_notifications=True): @@ -250,7 +314,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): with self._job_mutate: return len(self._known_jobs) - def _force_refresh(self, delayed=False): + def _force_refresh(self): try: children = self._client.get_children(self.path) except self._client.handler.timeout_exception as e: @@ -263,27 +327,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): except k_exceptions.KazooException as e: raise excp.JobFailure("Refreshing failure, internal error", e) else: - self._on_job_posting(children, delayed=delayed) + self._on_job_posting(children, delayed=False) def iterjobs(self, only_unclaimed=False, ensure_fresh=False): - if ensure_fresh: - self._force_refresh() - ok_states = ALL_JOB_STATES - if only_unclaimed: - ok_states = UNCLAIMED_JOB_STATES - with self._job_mutate: - known_jobs = list(six.itervalues(self._known_jobs)) - for job in sorted(known_jobs): - try: - if job.state in ok_states: - yield job - except excp.JobFailure: - LOG.warn("Failed determining the state of job: %s (%s)", - job.uuid, job.path, exc_info=True) - except excp.NotFound: - # Someone destroyed it while we are iterating. - with self._job_mutate: - self._remove_job(job.path) + return ZookeeperJobBoardIterator(self, + only_unclaimed=only_unclaimed, + ensure_fresh=ensure_fresh) def _remove_job(self, path): LOG.debug("Removing job that was at path: %s", path) diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index 737251bd..0aa533ea 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -36,14 +36,19 @@ class JobBoard(object): @abc.abstractmethod def iterjobs(self, only_unclaimed=False, ensure_fresh=False): - """Yields back jobs that are currently on this jobboard (claimed - or not claimed). + """Returns an iterator that will provide back jobs that are currently + on this jobboard. NOTE(harlowja): the ordering of this iteration should be by posting order (oldest to newest) if possible, but it is left up to the backing implementation to provide the order that best suits it (so don't depend on it always being oldest to newest). + NOTE(harlowja): the iterator that is returned may support other + attributes which can be used to further customize how iteration can + be accomplished; check with the backends iterator object to determine + what other attributes are supported. + :param only_unclaimed: boolean that indicates whether to only iteration over unclaimed jobs. :param ensure_fresh: boolean that requests to only iterate over the @@ -55,7 +60,9 @@ class JobBoard(object): @abc.abstractproperty def job_count(self): - """Returns how many jobs are on this jobboard.""" + """Returns how many jobs are on this jobboard (this count may change as + new jobs appear or are removed). + """ @abc.abstractmethod def find_owner(self, job): diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 26a0fec5..9b7c54c4 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -81,6 +81,18 @@ class TestZookeeperJobs(test.TestCase): self.assertTrue(mock_dt.called) + def test_board_iter(self): + with connect_close(self.board): + it = self.board.iterjobs() + self.assertEqual(it.board, self.board) + self.assertFalse(it.only_unclaimed) + self.assertFalse(it.ensure_fresh) + + def test_board_iter_empty(self): + with connect_close(self.board): + jobs_found = list(self.board.iterjobs()) + self.assertEqual([], jobs_found) + def test_fresh_iter(self): with connect_close(self.board): book = p_utils.temporary_log_book()