diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 6c58718aa..d266c17f4 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -15,16 +15,97 @@ # License for the specific language governing permissions and limitations # under the License. +import functools + import six from taskflow import exceptions as exc from taskflow import logging from taskflow.persistence.backends import base from taskflow.persistence import logbook +from taskflow.utils import lock_utils LOG = logging.getLogger(__name__) +class _Memory(object): + """Where the data is really stored.""" + + def __init__(self): + self.log_books = {} + self.flow_details = {} + self.atom_details = {} + + def clear_all(self): + self.log_books.clear() + self.flow_details.clear() + self.atom_details.clear() + + +class _MemoryHelper(object): + """Helper functionality for the memory backends & connections.""" + + def __init__(self, memory): + self._memory = memory + + @staticmethod + def _fetch_clone_args(incoming): + if isinstance(incoming, (logbook.LogBook, logbook.FlowDetail)): + # We keep our own copy of the added contents of the following + # types so we don't need the clone to retain them directly... + return { + 'retain_contents': False, + } + return {} + + def construct(self, uuid, container): + """Reconstructs a object from the given uuid and storage container.""" + source = container[uuid] + clone_kwargs = self._fetch_clone_args(source) + clone = source['object'].copy(**clone_kwargs) + rebuilder = source.get('rebuilder') + if rebuilder: + for component in map(rebuilder, source['components']): + clone.add(component) + return clone + + def merge(self, incoming, saved_info=None): + """Merges the incoming object into the local memories copy.""" + if saved_info is None: + if isinstance(incoming, logbook.LogBook): + saved_info = self._memory.log_books.setdefault( + incoming.uuid, {}) + elif isinstance(incoming, logbook.FlowDetail): + saved_info = self._memory.flow_details.setdefault( + incoming.uuid, {}) + elif isinstance(incoming, logbook.AtomDetail): + saved_info = self._memory.atom_details.setdefault( + incoming.uuid, {}) + else: + raise TypeError("Unknown how to merge type '%s'" + % type(incoming)) + try: + saved_info['object'].merge(incoming) + except KeyError: + clone_kwargs = self._fetch_clone_args(incoming) + saved_info['object'] = incoming.copy(**clone_kwargs) + if isinstance(incoming, logbook.LogBook): + flow_details = saved_info.setdefault('components', set()) + if 'rebuilder' not in saved_info: + saved_info['rebuilder'] = functools.partial( + self.construct, container=self._memory.flow_details) + for flow_detail in incoming: + flow_details.add(self.merge(flow_detail)) + elif isinstance(incoming, logbook.FlowDetail): + atom_details = saved_info.setdefault('components', set()) + if 'rebuilder' not in saved_info: + saved_info['rebuilder'] = functools.partial( + self.construct, container=self._memory.atom_details) + for atom_detail in incoming: + atom_details.add(self.merge(atom_detail)) + return incoming.uuid + + class MemoryBackend(base.Backend): """A in-memory (non-persistent) backend. @@ -33,21 +114,28 @@ class MemoryBackend(base.Backend): """ def __init__(self, conf=None): super(MemoryBackend, self).__init__(conf) - self._log_books = {} - self._flow_details = {} - self._atom_details = {} + self._memory = _Memory() + self._helper = _MemoryHelper(self._memory) + self._lock = lock_utils.ReaderWriterLock() + + def _construct_from(self, container): + return dict((uuid, self._helper.construct(uuid, container)) + for uuid in six.iterkeys(container)) @property def log_books(self): - return self._log_books + with self._lock.read_lock(): + return self._construct_from(self._memory.log_books) @property def flow_details(self): - return self._flow_details + with self._lock.read_lock(): + return self._construct_from(self._memory.flow_details) @property def atom_details(self): - return self._atom_details + with self._lock.read_lock(): + return self._construct_from(self._memory.atom_details) def get_connection(self): return Connection(self) @@ -57,8 +145,13 @@ class MemoryBackend(base.Backend): class Connection(base.Connection): + """A connection to an in-memory backend.""" + def __init__(self, backend): self._backend = backend + self._helper = backend._helper + self._memory = backend._memory + self._lock = backend._lock def upgrade(self): pass @@ -74,78 +167,70 @@ class Connection(base.Connection): pass def clear_all(self): - count = 0 - for book_uuid in list(six.iterkeys(self.backend.log_books)): - self.destroy_logbook(book_uuid) - count += 1 - return count + with self._lock.write_lock(): + self._memory.clear_all() def destroy_logbook(self, book_uuid): - try: - # Do the same cascading delete that the sql layer does. - lb = self.backend.log_books.pop(book_uuid) - for fd in lb: - self.backend.flow_details.pop(fd.uuid, None) - for ad in fd: - self.backend.atom_details.pop(ad.uuid, None) - except KeyError: - raise exc.NotFound("No logbook found with id: %s" % book_uuid) + with self._lock.write_lock(): + try: + # Do the same cascading delete that the sql layer does. + book_info = self._memory.log_books.pop(book_uuid) + except KeyError: + raise exc.NotFound("No logbook found with uuid '%s'" + % book_uuid) + else: + while book_info['components']: + flow_uuid = book_info['components'].pop() + flow_info = self._memory.flow_details.pop(flow_uuid) + while flow_info['components']: + atom_uuid = flow_info['components'].pop() + self._memory.atom_details.pop(atom_uuid) def update_atom_details(self, atom_detail): - try: - e_ad = self.backend.atom_details[atom_detail.uuid] - except KeyError: - raise exc.NotFound("No atom details found with id: %s" - % atom_detail.uuid) - return e_ad.merge(atom_detail, deep_copy=True) - - def _save_flowdetail_atoms(self, e_fd, flow_detail): - for atom_detail in flow_detail: - e_ad = e_fd.find(atom_detail.uuid) - if e_ad is None: - e_fd.add(atom_detail) - self.backend.atom_details[atom_detail.uuid] = atom_detail - else: - e_ad.merge(atom_detail, deep_copy=True) + with self._lock.write_lock(): + try: + atom_info = self._memory.atom_details[atom_detail.uuid] + return self._helper.construct( + self._helper.merge(atom_detail, saved_info=atom_info), + self._memory.atom_details) + except KeyError: + raise exc.NotFound("No atom details found with uuid '%s'" + % atom_detail.uuid) def update_flow_details(self, flow_detail): - try: - e_fd = self.backend.flow_details[flow_detail.uuid] - except KeyError: - raise exc.NotFound("No flow details found with id: %s" - % flow_detail.uuid) - e_fd.merge(flow_detail, deep_copy=True) - self._save_flowdetail_atoms(e_fd, flow_detail) - return e_fd + with self._lock.write_lock(): + try: + flow_info = self._memory.flow_details[flow_detail.uuid] + return self._helper.construct( + self._helper.merge(flow_detail, saved_info=flow_info), + self._memory.flow_details) + except KeyError: + raise exc.NotFound("No flow details found with uuid '%s'" + % flow_detail.uuid) def save_logbook(self, book): - # Get a existing logbook model (or create it if it isn't there). - try: - e_lb = self.backend.log_books[book.uuid] - except KeyError: - e_lb = logbook.LogBook(book.name, uuid=book.uuid) - self.backend.log_books[e_lb.uuid] = e_lb - - e_lb.merge(book, deep_copy=True) - # Add anything in to the new logbook that isn't already in the existing - # logbook. - for flow_detail in book: - try: - e_fd = self.backend.flow_details[flow_detail.uuid] - except KeyError: - e_fd = logbook.FlowDetail(flow_detail.name, flow_detail.uuid) - e_lb.add(e_fd) - self.backend.flow_details[e_fd.uuid] = e_fd - e_fd.merge(flow_detail, deep_copy=True) - self._save_flowdetail_atoms(e_fd, flow_detail) - return e_lb + with self._lock.write_lock(): + return self._helper.construct(self._helper.merge(book), + self._memory.log_books) def get_logbook(self, book_uuid): - try: - return self.backend.log_books[book_uuid] - except KeyError: - raise exc.NotFound("No logbook found with id: %s" % book_uuid) + with self._lock.read_lock(): + try: + return self._helper.construct(book_uuid, + self._memory.log_books) + except KeyError: + raise exc.NotFound("No logbook found with uuid '%s'" + % book_uuid) def get_logbooks(self): - for lb in list(six.itervalues(self.backend.log_books)): - yield lb + # Don't hold locks while iterating... + with self._lock.read_lock(): + book_uuids = set(six.iterkeys(self._memory.log_books)) + for book_uuid in book_uuids: + try: + with self._lock.read_lock(): + book = self._helper.construct(book_uuid, + self._memory.log_books) + yield book + except KeyError: + pass diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index ea6de4d00..f84f37c30 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -137,7 +137,7 @@ class LogBook(object): @classmethod def from_dict(cls, data, unmarshal_time=False): - """Translates the given data into an instance of this class.""" + """Translates the given dictionary into an instance of this class.""" if not unmarshal_time: unmarshal_fn = lambda x: x else: @@ -163,6 +163,17 @@ class LogBook(object): def __len__(self): return len(self._flowdetails_by_id) + def copy(self, retain_contents=True): + """Copies/clones this log book.""" + clone = copy.copy(self) + if not retain_contents: + clone._flowdetails_by_id = {} + else: + clone._flowdetails_by_id = self._flowdetails_by_id.copy() + if self.meta: + clone.meta = self.meta.copy() + return clone + class FlowDetail(object): """A container of atom details, a name and associated metadata. @@ -186,7 +197,7 @@ class FlowDetail(object): """Updates the objects state to be the same as the given one.""" if fd is self: return self - self._atomdetails_by_id = dict(fd._atomdetails_by_id) + self._atomdetails_by_id = fd._atomdetails_by_id self.state = fd.state self.meta = fd.meta return self @@ -206,6 +217,17 @@ class FlowDetail(object): self.state = fd.state return self + def copy(self, retain_contents=True): + """Copies/clones this flow detail.""" + clone = copy.copy(self) + if not retain_contents: + clone._atomdetails_by_id = {} + else: + clone._atomdetails_by_id = self._atomdetails_by_id.copy() + if self.meta: + clone.meta = self.meta.copy() + return clone + def to_dict(self): """Translates the internal state of this object to a dictionary. @@ -380,6 +402,7 @@ class AtomDetail(object): class TaskDetail(AtomDetail): """This class represents a task detail for flow task object.""" + def __init__(self, name, uuid): super(TaskDetail, self).__init__(name, uuid) @@ -410,6 +433,7 @@ class TaskDetail(AtomDetail): return self._to_dict_shared() def merge(self, other, deep_copy=False): + """Merges the current object state with the given ones state.""" if not isinstance(other, TaskDetail): raise exc.NotImplementedError("Can only merge with other" " task details") @@ -421,6 +445,16 @@ class TaskDetail(AtomDetail): self.results = copy_fn(other.results) return self + def copy(self): + """Copies/clones this task detail.""" + clone = copy.copy(self) + clone.results = copy.copy(self.results) + if self.meta: + clone.meta = self.meta.copy() + if self.version: + clone.version = copy.copy(self.version) + return clone + class RetryDetail(AtomDetail): """This class represents a retry detail for retry controller object.""" @@ -434,6 +468,24 @@ class RetryDetail(AtomDetail): self.state = state self.intention = states.EXECUTE + def copy(self): + """Copies/clones this retry detail.""" + clone = copy.copy(self) + results = [] + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + for (data, failures) in self.results: + copied_failures = {} + for (key, failure) in six.iteritems(failures): + copied_failures[key] = failure + results.append((data, copied_failures)) + clone.results = results + if self.meta: + clone.meta = self.meta.copy() + if self.version: + clone.version = copy.copy(self.version) + return clone + @property def last_results(self): try: @@ -496,6 +548,7 @@ class RetryDetail(AtomDetail): return base def merge(self, other, deep_copy=False): + """Merges the current object state with the given ones state.""" if not isinstance(other, RetryDetail): raise exc.NotImplementedError("Can only merge with other" " retry details") diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 544004351..095320103 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -559,7 +559,7 @@ class RetryTest(utils.EngineTestBase): # we execute retry engine.storage.save('flow-1_retry', 1) # task fails - fail = failure.Failure.from_exception(RuntimeError('foo')), + fail = failure.Failure.from_exception(RuntimeError('foo')) engine.storage.save('task1', fail, state=st.FAILURE) if when == 'task fails': return engine diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index b9d7a399f..87ff1bf6f 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -74,7 +74,7 @@ def _are_equal_exc_info_tuples(ei1, ei2): class Failure(object): - """Object that represents failure. + """An immutable object that represents failure. Failure objects encapsulate exception information so that they can be re-used later to re-raise, inspect, examine, log, print, serialize,