Use condition variables using 'with'

Instead of doing [acquire, try, finally, release]
just use the condition variable as a context manager
to achieve the same effect with less code and with
less verbosity.

Change-Id: I0a3bb80a932a3dc6623ba2378afa0341e9e06e5a
This commit is contained in:
Joshua Harlow
2014-12-07 17:16:13 -08:00
committed by Joshua Harlow
parent cd664bdd3b
commit 4eb0ca21b3
3 changed files with 7 additions and 28 deletions

View File

@@ -107,12 +107,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
# Add worker info to the cache
LOG.debug("Received that tasks %s can be processed by topic '%s'",
tasks, topic)
self._workers_arrival.acquire()
try:
with self._workers_arrival:
self._workers_cache[topic] = tasks
self._workers_arrival.notify_all()
finally:
self._workers_arrival.release()
# Publish waiting requests
for request in self._requests_cache.get_waiting_requests(tasks):
@@ -255,8 +252,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
w = None
if timeout is not None:
w = tt.StopWatch(timeout).start()
self._workers_arrival.acquire()
try:
with self._workers_arrival:
while len(self._workers_cache) < workers:
if w is not None and w.expired():
return workers - len(self._workers_cache)
@@ -265,8 +261,6 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
timeout = w.leftover()
self._workers_arrival.wait(timeout)
return 0
finally:
self._workers_arrival.release()
def start(self):
"""Starts proxy thread and associated topic notification thread."""

View File

@@ -395,8 +395,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
LOG.warn("Internal error fetching job data from path: %s",
path, exc_info=True)
else:
self._job_cond.acquire()
try:
with self._job_cond:
# Now we can offically check if someone already placed this
# jobs information into the known job set (if it's already
# existing then just leave it alone).
@@ -409,8 +408,6 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
created_on=created_on)
self._known_jobs[path] = job
self._job_cond.notify_all()
finally:
self._job_cond.release()
if job is not None:
self._emit(jobboard.POSTED, details={'job': job})
@@ -488,12 +485,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
self._persistence, job_path,
book=book, details=details,
uuid=job_uuid)
self._job_cond.acquire()
try:
with self._job_cond:
self._known_jobs[job_path] = job
self._job_cond.notify_all()
finally:
self._job_cond.release()
self._emit(jobboard.POSTED, details={'job': job})
return job
@@ -634,8 +628,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
watch = None
if timeout is not None:
watch = tt.StopWatch(duration=float(timeout)).start()
self._job_cond.acquire()
try:
with self._job_cond:
while True:
if not self._known_jobs:
if watch is not None and watch.expired():
@@ -656,8 +649,6 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
it._jobs.extend(self._fetch_jobs())
it._fetched = True
return it
finally:
self._job_cond.release()
@property
def connected(self):

View File

@@ -41,13 +41,10 @@ class Latch(object):
def countdown(self):
"""Decrements the internal counter due to an arrival."""
self._cond.acquire()
try:
with self._cond:
self._count -= 1
if self._count <= 0:
self._cond.notify_all()
finally:
self._cond.release()
def wait(self, timeout=None):
"""Waits until the latch is released.
@@ -60,8 +57,7 @@ class Latch(object):
w = None
if timeout is not None:
w = tt.StopWatch(timeout).start()
self._cond.acquire()
try:
with self._cond:
while self._count > 0:
if w is not None:
if w.expired():
@@ -70,5 +66,3 @@ class Latch(object):
timeout = w.leftover()
self._cond.wait(timeout)
return True
finally:
self._cond.release()