Merge "Move from generator to iterator for iterjobs"

This commit is contained in:
Jenkins
2014-05-07 21:12:53 +00:00
committed by Gerrit Code Review
3 changed files with 91 additions and 23 deletions

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import collections
import contextlib import contextlib
import functools import functools
import logging import logging
@@ -203,6 +204,69 @@ class ZookeeperJob(base_job.Job):
return self._book return self._book
class ZookeeperJobBoardIterator(six.Iterator):
"""Iterator over a zookeeper jobboard.
It supports the following attributes/constructor arguments:
* ensure_fresh: boolean that requests that during every fetch of a new
set of jobs this will cause the iterator to force the backend to
refresh (ensuring that the jobboard has the most recent job listings).
* only_unclaimed: boolean that indicates whether to only iterate
over unclaimed jobs.
"""
def __init__(self, board, only_unclaimed=False, ensure_fresh=False):
self._board = board
self._jobs = collections.deque()
self._fetched = False
self.ensure_fresh = ensure_fresh
self.only_unclaimed = only_unclaimed
@property
def board(self):
return self._board
def __iter__(self):
return self
def _fetch_jobs(self):
if self.ensure_fresh:
self._board._force_refresh()
with self._board._job_mutate:
return sorted(six.itervalues(self._board._known_jobs))
def _next_job(self):
if self.only_unclaimed:
allowed_states = UNCLAIMED_JOB_STATES
else:
allowed_states = ALL_JOB_STATES
job = None
while self._jobs and job is None:
maybe_job = self._jobs.popleft()
try:
if maybe_job.state in allowed_states:
job = maybe_job
except excp.JobFailure:
LOG.warn("Failed determining the state of job: %s (%s)",
maybe_job.uuid, maybe_job.path, exc_info=True)
except excp.NotFound:
with self._board._job_mutate:
self._board._remove_job(maybe_job.path)
return job
def __next__(self):
if not self._jobs:
if not self._fetched:
self._jobs.extend(self._fetch_jobs())
self._fetched = True
job = self._next_job()
if job is None:
raise StopIteration
else:
return job
class ZookeeperJobBoard(jobboard.NotifyingJobBoard): class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
def __init__(self, name, conf, def __init__(self, name, conf,
client=None, persistence=None, emit_notifications=True): client=None, persistence=None, emit_notifications=True):
@@ -252,7 +316,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
with self._job_mutate: with self._job_mutate:
return len(self._known_jobs) return len(self._known_jobs)
def _force_refresh(self, delayed=False): def _force_refresh(self):
try: try:
children = self._client.get_children(self.path) children = self._client.get_children(self.path)
except self._client.handler.timeout_exception as e: except self._client.handler.timeout_exception as e:
@@ -265,27 +329,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
except k_exceptions.KazooException as e: except k_exceptions.KazooException as e:
raise excp.JobFailure("Refreshing failure, internal error", e) raise excp.JobFailure("Refreshing failure, internal error", e)
else: else:
self._on_job_posting(children, delayed=delayed) self._on_job_posting(children, delayed=False)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False): def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
if ensure_fresh: return ZookeeperJobBoardIterator(self,
self._force_refresh() only_unclaimed=only_unclaimed,
ok_states = ALL_JOB_STATES ensure_fresh=ensure_fresh)
if only_unclaimed:
ok_states = UNCLAIMED_JOB_STATES
with self._job_mutate:
known_jobs = list(six.itervalues(self._known_jobs))
for job in sorted(known_jobs):
try:
if job.state in ok_states:
yield job
except excp.JobFailure:
LOG.warn("Failed determining the state of job: %s (%s)",
job.uuid, job.path, exc_info=True)
except excp.NotFound:
# Someone destroyed it while we are iterating.
with self._job_mutate:
self._remove_job(job.path)
def _remove_job(self, path): def _remove_job(self, path):
LOG.debug("Removing job that was at path: %s", path) LOG.debug("Removing job that was at path: %s", path)

View File

@@ -36,14 +36,19 @@ class JobBoard(object):
@abc.abstractmethod @abc.abstractmethod
def iterjobs(self, only_unclaimed=False, ensure_fresh=False): def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
"""Yields back jobs that are currently on this jobboard (claimed """Returns an iterator that will provide back jobs that are currently
or not claimed). on this jobboard.
NOTE(harlowja): the ordering of this iteration should be by posting NOTE(harlowja): the ordering of this iteration should be by posting
order (oldest to newest) if possible, but it is left up to the backing order (oldest to newest) if possible, but it is left up to the backing
implementation to provide the order that best suits it (so don't depend implementation to provide the order that best suits it (so don't depend
on it always being oldest to newest). on it always being oldest to newest).
NOTE(harlowja): the iterator that is returned may support other
attributes which can be used to further customize how iteration can
be accomplished; check with the backends iterator object to determine
what other attributes are supported.
:param only_unclaimed: boolean that indicates whether to only iteration :param only_unclaimed: boolean that indicates whether to only iteration
over unclaimed jobs. over unclaimed jobs.
:param ensure_fresh: boolean that requests to only iterate over the :param ensure_fresh: boolean that requests to only iterate over the
@@ -55,7 +60,9 @@ class JobBoard(object):
@abc.abstractproperty @abc.abstractproperty
def job_count(self): def job_count(self):
"""Returns how many jobs are on this jobboard.""" """Returns how many jobs are on this jobboard (this count may change as
new jobs appear or are removed).
"""
@abc.abstractmethod @abc.abstractmethod
def find_owner(self, job): def find_owner(self, job):

View File

@@ -81,6 +81,18 @@ class TestZookeeperJobs(test.TestCase):
self.assertTrue(mock_dt.called) self.assertTrue(mock_dt.called)
def test_board_iter(self):
with connect_close(self.board):
it = self.board.iterjobs()
self.assertEqual(it.board, self.board)
self.assertFalse(it.only_unclaimed)
self.assertFalse(it.ensure_fresh)
def test_board_iter_empty(self):
with connect_close(self.board):
jobs_found = list(self.board.iterjobs())
self.assertEqual([], jobs_found)
def test_fresh_iter(self): def test_fresh_iter(self):
with connect_close(self.board): with connect_close(self.board):
book = p_utils.temporary_log_book() book = p_utils.temporary_log_book()