Merge "Storage protects lower level backend against thread safety"

This commit is contained in:
Jenkins
2014-02-13 11:49:41 +00:00
committed by Gerrit Code Review
2 changed files with 5 additions and 61 deletions

View File

@@ -21,8 +21,6 @@ import errno
import logging import logging
import os import os
import shutil import shutil
import threading
import weakref
import six import six
@@ -35,38 +33,18 @@ from taskflow.utils import persistence_utils as p_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# The lock storage is not thread safe to set items in, so this lock is used to
# protect that access.
_LOCK_STORAGE_MUTATE = threading.RLock()
# Currently in use paths -> in-process locks are maintained here.
#
# NOTE(harlowja): Values in this dictionary will be automatically released once
# the objects referencing those objects have been garbage collected.
_LOCK_STORAGE = weakref.WeakValueDictionary()
class DirBackend(base.Backend): class DirBackend(base.Backend):
"""A backend that writes logbooks, flow details, and task details to a """A backend that writes logbooks, flow details, and task details to a
provided directory. This backend does *not* provide transactional semantics provided directory. This backend does *not* provide transactional semantics
although it does guarantee that there will be no race conditions when although it does guarantee that there will be no race conditions when
writing/reading by using file level locking and in-process locking. writing/reading by using file level locking.
NOTE(harlowja): this is more of an example/testing backend and likely
should *not* be used in production, since this backend lacks transactional
semantics.
""" """
def __init__(self, conf): def __init__(self, conf):
super(DirBackend, self).__init__(conf) super(DirBackend, self).__init__(conf)
self._path = os.path.abspath(conf['path']) self._path = os.path.abspath(conf['path'])
self._lock_path = os.path.join(self._path, 'locks') self._lock_path = os.path.join(self._path, 'locks')
self._file_cache = {} self._file_cache = {}
# Ensure that multiple threads are not accessing the same storage at
# the same time, the file lock mechanism doesn't protect against this
# so we must do in-process locking as well.
with _LOCK_STORAGE_MUTATE:
self._lock = _LOCK_STORAGE.setdefault(self._path,
threading.RLock())
@property @property
def lock_path(self): def lock_path(self):
@@ -90,10 +68,6 @@ class Connection(base.Connection):
self._flow_path = os.path.join(self._backend.base_path, 'flows') self._flow_path = os.path.join(self._backend.base_path, 'flows')
self._task_path = os.path.join(self._backend.base_path, 'tasks') self._task_path = os.path.join(self._backend.base_path, 'tasks')
self._book_path = os.path.join(self._backend.base_path, 'books') self._book_path = os.path.join(self._backend.base_path, 'books')
# Share the backends lock so that all threads using the given backend
# are restricted in writing, since the per-process lock we are using
# to restrict the multi-process access does not work inside a process.
self._lock = backend._lock
def validate(self): def validate(self):
# Verify key paths exist. # Verify key paths exist.
@@ -155,10 +129,7 @@ class Connection(base.Connection):
pass pass
def get_logbooks(self): def get_logbooks(self):
# NOTE(harlowja): don't hold the lock while iterating. for b in list(self._get_logbooks()):
with self._lock:
books = list(self._get_logbooks())
for b in books:
yield b yield b
@property @property
@@ -184,7 +155,6 @@ class Connection(base.Connection):
self._write_to(td_path, jsonutils.dumps(td_data)) self._write_to(td_path, jsonutils.dumps(td_data))
return task_detail return task_detail
@lock_utils.locked
def update_task_details(self, task_detail): def update_task_details(self, task_detail):
return self._run_with_process_lock("task", return self._run_with_process_lock("task",
self._save_task_details, self._save_task_details,
@@ -266,7 +236,6 @@ class Connection(base.Connection):
list(flow_detail), task_path) list(flow_detail), task_path)
return flow_detail return flow_detail
@lock_utils.locked
def update_flow_details(self, flow_detail): def update_flow_details(self, flow_detail):
return self._run_with_process_lock("flow", return self._run_with_process_lock("flow",
self._save_flow_details, self._save_flow_details,
@@ -312,12 +281,10 @@ class Connection(base.Connection):
list(book), flow_path) list(book), flow_path)
return book return book
@lock_utils.locked
def save_logbook(self, book): def save_logbook(self, book):
return self._run_with_process_lock("book", return self._run_with_process_lock("book",
self._save_logbook, book) self._save_logbook, book)
@lock_utils.locked
def upgrade(self): def upgrade(self):
def _step_create(): def _step_create():
@@ -328,7 +295,6 @@ class Connection(base.Connection):
misc.ensure_tree(self._backend.lock_path) misc.ensure_tree(self._backend.lock_path)
self._run_with_process_lock("init", _step_create) self._run_with_process_lock("init", _step_create)
@lock_utils.locked
def clear_all(self): def clear_all(self):
def _step_clear(): def _step_clear():
@@ -348,7 +314,6 @@ class Connection(base.Connection):
# Acquire all locks by going through this little hierarchy. # Acquire all locks by going through this little hierarchy.
self._run_with_process_lock("init", _step_book) self._run_with_process_lock("init", _step_book)
@lock_utils.locked
def destroy_logbook(self, book_uuid): def destroy_logbook(self, book_uuid):
def _destroy_tasks(task_details): def _destroy_tasks(task_details):
@@ -406,7 +371,6 @@ class Connection(base.Connection):
lb.add(self._get_flow_details(fd_uuid)) lb.add(self._get_flow_details(fd_uuid))
return lb return lb
@lock_utils.locked
def get_logbook(self, book_uuid): def get_logbook(self, book_uuid):
return self._run_with_process_lock("book", return self._run_with_process_lock("book",
self._get_logbook, book_uuid) self._get_logbook, book_uuid)

View File

@@ -20,27 +20,25 @@
"""Implementation of in-memory backend.""" """Implementation of in-memory backend."""
import logging import logging
import threading
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow.openstack.common import timeutils from taskflow.openstack.common import timeutils
from taskflow.persistence.backends import base from taskflow.persistence.backends import base
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow.utils import lock_utils
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class MemoryBackend(base.Backend): class MemoryBackend(base.Backend):
"""A backend that writes logbooks, flow details, and task details to in
memory dictionaries.
"""
def __init__(self, conf): def __init__(self, conf):
super(MemoryBackend, self).__init__(conf) super(MemoryBackend, self).__init__(conf)
self._log_books = {} self._log_books = {}
self._flow_details = {} self._flow_details = {}
self._task_details = {} self._task_details = {}
self._save_lock = threading.RLock()
self._read_lock = threading.RLock()
self._read_save_order = (self._read_lock, self._save_lock)
@property @property
def log_books(self): def log_books(self):
@@ -54,14 +52,6 @@ class MemoryBackend(base.Backend):
def task_details(self): def task_details(self):
return self._task_details return self._task_details
@property
def read_locks(self):
return (self._read_lock,)
@property
def save_locks(self):
return self._read_save_order
def get_connection(self): def get_connection(self):
return Connection(self) return Connection(self)
@@ -71,8 +61,6 @@ class MemoryBackend(base.Backend):
class Connection(base.Connection): class Connection(base.Connection):
def __init__(self, backend): def __init__(self, backend):
self._read_locks = backend.read_locks
self._save_locks = backend.save_locks
self._backend = backend self._backend = backend
def upgrade(self): def upgrade(self):
@@ -88,7 +76,6 @@ class Connection(base.Connection):
def close(self): def close(self):
pass pass
@lock_utils.locked(lock="_save_locks")
def clear_all(self): def clear_all(self):
count = 0 count = 0
for uuid in list(self.backend.log_books.keys()): for uuid in list(self.backend.log_books.keys()):
@@ -96,7 +83,6 @@ class Connection(base.Connection):
count += 1 count += 1
return count return count
@lock_utils.locked(lock="_save_locks")
def destroy_logbook(self, book_uuid): def destroy_logbook(self, book_uuid):
try: try:
# Do the same cascading delete that the sql layer does. # Do the same cascading delete that the sql layer does.
@@ -108,7 +94,6 @@ class Connection(base.Connection):
except KeyError: except KeyError:
raise exc.NotFound("No logbook found with id: %s" % book_uuid) raise exc.NotFound("No logbook found with id: %s" % book_uuid)
@lock_utils.locked(lock="_save_locks")
def update_task_details(self, task_detail): def update_task_details(self, task_detail):
try: try:
e_td = self.backend.task_details[task_detail.uuid] e_td = self.backend.task_details[task_detail.uuid]
@@ -128,7 +113,6 @@ class Connection(base.Connection):
self.backend.task_details[task_detail.uuid] = e_td self.backend.task_details[task_detail.uuid] = e_td
p_utils.task_details_merge(e_td, task_detail, deep_copy=True) p_utils.task_details_merge(e_td, task_detail, deep_copy=True)
@lock_utils.locked(lock="_save_locks")
def update_flow_details(self, flow_detail): def update_flow_details(self, flow_detail):
try: try:
e_fd = self.backend.flow_details[flow_detail.uuid] e_fd = self.backend.flow_details[flow_detail.uuid]
@@ -139,7 +123,6 @@ class Connection(base.Connection):
self._save_flowdetail_tasks(e_fd, flow_detail) self._save_flowdetail_tasks(e_fd, flow_detail)
return e_fd return e_fd
@lock_utils.locked(lock="_save_locks")
def save_logbook(self, book): def save_logbook(self, book):
# Get a existing logbook model (or create it if it isn't there). # Get a existing logbook model (or create it if it isn't there).
try: try:
@@ -169,18 +152,15 @@ class Connection(base.Connection):
self._save_flowdetail_tasks(e_fd, flow_detail) self._save_flowdetail_tasks(e_fd, flow_detail)
return e_lb return e_lb
@lock_utils.locked(lock='_read_locks')
def get_logbook(self, book_uuid): def get_logbook(self, book_uuid):
try: try:
return self.backend.log_books[book_uuid] return self.backend.log_books[book_uuid]
except KeyError: except KeyError:
raise exc.NotFound("No logbook found with id: %s" % book_uuid) raise exc.NotFound("No logbook found with id: %s" % book_uuid)
@lock_utils.locked(lock='_read_locks')
def _get_logbooks(self): def _get_logbooks(self):
return list(self.backend.log_books.values()) return list(self.backend.log_books.values())
def get_logbooks(self): def get_logbooks(self):
# NOTE(harlowja): don't hold the lock while iterating.
for lb in self._get_logbooks(): for lb in self._get_logbooks():
yield lb yield lb