From 84b387f8bb56d7c03dbe56a84ddbad94bf46262e Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 25 Sep 2014 18:27:05 -0700 Subject: [PATCH] Rework the in-memory backend This avoids storing direct copies of incoming objects and makes sure that we always merge incoming objects (if a saved object already exists) or create a copy of the incoming object if it does not exist when storing. On retrieval we also always return copies instead of returning the data that is stored internally to avoid the problems that can be hard to detect when users (engine or other) modify those source objects. Fixes bug 1365830 Also fixes a retry test case issue that was discovered due to this more easily useable/understandable memory backend changes... Change-Id: I2afdda7beb71e35f7e12d9fd7ccf90b6c5447274 --- taskflow/persistence/backends/impl_memory.py | 223 +++++++++++++------ taskflow/persistence/logbook.py | 57 ++++- taskflow/tests/unit/test_retries.py | 2 +- taskflow/types/failure.py | 2 +- 4 files changed, 211 insertions(+), 73 deletions(-) diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 6c58718aa..d266c17f4 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -15,16 +15,97 @@ # License for the specific language governing permissions and limitations # under the License. +import functools + import six from taskflow import exceptions as exc from taskflow import logging from taskflow.persistence.backends import base from taskflow.persistence import logbook +from taskflow.utils import lock_utils LOG = logging.getLogger(__name__) +class _Memory(object): + """Where the data is really stored.""" + + def __init__(self): + self.log_books = {} + self.flow_details = {} + self.atom_details = {} + + def clear_all(self): + self.log_books.clear() + self.flow_details.clear() + self.atom_details.clear() + + +class _MemoryHelper(object): + """Helper functionality for the memory backends & connections.""" + + def __init__(self, memory): + self._memory = memory + + @staticmethod + def _fetch_clone_args(incoming): + if isinstance(incoming, (logbook.LogBook, logbook.FlowDetail)): + # We keep our own copy of the added contents of the following + # types so we don't need the clone to retain them directly... + return { + 'retain_contents': False, + } + return {} + + def construct(self, uuid, container): + """Reconstructs a object from the given uuid and storage container.""" + source = container[uuid] + clone_kwargs = self._fetch_clone_args(source) + clone = source['object'].copy(**clone_kwargs) + rebuilder = source.get('rebuilder') + if rebuilder: + for component in map(rebuilder, source['components']): + clone.add(component) + return clone + + def merge(self, incoming, saved_info=None): + """Merges the incoming object into the local memories copy.""" + if saved_info is None: + if isinstance(incoming, logbook.LogBook): + saved_info = self._memory.log_books.setdefault( + incoming.uuid, {}) + elif isinstance(incoming, logbook.FlowDetail): + saved_info = self._memory.flow_details.setdefault( + incoming.uuid, {}) + elif isinstance(incoming, logbook.AtomDetail): + saved_info = self._memory.atom_details.setdefault( + incoming.uuid, {}) + else: + raise TypeError("Unknown how to merge type '%s'" + % type(incoming)) + try: + saved_info['object'].merge(incoming) + except KeyError: + clone_kwargs = self._fetch_clone_args(incoming) + saved_info['object'] = incoming.copy(**clone_kwargs) + if isinstance(incoming, logbook.LogBook): + flow_details = saved_info.setdefault('components', set()) + if 'rebuilder' not in saved_info: + saved_info['rebuilder'] = functools.partial( + self.construct, container=self._memory.flow_details) + for flow_detail in incoming: + flow_details.add(self.merge(flow_detail)) + elif isinstance(incoming, logbook.FlowDetail): + atom_details = saved_info.setdefault('components', set()) + if 'rebuilder' not in saved_info: + saved_info['rebuilder'] = functools.partial( + self.construct, container=self._memory.atom_details) + for atom_detail in incoming: + atom_details.add(self.merge(atom_detail)) + return incoming.uuid + + class MemoryBackend(base.Backend): """A in-memory (non-persistent) backend. @@ -33,21 +114,28 @@ class MemoryBackend(base.Backend): """ def __init__(self, conf=None): super(MemoryBackend, self).__init__(conf) - self._log_books = {} - self._flow_details = {} - self._atom_details = {} + self._memory = _Memory() + self._helper = _MemoryHelper(self._memory) + self._lock = lock_utils.ReaderWriterLock() + + def _construct_from(self, container): + return dict((uuid, self._helper.construct(uuid, container)) + for uuid in six.iterkeys(container)) @property def log_books(self): - return self._log_books + with self._lock.read_lock(): + return self._construct_from(self._memory.log_books) @property def flow_details(self): - return self._flow_details + with self._lock.read_lock(): + return self._construct_from(self._memory.flow_details) @property def atom_details(self): - return self._atom_details + with self._lock.read_lock(): + return self._construct_from(self._memory.atom_details) def get_connection(self): return Connection(self) @@ -57,8 +145,13 @@ class MemoryBackend(base.Backend): class Connection(base.Connection): + """A connection to an in-memory backend.""" + def __init__(self, backend): self._backend = backend + self._helper = backend._helper + self._memory = backend._memory + self._lock = backend._lock def upgrade(self): pass @@ -74,78 +167,70 @@ class Connection(base.Connection): pass def clear_all(self): - count = 0 - for book_uuid in list(six.iterkeys(self.backend.log_books)): - self.destroy_logbook(book_uuid) - count += 1 - return count + with self._lock.write_lock(): + self._memory.clear_all() def destroy_logbook(self, book_uuid): - try: - # Do the same cascading delete that the sql layer does. - lb = self.backend.log_books.pop(book_uuid) - for fd in lb: - self.backend.flow_details.pop(fd.uuid, None) - for ad in fd: - self.backend.atom_details.pop(ad.uuid, None) - except KeyError: - raise exc.NotFound("No logbook found with id: %s" % book_uuid) + with self._lock.write_lock(): + try: + # Do the same cascading delete that the sql layer does. + book_info = self._memory.log_books.pop(book_uuid) + except KeyError: + raise exc.NotFound("No logbook found with uuid '%s'" + % book_uuid) + else: + while book_info['components']: + flow_uuid = book_info['components'].pop() + flow_info = self._memory.flow_details.pop(flow_uuid) + while flow_info['components']: + atom_uuid = flow_info['components'].pop() + self._memory.atom_details.pop(atom_uuid) def update_atom_details(self, atom_detail): - try: - e_ad = self.backend.atom_details[atom_detail.uuid] - except KeyError: - raise exc.NotFound("No atom details found with id: %s" - % atom_detail.uuid) - return e_ad.merge(atom_detail, deep_copy=True) - - def _save_flowdetail_atoms(self, e_fd, flow_detail): - for atom_detail in flow_detail: - e_ad = e_fd.find(atom_detail.uuid) - if e_ad is None: - e_fd.add(atom_detail) - self.backend.atom_details[atom_detail.uuid] = atom_detail - else: - e_ad.merge(atom_detail, deep_copy=True) + with self._lock.write_lock(): + try: + atom_info = self._memory.atom_details[atom_detail.uuid] + return self._helper.construct( + self._helper.merge(atom_detail, saved_info=atom_info), + self._memory.atom_details) + except KeyError: + raise exc.NotFound("No atom details found with uuid '%s'" + % atom_detail.uuid) def update_flow_details(self, flow_detail): - try: - e_fd = self.backend.flow_details[flow_detail.uuid] - except KeyError: - raise exc.NotFound("No flow details found with id: %s" - % flow_detail.uuid) - e_fd.merge(flow_detail, deep_copy=True) - self._save_flowdetail_atoms(e_fd, flow_detail) - return e_fd + with self._lock.write_lock(): + try: + flow_info = self._memory.flow_details[flow_detail.uuid] + return self._helper.construct( + self._helper.merge(flow_detail, saved_info=flow_info), + self._memory.flow_details) + except KeyError: + raise exc.NotFound("No flow details found with uuid '%s'" + % flow_detail.uuid) def save_logbook(self, book): - # Get a existing logbook model (or create it if it isn't there). - try: - e_lb = self.backend.log_books[book.uuid] - except KeyError: - e_lb = logbook.LogBook(book.name, uuid=book.uuid) - self.backend.log_books[e_lb.uuid] = e_lb - - e_lb.merge(book, deep_copy=True) - # Add anything in to the new logbook that isn't already in the existing - # logbook. - for flow_detail in book: - try: - e_fd = self.backend.flow_details[flow_detail.uuid] - except KeyError: - e_fd = logbook.FlowDetail(flow_detail.name, flow_detail.uuid) - e_lb.add(e_fd) - self.backend.flow_details[e_fd.uuid] = e_fd - e_fd.merge(flow_detail, deep_copy=True) - self._save_flowdetail_atoms(e_fd, flow_detail) - return e_lb + with self._lock.write_lock(): + return self._helper.construct(self._helper.merge(book), + self._memory.log_books) def get_logbook(self, book_uuid): - try: - return self.backend.log_books[book_uuid] - except KeyError: - raise exc.NotFound("No logbook found with id: %s" % book_uuid) + with self._lock.read_lock(): + try: + return self._helper.construct(book_uuid, + self._memory.log_books) + except KeyError: + raise exc.NotFound("No logbook found with uuid '%s'" + % book_uuid) def get_logbooks(self): - for lb in list(six.itervalues(self.backend.log_books)): - yield lb + # Don't hold locks while iterating... + with self._lock.read_lock(): + book_uuids = set(six.iterkeys(self._memory.log_books)) + for book_uuid in book_uuids: + try: + with self._lock.read_lock(): + book = self._helper.construct(book_uuid, + self._memory.log_books) + yield book + except KeyError: + pass diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index ea6de4d00..f84f37c30 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -137,7 +137,7 @@ class LogBook(object): @classmethod def from_dict(cls, data, unmarshal_time=False): - """Translates the given data into an instance of this class.""" + """Translates the given dictionary into an instance of this class.""" if not unmarshal_time: unmarshal_fn = lambda x: x else: @@ -163,6 +163,17 @@ class LogBook(object): def __len__(self): return len(self._flowdetails_by_id) + def copy(self, retain_contents=True): + """Copies/clones this log book.""" + clone = copy.copy(self) + if not retain_contents: + clone._flowdetails_by_id = {} + else: + clone._flowdetails_by_id = self._flowdetails_by_id.copy() + if self.meta: + clone.meta = self.meta.copy() + return clone + class FlowDetail(object): """A container of atom details, a name and associated metadata. @@ -186,7 +197,7 @@ class FlowDetail(object): """Updates the objects state to be the same as the given one.""" if fd is self: return self - self._atomdetails_by_id = dict(fd._atomdetails_by_id) + self._atomdetails_by_id = fd._atomdetails_by_id self.state = fd.state self.meta = fd.meta return self @@ -206,6 +217,17 @@ class FlowDetail(object): self.state = fd.state return self + def copy(self, retain_contents=True): + """Copies/clones this flow detail.""" + clone = copy.copy(self) + if not retain_contents: + clone._atomdetails_by_id = {} + else: + clone._atomdetails_by_id = self._atomdetails_by_id.copy() + if self.meta: + clone.meta = self.meta.copy() + return clone + def to_dict(self): """Translates the internal state of this object to a dictionary. @@ -380,6 +402,7 @@ class AtomDetail(object): class TaskDetail(AtomDetail): """This class represents a task detail for flow task object.""" + def __init__(self, name, uuid): super(TaskDetail, self).__init__(name, uuid) @@ -410,6 +433,7 @@ class TaskDetail(AtomDetail): return self._to_dict_shared() def merge(self, other, deep_copy=False): + """Merges the current object state with the given ones state.""" if not isinstance(other, TaskDetail): raise exc.NotImplementedError("Can only merge with other" " task details") @@ -421,6 +445,16 @@ class TaskDetail(AtomDetail): self.results = copy_fn(other.results) return self + def copy(self): + """Copies/clones this task detail.""" + clone = copy.copy(self) + clone.results = copy.copy(self.results) + if self.meta: + clone.meta = self.meta.copy() + if self.version: + clone.version = copy.copy(self.version) + return clone + class RetryDetail(AtomDetail): """This class represents a retry detail for retry controller object.""" @@ -434,6 +468,24 @@ class RetryDetail(AtomDetail): self.state = state self.intention = states.EXECUTE + def copy(self): + """Copies/clones this retry detail.""" + clone = copy.copy(self) + results = [] + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + for (data, failures) in self.results: + copied_failures = {} + for (key, failure) in six.iteritems(failures): + copied_failures[key] = failure + results.append((data, copied_failures)) + clone.results = results + if self.meta: + clone.meta = self.meta.copy() + if self.version: + clone.version = copy.copy(self.version) + return clone + @property def last_results(self): try: @@ -496,6 +548,7 @@ class RetryDetail(AtomDetail): return base def merge(self, other, deep_copy=False): + """Merges the current object state with the given ones state.""" if not isinstance(other, RetryDetail): raise exc.NotImplementedError("Can only merge with other" " retry details") diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 544004351..095320103 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -559,7 +559,7 @@ class RetryTest(utils.EngineTestBase): # we execute retry engine.storage.save('flow-1_retry', 1) # task fails - fail = failure.Failure.from_exception(RuntimeError('foo')), + fail = failure.Failure.from_exception(RuntimeError('foo')) engine.storage.save('task1', fail, state=st.FAILURE) if when == 'task fails': return engine diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index b9d7a399f..87ff1bf6f 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -74,7 +74,7 @@ def _are_equal_exc_info_tuples(ei1, ei2): class Failure(object): - """Object that represents failure. + """An immutable object that represents failure. Failure objects encapsulate exception information so that they can be re-used later to re-raise, inspect, examine, log, print, serialize,