Refactor parts of the job lock/job condition zookeeper usage
This reduces the necessary locks to operations which really only need to be locked, removing some of the ones which are not needed around read only operations or operations which are thread safe when used with dictionaries (popping a single item for example) or checking if a *string* key is in a dictionary, or fetching a dictionaries length... Change-Id: I28a9d66afa7f7b733b2963b8cee3edd45696561b
This commit is contained in:
@@ -298,8 +298,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
self._persistence = persistence
|
||||
# Misc. internal details
|
||||
self._known_jobs = {}
|
||||
self._job_lock = threading.RLock()
|
||||
self._job_cond = threading.Condition(self._job_lock)
|
||||
self._job_cond = threading.Condition()
|
||||
self._open_close_lock = threading.RLock()
|
||||
self._client.add_listener(self._state_change_listener)
|
||||
self._bad_paths = frozenset([path])
|
||||
@@ -325,13 +324,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
|
||||
@property
|
||||
def job_count(self):
|
||||
with self._job_lock:
|
||||
return len(self._known_jobs)
|
||||
return len(self._known_jobs)
|
||||
|
||||
def _fetch_jobs(self, ensure_fresh=False):
|
||||
if ensure_fresh:
|
||||
self._force_refresh()
|
||||
with self._job_lock:
|
||||
with self._job_cond:
|
||||
return sorted(six.itervalues(self._known_jobs))
|
||||
|
||||
def _force_refresh(self):
|
||||
@@ -356,8 +354,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
|
||||
def _remove_job(self, path):
|
||||
LOG.debug("Removing job that was at path: %s", path)
|
||||
with self._job_lock:
|
||||
job = self._known_jobs.pop(path, None)
|
||||
job = self._known_jobs.pop(path, None)
|
||||
if job is not None:
|
||||
self._emit(jobboard.REMOVAL, details={'job': job})
|
||||
|
||||
@@ -413,15 +410,14 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
child_paths.append(k_paths.join(self.path, c))
|
||||
|
||||
# Figure out what we really should be investigating and what we
|
||||
# shouldn't...
|
||||
# shouldn't (remove jobs that exist in our local version, but don't
|
||||
# exist in the children anymore) and accumulate all paths that we
|
||||
# need to trigger population of (without holding the job lock).
|
||||
investigate_paths = []
|
||||
with self._job_lock:
|
||||
removals = set()
|
||||
with self._job_cond:
|
||||
for path in six.iterkeys(self._known_jobs):
|
||||
if path not in child_paths:
|
||||
removals.add(path)
|
||||
for path in removals:
|
||||
self._remove_job(path)
|
||||
self._remove_job(path)
|
||||
for path in child_paths:
|
||||
if path in self._bad_paths:
|
||||
continue
|
||||
@@ -543,10 +539,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
if not job_path:
|
||||
raise ValueError("Unable to check if %r is a known path"
|
||||
% (job_path))
|
||||
with self._job_lock:
|
||||
if job_path not in self._known_jobs:
|
||||
fail_msg_tpl += ", unknown job"
|
||||
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
||||
if job_path not in self._known_jobs:
|
||||
fail_msg_tpl += ", unknown job"
|
||||
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
||||
try:
|
||||
yield
|
||||
except self._client.handler.timeout_exception as e:
|
||||
@@ -663,7 +658,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
LOG.debug("Shutting down the notifier")
|
||||
self._worker.shutdown()
|
||||
self._worker = None
|
||||
with self._job_lock:
|
||||
with self._job_cond:
|
||||
self._known_jobs.clear()
|
||||
LOG.debug("Stopped & cleared local state")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user