From 4eb0ca21b3cdf9b468a241bb18cfb38f3c7e9f83 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sun, 7 Dec 2014 17:16:13 -0800 Subject: [PATCH] Use condition variables using 'with' Instead of doing [acquire, try, finally, release] just use the condition variable as a context manager to achieve the same effect with less code and with less verbosity. Change-Id: I0a3bb80a932a3dc6623ba2378afa0341e9e06e5a --- taskflow/engines/worker_based/executor.py | 10 ++-------- taskflow/jobs/backends/impl_zookeeper.py | 15 +++------------ taskflow/types/latch.py | 10 ++-------- 3 files changed, 7 insertions(+), 28 deletions(-) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 5982a463..092d4038 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -107,12 +107,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): # Add worker info to the cache LOG.debug("Received that tasks %s can be processed by topic '%s'", tasks, topic) - self._workers_arrival.acquire() - try: + with self._workers_arrival: self._workers_cache[topic] = tasks self._workers_arrival.notify_all() - finally: - self._workers_arrival.release() # Publish waiting requests for request in self._requests_cache.get_waiting_requests(tasks): @@ -255,8 +252,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): w = None if timeout is not None: w = tt.StopWatch(timeout).start() - self._workers_arrival.acquire() - try: + with self._workers_arrival: while len(self._workers_cache) < workers: if w is not None and w.expired(): return workers - len(self._workers_cache) @@ -265,8 +261,6 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): timeout = w.leftover() self._workers_arrival.wait(timeout) return 0 - finally: - self._workers_arrival.release() def start(self): """Starts proxy thread and associated topic notification thread.""" diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 05b519ec..ea485f8b 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -395,8 +395,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): LOG.warn("Internal error fetching job data from path: %s", path, exc_info=True) else: - self._job_cond.acquire() - try: + with self._job_cond: # Now we can offically check if someone already placed this # jobs information into the known job set (if it's already # existing then just leave it alone). @@ -409,8 +408,6 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): created_on=created_on) self._known_jobs[path] = job self._job_cond.notify_all() - finally: - self._job_cond.release() if job is not None: self._emit(jobboard.POSTED, details={'job': job}) @@ -488,12 +485,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._persistence, job_path, book=book, details=details, uuid=job_uuid) - self._job_cond.acquire() - try: + with self._job_cond: self._known_jobs[job_path] = job self._job_cond.notify_all() - finally: - self._job_cond.release() self._emit(jobboard.POSTED, details={'job': job}) return job @@ -634,8 +628,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): watch = None if timeout is not None: watch = tt.StopWatch(duration=float(timeout)).start() - self._job_cond.acquire() - try: + with self._job_cond: while True: if not self._known_jobs: if watch is not None and watch.expired(): @@ -656,8 +649,6 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): it._jobs.extend(self._fetch_jobs()) it._fetched = True return it - finally: - self._job_cond.release() @property def connected(self): diff --git a/taskflow/types/latch.py b/taskflow/types/latch.py index 0945a286..db6e56f3 100644 --- a/taskflow/types/latch.py +++ b/taskflow/types/latch.py @@ -41,13 +41,10 @@ class Latch(object): def countdown(self): """Decrements the internal counter due to an arrival.""" - self._cond.acquire() - try: + with self._cond: self._count -= 1 if self._count <= 0: self._cond.notify_all() - finally: - self._cond.release() def wait(self, timeout=None): """Waits until the latch is released. @@ -60,8 +57,7 @@ class Latch(object): w = None if timeout is not None: w = tt.StopWatch(timeout).start() - self._cond.acquire() - try: + with self._cond: while self._count > 0: if w is not None: if w.expired(): @@ -70,5 +66,3 @@ class Latch(object): timeout = w.leftover() self._cond.wait(timeout) return True - finally: - self._cond.release()