Unify the zookeeper/redis jobboard iterators

To make the zookeeper jobboard and redis jobboard iterjobs and wait
functions that much similar have both return iterator objects from
the same class iterator.

This makes the iterator code and the jobboard code that much easier
to follow and understand.

Change-Id: Ia772cde881c2631002140e06684521fd42441534
This commit is contained in:
Joshua Harlow
2015-07-10 17:45:33 -07:00
committed by Thomas Goirand
parent 88898570ad
commit 07b9c2eb0a
3 changed files with 105 additions and 96 deletions

View File

@@ -769,22 +769,24 @@ return cmsgpack.pack(result)
while True:
jc = self.job_count
if jc > 0:
it = self.iterjobs()
return it
curr_jobs = self._fetch_jobs()
if curr_jobs:
return base.JobBoardIterator(
self, LOG,
board_fetch_func=lambda ensure_fresh: curr_jobs)
if w.expired():
raise exc.NotFound("Expired waiting for jobs to"
" arrive; waited %s seconds"
% w.elapsed())
else:
if w.expired():
raise exc.NotFound("Expired waiting for jobs to"
" arrive; waited %s seconds"
% w.elapsed())
remaining = w.leftover(return_none=True)
if remaining is not None:
delay = min(delay * 2, remaining, max_delay)
else:
remaining = w.leftover(return_none=True)
if remaining is not None:
delay = min(delay * 2, remaining, max_delay)
else:
delay = min(delay * 2, max_delay)
sleep_func(delay)
delay = min(delay * 2, max_delay)
sleep_func(delay)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
def _fetch_jobs(self):
with _translate_failures():
raw_postings = self._client.hgetall(self.listings_key)
postings = []
@@ -798,13 +800,13 @@ return cmsgpack.pack(result)
book_data=posting.get('book'),
backend=self._persistence)
postings.append(job)
postings = sorted(postings)
for job in postings:
if only_unclaimed:
if job.state == states.UNCLAIMED:
yield job
else:
yield job
return sorted(postings)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
return base.JobBoardIterator(
self, LOG, only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh,
board_fetch_func=lambda ensure_fresh: self._fetch_jobs())
@base.check_who
def consume(self, job, who):

View File

@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import functools
import sys
@@ -189,74 +188,6 @@ class ZookeeperJob(base.Job):
return hash(self.path)
class ZookeeperJobBoardIterator(six.Iterator):
"""Iterator over a zookeeper jobboard that iterates over potential jobs.
It supports the following attributes/constructor arguments:
* ``ensure_fresh``: boolean that requests that during every fetch of a new
set of jobs this will cause the iterator to force the backend to
refresh (ensuring that the jobboard has the most recent job listings).
* ``only_unclaimed``: boolean that indicates whether to only iterate
over unclaimed jobs.
"""
_UNCLAIMED_JOB_STATES = (
states.UNCLAIMED,
)
_JOB_STATES = (
states.UNCLAIMED,
states.COMPLETE,
states.CLAIMED,
)
def __init__(self, board, only_unclaimed=False, ensure_fresh=False):
self._board = board
self._jobs = collections.deque()
self._fetched = False
self.ensure_fresh = ensure_fresh
self.only_unclaimed = only_unclaimed
@property
def board(self):
"""The board this iterator was created from."""
return self._board
def __iter__(self):
return self
def _next_job(self):
if self.only_unclaimed:
allowed_states = self._UNCLAIMED_JOB_STATES
else:
allowed_states = self._JOB_STATES
job = None
while self._jobs and job is None:
maybe_job = self._jobs.popleft()
try:
if maybe_job.state in allowed_states:
job = maybe_job
except excp.JobFailure:
LOG.warn("Failed determining the state of job: %s (%s)",
maybe_job.uuid, maybe_job.path, exc_info=True)
except excp.NotFound:
self._board._remove_job(maybe_job.path)
return job
def __next__(self):
if not self._jobs:
if not self._fetched:
jobs = self._board._fetch_jobs(ensure_fresh=self.ensure_fresh)
self._jobs.extend(jobs)
self._fetched = True
job = self._next_job()
if job is None:
raise StopIteration
else:
return job
class ZookeeperJobBoard(base.NotifyingJobBoard):
"""A jobboard backed by `zookeeper`_.
@@ -387,9 +318,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._on_job_posting(children, delayed=False)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
return ZookeeperJobBoardIterator(self,
only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh)
return base.JobBoardIterator(
self, LOG, only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh, board_fetch_func=self._fetch_jobs,
board_removal_func=lambda a_job: self._remove_job(a_job.path))
def _remove_job(self, path):
if path not in self._known_jobs:
@@ -697,10 +629,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
# must recalculate the amount of time we really have left.
self._job_cond.wait(watch.leftover(return_none=True))
else:
it = ZookeeperJobBoardIterator(self)
it._jobs.extend(self._fetch_jobs())
it._fetched = True
return it
curr_jobs = self._fetch_jobs()
fetch_func = lambda ensure_fresh: curr_jobs
removal_func = lambda a_job: self._remove_job(a_job.path)
return base.JobBoardIterator(
self, LOG, board_fetch_func=fetch_func,
board_removal_func=removal_func)
@property
def connected(self):

View File

@@ -16,11 +16,14 @@
# under the License.
import abc
import collections
import contextlib
from oslo_utils import uuidutils
import six
from taskflow import exceptions as excp
from taskflow import states
from taskflow.types import notifier
@@ -147,6 +150,76 @@ class Job(object):
self.details)
class JobBoardIterator(six.Iterator):
"""Iterator over a jobboard that iterates over potential jobs.
It provides the following attributes:
* ``only_unclaimed``: boolean that indicates whether to only iterate
over unclaimed jobs
* ``ensure_fresh``: boolean that requests that during every fetch of a new
set of jobs this will cause the iterator to force the backend to
refresh (ensuring that the jobboard has the most recent job listings)
* ``board``: the board this iterator was created from
"""
_UNCLAIMED_JOB_STATES = (states.UNCLAIMED,)
_JOB_STATES = (states.UNCLAIMED, states.COMPLETE, states.CLAIMED)
def __init__(self, board, logger,
board_fetch_func=None, board_removal_func=None,
only_unclaimed=False, ensure_fresh=False):
self._board = board
self._logger = logger
self._board_removal_func = board_removal_func
self._board_fetch_func = board_fetch_func
self._fetched = False
self._jobs = collections.deque()
self.only_unclaimed = only_unclaimed
self.ensure_fresh = ensure_fresh
@property
def board(self):
"""The board this iterator was created from."""
return self._board
def __iter__(self):
return self
def _next_job(self):
if self.only_unclaimed:
allowed_states = self._UNCLAIMED_JOB_STATES
else:
allowed_states = self._JOB_STATES
job = None
while self._jobs and job is None:
maybe_job = self._jobs.popleft()
try:
if maybe_job.state in allowed_states:
job = maybe_job
except excp.JobFailure:
self._logger.warn("Failed determining the state of"
" job '%s'", maybe_job, exc_info=True)
except excp.NotFound:
if self._board_removal_func is not None:
self._board_removal_func(maybe_job)
return job
def __next__(self):
if not self._jobs:
if not self._fetched:
if self._board_fetch_func is not None:
self._jobs.extend(
self._board_fetch_func(
ensure_fresh=self.ensure_fresh))
self._fetched = True
job = self._next_job()
if job is None:
raise StopIteration
else:
return job
@six.add_metaclass(abc.ABCMeta)
class JobBoard(object):
"""A place where jobs can be posted, reposted, claimed and transferred.