diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index 67542cd6..92a13dae 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -769,22 +769,24 @@ return cmsgpack.pack(result) while True: jc = self.job_count if jc > 0: - it = self.iterjobs() - return it + curr_jobs = self._fetch_jobs() + if curr_jobs: + return base.JobBoardIterator( + self, LOG, + board_fetch_func=lambda ensure_fresh: curr_jobs) + if w.expired(): + raise exc.NotFound("Expired waiting for jobs to" + " arrive; waited %s seconds" + % w.elapsed()) else: - if w.expired(): - raise exc.NotFound("Expired waiting for jobs to" - " arrive; waited %s seconds" - % w.elapsed()) + remaining = w.leftover(return_none=True) + if remaining is not None: + delay = min(delay * 2, remaining, max_delay) else: - remaining = w.leftover(return_none=True) - if remaining is not None: - delay = min(delay * 2, remaining, max_delay) - else: - delay = min(delay * 2, max_delay) - sleep_func(delay) + delay = min(delay * 2, max_delay) + sleep_func(delay) - def iterjobs(self, only_unclaimed=False, ensure_fresh=False): + def _fetch_jobs(self): with _translate_failures(): raw_postings = self._client.hgetall(self.listings_key) postings = [] @@ -798,13 +800,13 @@ return cmsgpack.pack(result) book_data=posting.get('book'), backend=self._persistence) postings.append(job) - postings = sorted(postings) - for job in postings: - if only_unclaimed: - if job.state == states.UNCLAIMED: - yield job - else: - yield job + return sorted(postings) + + def iterjobs(self, only_unclaimed=False, ensure_fresh=False): + return base.JobBoardIterator( + self, LOG, only_unclaimed=only_unclaimed, + ensure_fresh=ensure_fresh, + board_fetch_func=lambda ensure_fresh: self._fetch_jobs()) @base.check_who def consume(self, job, who): diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 6644a33e..b6f85465 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import contextlib import functools import sys @@ -189,74 +188,6 @@ class ZookeeperJob(base.Job): return hash(self.path) -class ZookeeperJobBoardIterator(six.Iterator): - """Iterator over a zookeeper jobboard that iterates over potential jobs. - - It supports the following attributes/constructor arguments: - - * ``ensure_fresh``: boolean that requests that during every fetch of a new - set of jobs this will cause the iterator to force the backend to - refresh (ensuring that the jobboard has the most recent job listings). - * ``only_unclaimed``: boolean that indicates whether to only iterate - over unclaimed jobs. - """ - - _UNCLAIMED_JOB_STATES = ( - states.UNCLAIMED, - ) - - _JOB_STATES = ( - states.UNCLAIMED, - states.COMPLETE, - states.CLAIMED, - ) - - def __init__(self, board, only_unclaimed=False, ensure_fresh=False): - self._board = board - self._jobs = collections.deque() - self._fetched = False - self.ensure_fresh = ensure_fresh - self.only_unclaimed = only_unclaimed - - @property - def board(self): - """The board this iterator was created from.""" - return self._board - - def __iter__(self): - return self - - def _next_job(self): - if self.only_unclaimed: - allowed_states = self._UNCLAIMED_JOB_STATES - else: - allowed_states = self._JOB_STATES - job = None - while self._jobs and job is None: - maybe_job = self._jobs.popleft() - try: - if maybe_job.state in allowed_states: - job = maybe_job - except excp.JobFailure: - LOG.warn("Failed determining the state of job: %s (%s)", - maybe_job.uuid, maybe_job.path, exc_info=True) - except excp.NotFound: - self._board._remove_job(maybe_job.path) - return job - - def __next__(self): - if not self._jobs: - if not self._fetched: - jobs = self._board._fetch_jobs(ensure_fresh=self.ensure_fresh) - self._jobs.extend(jobs) - self._fetched = True - job = self._next_job() - if job is None: - raise StopIteration - else: - return job - - class ZookeeperJobBoard(base.NotifyingJobBoard): """A jobboard backed by `zookeeper`_. @@ -387,9 +318,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._on_job_posting(children, delayed=False) def iterjobs(self, only_unclaimed=False, ensure_fresh=False): - return ZookeeperJobBoardIterator(self, - only_unclaimed=only_unclaimed, - ensure_fresh=ensure_fresh) + return base.JobBoardIterator( + self, LOG, only_unclaimed=only_unclaimed, + ensure_fresh=ensure_fresh, board_fetch_func=self._fetch_jobs, + board_removal_func=lambda a_job: self._remove_job(a_job.path)) def _remove_job(self, path): if path not in self._known_jobs: @@ -697,10 +629,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): # must recalculate the amount of time we really have left. self._job_cond.wait(watch.leftover(return_none=True)) else: - it = ZookeeperJobBoardIterator(self) - it._jobs.extend(self._fetch_jobs()) - it._fetched = True - return it + curr_jobs = self._fetch_jobs() + fetch_func = lambda ensure_fresh: curr_jobs + removal_func = lambda a_job: self._remove_job(a_job.path) + return base.JobBoardIterator( + self, LOG, board_fetch_func=fetch_func, + board_removal_func=removal_func) @property def connected(self): diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index a9ff0274..81e4a574 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -16,11 +16,14 @@ # under the License. import abc +import collections import contextlib from oslo_utils import uuidutils import six +from taskflow import exceptions as excp +from taskflow import states from taskflow.types import notifier @@ -147,6 +150,76 @@ class Job(object): self.details) +class JobBoardIterator(six.Iterator): + """Iterator over a jobboard that iterates over potential jobs. + + It provides the following attributes: + + * ``only_unclaimed``: boolean that indicates whether to only iterate + over unclaimed jobs + * ``ensure_fresh``: boolean that requests that during every fetch of a new + set of jobs this will cause the iterator to force the backend to + refresh (ensuring that the jobboard has the most recent job listings) + * ``board``: the board this iterator was created from + """ + + _UNCLAIMED_JOB_STATES = (states.UNCLAIMED,) + _JOB_STATES = (states.UNCLAIMED, states.COMPLETE, states.CLAIMED) + + def __init__(self, board, logger, + board_fetch_func=None, board_removal_func=None, + only_unclaimed=False, ensure_fresh=False): + self._board = board + self._logger = logger + self._board_removal_func = board_removal_func + self._board_fetch_func = board_fetch_func + self._fetched = False + self._jobs = collections.deque() + self.only_unclaimed = only_unclaimed + self.ensure_fresh = ensure_fresh + + @property + def board(self): + """The board this iterator was created from.""" + return self._board + + def __iter__(self): + return self + + def _next_job(self): + if self.only_unclaimed: + allowed_states = self._UNCLAIMED_JOB_STATES + else: + allowed_states = self._JOB_STATES + job = None + while self._jobs and job is None: + maybe_job = self._jobs.popleft() + try: + if maybe_job.state in allowed_states: + job = maybe_job + except excp.JobFailure: + self._logger.warn("Failed determining the state of" + " job '%s'", maybe_job, exc_info=True) + except excp.NotFound: + if self._board_removal_func is not None: + self._board_removal_func(maybe_job) + return job + + def __next__(self): + if not self._jobs: + if not self._fetched: + if self._board_fetch_func is not None: + self._jobs.extend( + self._board_fetch_func( + ensure_fresh=self.ensure_fresh)) + self._fetched = True + job = self._next_job() + if job is None: + raise StopIteration + else: + return job + + @six.add_metaclass(abc.ABCMeta) class JobBoard(object): """A place where jobs can be posted, reposted, claimed and transferred.