Merge "Add in-memory backend delete() in recursive/non-recursive modes"

This commit is contained in:
Jenkins
2015-12-15 03:00:32 +00:00
committed by Gerrit Code Review
2 changed files with 65 additions and 9 deletions

View File

@@ -233,6 +233,32 @@ class FakeFilesystem(object):
for node in list(self._root.reverse_iter()):
node.disassociate()
def delete(self, path, recursive=False):
"""Deletes a node (optionally its children) from this filesystem."""
path = self.normpath(path)
node = self._fetch_node(path, normalized=True)
if node is self._root and not recursive:
raise ValueError("Can not delete '%s'" % self._root.item)
if recursive:
child_paths = (child.metadata['path'] for child in node.bfs_iter())
else:
node_child_count = node.child_count()
if node_child_count:
raise ValueError("Can not delete '%s', it has %s children"
% (path, node_child_count))
child_paths = []
if node is self._root:
# Don't drop/pop the root...
paths = child_paths
drop_nodes = []
else:
paths = itertools.chain([path], child_paths)
drop_nodes = [node]
for path in paths:
self._reverse_mapping.pop(path, None)
for node in drop_nodes:
node.disassociate()
def _iter_pieces(self, path, include_root=False):
if path == self._root.item:
# Check for this directly as the following doesn't work with
@@ -250,14 +276,7 @@ class FakeFilesystem(object):
yield piece
def __delitem__(self, path):
path = self.normpath(path)
node = self._fetch_node(path, normalized=True)
if node is self._root:
raise ValueError("Can not delete '%s'" % self._root.item)
child_gen = (child.metadata['path'] for child in node.bfs_iter())
for path in itertools.chain([path], child_gen):
self._reverse_mapping.pop(path, None)
node.disassociate()
self.delete(path, recursive=True)
@staticmethod
def _stringify_node(node):

View File

@@ -194,7 +194,44 @@ class MemoryFilesystemTest(test.TestCase):
def test_del_root_not_allowed(self):
fs = impl_memory.FakeFilesystem()
self.assertRaises(ValueError, self._del_item_path, fs, '/')
self.assertRaises(ValueError, fs.delete, "/", recursive=False)
def test_del_no_children_allowed(self):
fs = impl_memory.FakeFilesystem()
fs['/a'] = 'a'
self.assertEqual(1, len(fs.ls_r("/")))
fs.delete("/a")
self.assertEqual(0, len(fs.ls("/")))
def test_del_many_children_not_allowed(self):
fs = impl_memory.FakeFilesystem()
fs['/a'] = 'a'
fs['/a/b'] = 'b'
self.assertRaises(ValueError, fs.delete, "/", recursive=False)
def test_del_with_children_not_allowed(self):
fs = impl_memory.FakeFilesystem()
fs['/a'] = 'a'
fs['/a/b'] = 'b'
self.assertRaises(ValueError, fs.delete, "/a", recursive=False)
def test_del_many_children_allowed(self):
fs = impl_memory.FakeFilesystem()
fs['/a'] = 'a'
fs['/a/b'] = 'b'
self.assertEqual(2, len(fs.ls_r("/")))
fs.delete("/a", recursive=True)
self.assertEqual(0, len(fs.ls("/")))
def test_del_many_children_allowed_not_recursive(self):
fs = impl_memory.FakeFilesystem()
fs['/a'] = 'a'
fs['/a/b'] = 'b'
self.assertEqual(2, len(fs.ls_r("/")))
fs.delete("/a/b", recursive=False)
self.assertEqual(1, len(fs.ls("/")))
fs.delete("/a", recursive=False)
self.assertEqual(0, len(fs.ls("/")))
def test_link_loop_raises(self):
fs = impl_memory.FakeFilesystem()