diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index d6c131d6..db873011 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -367,9 +367,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): ensure_fresh=ensure_fresh) def _remove_job(self, path): - LOG.debug("Removing job that was at path: %s", path) - job = self._known_jobs.pop(path, None) + if path not in self._known_jobs: + return + with self._job_cond: + job = self._known_jobs.pop(path, None) if job is not None: + LOG.debug("Removed job that was at path '%s'", path) self._emit(jobboard.REMOVAL, details={'job': job}) def _process_child(self, path, request): @@ -425,10 +428,11 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): # exist in the children anymore) and accumulate all paths that we # need to trigger population of (without holding the job lock). investigate_paths = [] + removals = [] with self._job_cond: for path in six.iterkeys(self._known_jobs): if path not in child_paths: - self._remove_job(path) + removals.append(path) for path in child_paths: if path in self._bad_paths: continue @@ -439,6 +443,10 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): continue if path not in investigate_paths: investigate_paths.append(path) + if removals: + with self._job_cond: + for path in removals: + self._remove_job(path) for path in investigate_paths: # Fire off the request to populate this job. #