From 35529ff37d6663f1eb53ff724345f06b46ba618b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 30 Apr 2014 18:41:33 -0700 Subject: [PATCH] Add a new wait() method that waits for jobs to arrive Using the new iterator object add a new wait method that will wait until jobs have arrived before giving back an iterator that can be used to analyze those jobs. It also supports a timeout which can be used to avoid waiting for a very long time. Change-Id: I3d53120948d3d466ebc921a8be0a66b78732f09b --- taskflow/jobs/backends/impl_zookeeper.py | 117 +++++++++++++++-------- taskflow/jobs/jobboard.py | 15 +++ taskflow/tests/unit/jobs/test_zk_job.py | 32 +++++++ taskflow/utils/misc.py | 10 ++ 4 files changed, 135 insertions(+), 39 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index b9331404..6b80f99b 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -18,6 +18,7 @@ import collections import contextlib import functools import logging +import threading from concurrent import futures from kazoo import exceptions as k_exceptions @@ -228,12 +229,6 @@ class ZookeeperJobBoardIterator(six.Iterator): def __iter__(self): return self - def _fetch_jobs(self): - if self.ensure_fresh: - self._board._force_refresh() - with self._board._job_mutate: - return sorted(six.itervalues(self._board._known_jobs)) - def _next_job(self): if self.only_unclaimed: allowed_states = UNCLAIMED_JOB_STATES @@ -249,14 +244,14 @@ class ZookeeperJobBoardIterator(six.Iterator): LOG.warn("Failed determining the state of job: %s (%s)", maybe_job.uuid, maybe_job.path, exc_info=True) except excp.NotFound: - with self._board._job_mutate: - self._board._remove_job(maybe_job.path) + self._board._remove_job(maybe_job.path) return job def __next__(self): if not self._jobs: if not self._fetched: - self._jobs.extend(self._fetch_jobs()) + jobs = self._board._fetch_jobs(ensure_fresh=self.ensure_fresh) + self._jobs.extend(jobs) self._fetched = True job = self._next_job() if job is None: @@ -289,8 +284,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._persistence = persistence # Misc. internal details self._known_jobs = {} - self._job_mutate = self._client.handler.rlock_object() - self._open_close_lock = self._client.handler.rlock_object() + self._job_lock = threading.RLock() + self._job_cond = threading.Condition(self._job_lock) + self._open_close_lock = threading.RLock() self._client.add_listener(self._state_change_listener) self._bad_paths = frozenset([path]) self._job_watcher = None @@ -311,9 +307,15 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): @property def job_count(self): - with self._job_mutate: + with self._job_lock: return len(self._known_jobs) + def _fetch_jobs(self, ensure_fresh=False): + if ensure_fresh: + self._force_refresh() + with self._job_lock: + return sorted(six.itervalues(self._known_jobs)) + def _force_refresh(self): try: children = self._client.get_children(self.path) @@ -336,32 +338,18 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _remove_job(self, path): LOG.debug("Removing job that was at path: %s", path) - job = self._known_jobs.pop(path, None) + with self._job_lock: + job = self._known_jobs.pop(path, None) if job is not None: - self._emit(jobboard.REMOVAL, - details={ - 'job': job, - }) + self._emit(jobboard.REMOVAL, details={'job': job}) def _process_child(self, path, request): """Receives the result of a child data fetch request.""" + job = None try: raw_data, node_stat = request.get() job_data = misc.decode_json(raw_data) created_on = misc.millis_to_datetime(node_stat.ctime) - with self._job_mutate: - if path not in self._known_jobs: - job = ZookeeperJob(job_data['name'], self, - self._client, self._persistence, path, - uuid=job_data['uuid'], - book_data=job_data.get("book"), - details=job_data.get("details", {}), - created_on=created_on) - self._known_jobs[path] = job - self._emit(jobboard.POSTED, - details={ - 'job': job, - }) except (ValueError, TypeError, KeyError): LOG.warn("Incorrectly formatted job data found at path: %s", path, exc_info=True) @@ -377,13 +365,29 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): except k_exceptions.KazooException: LOG.warn("Internal error fetching job data from path: %s", path, exc_info=True) + else: + self._job_cond.acquire() + try: + if path not in self._known_jobs: + job = ZookeeperJob(job_data['name'], self, + self._client, self._persistence, path, + uuid=job_data['uuid'], + book_data=job_data.get("book"), + details=job_data.get("details", {}), + created_on=created_on) + self._known_jobs[path] = job + self._job_cond.notify_all() + finally: + self._job_cond.release() + if job is not None: + self._emit(jobboard.POSTED, details={'job': job}) def _on_job_posting(self, children, delayed=True): LOG.debug("Got children %s under path %s", children, self.path) child_paths = [k_paths.join(self.path, c) for c in children] # Remove jobs that we know about but which are no longer children - with self._job_mutate: + with self._job_lock: removals = set() for path, _job in six.iteritems(self._known_jobs): if path not in child_paths: @@ -395,7 +399,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): for path in child_paths: if path in self._bad_paths: continue - with self._job_mutate: + with self._job_lock: if path not in self._known_jobs: # Fire off the request to populate this job asynchronously. # @@ -443,8 +447,13 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._persistence, job_path, book=book, details=details, uuid=job_uuid) - with self._job_mutate: + self._job_cond.acquire() + try: self._known_jobs[job_path] = job + self._job_cond.notify_all() + finally: + self._job_cond.release() + self._emit(jobboard.POSTED, details={'job': job}) return job def claim(self, job, who): @@ -480,7 +489,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): if not job_path: raise ValueError("Unable to check if %r is a known path" % (job_path)) - with self._job_mutate: + with self._job_lock: if job_path not in self._known_jobs: fail_msg_tpl += ", unknown job" raise excp.NotFound(fail_msg_tpl % (job_uuid)) @@ -533,8 +542,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): with self._client.transaction() as txn: txn.delete(job.lock_path, version=lock_stat.version) txn.delete(job.path, version=data_stat.version) - with self._job_mutate: - self._remove_job(job.path) + self._remove_job(job.path) def abandon(self, job, who): _check_who(who) @@ -557,9 +565,40 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): LOG.debug("Kazoo client has changed to state: %s", state) def _clear(self): - with self._job_mutate: - self._known_jobs = {} - self._job_watcher = None + with self._job_lock: + self._known_jobs.clear() + self._job_watcher = None + + def wait(self, timeout=None): + # Wait until timeout expires (or forever) for jobs to appear. + watch = None + if timeout is not None: + watch = misc.StopWatch(duration=float(timeout)) + watch.start() + self._job_cond.acquire() + try: + while True: + if not self._known_jobs: + if watch and watch.expired(): + raise excp.NotFound("Expired waiting for jobs to" + " arrive; waited %s seconds" + % watch.elapsed()) + # This is done since the given timeout can not be provided + # to the condition variable, since we can not ensure that + # when we acquire the condition that there will actually + # be jobs (especially if we are spuriously awaken), so we + # must recalculate the amount of time we really have left. + timeout = None + if watch is not None: + timeout = watch.leftover() + self._job_cond.wait(timeout) + else: + it = ZookeeperJobBoardIterator(self) + it._jobs.extend(self._fetch_jobs()) + it._fetched = True + return it + finally: + self._job_cond.release() @property def connected(self): diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index 0aa533ea..662a3232 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -58,6 +58,21 @@ class JobBoard(object): support this argument. """ + @abc.abstractmethod + def wait(self, timeout=None): + """Waits a given amount of time for job/s to be posted, when jobs are + found then an iterator will be returned that contains the jobs at + the given point in time. + + NOTE(harlowja): since a jobboard can be mutated on by multiple external + entities at the *same* time the iterator that can be returned *may* + still be empty due to other entities removing those jobs after the + iterator has been created (be aware of this when using it). + + :param timeout: float that indicates how long to wait for a job to + appear (if None then waits forever). + """ + @abc.abstractproperty def job_count(self): """Returns how many jobs are on this jobboard (this count may change as diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 9b7c54c4..9154994d 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -16,6 +16,8 @@ import contextlib import mock +import threading +import time import six @@ -102,6 +104,36 @@ class TestZookeeperJobs(test.TestCase): jobs = list(self.board.iterjobs(ensure_fresh=True)) self.assertEqual(1, len(jobs)) + def test_wait_timeout(self): + with connect_close(self.board): + self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1) + + def test_wait_arrival(self): + ev = threading.Event() + jobs = [] + + def poster(wait_post=0.2): + ev.wait() # wait until the waiter is active + time.sleep(wait_post) + self.board.post('test', p_utils.temporary_log_book()) + + def waiter(): + ev.set() + it = self.board.wait() + jobs.extend(it) + + with connect_close(self.board): + t1 = threading.Thread(target=poster) + t1.daemon = True + t1.start() + t2 = threading.Thread(target=waiter) + t2.daemon = True + t2.start() + for t in (t1, t2): + t.join() + + self.assertEqual(1, len(jobs)) + def test_posting_received_raw(self): book = p_utils.temporary_log_book() diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 0e3a1c3d..6c20fe12 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -373,6 +373,16 @@ class StopWatch(object): # NOTE(harlowja): don't silence the exception. return False + def leftover(self): + if self._duration is None: + raise RuntimeError("Can not get the leftover time of a watch that" + " has no duration") + if self._state != self._STARTED: + raise RuntimeError("Can not get the leftover time of a stopwatch" + " that has not been started") + end_time = self._started_at + self._duration + return max(0.0, end_time - wallclock()) + def expired(self): if self._duration is None: return False