Merge "Refactor parts of the job lock/job condition zookeeper usage"
This commit is contained in:
@@ -312,8 +312,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
self._persistence = persistence
|
||||
# Misc. internal details
|
||||
self._known_jobs = {}
|
||||
self._job_lock = threading.RLock()
|
||||
self._job_cond = threading.Condition(self._job_lock)
|
||||
self._job_cond = threading.Condition()
|
||||
self._open_close_lock = threading.RLock()
|
||||
self._client.add_listener(self._state_change_listener)
|
||||
self._bad_paths = frozenset([path])
|
||||
@@ -339,13 +338,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
|
||||
@property
|
||||
def job_count(self):
|
||||
with self._job_lock:
|
||||
return len(self._known_jobs)
|
||||
return len(self._known_jobs)
|
||||
|
||||
def _fetch_jobs(self, ensure_fresh=False):
|
||||
if ensure_fresh:
|
||||
self._force_refresh()
|
||||
with self._job_lock:
|
||||
with self._job_cond:
|
||||
return sorted(six.itervalues(self._known_jobs))
|
||||
|
||||
def _force_refresh(self):
|
||||
@@ -370,8 +368,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
|
||||
def _remove_job(self, path):
|
||||
LOG.debug("Removing job that was at path: %s", path)
|
||||
with self._job_lock:
|
||||
job = self._known_jobs.pop(path, None)
|
||||
job = self._known_jobs.pop(path, None)
|
||||
if job is not None:
|
||||
self._emit(jobboard.REMOVAL, details={'job': job})
|
||||
|
||||
@@ -427,15 +424,14 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
child_paths.append(k_paths.join(self.path, c))
|
||||
|
||||
# Figure out what we really should be investigating and what we
|
||||
# shouldn't...
|
||||
# shouldn't (remove jobs that exist in our local version, but don't
|
||||
# exist in the children anymore) and accumulate all paths that we
|
||||
# need to trigger population of (without holding the job lock).
|
||||
investigate_paths = []
|
||||
with self._job_lock:
|
||||
removals = set()
|
||||
with self._job_cond:
|
||||
for path in six.iterkeys(self._known_jobs):
|
||||
if path not in child_paths:
|
||||
removals.add(path)
|
||||
for path in removals:
|
||||
self._remove_job(path)
|
||||
self._remove_job(path)
|
||||
for path in child_paths:
|
||||
if path in self._bad_paths:
|
||||
continue
|
||||
@@ -557,10 +553,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
if not job_path:
|
||||
raise ValueError("Unable to check if %r is a known path"
|
||||
% (job_path))
|
||||
with self._job_lock:
|
||||
if job_path not in self._known_jobs:
|
||||
fail_msg_tpl += ", unknown job"
|
||||
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
||||
if job_path not in self._known_jobs:
|
||||
fail_msg_tpl += ", unknown job"
|
||||
raise excp.NotFound(fail_msg_tpl % (job_uuid))
|
||||
try:
|
||||
yield
|
||||
except self._client.handler.timeout_exception as e:
|
||||
@@ -677,7 +672,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
|
||||
LOG.debug("Shutting down the notifier")
|
||||
self._worker.shutdown()
|
||||
self._worker = None
|
||||
with self._job_lock:
|
||||
with self._job_cond:
|
||||
self._known_jobs.clear()
|
||||
LOG.debug("Stopped & cleared local state")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user