Adjust on_job_posting to not hold the lock while investigating

To avoid issues when investigating jobs in one thread and having
a dispatcher thread get locked on that same investigation lock avoid
this entirely by not holding onto that lock.

This also adds a small cleanup around the notifier thread pool and
ensures that it actually exists in a useable state (and handles the
exceptions that arise when it doesn't) on submission.

Change-Id: I12e71f029726ec54ec0aadbdf71cc3c7a959f047
This commit is contained in:
Joshua Harlow
2014-07-16 19:53:45 -07:00
committed by Joshua Harlow
parent d43cc4f9c3
commit f8d69ffc31

View File

@@ -312,8 +312,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
def _emit(self, state, details):
# Submit the work to the executor to avoid blocking the kazoo queue.
if self._worker is not None:
try:
self._worker.submit(self.notifier.notify, state, details)
except (AttributeError, RuntimeError):
# Notification thread is shutdown or non-existent, either case we
# just want to skip submitting a notification...
pass
@property
def path(self):
@@ -382,6 +386,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
else:
self._job_cond.acquire()
try:
# Now we can offically check if someone already placed this
# jobs information into the known job set (if it's already
# existing then just leave it alone).
if path not in self._known_jobs:
job = ZookeeperJob(job_data['name'], self,
self._client, self._persistence, path,
@@ -405,32 +412,37 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
continue
child_paths.append(k_paths.join(self.path, c))
# Remove jobs that we know about but which are no longer children
# Figure out what we really should be investigating and what we
# shouldn't...
investigate_paths = []
with self._job_lock:
removals = set()
for path, _job in six.iteritems(self._known_jobs):
for path in six.iterkeys(self._known_jobs):
if path not in child_paths:
removals.add(path)
for path in removals:
self._remove_job(path)
# Ensure that we have a job record for each new job that has appeared
for path in child_paths:
if path in self._bad_paths:
continue
with self._job_lock:
if path not in self._known_jobs:
# Fire off the request to populate this job asynchronously.
#
# This method is *usually* called from a asynchronous
# handler so it's better to exit from this quickly to
# allow other asynchronous handlers to be executed.
request = self._client.get_async(path)
child_proc = functools.partial(self._process_child, path)
if delayed:
request.rawlink(child_proc)
else:
child_proc(request)
for path in child_paths:
if path in self._bad_paths:
continue
# This pre-check will not guarantee that we will not already
# have the job (if it's being populated elsewhere) but it will
# reduce the amount of duplicated requests in general.
if path in self._known_jobs:
continue
if path not in investigate_paths:
investigate_paths.append(path)
for path in investigate_paths:
# Fire off the request to populate this job.
#
# This method is *usually* called from a asynchronous handler so
# it's better to exit from this quickly to allow other asynchronous
# handlers to be executed.
request = self._client.get_async(path)
if delayed:
request.rawlink(functools.partial(self._process_child, path))
else:
self._process_child(path, request)
def post(self, name, book=None, details=None):