From 95b30d60cd1be5cbdf953e30c379822aad08ab07 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 12 Sep 2014 17:29:45 -0700 Subject: [PATCH] Refactor parts of the job lock/job condition zookeeper usage This reduces the necessary locks to operations which really only need to be locked, removing some of the ones which are not needed around read only operations or operations which are thread safe when used with dictionaries (popping a single item for example) or checking if a *string* key is in a dictionary, or fetching a dictionaries length... Change-Id: I28a9d66afa7f7b733b2963b8cee3edd45696561b --- taskflow/jobs/backends/impl_zookeeper.py | 31 ++++++++++-------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 96307790..f1decbbc 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -298,8 +298,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._persistence = persistence # Misc. internal details self._known_jobs = {} - self._job_lock = threading.RLock() - self._job_cond = threading.Condition(self._job_lock) + self._job_cond = threading.Condition() self._open_close_lock = threading.RLock() self._client.add_listener(self._state_change_listener) self._bad_paths = frozenset([path]) @@ -325,13 +324,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): @property def job_count(self): - with self._job_lock: - return len(self._known_jobs) + return len(self._known_jobs) def _fetch_jobs(self, ensure_fresh=False): if ensure_fresh: self._force_refresh() - with self._job_lock: + with self._job_cond: return sorted(six.itervalues(self._known_jobs)) def _force_refresh(self): @@ -356,8 +354,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _remove_job(self, path): LOG.debug("Removing job that was at path: %s", path) - with self._job_lock: - job = self._known_jobs.pop(path, None) + job = self._known_jobs.pop(path, None) if job is not None: self._emit(jobboard.REMOVAL, details={'job': job}) @@ -413,15 +410,14 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): child_paths.append(k_paths.join(self.path, c)) # Figure out what we really should be investigating and what we - # shouldn't... + # shouldn't (remove jobs that exist in our local version, but don't + # exist in the children anymore) and accumulate all paths that we + # need to trigger population of (without holding the job lock). investigate_paths = [] - with self._job_lock: - removals = set() + with self._job_cond: for path in six.iterkeys(self._known_jobs): if path not in child_paths: - removals.add(path) - for path in removals: - self._remove_job(path) + self._remove_job(path) for path in child_paths: if path in self._bad_paths: continue @@ -543,10 +539,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): if not job_path: raise ValueError("Unable to check if %r is a known path" % (job_path)) - with self._job_lock: - if job_path not in self._known_jobs: - fail_msg_tpl += ", unknown job" - raise excp.NotFound(fail_msg_tpl % (job_uuid)) + if job_path not in self._known_jobs: + fail_msg_tpl += ", unknown job" + raise excp.NotFound(fail_msg_tpl % (job_uuid)) try: yield except self._client.handler.timeout_exception as e: @@ -663,7 +658,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): LOG.debug("Shutting down the notifier") self._worker.shutdown() self._worker = None - with self._job_lock: + with self._job_cond: self._known_jobs.clear() LOG.debug("Stopped & cleared local state")