diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 7fc22e6d..0f5e22b4 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -25,11 +25,125 @@ from taskflow.types import tree from taskflow.utils import lock_utils +class Filesystem(object): + """An in-memory tree filesystem-like structure.""" + + @staticmethod + def _normpath(path): + if not path.startswith(os.sep): + raise ValueError("This filesystem can only normalize absolute" + " paths: '%s' is not valid" % path) + return os.path.normpath(path) + + def __init__(self): + self._root = tree.Node(os.sep) + + def ensure_path(self, path): + path = self._normpath(path) + # Ignore the root path as we already checked for that; and it + # will always exist/can't be removed anyway... + if path == self._root.item: + return + node = self._root + for piece in self._iter_pieces(path): + child_node = node.find(piece, only_direct=True, + include_self=False) + if child_node is None: + child_node = tree.Node(piece) + node.add(child_node) + node = child_node + + def _fetch_node(self, path): + node = self._root + path = self._normpath(path) + if path == self._root.item: + return node + for piece in self._iter_pieces(path): + node = node.find(piece, only_direct=True, + include_self=False) + if node is None: + raise exc.NotFound("Item not found %s" % path) + return node + + def _get_item(self, path, links=None): + node = self._fetch_node(path) + if 'target' in node.metadata: + # Follow the link (and watch out for loops)... + path = node.metadata['target'] + if links is None: + links = [] + if path in links: + raise ValueError("Recursive link following not" + " allowed (loop %s detected)" + % (links + [path])) + else: + links.append(path) + return self._get_item(path, links=links) + else: + return copy.deepcopy(node.metadata['value']) + + def ls(self, path): + return [node.item for node in self._fetch_node(path)] + + def _iter_pieces(self, path, include_root=False): + if path == self._root.item: + # Check for this directly as the following doesn't work with + # split correctly: + # + # >>> path = "/" + # path.split(os.sep) + # ['', ''] + parts = [] + else: + parts = path.split(os.sep)[1:] + if include_root: + parts.insert(0, self._root.item) + for piece in parts: + yield piece + + def __delitem__(self, path): + node = self._fetch_node(path) + if node is self._root: + raise ValueError("Can not delete '%s'" % self._root.item) + node.disassociate() + + def pformat(self): + return self._root.pformat() + + def symlink(self, src_path, dest_path): + dest_path = self._normpath(dest_path) + src_path = self._normpath(src_path) + dirname, basename = os.path.split(dest_path) + parent_node = self._fetch_node(dirname) + child_node = parent_node.find(basename, + only_direct=True, + include_self=False) + if child_node is None: + child_node = tree.Node(basename) + parent_node.add(child_node) + child_node.metadata['target'] = src_path + + def __getitem__(self, path): + return self._get_item(path) + + def __setitem__(self, path, value): + path = self._normpath(path) + value = copy.deepcopy(value) + try: + item_node = self._fetch_node(path) + item_node.metadata.update(value=value) + except exc.NotFound: + dirname, basename = os.path.split(path) + parent_node = self._fetch_node(dirname) + parent_node.add(tree.Node(basename, value=value)) + + class MemoryBackend(path_based.PathBasedBackend): """A in-memory (non-persistent) backend. - This backend writes logbooks, flow details, and atom details to in-memory - dictionaries and retrieves from those dictionaries as needed. + This backend writes logbooks, flow details, and atom details to a + in-memory filesystem-like structure (rooted by the ``memory`` + instance variable). This backend does *not* provide true transactional semantics. It does guarantee that there will be no inter-thread race conditions when @@ -39,7 +153,7 @@ class MemoryBackend(path_based.PathBasedBackend): super(MemoryBackend, self).__init__(conf) if self._path is None: self._path = os.sep - self.memory = tree.Node(self._path) + self.memory = Filesystem() self.lock = lock_utils.ReaderWriterLock() def get_connection(self): @@ -60,7 +174,6 @@ class Connection(path_based.PathBasedConnection): lock = self.backend.lock.write_lock else: lock = self.backend.lock.read_lock - with lock(): try: yield @@ -69,53 +182,29 @@ class Connection(path_based.PathBasedConnection): except Exception as e: raise exc.StorageFailure("Storage backend internal error", e) - def _fetch_node(self, path): - node = self.backend.memory.find(path) - if node is None: - raise exc.NotFound("Item not found %s" % path) - return node - def _join_path(self, *parts): return os.path.join(*parts) def _get_item(self, path): with self._memory_lock(): - return copy.deepcopy(self._fetch_node(path).metadata['value']) + return self.backend.memory[path] def _set_item(self, path, value, transaction): - value = copy.deepcopy(value) - try: - item_node = self._fetch_node(path) - item_node.metadata.update(value=value) - except exc.NotFound: - dirname, basename = os.path.split(path) - parent_node = self._fetch_node(dirname) - parent_node.add(tree.Node(path, name=basename, value=value)) + self.backend.memory[path] = value def _del_tree(self, path, transaction): - node = self._fetch_node(path) - node.disassociate() + del self.backend.memory[path] def _get_children(self, path): with self._memory_lock(): - return [node.metadata['name'] for node in self._fetch_node(path)] + return self.backend.memory.ls(path) def _ensure_path(self, path): with self._memory_lock(write=True): - path = os.path.normpath(path) - parts = path.split(os.sep) - node = self.backend.memory - for p in range(len(parts) - 1): - node_path = os.sep.join(parts[:p + 2]) - try: - node = self._fetch_node(node_path) - except exc.NotFound: - node.add(tree.Node(node_path, name=parts[p + 1])) + self.backend.memory.ensure_path(path) def _create_link(self, src_path, dest_path, transaction): - dirname, basename = os.path.split(dest_path) - parent_node = self._fetch_node(dirname) - parent_node.add(tree.Node(dest_path, name=basename, target=src_path)) + self.backend.memory.symlink(src_path, dest_path) @contextlib.contextmanager def _transaction(self):