From df6987e0538561c4b4c05b23c32d22b83a57e2d7 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 4 Apr 2014 15:04:33 -0700 Subject: [PATCH] Allow for only iterating over the most 'fresh' jobs Instead of allowing the backend to determine how fresh are the jobs yielded back (in the zookeeper case the jobs are async populated so freshness depends on the freshness of the watches established) allow the iteration method to support a way to request (if backend supported) that the jobs iterated be as fresh as possible. Change-Id: I8a7c9d7d086ad5fa85dc842fb36e3356f781f057 --- taskflow/jobs/backends/impl_zookeeper.py | 34 +++++++++++++++++++----- taskflow/jobs/jobboard.py | 10 ++++++- taskflow/tests/unit/jobs/test_zk_job.py | 11 +++++++- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 38b5b990..360714b3 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -187,7 +187,24 @@ class ZookeeperJobBoard(jobboard.JobBoard): count += 1 return count - def iterjobs(self, only_unclaimed=False): + def _force_refresh(self, delayed=False): + try: + children = self._client.get_children(self.path) + except self._client.handler.timeout_exception as e: + raise excp.JobFailure("Refreshing failure, connection timed out", + e) + except k_exceptions.SessionExpiredError as e: + raise excp.JobFailure("Refreshing failure, session expired", e) + except k_exceptions.NoNodeError: + pass + except k_exceptions.KazooException as e: + raise excp.JobFailure("Refreshing failure, internal error", e) + else: + self._on_job_posting(children, delayed=delayed) + + def iterjobs(self, only_unclaimed=False, ensure_fresh=False): + if ensure_fresh: + self._force_refresh() ok_states = ALL_JOB_STATES if only_unclaimed: ok_states = UNCLAIMED_JOB_STATES @@ -236,7 +253,7 @@ class ZookeeperJobBoard(jobboard.JobBoard): LOG.warn("Internal error fetching job data from path: %s", path, exc_info=True) - def _on_job_posting(self, children): + def _on_job_posting(self, children, delayed=True): LOG.debug("Got children %s under path %s", children, self.path) child_paths = [k_paths.join(self.path, c) for c in children] @@ -259,12 +276,15 @@ class ZookeeperJobBoard(jobboard.JobBoard): if path not in self._known_jobs: # Fire off the request to populate this job asynchronously. # - # This method is called from a asynchronous handler so it's - # better to exit from this quickly to allow other - # asynchronous handlers to be executed. + # This method is *usually* called from a asynchronous + # handler so it's better to exit from this quickly to + # allow other asynchronous handlers to be executed. + request = self._client.get_async(path) child_proc = functools.partial(self._process_child, path) - result = self._client.get_async(path) - result.rawlink(child_proc) + if delayed: + request.rawlink(child_proc) + else: + child_proc(request) def _format_job(self, job): posting = { diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index 50af583e..a2833550 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -32,9 +32,17 @@ class JobBoard(object): self._name = name @abc.abstractmethod - def iterjobs(self, only_unclaimed=False): + def iterjobs(self, only_unclaimed=False, ensure_fresh=False): """Yields back jobs that are currently on this jobboard (claimed or not claimed). + + :param only_unclaimed: boolean that indicates whether to only iteration + over unclaimed jobs. + :param ensure_fresh: boolean that requests to only iterate over the + most recent jobs available, where the definition of what is recent + is backend specific. It is allowable that a backend may ignore this + value if the backends internal semantics/capabilities can not + support this argument. """ @abc.abstractproperty diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 37fa87d1..3074c4a6 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -66,6 +66,15 @@ class TestZookeeperJobs(test.TestCase): self.client.flush() self.assertTrue(self.board.connected) + def test_fresh_iter(self): + with connect_close(self.board): + book = p_utils.temporary_log_book() + self.board.post('test', book) + self.client.flush() + + jobs = list(self.board.iterjobs(ensure_fresh=True)) + self.assertEqual(1, len(jobs)) + def test_posting_received_raw(self): book = p_utils.temporary_log_book() @@ -77,7 +86,7 @@ class TestZookeeperJobs(test.TestCase): self.client.flush() self.assertEqual(self.board, posted_job.board) - self.assertTrue(1, self.board.job_count) + self.assertEqual(1, self.board.job_count) self.assertIn(posted_job.uuid, [j.uuid for j in self.board.iterjobs()])