diff --git a/taskflow/engines/worker_based/cache.py b/taskflow/engines/worker_based/cache.py index 9da7f12c..3b00890d 100644 --- a/taskflow/engines/worker_based/cache.py +++ b/taskflow/engines/worker_based/cache.py @@ -28,7 +28,7 @@ class RequestsCache(base.ExpiringCache): def get_waiting_requests(self, tasks): """Get list of waiting requests by tasks.""" waiting_requests = [] - with self._lock.read_lock(): + with self._lock: for request in six.itervalues(self._data): if request.state == pr.WAITING and request.task_cls in tasks: waiting_requests.append(request) @@ -41,7 +41,7 @@ class WorkersCache(base.ExpiringCache): def get_topic_by_task(self, task): """Get topic for a given task.""" available_topics = [] - with self._lock.read_lock(): + with self._lock: for topic, tasks in six.iteritems(self._data): if task in tasks: available_topics.append(topic) diff --git a/taskflow/types/cache.py b/taskflow/types/cache.py index 72214fed..61511e12 100644 --- a/taskflow/types/cache.py +++ b/taskflow/types/cache.py @@ -14,9 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +import threading + import six -from taskflow.utils import lock_utils as lu from taskflow.utils import reflection @@ -30,41 +31,38 @@ class ExpiringCache(object): def __init__(self): self._data = {} - self._lock = lu.ReaderWriterLock() + self._lock = threading.Lock() def __setitem__(self, key, value): """Set a value in the cache.""" - with self._lock.write_lock(): + with self._lock: self._data[key] = value def __len__(self): """Returns how many items are in this cache.""" - with self._lock.read_lock(): - return len(self._data) + return len(self._data) def get(self, key, default=None): """Retrieve a value from the cache (returns default if not found).""" - with self._lock.read_lock(): - return self._data.get(key, default) + return self._data.get(key, default) def __getitem__(self, key): """Retrieve a value from the cache.""" - with self._lock.read_lock(): - return self._data[key] + return self._data[key] def __delitem__(self, key): """Delete a key & value from the cache.""" - with self._lock.write_lock(): + with self._lock: del self._data[key] def cleanup(self, on_expired_callback=None): """Delete out-dated keys & values from the cache.""" - with self._lock.write_lock(): + with self._lock: expired_values = [(k, v) for k, v in six.iteritems(self._data) if v.expired] for (k, _v) in expired_values: del self._data[k] - if on_expired_callback: + if on_expired_callback is not None: arg_c = len(reflection.get_callable_args(on_expired_callback)) for (k, v) in expired_values: if arg_c == 2: