tests: Use @with_tempdir more

Change-Id: I33e71f6c201bb4f2cf3481afd40cf489eb1fcd1f
This commit is contained in:
Tim Burke 2024-04-30 13:51:27 -07:00
parent 0a3c46e6c4
commit b4dddb7406
1 changed files with 197 additions and 256 deletions

View File

@ -678,31 +678,28 @@ class TestUtils(unittest.TestCase):
with TemporaryFile('r') as f:
self.assertEqual([], list(utils.backward(f)))
def test_mkdirs(self):
testdir_base = mkdtemp()
@with_tempdir
def test_mkdirs(self, testdir_base):
testroot = os.path.join(testdir_base, 'mkdirs')
try:
self.assertTrue(not os.path.exists(testroot))
utils.mkdirs(testroot)
self.assertTrue(os.path.exists(testroot))
utils.mkdirs(testroot)
self.assertTrue(os.path.exists(testroot))
rmtree(testroot, ignore_errors=1)
self.assertTrue(not os.path.exists(testroot))
utils.mkdirs(testroot)
self.assertTrue(os.path.exists(testroot))
utils.mkdirs(testroot)
self.assertTrue(os.path.exists(testroot))
rmtree(testroot, ignore_errors=1)
testdir = os.path.join(testroot, 'one/two/three')
self.assertTrue(not os.path.exists(testdir))
utils.mkdirs(testdir)
self.assertTrue(os.path.exists(testdir))
utils.mkdirs(testdir)
self.assertTrue(os.path.exists(testdir))
rmtree(testroot, ignore_errors=1)
testdir = os.path.join(testroot, 'one/two/three')
self.assertTrue(not os.path.exists(testdir))
utils.mkdirs(testdir)
self.assertTrue(os.path.exists(testdir))
utils.mkdirs(testdir)
self.assertTrue(os.path.exists(testdir))
rmtree(testroot, ignore_errors=1)
open(testroot, 'wb').close()
self.assertTrue(not os.path.exists(testdir))
self.assertRaises(OSError, utils.mkdirs, testdir)
os.unlink(testroot)
finally:
rmtree(testdir_base)
open(testroot, 'wb').close()
self.assertTrue(not os.path.exists(testdir))
self.assertRaises(OSError, utils.mkdirs, testdir)
os.unlink(testroot)
def test_split_path(self):
# Test swift.common.utils.split_account_path
@ -928,94 +925,88 @@ class TestUtils(unittest.TestCase):
utils.sys.stdout = orig_stdout
utils.sys.stderr = orig_stderr
def test_dump_recon_cache(self):
testdir_base = mkdtemp()
@with_tempdir
def test_dump_recon_cache(self, testdir_base):
testcache_file = os.path.join(testdir_base, 'cache.recon')
logger = utils.get_logger(None, 'server', log_route='server')
try:
submit_dict = {'key0': 99,
'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(submit_dict, file_dict)
# Use a nested entry
submit_dict = {'key0': 101,
'key1': {'key2': {'value1': 1, 'value2': 2}}}
expect_dict = {'key0': 101,
'key1': {'key2': {'value1': 1, 'value2': 2},
'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# nested dict items are not sticky
submit_dict = {'key1': {'key2': {'value3': 3}}}
expect_dict = {'key0': 101,
'key1': {'key2': {'value3': 3},
'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# cached entries are sticky
submit_dict = {}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# nested dicts can be erased...
submit_dict = {'key1': {'key2': {}}}
expect_dict = {'key0': 101,
'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# ... and erasure is idempotent
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# top level dicts can be erased...
submit_dict = {'key1': {}}
expect_dict = {'key0': 101}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# ... and erasure is idempotent
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
finally:
rmtree(testdir_base)
submit_dict = {'key0': 99,
'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(submit_dict, file_dict)
# Use a nested entry
submit_dict = {'key0': 101,
'key1': {'key2': {'value1': 1, 'value2': 2}}}
expect_dict = {'key0': 101,
'key1': {'key2': {'value1': 1, 'value2': 2},
'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# nested dict items are not sticky
submit_dict = {'key1': {'key2': {'value3': 3}}}
expect_dict = {'key0': 101,
'key1': {'key2': {'value3': 3},
'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# cached entries are sticky
submit_dict = {}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# nested dicts can be erased...
submit_dict = {'key1': {'key2': {}}}
expect_dict = {'key0': 101,
'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# ... and erasure is idempotent
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# top level dicts can be erased...
submit_dict = {'key1': {}}
expect_dict = {'key0': 101}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
# ... and erasure is idempotent
utils.dump_recon_cache(submit_dict, testcache_file, logger)
with open(testcache_file) as fd:
file_dict = json.loads(fd.readline())
self.assertEqual(expect_dict, file_dict)
def test_dump_recon_cache_set_owner(self):
testdir_base = mkdtemp()
@with_tempdir
def test_dump_recon_cache_set_owner(self, testdir_base):
testcache_file = os.path.join(testdir_base, 'cache.recon')
logger = utils.get_logger(None, 'server', log_route='server')
try:
submit_dict = {'key1': {'value1': 1, 'value2': 2}}
submit_dict = {'key1': {'value1': 1, 'value2': 2}}
_ret = lambda: None
_ret.pw_uid = 100
_mock_getpwnam = MagicMock(return_value=_ret)
_mock_chown = mock.Mock()
_ret = lambda: None
_ret.pw_uid = 100
_mock_getpwnam = MagicMock(return_value=_ret)
_mock_chown = mock.Mock()
with patch('os.chown', _mock_chown), \
patch('pwd.getpwnam', _mock_getpwnam):
utils.dump_recon_cache(submit_dict, testcache_file,
logger, set_owner="swift")
with patch('os.chown', _mock_chown), \
patch('pwd.getpwnam', _mock_getpwnam):
utils.dump_recon_cache(submit_dict, testcache_file,
logger, set_owner="swift")
_mock_getpwnam.assert_called_once_with("swift")
self.assertEqual(_mock_chown.call_args[0][1], 100)
finally:
rmtree(testdir_base)
_mock_getpwnam.assert_called_once_with("swift")
self.assertEqual(_mock_chown.call_args[0][1], 100)
def test_dump_recon_cache_permission_denied(self):
testdir_base = mkdtemp()
@with_tempdir
def test_dump_recon_cache_permission_denied(self, testdir_base):
testcache_file = os.path.join(testdir_base, 'cache.recon')
class MockLogger(object):
@ -1027,15 +1018,12 @@ class TestUtils(unittest.TestCase):
self._excs.append(exc)
logger = MockLogger()
try:
submit_dict = {'key1': {'value1': 1, 'value2': 2}}
with mock.patch(
'swift.common.utils.NamedTemporaryFile',
side_effect=IOError(13, 'Permission Denied')):
utils.dump_recon_cache(submit_dict, testcache_file, logger)
self.assertIsInstance(logger._excs[0], IOError)
finally:
rmtree(testdir_base)
submit_dict = {'key1': {'value1': 1, 'value2': 2}}
with mock.patch(
'swift.common.utils.NamedTemporaryFile',
side_effect=IOError(13, 'Permission Denied')):
utils.dump_recon_cache(submit_dict, testcache_file, logger)
self.assertIsInstance(logger._excs[0], IOError)
def test_load_recon_cache(self):
stub_data = {'test': 'foo'}
@ -2957,65 +2945,51 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertTrue(timedout)
self.assertTrue(os.path.exists(nt.name))
def test_ismount_path_does_not_exist(self):
tmpdir = mkdtemp()
try:
self.assertFalse(utils.ismount(os.path.join(tmpdir, 'bar')))
finally:
shutil.rmtree(tmpdir)
@with_tempdir
def test_ismount_path_does_not_exist(self, tmpdir):
self.assertFalse(utils.ismount(os.path.join(tmpdir, 'bar')))
def test_ismount_path_not_mount(self):
tmpdir = mkdtemp()
try:
self.assertFalse(utils.ismount(tmpdir))
finally:
shutil.rmtree(tmpdir)
@with_tempdir
def test_ismount_path_not_mount(self, tmpdir):
self.assertFalse(utils.ismount(tmpdir))
def test_ismount_path_error(self):
@with_tempdir
def test_ismount_path_error(self, tmpdir):
def _mock_os_lstat(path):
raise OSError(13, "foo")
tmpdir = mkdtemp()
try:
with patch("os.lstat", _mock_os_lstat):
# Raises exception with _raw -- see next test.
utils.ismount(tmpdir)
finally:
shutil.rmtree(tmpdir)
with patch("os.lstat", _mock_os_lstat):
# Raises exception with _raw -- see next test.
utils.ismount(tmpdir)
def test_ismount_raw_path_error(self):
@with_tempdir
def test_ismount_raw_path_error(self, tmpdir):
def _mock_os_lstat(path):
raise OSError(13, "foo")
tmpdir = mkdtemp()
try:
with patch("os.lstat", _mock_os_lstat):
self.assertRaises(OSError, utils.ismount_raw, tmpdir)
finally:
shutil.rmtree(tmpdir)
with patch("os.lstat", _mock_os_lstat):
self.assertRaises(OSError, utils.ismount_raw, tmpdir)
def test_ismount_path_is_symlink(self):
tmpdir = mkdtemp()
try:
link = os.path.join(tmpdir, "tmp")
rdir = os.path.join(tmpdir, "realtmp")
os.mkdir(rdir)
os.symlink(rdir, link)
self.assertFalse(utils.ismount(link))
@with_tempdir
def test_ismount_path_is_symlink(self, tmpdir):
link = os.path.join(tmpdir, "tmp")
rdir = os.path.join(tmpdir, "realtmp")
os.mkdir(rdir)
os.symlink(rdir, link)
self.assertFalse(utils.ismount(link))
# Can add a stubfile to make it pass
with open(os.path.join(link, ".ismount"), "w"):
pass
self.assertTrue(utils.ismount(link))
finally:
shutil.rmtree(tmpdir)
# Can add a stubfile to make it pass
with open(os.path.join(link, ".ismount"), "w"):
pass
self.assertTrue(utils.ismount(link))
def test_ismount_path_is_root(self):
self.assertTrue(utils.ismount('/'))
def test_ismount_parent_path_error(self):
@with_tempdir
def test_ismount_parent_path_error(self, tmpdir):
_os_lstat = os.lstat
@ -3025,15 +2999,12 @@ cluster_dfw1 = http://dfw1.host/v1/
else:
return _os_lstat(path)
tmpdir = mkdtemp()
try:
with patch("os.lstat", _mock_os_lstat):
# Raises exception with _raw -- see next test.
utils.ismount(tmpdir)
finally:
shutil.rmtree(tmpdir)
with patch("os.lstat", _mock_os_lstat):
# Raises exception with _raw -- see next test.
utils.ismount(tmpdir)
def test_ismount_raw_parent_path_error(self):
@with_tempdir
def test_ismount_raw_parent_path_error(self, tmpdir):
_os_lstat = os.lstat
@ -3043,14 +3014,11 @@ cluster_dfw1 = http://dfw1.host/v1/
else:
return _os_lstat(path)
tmpdir = mkdtemp()
try:
with patch("os.lstat", _mock_os_lstat):
self.assertRaises(OSError, utils.ismount_raw, tmpdir)
finally:
shutil.rmtree(tmpdir)
with patch("os.lstat", _mock_os_lstat):
self.assertRaises(OSError, utils.ismount_raw, tmpdir)
def test_ismount_successes_dev(self):
@with_tempdir
def test_ismount_successes_dev(self, tmpdir):
_os_lstat = os.lstat
@ -3068,14 +3036,11 @@ cluster_dfw1 = http://dfw1.host/v1/
else:
return _os_lstat(path)
tmpdir = mkdtemp()
try:
with patch("os.lstat", _mock_os_lstat):
self.assertTrue(utils.ismount(tmpdir))
finally:
shutil.rmtree(tmpdir)
with patch("os.lstat", _mock_os_lstat):
self.assertTrue(utils.ismount(tmpdir))
def test_ismount_successes_ino(self):
@with_tempdir
def test_ismount_successes_ino(self, tmpdir):
_os_lstat = os.lstat
@ -3095,22 +3060,15 @@ cluster_dfw1 = http://dfw1.host/v1/
return MockStat(child.st_mode, parent.st_ino,
child.st_dev)
tmpdir = mkdtemp()
try:
with patch("os.lstat", _mock_os_lstat):
self.assertTrue(utils.ismount(tmpdir))
finally:
shutil.rmtree(tmpdir)
def test_ismount_successes_stubfile(self):
tmpdir = mkdtemp()
fname = os.path.join(tmpdir, ".ismount")
try:
with open(fname, "w") as stubfile:
stubfile.write("")
with patch("os.lstat", _mock_os_lstat):
self.assertTrue(utils.ismount(tmpdir))
finally:
shutil.rmtree(tmpdir)
@with_tempdir
def test_ismount_successes_stubfile(self, tmpdir):
fname = os.path.join(tmpdir, ".ismount")
with open(fname, "w") as stubfile:
stubfile.write("")
self.assertTrue(utils.ismount(tmpdir))
def test_parse_content_type(self):
self.assertEqual(utils.parse_content_type('text/plain'),
@ -3460,12 +3418,11 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertIsNone(utils.cache_from_env(env, True))
self.assertEqual(0, len(logger.get_lines_for_level('error')))
def test_fsync_dir(self):
@with_tempdir
def test_fsync_dir(self, tempdir):
tempdir = None
fd = None
try:
tempdir = mkdtemp()
fd, temppath = tempfile.mkstemp(dir=tempdir)
_mock_fsync = mock.Mock()
@ -3497,41 +3454,34 @@ cluster_dfw1 = http://dfw1.host/v1/
if fd is not None:
os.close(fd)
os.unlink(temppath)
if tempdir:
os.rmdir(tempdir)
def test_renamer_with_fsync_dir(self):
tempdir = None
try:
tempdir = mkdtemp()
# Simulate part of object path already existing
part_dir = os.path.join(tempdir, 'objects/1234/')
os.makedirs(part_dir)
obj_dir = os.path.join(part_dir, 'aaa', 'a' * 32)
obj_path = os.path.join(obj_dir, '1425276031.12345.data')
@with_tempdir
def test_renamer_with_fsync_dir(self, tempdir):
# Simulate part of object path already existing
part_dir = os.path.join(tempdir, 'objects/1234/')
os.makedirs(part_dir)
obj_dir = os.path.join(part_dir, 'aaa', 'a' * 32)
obj_path = os.path.join(obj_dir, '1425276031.12345.data')
# Object dir had to be created
_m_os_rename = mock.Mock()
_m_fsync_dir = mock.Mock()
with patch('os.rename', _m_os_rename):
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
utils.renamer("fake_path", obj_path)
_m_os_rename.assert_called_once_with('fake_path', obj_path)
# fsync_dir on parents of all newly create dirs
self.assertEqual(_m_fsync_dir.call_count, 3)
# Object dir had to be created
_m_os_rename = mock.Mock()
_m_fsync_dir = mock.Mock()
with patch('os.rename', _m_os_rename):
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
utils.renamer("fake_path", obj_path)
_m_os_rename.assert_called_once_with('fake_path', obj_path)
# fsync_dir on parents of all newly create dirs
self.assertEqual(_m_fsync_dir.call_count, 3)
# Object dir existed
_m_os_rename.reset_mock()
_m_fsync_dir.reset_mock()
with patch('os.rename', _m_os_rename):
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
utils.renamer("fake_path", obj_path)
_m_os_rename.assert_called_once_with('fake_path', obj_path)
# fsync_dir only on the leaf dir
self.assertEqual(_m_fsync_dir.call_count, 1)
finally:
if tempdir:
shutil.rmtree(tempdir)
# Object dir existed
_m_os_rename.reset_mock()
_m_fsync_dir.reset_mock()
with patch('os.rename', _m_os_rename):
with patch('swift.common.utils.fsync_dir', _m_fsync_dir):
utils.renamer("fake_path", obj_path)
_m_os_rename.assert_called_once_with('fake_path', obj_path)
# fsync_dir only on the leaf dir
self.assertEqual(_m_fsync_dir.call_count, 1)
def test_renamer_when_fsync_is_false(self):
_m_os_rename = mock.Mock()
@ -3546,26 +3496,20 @@ cluster_dfw1 = http://dfw1.host/v1/
_m_os_rename.assert_called_once_with('fake_path', "/a/b/c.data")
self.assertFalse(_m_fsync_dir.called)
def test_makedirs_count(self):
tempdir = None
fd = None
try:
tempdir = mkdtemp()
os.makedirs(os.path.join(tempdir, 'a/b'))
# 4 new dirs created
dirpath = os.path.join(tempdir, 'a/b/1/2/3/4')
ret = utils.makedirs_count(dirpath)
self.assertEqual(ret, 4)
# no new dirs created - dir already exists
ret = utils.makedirs_count(dirpath)
self.assertEqual(ret, 0)
# path exists and is a file
fd, temppath = tempfile.mkstemp(dir=dirpath)
os.close(fd)
self.assertRaises(OSError, utils.makedirs_count, temppath)
finally:
if tempdir:
shutil.rmtree(tempdir)
@with_tempdir
def test_makedirs_count(self, tempdir):
os.makedirs(os.path.join(tempdir, 'a/b'))
# 4 new dirs created
dirpath = os.path.join(tempdir, 'a/b/1/2/3/4')
ret = utils.makedirs_count(dirpath)
self.assertEqual(ret, 4)
# no new dirs created - dir already exists
ret = utils.makedirs_count(dirpath)
self.assertEqual(ret, 0)
# path exists and is a file
fd, temppath = tempfile.mkstemp(dir=dirpath)
os.close(fd)
self.assertRaises(OSError, utils.makedirs_count, temppath)
def test_find_namespace(self):
ts = utils.Timestamp.now().internal
@ -3669,8 +3613,8 @@ cluster_dfw1 = http://dfw1.host/v1/
'/path/to/hash.db', 'bad epoch')
@requires_o_tmpfile_support_in_tmp
def test_link_fd_to_path_linkat_success(self):
tempdir = mkdtemp()
@with_tempdir
def test_link_fd_to_path_linkat_success(self, tempdir):
fd = os.open(tempdir, utils.O_TMPFILE | os.O_WRONLY)
data = b"I'm whatever Gotham needs me to be"
_m_fsync_dir = mock.Mock()
@ -3686,11 +3630,10 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(_m_fsync_dir.call_count, 2)
finally:
os.close(fd)
shutil.rmtree(tempdir)
@requires_o_tmpfile_support_in_tmp
def test_link_fd_to_path_target_exists(self):
tempdir = mkdtemp()
@with_tempdir
def test_link_fd_to_path_target_exists(self, tempdir):
# Create and write to a file
fd, path = tempfile.mkstemp(dir=tempdir)
os.write(fd, b"hello world")
@ -3708,7 +3651,6 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(f.read(), b"bye world")
finally:
os.close(fd)
shutil.rmtree(tempdir)
def test_link_fd_to_path_errno_not_EEXIST_or_ENOENT(self):
_m_linkat = mock.Mock(
@ -3723,22 +3665,21 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertTrue(_m_linkat.called)
@requires_o_tmpfile_support_in_tmp
def test_linkat_race_dir_not_exists(self):
tempdir = mkdtemp()
@with_tempdir
def test_linkat_race_dir_not_exists(self, tempdir):
target_dir = os.path.join(tempdir, uuid4().hex)
target_path = os.path.join(target_dir, uuid4().hex)
os.mkdir(target_dir)
fd = os.open(target_dir, utils.O_TMPFILE | os.O_WRONLY)
# Simulating directory deletion by other backend process
os.rmdir(target_dir)
self.assertFalse(os.path.exists(target_dir))
try:
# Simulating directory deletion by other backend process
os.rmdir(target_dir)
self.assertFalse(os.path.exists(target_dir))
utils.link_fd_to_path(fd, target_path, 1)
self.assertTrue(os.path.exists(target_dir))
self.assertTrue(os.path.exists(target_path))
finally:
os.close(fd)
shutil.rmtree(tempdir)
def test_safe_json_loads(self):
expectations = {