From 34b358a137b307eb11bb362baa4459613a56b3e0 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sat, 25 Oct 2014 21:58:48 -0700 Subject: [PATCH] Use standard threading locks in the cache types Instead of using a reader/writer lock in the cache types just take advantage of the fact that single, non-mutating operations on dictionaries are thread-safe in python. This means we can remove the need to have simple read-only operations using read-locks and can just use a simpler lock around write-operations or around read/write operations that span multiple dictionary operations (like iteration). Change-Id: I4679275d7fe25fac12a6b05e8aa38da95649f4f6 --- taskflow/engines/worker_based/cache.py | 4 ++-- taskflow/types/cache.py | 22 ++++++++++------------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/taskflow/engines/worker_based/cache.py b/taskflow/engines/worker_based/cache.py index 9da7f12c..3b00890d 100644 --- a/taskflow/engines/worker_based/cache.py +++ b/taskflow/engines/worker_based/cache.py @@ -28,7 +28,7 @@ class RequestsCache(base.ExpiringCache): def get_waiting_requests(self, tasks): """Get list of waiting requests by tasks.""" waiting_requests = [] - with self._lock.read_lock(): + with self._lock: for request in six.itervalues(self._data): if request.state == pr.WAITING and request.task_cls in tasks: waiting_requests.append(request) @@ -41,7 +41,7 @@ class WorkersCache(base.ExpiringCache): def get_topic_by_task(self, task): """Get topic for a given task.""" available_topics = [] - with self._lock.read_lock(): + with self._lock: for topic, tasks in six.iteritems(self._data): if task in tasks: available_topics.append(topic) diff --git a/taskflow/types/cache.py b/taskflow/types/cache.py index 72214fed..61511e12 100644 --- a/taskflow/types/cache.py +++ b/taskflow/types/cache.py @@ -14,9 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +import threading + import six -from taskflow.utils import lock_utils as lu from taskflow.utils import reflection @@ -30,41 +31,38 @@ class ExpiringCache(object): def __init__(self): self._data = {} - self._lock = lu.ReaderWriterLock() + self._lock = threading.Lock() def __setitem__(self, key, value): """Set a value in the cache.""" - with self._lock.write_lock(): + with self._lock: self._data[key] = value def __len__(self): """Returns how many items are in this cache.""" - with self._lock.read_lock(): - return len(self._data) + return len(self._data) def get(self, key, default=None): """Retrieve a value from the cache (returns default if not found).""" - with self._lock.read_lock(): - return self._data.get(key, default) + return self._data.get(key, default) def __getitem__(self, key): """Retrieve a value from the cache.""" - with self._lock.read_lock(): - return self._data[key] + return self._data[key] def __delitem__(self, key): """Delete a key & value from the cache.""" - with self._lock.write_lock(): + with self._lock: del self._data[key] def cleanup(self, on_expired_callback=None): """Delete out-dated keys & values from the cache.""" - with self._lock.write_lock(): + with self._lock: expired_values = [(k, v) for k, v in six.iteritems(self._data) if v.expired] for (k, _v) in expired_values: del self._data[k] - if on_expired_callback: + if on_expired_callback is not None: arg_c = len(reflection.get_callable_args(on_expired_callback)) for (k, v) in expired_values: if arg_c == 2: