Storage protects lower level backend against thread safety
The storage changes to use a read/write lock provide locking guarantees that we do not need to duplicate in the underlying storage. This is predicated on the fact that we do the following: 1. A user before running upgrades the schema of there backend before giving the logbook/backend to the engine. This means that the backend upgrade() method does not need to be thread safe. 2. One engine working on one logbook at a given time. 3. An engine accesses and uses the logbook and its flow/task details by accessing data via the storage accessor interface. 4. Tasks do not currently get direct access to storage or the underlying logbook while running (in the future we could provide access to a storage object, but we would need to have a contract that ensures a task can not hijack an engines usage of storage by deadlocking the engine). Change-Id: I0efd5333928698802d25e1ff124e781f9a4907d9
This commit is contained in:
@@ -21,8 +21,6 @@ import errno
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
import weakref
|
||||
|
||||
import six
|
||||
|
||||
@@ -35,38 +33,18 @@ from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# The lock storage is not thread safe to set items in, so this lock is used to
|
||||
# protect that access.
|
||||
_LOCK_STORAGE_MUTATE = threading.RLock()
|
||||
|
||||
# Currently in use paths -> in-process locks are maintained here.
|
||||
#
|
||||
# NOTE(harlowja): Values in this dictionary will be automatically released once
|
||||
# the objects referencing those objects have been garbage collected.
|
||||
_LOCK_STORAGE = weakref.WeakValueDictionary()
|
||||
|
||||
|
||||
class DirBackend(base.Backend):
|
||||
"""A backend that writes logbooks, flow details, and task details to a
|
||||
provided directory. This backend does *not* provide transactional semantics
|
||||
although it does guarantee that there will be no race conditions when
|
||||
writing/reading by using file level locking and in-process locking.
|
||||
|
||||
NOTE(harlowja): this is more of an example/testing backend and likely
|
||||
should *not* be used in production, since this backend lacks transactional
|
||||
semantics.
|
||||
writing/reading by using file level locking.
|
||||
"""
|
||||
def __init__(self, conf):
|
||||
super(DirBackend, self).__init__(conf)
|
||||
self._path = os.path.abspath(conf['path'])
|
||||
self._lock_path = os.path.join(self._path, 'locks')
|
||||
self._file_cache = {}
|
||||
# Ensure that multiple threads are not accessing the same storage at
|
||||
# the same time, the file lock mechanism doesn't protect against this
|
||||
# so we must do in-process locking as well.
|
||||
with _LOCK_STORAGE_MUTATE:
|
||||
self._lock = _LOCK_STORAGE.setdefault(self._path,
|
||||
threading.RLock())
|
||||
|
||||
@property
|
||||
def lock_path(self):
|
||||
@@ -90,10 +68,6 @@ class Connection(base.Connection):
|
||||
self._flow_path = os.path.join(self._backend.base_path, 'flows')
|
||||
self._task_path = os.path.join(self._backend.base_path, 'tasks')
|
||||
self._book_path = os.path.join(self._backend.base_path, 'books')
|
||||
# Share the backends lock so that all threads using the given backend
|
||||
# are restricted in writing, since the per-process lock we are using
|
||||
# to restrict the multi-process access does not work inside a process.
|
||||
self._lock = backend._lock
|
||||
|
||||
def validate(self):
|
||||
# Verify key paths exist.
|
||||
@@ -155,10 +129,7 @@ class Connection(base.Connection):
|
||||
pass
|
||||
|
||||
def get_logbooks(self):
|
||||
# NOTE(harlowja): don't hold the lock while iterating.
|
||||
with self._lock:
|
||||
books = list(self._get_logbooks())
|
||||
for b in books:
|
||||
for b in list(self._get_logbooks()):
|
||||
yield b
|
||||
|
||||
@property
|
||||
@@ -184,7 +155,6 @@ class Connection(base.Connection):
|
||||
self._write_to(td_path, jsonutils.dumps(td_data))
|
||||
return task_detail
|
||||
|
||||
@lock_utils.locked
|
||||
def update_task_details(self, task_detail):
|
||||
return self._run_with_process_lock("task",
|
||||
self._save_task_details,
|
||||
@@ -266,7 +236,6 @@ class Connection(base.Connection):
|
||||
list(flow_detail), task_path)
|
||||
return flow_detail
|
||||
|
||||
@lock_utils.locked
|
||||
def update_flow_details(self, flow_detail):
|
||||
return self._run_with_process_lock("flow",
|
||||
self._save_flow_details,
|
||||
@@ -312,12 +281,10 @@ class Connection(base.Connection):
|
||||
list(book), flow_path)
|
||||
return book
|
||||
|
||||
@lock_utils.locked
|
||||
def save_logbook(self, book):
|
||||
return self._run_with_process_lock("book",
|
||||
self._save_logbook, book)
|
||||
|
||||
@lock_utils.locked
|
||||
def upgrade(self):
|
||||
|
||||
def _step_create():
|
||||
@@ -328,7 +295,6 @@ class Connection(base.Connection):
|
||||
misc.ensure_tree(self._backend.lock_path)
|
||||
self._run_with_process_lock("init", _step_create)
|
||||
|
||||
@lock_utils.locked
|
||||
def clear_all(self):
|
||||
|
||||
def _step_clear():
|
||||
@@ -348,7 +314,6 @@ class Connection(base.Connection):
|
||||
# Acquire all locks by going through this little hierarchy.
|
||||
self._run_with_process_lock("init", _step_book)
|
||||
|
||||
@lock_utils.locked
|
||||
def destroy_logbook(self, book_uuid):
|
||||
|
||||
def _destroy_tasks(task_details):
|
||||
@@ -406,7 +371,6 @@ class Connection(base.Connection):
|
||||
lb.add(self._get_flow_details(fd_uuid))
|
||||
return lb
|
||||
|
||||
@lock_utils.locked
|
||||
def get_logbook(self, book_uuid):
|
||||
return self._run_with_process_lock("book",
|
||||
self._get_logbook, book_uuid)
|
||||
|
||||
@@ -20,27 +20,25 @@
|
||||
"""Implementation of in-memory backend."""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.openstack.common import timeutils
|
||||
from taskflow.persistence.backends import base
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MemoryBackend(base.Backend):
|
||||
"""A backend that writes logbooks, flow details, and task details to in
|
||||
memory dictionaries.
|
||||
"""
|
||||
def __init__(self, conf):
|
||||
super(MemoryBackend, self).__init__(conf)
|
||||
self._log_books = {}
|
||||
self._flow_details = {}
|
||||
self._task_details = {}
|
||||
self._save_lock = threading.RLock()
|
||||
self._read_lock = threading.RLock()
|
||||
self._read_save_order = (self._read_lock, self._save_lock)
|
||||
|
||||
@property
|
||||
def log_books(self):
|
||||
@@ -54,14 +52,6 @@ class MemoryBackend(base.Backend):
|
||||
def task_details(self):
|
||||
return self._task_details
|
||||
|
||||
@property
|
||||
def read_locks(self):
|
||||
return (self._read_lock,)
|
||||
|
||||
@property
|
||||
def save_locks(self):
|
||||
return self._read_save_order
|
||||
|
||||
def get_connection(self):
|
||||
return Connection(self)
|
||||
|
||||
@@ -71,8 +61,6 @@ class MemoryBackend(base.Backend):
|
||||
|
||||
class Connection(base.Connection):
|
||||
def __init__(self, backend):
|
||||
self._read_locks = backend.read_locks
|
||||
self._save_locks = backend.save_locks
|
||||
self._backend = backend
|
||||
|
||||
def upgrade(self):
|
||||
@@ -88,7 +76,6 @@ class Connection(base.Connection):
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
@lock_utils.locked(lock="_save_locks")
|
||||
def clear_all(self):
|
||||
count = 0
|
||||
for uuid in list(self.backend.log_books.keys()):
|
||||
@@ -96,7 +83,6 @@ class Connection(base.Connection):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
@lock_utils.locked(lock="_save_locks")
|
||||
def destroy_logbook(self, book_uuid):
|
||||
try:
|
||||
# Do the same cascading delete that the sql layer does.
|
||||
@@ -108,7 +94,6 @@ class Connection(base.Connection):
|
||||
except KeyError:
|
||||
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
|
||||
|
||||
@lock_utils.locked(lock="_save_locks")
|
||||
def update_task_details(self, task_detail):
|
||||
try:
|
||||
e_td = self.backend.task_details[task_detail.uuid]
|
||||
@@ -128,7 +113,6 @@ class Connection(base.Connection):
|
||||
self.backend.task_details[task_detail.uuid] = e_td
|
||||
p_utils.task_details_merge(e_td, task_detail, deep_copy=True)
|
||||
|
||||
@lock_utils.locked(lock="_save_locks")
|
||||
def update_flow_details(self, flow_detail):
|
||||
try:
|
||||
e_fd = self.backend.flow_details[flow_detail.uuid]
|
||||
@@ -139,7 +123,6 @@ class Connection(base.Connection):
|
||||
self._save_flowdetail_tasks(e_fd, flow_detail)
|
||||
return e_fd
|
||||
|
||||
@lock_utils.locked(lock="_save_locks")
|
||||
def save_logbook(self, book):
|
||||
# Get a existing logbook model (or create it if it isn't there).
|
||||
try:
|
||||
@@ -169,18 +152,15 @@ class Connection(base.Connection):
|
||||
self._save_flowdetail_tasks(e_fd, flow_detail)
|
||||
return e_lb
|
||||
|
||||
@lock_utils.locked(lock='_read_locks')
|
||||
def get_logbook(self, book_uuid):
|
||||
try:
|
||||
return self.backend.log_books[book_uuid]
|
||||
except KeyError:
|
||||
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
|
||||
|
||||
@lock_utils.locked(lock='_read_locks')
|
||||
def _get_logbooks(self):
|
||||
return list(self.backend.log_books.values())
|
||||
|
||||
def get_logbooks(self):
|
||||
# NOTE(harlowja): don't hold the lock while iterating.
|
||||
for lb in self._get_logbooks():
|
||||
yield lb
|
||||
|
||||
Reference in New Issue
Block a user