From 54bb34bd4377bc699b7e5b82aa509fcfa2799b08 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 25 Apr 2014 11:44:48 -0700 Subject: [PATCH] Skip loading (and failing to load) lock files Ignore lock files (and other hidden files) that zookeeper will notify the jobboard about and avoid reading these as potential jobs (which they are not). Fixes bug 1312843 Change-Id: Ifa1ed31e22aed838f9baf883a6faaec00187663a --- taskflow/jobs/backends/impl_zookeeper.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index b0a53ecc..eb555a36 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -49,6 +49,8 @@ ALL_JOB_STATES = ( # Transaction support was added in 3.4.0 MIN_ZK_VERSION = (3, 4, 0) +LOCK_POSTFIX = ".lock" +JOB_PREFIX = 'job' def _check_who(who): @@ -74,7 +76,7 @@ class ZookeeperJob(base_job.Job): raise ValueError("Only one of 'book_data' or 'book'" " can be provided") self._path = path - self._lock_path = "%s.lock" % (path) + self._lock_path = path + LOCK_POSTFIX self._created_on = created_on self._node_not_found = False @@ -232,7 +234,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._job_watcher = None # Since we use sequenced ids this will be the path that the sequences # are prefixed with, for example, job0000000001, job0000000002, ... - self._job_base = k_paths.join(path, "job") + self._job_base = k_paths.join(path, JOB_PREFIX) self._worker = None self._emit_notifications = bool(emit_notifications) @@ -331,7 +333,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _on_job_posting(self, children, delayed=True): LOG.debug("Got children %s under path %s", children, self.path) - child_paths = [k_paths.join(self.path, c) for c in children] + child_paths = [] + for c in children: + if c.endswith(LOCK_POSTFIX) or not c.startswith(JOB_PREFIX): + # Skip lock paths or non-job-paths (these are not valid jobs) + continue + child_paths.append(k_paths.join(self.path, c)) # Remove jobs that we know about but which are no longer children with self._job_mutate: