Merge "Unify the zookeeper/redis jobboard iterators"

This commit is contained in:
Jenkins
2015-07-21 04:26:33 +00:00
committed by Gerrit Code Review
3 changed files with 105 additions and 96 deletions

View File

@@ -753,9 +753,11 @@ return cmsgpack.pack(result)
while True:
jc = self.job_count
if jc > 0:
it = self.iterjobs()
return it
else:
curr_jobs = self._fetch_jobs()
if curr_jobs:
return base.JobBoardIterator(
self, LOG,
board_fetch_func=lambda ensure_fresh: curr_jobs)
if w.expired():
raise exc.NotFound("Expired waiting for jobs to"
" arrive; waited %s seconds"
@@ -768,7 +770,7 @@ return cmsgpack.pack(result)
delay = min(delay * 2, max_delay)
sleep_func(delay)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
def _fetch_jobs(self):
with _translate_failures():
raw_postings = self._client.hgetall(self.listings_key)
postings = []
@@ -782,13 +784,13 @@ return cmsgpack.pack(result)
book_data=posting.get('book'),
backend=self._persistence)
postings.append(job)
postings = sorted(postings)
for job in postings:
if only_unclaimed:
if job.state == states.UNCLAIMED:
yield job
else:
yield job
return sorted(postings)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
return base.JobBoardIterator(
self, LOG, only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh,
board_fetch_func=lambda ensure_fresh: self._fetch_jobs())
@base.check_who
def consume(self, job, who):

View File

@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import functools
import sys
@@ -180,74 +179,6 @@ class ZookeeperJob(base.Job):
return hash(self.path)
class ZookeeperJobBoardIterator(six.Iterator):
"""Iterator over a zookeeper jobboard that iterates over potential jobs.
It supports the following attributes/constructor arguments:
* ``ensure_fresh``: boolean that requests that during every fetch of a new
set of jobs this will cause the iterator to force the backend to
refresh (ensuring that the jobboard has the most recent job listings).
* ``only_unclaimed``: boolean that indicates whether to only iterate
over unclaimed jobs.
"""
_UNCLAIMED_JOB_STATES = (
states.UNCLAIMED,
)
_JOB_STATES = (
states.UNCLAIMED,
states.COMPLETE,
states.CLAIMED,
)
def __init__(self, board, only_unclaimed=False, ensure_fresh=False):
self._board = board
self._jobs = collections.deque()
self._fetched = False
self.ensure_fresh = ensure_fresh
self.only_unclaimed = only_unclaimed
@property
def board(self):
"""The board this iterator was created from."""
return self._board
def __iter__(self):
return self
def _next_job(self):
if self.only_unclaimed:
allowed_states = self._UNCLAIMED_JOB_STATES
else:
allowed_states = self._JOB_STATES
job = None
while self._jobs and job is None:
maybe_job = self._jobs.popleft()
try:
if maybe_job.state in allowed_states:
job = maybe_job
except excp.JobFailure:
LOG.warn("Failed determining the state of job: %s (%s)",
maybe_job.uuid, maybe_job.path, exc_info=True)
except excp.NotFound:
self._board._remove_job(maybe_job.path)
return job
def __next__(self):
if not self._jobs:
if not self._fetched:
jobs = self._board._fetch_jobs(ensure_fresh=self.ensure_fresh)
self._jobs.extend(jobs)
self._fetched = True
job = self._next_job()
if job is None:
raise StopIteration
else:
return job
class ZookeeperJobBoard(base.NotifyingJobBoard):
"""A jobboard backed by `zookeeper`_.
@@ -388,9 +319,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._on_job_posting(children, delayed=False)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
return ZookeeperJobBoardIterator(self,
only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh)
return base.JobBoardIterator(
self, LOG, only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh, board_fetch_func=self._fetch_jobs,
board_removal_func=lambda a_job: self._remove_job(a_job.path))
def _remove_job(self, path):
if path not in self._known_jobs:
@@ -698,10 +630,12 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
# must recalculate the amount of time we really have left.
self._job_cond.wait(watch.leftover(return_none=True))
else:
it = ZookeeperJobBoardIterator(self)
it._jobs.extend(self._fetch_jobs())
it._fetched = True
return it
curr_jobs = self._fetch_jobs()
fetch_func = lambda ensure_fresh: curr_jobs
removal_func = lambda a_job: self._remove_job(a_job.path)
return base.JobBoardIterator(
self, LOG, board_fetch_func=fetch_func,
board_removal_func=removal_func)
@property
def connected(self):

View File

@@ -16,11 +16,14 @@
# under the License.
import abc
import collections
import contextlib
from oslo_utils import uuidutils
import six
from taskflow import exceptions as excp
from taskflow import states
from taskflow.types import notifier
@@ -147,6 +150,76 @@ class Job(object):
self.details)
class JobBoardIterator(six.Iterator):
"""Iterator over a jobboard that iterates over potential jobs.
It provides the following attributes:
* ``only_unclaimed``: boolean that indicates whether to only iterate
over unclaimed jobs
* ``ensure_fresh``: boolean that requests that during every fetch of a new
set of jobs this will cause the iterator to force the backend to
refresh (ensuring that the jobboard has the most recent job listings)
* ``board``: the board this iterator was created from
"""
_UNCLAIMED_JOB_STATES = (states.UNCLAIMED,)
_JOB_STATES = (states.UNCLAIMED, states.COMPLETE, states.CLAIMED)
def __init__(self, board, logger,
board_fetch_func=None, board_removal_func=None,
only_unclaimed=False, ensure_fresh=False):
self._board = board
self._logger = logger
self._board_removal_func = board_removal_func
self._board_fetch_func = board_fetch_func
self._fetched = False
self._jobs = collections.deque()
self.only_unclaimed = only_unclaimed
self.ensure_fresh = ensure_fresh
@property
def board(self):
"""The board this iterator was created from."""
return self._board
def __iter__(self):
return self
def _next_job(self):
if self.only_unclaimed:
allowed_states = self._UNCLAIMED_JOB_STATES
else:
allowed_states = self._JOB_STATES
job = None
while self._jobs and job is None:
maybe_job = self._jobs.popleft()
try:
if maybe_job.state in allowed_states:
job = maybe_job
except excp.JobFailure:
self._logger.warn("Failed determining the state of"
" job '%s'", maybe_job, exc_info=True)
except excp.NotFound:
if self._board_removal_func is not None:
self._board_removal_func(maybe_job)
return job
def __next__(self):
if not self._jobs:
if not self._fetched:
if self._board_fetch_func is not None:
self._jobs.extend(
self._board_fetch_func(
ensure_fresh=self.ensure_fresh))
self._fetched = True
job = self._next_job()
if job is None:
raise StopIteration
else:
return job
@six.add_metaclass(abc.ABCMeta)
class JobBoard(object):
"""A place where jobs can be posted, reposted, claimed and transferred.