diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 38b5b990..360714b3 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -187,7 +187,24 @@ class ZookeeperJobBoard(jobboard.JobBoard): count += 1 return count - def iterjobs(self, only_unclaimed=False): + def _force_refresh(self, delayed=False): + try: + children = self._client.get_children(self.path) + except self._client.handler.timeout_exception as e: + raise excp.JobFailure("Refreshing failure, connection timed out", + e) + except k_exceptions.SessionExpiredError as e: + raise excp.JobFailure("Refreshing failure, session expired", e) + except k_exceptions.NoNodeError: + pass + except k_exceptions.KazooException as e: + raise excp.JobFailure("Refreshing failure, internal error", e) + else: + self._on_job_posting(children, delayed=delayed) + + def iterjobs(self, only_unclaimed=False, ensure_fresh=False): + if ensure_fresh: + self._force_refresh() ok_states = ALL_JOB_STATES if only_unclaimed: ok_states = UNCLAIMED_JOB_STATES @@ -236,7 +253,7 @@ class ZookeeperJobBoard(jobboard.JobBoard): LOG.warn("Internal error fetching job data from path: %s", path, exc_info=True) - def _on_job_posting(self, children): + def _on_job_posting(self, children, delayed=True): LOG.debug("Got children %s under path %s", children, self.path) child_paths = [k_paths.join(self.path, c) for c in children] @@ -259,12 +276,15 @@ class ZookeeperJobBoard(jobboard.JobBoard): if path not in self._known_jobs: # Fire off the request to populate this job asynchronously. # - # This method is called from a asynchronous handler so it's - # better to exit from this quickly to allow other - # asynchronous handlers to be executed. + # This method is *usually* called from a asynchronous + # handler so it's better to exit from this quickly to + # allow other asynchronous handlers to be executed. + request = self._client.get_async(path) child_proc = functools.partial(self._process_child, path) - result = self._client.get_async(path) - result.rawlink(child_proc) + if delayed: + request.rawlink(child_proc) + else: + child_proc(request) def _format_job(self, job): posting = { diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index 50af583e..a2833550 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -32,9 +32,17 @@ class JobBoard(object): self._name = name @abc.abstractmethod - def iterjobs(self, only_unclaimed=False): + def iterjobs(self, only_unclaimed=False, ensure_fresh=False): """Yields back jobs that are currently on this jobboard (claimed or not claimed). + + :param only_unclaimed: boolean that indicates whether to only iteration + over unclaimed jobs. + :param ensure_fresh: boolean that requests to only iterate over the + most recent jobs available, where the definition of what is recent + is backend specific. It is allowable that a backend may ignore this + value if the backends internal semantics/capabilities can not + support this argument. """ @abc.abstractproperty diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 37fa87d1..3074c4a6 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -66,6 +66,15 @@ class TestZookeeperJobs(test.TestCase): self.client.flush() self.assertTrue(self.board.connected) + def test_fresh_iter(self): + with connect_close(self.board): + book = p_utils.temporary_log_book() + self.board.post('test', book) + self.client.flush() + + jobs = list(self.board.iterjobs(ensure_fresh=True)) + self.assertEqual(1, len(jobs)) + def test_posting_received_raw(self): book = p_utils.temporary_log_book() @@ -77,7 +86,7 @@ class TestZookeeperJobs(test.TestCase): self.client.flush() self.assertEqual(self.board, posted_job.board) - self.assertTrue(1, self.board.job_count) + self.assertEqual(1, self.board.job_count) self.assertIn(posted_job.uuid, [j.uuid for j in self.board.iterjobs()])