From 07b9c2eb0a1338888454fc5437b86da9fbdf6490 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 10 Jul 2015 17:45:33 -0700 Subject: [PATCH] Unify the zookeeper/redis jobboard iterators To make the zookeeper jobboard and redis jobboard iterjobs and wait functions that much similar have both return iterator objects from the same class iterator. This makes the iterator code and the jobboard code that much easier to follow and understand. Change-Id: Ia772cde881c2631002140e06684521fd42441534 --- taskflow/jobs/backends/impl_redis.py | 42 ++++++------ taskflow/jobs/backends/impl_zookeeper.py | 86 +++--------------------- taskflow/jobs/base.py | 73 ++++++++++++++++++++ 3 files changed, 105 insertions(+), 96 deletions(-) diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index 67542cd6..92a13dae 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -769,22 +769,24 @@ return cmsgpack.pack(result) while True: jc = self.job_count if jc > 0: - it = self.iterjobs() - return it + curr_jobs = self._fetch_jobs() + if curr_jobs: + return base.JobBoardIterator( + self, LOG, + board_fetch_func=lambda ensure_fresh: curr_jobs) + if w.expired(): + raise exc.NotFound("Expired waiting for jobs to" + " arrive; waited %s seconds" + % w.elapsed()) else: - if w.expired(): - raise exc.NotFound("Expired waiting for jobs to" - " arrive; waited %s seconds" - % w.elapsed()) + remaining = w.leftover(return_none=True) + if remaining is not None: + delay = min(delay * 2, remaining, max_delay) else: - remaining = w.leftover(return_none=True) - if remaining is not None: - delay = min(delay * 2, remaining, max_delay) - else: - delay = min(delay * 2, max_delay) - sleep_func(delay) + delay = min(delay * 2, max_delay) + sleep_func(delay) - def iterjobs(self, only_unclaimed=False, ensure_fresh=False): + def _fetch_jobs(self): with _translate_failures(): raw_postings = self._client.hgetall(self.listings_key) postings = [] @@ -798,13 +800,13 @@ return cmsgpack.pack(result) book_data=posting.get('book'), backend=self._persistence) postings.append(job) - postings = sorted(postings) - for job in postings: - if only_unclaimed: - if job.state == states.UNCLAIMED: - yield job - else: - yield job + return sorted(postings) + + def iterjobs(self, only_unclaimed=False, ensure_fresh=False): + return base.JobBoardIterator( + self, LOG, only_unclaimed=only_unclaimed, + ensure_fresh=ensure_fresh, + board_fetch_func=lambda ensure_fresh: self._fetch_jobs()) @base.check_who def consume(self, job, who): diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 6644a33e..b6f85465 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import contextlib import functools import sys @@ -189,74 +188,6 @@ class ZookeeperJob(base.Job): return hash(self.path) -class ZookeeperJobBoardIterator(six.Iterator): - """Iterator over a zookeeper jobboard that iterates over potential jobs. - - It supports the following attributes/constructor arguments: - - * ``ensure_fresh``: boolean that requests that during every fetch of a new - set of jobs this will cause the iterator to force the backend to - refresh (ensuring that the jobboard has the most recent job listings). - * ``only_unclaimed``: boolean that indicates whether to only iterate - over unclaimed jobs. - """ - - _UNCLAIMED_JOB_STATES = ( - states.UNCLAIMED, - ) - - _JOB_STATES = ( - states.UNCLAIMED, - states.COMPLETE, - states.CLAIMED, - ) - - def __init__(self, board, only_unclaimed=False, ensure_fresh=False): - self._board = board - self._jobs = collections.deque() - self._fetched = False - self.ensure_fresh = ensure_fresh - self.only_unclaimed = only_unclaimed - - @property - def board(self): - """The board this iterator was created from.""" - return self._board - - def __iter__(self): - return self - - def _next_job(self): - if self.only_unclaimed: - allowed_states = self._UNCLAIMED_JOB_STATES - else: - allowed_states = self._JOB_STATES - job = None - while self._jobs and job is None: - maybe_job = self._jobs.popleft() - try: - if maybe_job.state in allowed_states: - job = maybe_job - except excp.JobFailure: - LOG.warn("Failed determining the state of job: %s (%s)", - maybe_job.uuid, maybe_job.path, exc_info=True) - except excp.NotFound: - self._board._remove_job(maybe_job.path) - return job - - def __next__(self): - if not self._jobs: - if not self._fetched: - jobs = self._board._fetch_jobs(ensure_fresh=self.ensure_fresh) - self._jobs.extend(jobs) - self._fetched = True - job = self._next_job() - if job is None: - raise StopIteration - else: - return job - - class ZookeeperJobBoard(base.NotifyingJobBoard): """A jobboard backed by `zookeeper`_. @@ -387,9 +318,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._on_job_posting(children, delayed=False) def iterjobs(self, only_unclaimed=False, ensure_fresh=False): - return ZookeeperJobBoardIterator(self, - only_unclaimed=only_unclaimed, - ensure_fresh=ensure_fresh) + return base.JobBoardIterator( + self, LOG, only_unclaimed=only_unclaimed, + ensure_fresh=ensure_fresh, board_fetch_func=self._fetch_jobs, + board_removal_func=lambda a_job: self._remove_job(a_job.path)) def _remove_job(self, path): if path not in self._known_jobs: @@ -697,10 +629,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): # must recalculate the amount of time we really have left. self._job_cond.wait(watch.leftover(return_none=True)) else: - it = ZookeeperJobBoardIterator(self) - it._jobs.extend(self._fetch_jobs()) - it._fetched = True - return it + curr_jobs = self._fetch_jobs() + fetch_func = lambda ensure_fresh: curr_jobs + removal_func = lambda a_job: self._remove_job(a_job.path) + return base.JobBoardIterator( + self, LOG, board_fetch_func=fetch_func, + board_removal_func=removal_func) @property def connected(self): diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index a9ff0274..81e4a574 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -16,11 +16,14 @@ # under the License. import abc +import collections import contextlib from oslo_utils import uuidutils import six +from taskflow import exceptions as excp +from taskflow import states from taskflow.types import notifier @@ -147,6 +150,76 @@ class Job(object): self.details) +class JobBoardIterator(six.Iterator): + """Iterator over a jobboard that iterates over potential jobs. + + It provides the following attributes: + + * ``only_unclaimed``: boolean that indicates whether to only iterate + over unclaimed jobs + * ``ensure_fresh``: boolean that requests that during every fetch of a new + set of jobs this will cause the iterator to force the backend to + refresh (ensuring that the jobboard has the most recent job listings) + * ``board``: the board this iterator was created from + """ + + _UNCLAIMED_JOB_STATES = (states.UNCLAIMED,) + _JOB_STATES = (states.UNCLAIMED, states.COMPLETE, states.CLAIMED) + + def __init__(self, board, logger, + board_fetch_func=None, board_removal_func=None, + only_unclaimed=False, ensure_fresh=False): + self._board = board + self._logger = logger + self._board_removal_func = board_removal_func + self._board_fetch_func = board_fetch_func + self._fetched = False + self._jobs = collections.deque() + self.only_unclaimed = only_unclaimed + self.ensure_fresh = ensure_fresh + + @property + def board(self): + """The board this iterator was created from.""" + return self._board + + def __iter__(self): + return self + + def _next_job(self): + if self.only_unclaimed: + allowed_states = self._UNCLAIMED_JOB_STATES + else: + allowed_states = self._JOB_STATES + job = None + while self._jobs and job is None: + maybe_job = self._jobs.popleft() + try: + if maybe_job.state in allowed_states: + job = maybe_job + except excp.JobFailure: + self._logger.warn("Failed determining the state of" + " job '%s'", maybe_job, exc_info=True) + except excp.NotFound: + if self._board_removal_func is not None: + self._board_removal_func(maybe_job) + return job + + def __next__(self): + if not self._jobs: + if not self._fetched: + if self._board_fetch_func is not None: + self._jobs.extend( + self._board_fetch_func( + ensure_fresh=self.ensure_fresh)) + self._fetched = True + job = self._next_job() + if job is None: + raise StopIteration + else: + return job + + @six.add_metaclass(abc.ABCMeta) class JobBoard(object): """A place where jobs can be posted, reposted, claimed and transferred.