diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 96307790..f1decbbc 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -298,8 +298,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._persistence = persistence # Misc. internal details self._known_jobs = {} - self._job_lock = threading.RLock() - self._job_cond = threading.Condition(self._job_lock) + self._job_cond = threading.Condition() self._open_close_lock = threading.RLock() self._client.add_listener(self._state_change_listener) self._bad_paths = frozenset([path]) @@ -325,13 +324,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): @property def job_count(self): - with self._job_lock: - return len(self._known_jobs) + return len(self._known_jobs) def _fetch_jobs(self, ensure_fresh=False): if ensure_fresh: self._force_refresh() - with self._job_lock: + with self._job_cond: return sorted(six.itervalues(self._known_jobs)) def _force_refresh(self): @@ -356,8 +354,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _remove_job(self, path): LOG.debug("Removing job that was at path: %s", path) - with self._job_lock: - job = self._known_jobs.pop(path, None) + job = self._known_jobs.pop(path, None) if job is not None: self._emit(jobboard.REMOVAL, details={'job': job}) @@ -413,15 +410,14 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): child_paths.append(k_paths.join(self.path, c)) # Figure out what we really should be investigating and what we - # shouldn't... + # shouldn't (remove jobs that exist in our local version, but don't + # exist in the children anymore) and accumulate all paths that we + # need to trigger population of (without holding the job lock). investigate_paths = [] - with self._job_lock: - removals = set() + with self._job_cond: for path in six.iterkeys(self._known_jobs): if path not in child_paths: - removals.add(path) - for path in removals: - self._remove_job(path) + self._remove_job(path) for path in child_paths: if path in self._bad_paths: continue @@ -543,10 +539,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): if not job_path: raise ValueError("Unable to check if %r is a known path" % (job_path)) - with self._job_lock: - if job_path not in self._known_jobs: - fail_msg_tpl += ", unknown job" - raise excp.NotFound(fail_msg_tpl % (job_uuid)) + if job_path not in self._known_jobs: + fail_msg_tpl += ", unknown job" + raise excp.NotFound(fail_msg_tpl % (job_uuid)) try: yield except self._client.handler.timeout_exception as e: @@ -663,7 +658,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): LOG.debug("Shutting down the notifier") self._worker.shutdown() self._worker = None - with self._job_lock: + with self._job_cond: self._known_jobs.clear() LOG.debug("Stopped & cleared local state")