diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index b0a53ecc..eb555a36 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -49,6 +49,8 @@ ALL_JOB_STATES = ( # Transaction support was added in 3.4.0 MIN_ZK_VERSION = (3, 4, 0) +LOCK_POSTFIX = ".lock" +JOB_PREFIX = 'job' def _check_who(who): @@ -74,7 +76,7 @@ class ZookeeperJob(base_job.Job): raise ValueError("Only one of 'book_data' or 'book'" " can be provided") self._path = path - self._lock_path = "%s.lock" % (path) + self._lock_path = path + LOCK_POSTFIX self._created_on = created_on self._node_not_found = False @@ -232,7 +234,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._job_watcher = None # Since we use sequenced ids this will be the path that the sequences # are prefixed with, for example, job0000000001, job0000000002, ... - self._job_base = k_paths.join(path, "job") + self._job_base = k_paths.join(path, JOB_PREFIX) self._worker = None self._emit_notifications = bool(emit_notifications) @@ -331,7 +333,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _on_job_posting(self, children, delayed=True): LOG.debug("Got children %s under path %s", children, self.path) - child_paths = [k_paths.join(self.path, c) for c in children] + child_paths = [] + for c in children: + if c.endswith(LOCK_POSTFIX) or not c.startswith(JOB_PREFIX): + # Skip lock paths or non-job-paths (these are not valid jobs) + continue + child_paths.append(k_paths.join(self.path, c)) # Remove jobs that we know about but which are no longer children with self._job_mutate: