Allow for only iterating over the most 'fresh' jobs

Instead of allowing the backend to determine how fresh
are the jobs yielded back (in the zookeeper case the jobs
are async populated so freshness depends on the freshness of
the watches established) allow the iteration method to support
a way to request (if backend supported) that the jobs iterated
be as fresh as possible.

Change-Id: I8a7c9d7d086ad5fa85dc842fb36e3356f781f057
This commit is contained in:
Joshua Harlow
2014-04-04 15:04:33 -07:00
committed by Thomas Goirand
parent 1aa7e49eda
commit df6987e053
3 changed files with 46 additions and 9 deletions

View File

@@ -187,7 +187,24 @@ class ZookeeperJobBoard(jobboard.JobBoard):
count += 1
return count
def iterjobs(self, only_unclaimed=False):
def _force_refresh(self, delayed=False):
try:
children = self._client.get_children(self.path)
except self._client.handler.timeout_exception as e:
raise excp.JobFailure("Refreshing failure, connection timed out",
e)
except k_exceptions.SessionExpiredError as e:
raise excp.JobFailure("Refreshing failure, session expired", e)
except k_exceptions.NoNodeError:
pass
except k_exceptions.KazooException as e:
raise excp.JobFailure("Refreshing failure, internal error", e)
else:
self._on_job_posting(children, delayed=delayed)
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
if ensure_fresh:
self._force_refresh()
ok_states = ALL_JOB_STATES
if only_unclaimed:
ok_states = UNCLAIMED_JOB_STATES
@@ -236,7 +253,7 @@ class ZookeeperJobBoard(jobboard.JobBoard):
LOG.warn("Internal error fetching job data from path: %s",
path, exc_info=True)
def _on_job_posting(self, children):
def _on_job_posting(self, children, delayed=True):
LOG.debug("Got children %s under path %s", children, self.path)
child_paths = [k_paths.join(self.path, c) for c in children]
@@ -259,12 +276,15 @@ class ZookeeperJobBoard(jobboard.JobBoard):
if path not in self._known_jobs:
# Fire off the request to populate this job asynchronously.
#
# This method is called from a asynchronous handler so it's
# better to exit from this quickly to allow other
# asynchronous handlers to be executed.
# This method is *usually* called from a asynchronous
# handler so it's better to exit from this quickly to
# allow other asynchronous handlers to be executed.
request = self._client.get_async(path)
child_proc = functools.partial(self._process_child, path)
result = self._client.get_async(path)
result.rawlink(child_proc)
if delayed:
request.rawlink(child_proc)
else:
child_proc(request)
def _format_job(self, job):
posting = {

View File

@@ -32,9 +32,17 @@ class JobBoard(object):
self._name = name
@abc.abstractmethod
def iterjobs(self, only_unclaimed=False):
def iterjobs(self, only_unclaimed=False, ensure_fresh=False):
"""Yields back jobs that are currently on this jobboard (claimed
or not claimed).
:param only_unclaimed: boolean that indicates whether to only iteration
over unclaimed jobs.
:param ensure_fresh: boolean that requests to only iterate over the
most recent jobs available, where the definition of what is recent
is backend specific. It is allowable that a backend may ignore this
value if the backends internal semantics/capabilities can not
support this argument.
"""
@abc.abstractproperty

View File

@@ -66,6 +66,15 @@ class TestZookeeperJobs(test.TestCase):
self.client.flush()
self.assertTrue(self.board.connected)
def test_fresh_iter(self):
with connect_close(self.board):
book = p_utils.temporary_log_book()
self.board.post('test', book)
self.client.flush()
jobs = list(self.board.iterjobs(ensure_fresh=True))
self.assertEqual(1, len(jobs))
def test_posting_received_raw(self):
book = p_utils.temporary_log_book()
@@ -77,7 +86,7 @@ class TestZookeeperJobs(test.TestCase):
self.client.flush()
self.assertEqual(self.board, posted_job.board)
self.assertTrue(1, self.board.job_count)
self.assertEqual(1, self.board.job_count)
self.assertIn(posted_job.uuid, [j.uuid
for j in self.board.iterjobs()])