diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index 5f627972..d4772c7c 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -21,8 +21,6 @@ import errno import logging import os import shutil -import threading -import weakref import six @@ -35,38 +33,18 @@ from taskflow.utils import persistence_utils as p_utils LOG = logging.getLogger(__name__) -# The lock storage is not thread safe to set items in, so this lock is used to -# protect that access. -_LOCK_STORAGE_MUTATE = threading.RLock() - -# Currently in use paths -> in-process locks are maintained here. -# -# NOTE(harlowja): Values in this dictionary will be automatically released once -# the objects referencing those objects have been garbage collected. -_LOCK_STORAGE = weakref.WeakValueDictionary() - class DirBackend(base.Backend): """A backend that writes logbooks, flow details, and task details to a provided directory. This backend does *not* provide transactional semantics although it does guarantee that there will be no race conditions when - writing/reading by using file level locking and in-process locking. - - NOTE(harlowja): this is more of an example/testing backend and likely - should *not* be used in production, since this backend lacks transactional - semantics. + writing/reading by using file level locking. """ def __init__(self, conf): super(DirBackend, self).__init__(conf) self._path = os.path.abspath(conf['path']) self._lock_path = os.path.join(self._path, 'locks') self._file_cache = {} - # Ensure that multiple threads are not accessing the same storage at - # the same time, the file lock mechanism doesn't protect against this - # so we must do in-process locking as well. - with _LOCK_STORAGE_MUTATE: - self._lock = _LOCK_STORAGE.setdefault(self._path, - threading.RLock()) @property def lock_path(self): @@ -90,10 +68,6 @@ class Connection(base.Connection): self._flow_path = os.path.join(self._backend.base_path, 'flows') self._task_path = os.path.join(self._backend.base_path, 'tasks') self._book_path = os.path.join(self._backend.base_path, 'books') - # Share the backends lock so that all threads using the given backend - # are restricted in writing, since the per-process lock we are using - # to restrict the multi-process access does not work inside a process. - self._lock = backend._lock def validate(self): # Verify key paths exist. @@ -155,10 +129,7 @@ class Connection(base.Connection): pass def get_logbooks(self): - # NOTE(harlowja): don't hold the lock while iterating. - with self._lock: - books = list(self._get_logbooks()) - for b in books: + for b in list(self._get_logbooks()): yield b @property @@ -184,7 +155,6 @@ class Connection(base.Connection): self._write_to(td_path, jsonutils.dumps(td_data)) return task_detail - @lock_utils.locked def update_task_details(self, task_detail): return self._run_with_process_lock("task", self._save_task_details, @@ -266,7 +236,6 @@ class Connection(base.Connection): list(flow_detail), task_path) return flow_detail - @lock_utils.locked def update_flow_details(self, flow_detail): return self._run_with_process_lock("flow", self._save_flow_details, @@ -312,12 +281,10 @@ class Connection(base.Connection): list(book), flow_path) return book - @lock_utils.locked def save_logbook(self, book): return self._run_with_process_lock("book", self._save_logbook, book) - @lock_utils.locked def upgrade(self): def _step_create(): @@ -328,7 +295,6 @@ class Connection(base.Connection): misc.ensure_tree(self._backend.lock_path) self._run_with_process_lock("init", _step_create) - @lock_utils.locked def clear_all(self): def _step_clear(): @@ -348,7 +314,6 @@ class Connection(base.Connection): # Acquire all locks by going through this little hierarchy. self._run_with_process_lock("init", _step_book) - @lock_utils.locked def destroy_logbook(self, book_uuid): def _destroy_tasks(task_details): @@ -406,7 +371,6 @@ class Connection(base.Connection): lb.add(self._get_flow_details(fd_uuid)) return lb - @lock_utils.locked def get_logbook(self, book_uuid): return self._run_with_process_lock("book", self._get_logbook, book_uuid) diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index db9ba97f..0c74675d 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -20,27 +20,25 @@ """Implementation of in-memory backend.""" import logging -import threading from taskflow import exceptions as exc from taskflow.openstack.common import timeutils from taskflow.persistence.backends import base from taskflow.persistence import logbook -from taskflow.utils import lock_utils from taskflow.utils import persistence_utils as p_utils LOG = logging.getLogger(__name__) class MemoryBackend(base.Backend): + """A backend that writes logbooks, flow details, and task details to in + memory dictionaries. + """ def __init__(self, conf): super(MemoryBackend, self).__init__(conf) self._log_books = {} self._flow_details = {} self._task_details = {} - self._save_lock = threading.RLock() - self._read_lock = threading.RLock() - self._read_save_order = (self._read_lock, self._save_lock) @property def log_books(self): @@ -54,14 +52,6 @@ class MemoryBackend(base.Backend): def task_details(self): return self._task_details - @property - def read_locks(self): - return (self._read_lock,) - - @property - def save_locks(self): - return self._read_save_order - def get_connection(self): return Connection(self) @@ -71,8 +61,6 @@ class MemoryBackend(base.Backend): class Connection(base.Connection): def __init__(self, backend): - self._read_locks = backend.read_locks - self._save_locks = backend.save_locks self._backend = backend def upgrade(self): @@ -88,7 +76,6 @@ class Connection(base.Connection): def close(self): pass - @lock_utils.locked(lock="_save_locks") def clear_all(self): count = 0 for uuid in list(self.backend.log_books.keys()): @@ -96,7 +83,6 @@ class Connection(base.Connection): count += 1 return count - @lock_utils.locked(lock="_save_locks") def destroy_logbook(self, book_uuid): try: # Do the same cascading delete that the sql layer does. @@ -108,7 +94,6 @@ class Connection(base.Connection): except KeyError: raise exc.NotFound("No logbook found with id: %s" % book_uuid) - @lock_utils.locked(lock="_save_locks") def update_task_details(self, task_detail): try: e_td = self.backend.task_details[task_detail.uuid] @@ -128,7 +113,6 @@ class Connection(base.Connection): self.backend.task_details[task_detail.uuid] = e_td p_utils.task_details_merge(e_td, task_detail, deep_copy=True) - @lock_utils.locked(lock="_save_locks") def update_flow_details(self, flow_detail): try: e_fd = self.backend.flow_details[flow_detail.uuid] @@ -139,7 +123,6 @@ class Connection(base.Connection): self._save_flowdetail_tasks(e_fd, flow_detail) return e_fd - @lock_utils.locked(lock="_save_locks") def save_logbook(self, book): # Get a existing logbook model (or create it if it isn't there). try: @@ -169,18 +152,15 @@ class Connection(base.Connection): self._save_flowdetail_tasks(e_fd, flow_detail) return e_lb - @lock_utils.locked(lock='_read_locks') def get_logbook(self, book_uuid): try: return self.backend.log_books[book_uuid] except KeyError: raise exc.NotFound("No logbook found with id: %s" % book_uuid) - @lock_utils.locked(lock='_read_locks') def _get_logbooks(self): return list(self.backend.log_books.values()) def get_logbooks(self): - # NOTE(harlowja): don't hold the lock while iterating. for lb in self._get_logbooks(): yield lb