Merge "relinker: make cleanup checks more robust"
This commit is contained in:
@@ -22,8 +22,6 @@ import logging
|
|||||||
import os
|
import os
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
|
|
||||||
DiskFileQuarantined
|
|
||||||
from swift.common.utils import replace_partition_in_path, config_true_value, \
|
from swift.common.utils import replace_partition_in_path, config_true_value, \
|
||||||
audit_location_generator, get_logger, readconf, drop_privileges, \
|
audit_location_generator, get_logger, readconf, drop_privileges, \
|
||||||
RateLimitedIterator, lock_path
|
RateLimitedIterator, lock_path
|
||||||
@@ -215,8 +213,9 @@ def hook_post_partition(logger, states, step, policy, diskfile_manager,
|
|||||||
def hashes_filter(next_part_power, suff_path, hashes):
|
def hashes_filter(next_part_power, suff_path, hashes):
|
||||||
hashes = list(hashes)
|
hashes = list(hashes)
|
||||||
for hsh in hashes:
|
for hsh in hashes:
|
||||||
fname = os.path.join(suff_path, hsh, 'fake-file-name')
|
fname = os.path.join(suff_path, hsh)
|
||||||
if replace_partition_in_path(fname, next_part_power) == fname:
|
if fname == replace_partition_in_path(
|
||||||
|
fname, next_part_power, is_hash_dir=True):
|
||||||
hashes.remove(hsh)
|
hashes.remove(hsh)
|
||||||
return hashes
|
return hashes
|
||||||
|
|
||||||
@@ -363,79 +362,109 @@ def cleanup(conf, logger, device):
|
|||||||
partitions_filter=cleanup_partition_filter,
|
partitions_filter=cleanup_partition_filter,
|
||||||
hook_post_partition=cleanup_hook_post_partition,
|
hook_post_partition=cleanup_hook_post_partition,
|
||||||
hashes_filter=cleanup_hashes_filter,
|
hashes_filter=cleanup_hashes_filter,
|
||||||
logger=logger, error_counter=error_counter)
|
logger=logger,
|
||||||
|
error_counter=error_counter,
|
||||||
|
yield_hash_dirs=True)
|
||||||
if conf['files_per_second'] > 0:
|
if conf['files_per_second'] > 0:
|
||||||
locations = RateLimitedIterator(
|
locations = RateLimitedIterator(
|
||||||
locations, conf['files_per_second'])
|
locations, conf['files_per_second'])
|
||||||
for fname, device, partition in locations:
|
|
||||||
# Relinking will take a while; we'll likely have some tombstones
|
for hash_path, device, partition in locations:
|
||||||
# transition to being reapable during the process. When we open
|
# Compare the contents of each hash dir with contents of same hash
|
||||||
# them in the new partition space, they'll get cleaned up and
|
# dir in its new partition to verify that the new location has the
|
||||||
# raise DiskFileNotExist. Without replicators running, this is
|
# most up to date set of files. The new location may have newer
|
||||||
# likely the first opportunity for clean-up. To avoid a false-
|
# files if it has been updated since relinked.
|
||||||
# positive error below, open up in the old space *first* -- if
|
new_hash_path = replace_partition_in_path(
|
||||||
# that raises DiskFileNotExist, ignore it and move on.
|
hash_path, part_power, is_hash_dir=True)
|
||||||
loc = diskfile.AuditLocation(
|
|
||||||
os.path.dirname(fname), device, partition, policy)
|
if new_hash_path == hash_path:
|
||||||
df = diskfile_mgr.get_diskfile_from_audit_location(loc)
|
|
||||||
try:
|
|
||||||
with df.open():
|
|
||||||
pass
|
|
||||||
except DiskFileQuarantined as exc:
|
|
||||||
logger.warning('ERROR Object %(obj)s failed audit and was'
|
|
||||||
' quarantined: %(err)r',
|
|
||||||
{'obj': loc, 'err': exc})
|
|
||||||
errors += 1
|
|
||||||
continue
|
|
||||||
except DiskFileNotExist:
|
|
||||||
logger.debug('Found reapable on-disk file: %s', fname)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
expected_fname = replace_partition_in_path(fname, part_power)
|
# Get on disk data for new and old locations, cleaning up any
|
||||||
if fname == expected_fname:
|
# reclaimable or obsolete files in each. The new location is
|
||||||
continue
|
# cleaned up *before* the old location to prevent false negatives
|
||||||
# Make sure there is a valid object file in the expected new
|
# where the old still has a file that has been cleaned up in the
|
||||||
# location. Note that this could be newer than the original one
|
# new; cleaning up the new location first ensures that the old will
|
||||||
# (which happens if there is another PUT after partition power
|
# always be 'cleaner' than the new.
|
||||||
# has been increased, but cleanup did not yet run)
|
new_df_data = diskfile_mgr.cleanup_ondisk_files(new_hash_path)
|
||||||
loc = diskfile.AuditLocation(
|
old_df_data = diskfile_mgr.cleanup_ondisk_files(hash_path)
|
||||||
os.path.dirname(expected_fname), device, partition, policy)
|
# Now determine the most up to date set of on disk files would be
|
||||||
df = diskfile_mgr.get_diskfile_from_audit_location(loc)
|
# given the content of old and new locations...
|
||||||
try:
|
new_files = set(new_df_data['files'])
|
||||||
with df.open():
|
old_files = set(old_df_data['files'])
|
||||||
pass
|
union_files = new_files.union(old_files)
|
||||||
except DiskFileQuarantined as exc:
|
union_data = diskfile_mgr.get_ondisk_files(
|
||||||
logger.warning('ERROR Object %(obj)s failed audit and was'
|
union_files, '', verify=False)
|
||||||
' quarantined: %(err)r',
|
obsolete_files = set(info['filename']
|
||||||
{'obj': loc, 'err': exc})
|
for info in union_data.get('obsolete', []))
|
||||||
errors += 1
|
required_files = union_files.difference(obsolete_files)
|
||||||
continue
|
required_links = required_files.intersection(old_files)
|
||||||
except DiskFileDeleted:
|
|
||||||
pass
|
missing_links = 0
|
||||||
except DiskFileNotExist as exc:
|
created_links = 0
|
||||||
err = False
|
for filename in required_links:
|
||||||
if policy.policy_type == 'erasure_coding':
|
# Before removing old files, be sure that the corresponding
|
||||||
# Might be a non-durable fragment - check that there is
|
# required new files exist by calling relink_paths again. There
|
||||||
# a fragment in the new path. Will be fixed by the
|
# are several possible outcomes:
|
||||||
# reconstructor then
|
# - The common case is that the new file exists, in which case
|
||||||
if not os.path.isfile(expected_fname):
|
# relink_paths checks that the new file has the same inode
|
||||||
err = True
|
# as the old file. An exception is raised if the inode of
|
||||||
else:
|
# the new file is not the same as the old file.
|
||||||
err = True
|
# - The new file may not exist because the relinker failed to
|
||||||
if err:
|
# create the link to the old file and has erroneously moved
|
||||||
|
# on to cleanup. In this case the relink_paths will create
|
||||||
|
# the link now or raise an exception if that fails.
|
||||||
|
# - The new file may not exist because some other process,
|
||||||
|
# such as an object server handling a request, has cleaned
|
||||||
|
# it up since we called cleanup_ondisk_files(new_hash_path).
|
||||||
|
# In this case a new link will be created to the old file.
|
||||||
|
# This is unnecessary but simpler than repeating the
|
||||||
|
# evaluation of what links are now required and safer than
|
||||||
|
# assuming that a non-existent file that *was* required is
|
||||||
|
# no longer required. The new file will eventually be
|
||||||
|
# cleaned up again.
|
||||||
|
old_file = os.path.join(hash_path, filename)
|
||||||
|
new_file = os.path.join(new_hash_path, filename)
|
||||||
|
try:
|
||||||
|
if diskfile.relink_paths(old_file, new_file):
|
||||||
|
logger.debug(
|
||||||
|
"Relinking (cleanup) created link: %s to %s",
|
||||||
|
old_file, new_file)
|
||||||
|
created_links += 1
|
||||||
|
except OSError as exc:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
'Error cleaning up %s: %r', fname, exc)
|
"Error relinking (cleanup): failed to relink %s to "
|
||||||
|
"%s: %s", old_file, new_file, exc)
|
||||||
errors += 1
|
errors += 1
|
||||||
continue
|
missing_links += 1
|
||||||
|
if created_links:
|
||||||
|
diskfile.invalidate_hash(os.path.dirname(new_hash_path))
|
||||||
|
if missing_links:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# the new partition hash dir has the most up to date set of on
|
||||||
|
# disk files so it is safe to delete the old location...
|
||||||
|
rehash = False
|
||||||
try:
|
try:
|
||||||
os.remove(fname)
|
for filename in old_files:
|
||||||
cleaned_up += 1
|
os.remove(os.path.join(hash_path, filename))
|
||||||
logger.debug("Removed %s", fname)
|
rehash = True
|
||||||
suffix_dir = os.path.dirname(os.path.dirname(fname))
|
|
||||||
diskfile.invalidate_hash(suffix_dir)
|
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
logger.warning('Error cleaning up %s: %r', fname, exc)
|
logger.warning('Error cleaning up %s: %r', hash_path, exc)
|
||||||
errors += 1
|
errors += 1
|
||||||
|
else:
|
||||||
|
cleaned_up += 1
|
||||||
|
logger.debug("Removed %s", hash_path)
|
||||||
|
|
||||||
|
if rehash:
|
||||||
|
try:
|
||||||
|
diskfile.invalidate_hash(os.path.dirname(hash_path))
|
||||||
|
except Exception as exc:
|
||||||
|
# note: not counted as an error
|
||||||
|
logger.warning(
|
||||||
|
'Error invalidating suffix for %s: %r',
|
||||||
|
hash_path, exc)
|
||||||
|
|
||||||
return determine_exit_code(
|
return determine_exit_code(
|
||||||
logger=logger,
|
logger=logger,
|
||||||
found_policy=found_policy,
|
found_policy=found_policy,
|
||||||
|
|||||||
@@ -3224,7 +3224,7 @@ def audit_location_generator(devices, datadir, suffix='',
|
|||||||
hook_pre_partition=None, hook_post_partition=None,
|
hook_pre_partition=None, hook_post_partition=None,
|
||||||
hook_pre_suffix=None, hook_post_suffix=None,
|
hook_pre_suffix=None, hook_post_suffix=None,
|
||||||
hook_pre_hash=None, hook_post_hash=None,
|
hook_pre_hash=None, hook_post_hash=None,
|
||||||
error_counter=None):
|
error_counter=None, yield_hash_dirs=False):
|
||||||
"""
|
"""
|
||||||
Given a devices path and a data directory, yield (path, device,
|
Given a devices path and a data directory, yield (path, device,
|
||||||
partition) for all files in that directory
|
partition) for all files in that directory
|
||||||
@@ -3243,6 +3243,7 @@ def audit_location_generator(devices, datadir, suffix='',
|
|||||||
one of the DATADIR constants defined in the account,
|
one of the DATADIR constants defined in the account,
|
||||||
container, and object servers.
|
container, and object servers.
|
||||||
:param suffix: path name suffix required for all names returned
|
:param suffix: path name suffix required for all names returned
|
||||||
|
(ignored if yield_hash_dirs is True)
|
||||||
:param mount_check: Flag to check if a mount check should be performed
|
:param mount_check: Flag to check if a mount check should be performed
|
||||||
on devices
|
on devices
|
||||||
:param logger: a logger object
|
:param logger: a logger object
|
||||||
@@ -3264,6 +3265,8 @@ def audit_location_generator(devices, datadir, suffix='',
|
|||||||
:param hook_post_hash: a callable taking hash_path as parameter
|
:param hook_post_hash: a callable taking hash_path as parameter
|
||||||
:param error_counter: a dictionary used to accumulate error counts; may
|
:param error_counter: a dictionary used to accumulate error counts; may
|
||||||
add keys 'unmounted' and 'unlistable_partitions'
|
add keys 'unmounted' and 'unlistable_partitions'
|
||||||
|
:param yield_hash_dirs: if True, yield hash dirs instead of individual
|
||||||
|
files
|
||||||
"""
|
"""
|
||||||
device_dir = listdir(devices)
|
device_dir = listdir(devices)
|
||||||
# randomize devices in case of process restart before sweep completed
|
# randomize devices in case of process restart before sweep completed
|
||||||
@@ -3323,17 +3326,21 @@ def audit_location_generator(devices, datadir, suffix='',
|
|||||||
hash_path = os.path.join(suff_path, hsh)
|
hash_path = os.path.join(suff_path, hsh)
|
||||||
if hook_pre_hash:
|
if hook_pre_hash:
|
||||||
hook_pre_hash(hash_path)
|
hook_pre_hash(hash_path)
|
||||||
try:
|
if yield_hash_dirs:
|
||||||
files = sorted(listdir(hash_path), reverse=True)
|
if os.path.isdir(hash_path):
|
||||||
except OSError as e:
|
yield hash_path, device, partition
|
||||||
if e.errno != errno.ENOTDIR:
|
else:
|
||||||
raise
|
try:
|
||||||
continue
|
files = sorted(listdir(hash_path), reverse=True)
|
||||||
for fname in files:
|
except OSError as e:
|
||||||
if suffix and not fname.endswith(suffix):
|
if e.errno != errno.ENOTDIR:
|
||||||
|
raise
|
||||||
continue
|
continue
|
||||||
path = os.path.join(hash_path, fname)
|
for fname in files:
|
||||||
yield path, device, partition
|
if suffix and not fname.endswith(suffix):
|
||||||
|
continue
|
||||||
|
path = os.path.join(hash_path, fname)
|
||||||
|
yield path, device, partition
|
||||||
if hook_post_hash:
|
if hook_post_hash:
|
||||||
hook_post_hash(hash_path)
|
hook_post_hash(hash_path)
|
||||||
if hook_post_suffix:
|
if hook_post_suffix:
|
||||||
@@ -5799,19 +5806,21 @@ def get_partition_for_hash(hex_hash, part_power):
|
|||||||
return struct.unpack_from('>I', raw_hash)[0] >> part_shift
|
return struct.unpack_from('>I', raw_hash)[0] >> part_shift
|
||||||
|
|
||||||
|
|
||||||
def replace_partition_in_path(path, part_power):
|
def replace_partition_in_path(path, part_power, is_hash_dir=False):
|
||||||
"""
|
"""
|
||||||
Takes a full path to a file and a partition power and returns
|
Takes a path and a partition power and returns the same path, but with the
|
||||||
the same path, but with the correct partition number. Most useful when
|
correct partition number. Most useful when increasing the partition power.
|
||||||
increasing the partition power.
|
|
||||||
|
|
||||||
:param path: full path to a file, for example object .data file
|
:param path: full path to a file, for example object .data file
|
||||||
:param part_power: partition power to compute correct partition number
|
:param part_power: partition power to compute correct partition number
|
||||||
|
:param is_hash_dir: if True then ``path`` is the path to a hash dir,
|
||||||
|
otherwise ``path`` is the path to a file in a hash dir.
|
||||||
:returns: Path with re-computed partition power
|
:returns: Path with re-computed partition power
|
||||||
"""
|
"""
|
||||||
path_components = path.split(os.sep)
|
path_components = path.split(os.sep)
|
||||||
part = get_partition_for_hash(path_components[-2], part_power)
|
part = get_partition_for_hash(path_components[-1 if is_hash_dir else -2],
|
||||||
path_components[-4] = "%d" % part
|
part_power)
|
||||||
|
path_components[-3 if is_hash_dir else -4] = "%d" % part
|
||||||
return os.sep.join(path_components)
|
return os.sep.join(path_components)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -935,7 +935,9 @@ class BaseDiskFileManager(object):
|
|||||||
valid fileset, or None.
|
valid fileset, or None.
|
||||||
|
|
||||||
:param files: a list of file names.
|
:param files: a list of file names.
|
||||||
:param datadir: directory name files are from.
|
:param datadir: directory name files are from; this is used to
|
||||||
|
construct file paths in the results, but the datadir is
|
||||||
|
not modified by this method.
|
||||||
:param verify: if True verify that the ondisk file contract has not
|
:param verify: if True verify that the ondisk file contract has not
|
||||||
been violated, otherwise do not verify.
|
been violated, otherwise do not verify.
|
||||||
:param policy: storage policy used to store the files. Used to
|
:param policy: storage policy used to store the files. Used to
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ import uuid
|
|||||||
from six.moves import cStringIO as StringIO
|
from six.moves import cStringIO as StringIO
|
||||||
|
|
||||||
from swift.cli import relinker
|
from swift.cli import relinker
|
||||||
from swift.common import exceptions, ring, utils
|
from swift.common import ring, utils
|
||||||
from swift.common import storage_policy
|
from swift.common import storage_policy
|
||||||
from swift.common.exceptions import PathNotDir
|
from swift.common.exceptions import PathNotDir
|
||||||
from swift.common.storage_policy import (
|
from swift.common.storage_policy import (
|
||||||
@@ -81,7 +81,7 @@ class TestRelinker(unittest.TestCase):
|
|||||||
digest = binascii.unhexlify(self._hash)
|
digest = binascii.unhexlify(self._hash)
|
||||||
self.part = struct.unpack_from('>I', digest)[0] >> 24
|
self.part = struct.unpack_from('>I', digest)[0] >> 24
|
||||||
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
|
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
|
||||||
path = os.path.join(os.path.sep, account, container, obj)
|
self.obj_path = os.path.join(os.path.sep, account, container, obj)
|
||||||
# There's 1/512 chance that both old and new parts will be 0;
|
# There's 1/512 chance that both old and new parts will be 0;
|
||||||
# that's not a terribly interesting case, as there's nothing to do
|
# that's not a terribly interesting case, as there's nothing to do
|
||||||
attempts.append((self.part, self.next_part, 2**PART_POWER))
|
attempts.append((self.part, self.next_part, 2**PART_POWER))
|
||||||
@@ -97,21 +97,23 @@ class TestRelinker(unittest.TestCase):
|
|||||||
self.objdir = os.path.join(
|
self.objdir = os.path.join(
|
||||||
self.objects, str(self.part), self._hash[-3:], self._hash)
|
self.objects, str(self.part), self._hash[-3:], self._hash)
|
||||||
os.makedirs(self.objdir)
|
os.makedirs(self.objdir)
|
||||||
self.object_fname = utils.Timestamp.now().internal + ".data"
|
self.obj_ts = utils.Timestamp.now()
|
||||||
|
self.object_fname = self.obj_ts.internal + ".data"
|
||||||
|
|
||||||
self.objname = os.path.join(self.objdir, self.object_fname)
|
self.objname = os.path.join(self.objdir, self.object_fname)
|
||||||
with open(self.objname, "wb") as dummy:
|
with open(self.objname, "wb") as dummy:
|
||||||
dummy.write(b"Hello World!")
|
dummy.write(b"Hello World!")
|
||||||
write_metadata(dummy, {'name': path, 'Content-Length': '12'})
|
write_metadata(dummy,
|
||||||
|
{'name': self.obj_path, 'Content-Length': '12'})
|
||||||
|
|
||||||
self.policy = StoragePolicy(0, 'platinum', True)
|
self.policy = StoragePolicy(0, 'platinum', True)
|
||||||
storage_policy._POLICIES = StoragePolicyCollection([self.policy])
|
storage_policy._POLICIES = StoragePolicyCollection([self.policy])
|
||||||
|
|
||||||
self.part_dir = os.path.join(self.objects, str(self.part))
|
self.part_dir = os.path.join(self.objects, str(self.part))
|
||||||
self.suffix_dir = os.path.join(self.part_dir, self._hash[-3:])
|
self.suffix = self._hash[-3:]
|
||||||
|
self.suffix_dir = os.path.join(self.part_dir, self.suffix)
|
||||||
self.next_part_dir = os.path.join(self.objects, str(self.next_part))
|
self.next_part_dir = os.path.join(self.objects, str(self.next_part))
|
||||||
self.next_suffix_dir = os.path.join(
|
self.next_suffix_dir = os.path.join(self.next_part_dir, self.suffix)
|
||||||
self.next_part_dir, self._hash[-3:])
|
|
||||||
self.expected_dir = os.path.join(self.next_suffix_dir, self._hash)
|
self.expected_dir = os.path.join(self.next_suffix_dir, self._hash)
|
||||||
self.expected_file = os.path.join(self.expected_dir, self.object_fname)
|
self.expected_file = os.path.join(self.expected_dir, self.object_fname)
|
||||||
|
|
||||||
@@ -599,7 +601,7 @@ class TestRelinker(unittest.TestCase):
|
|||||||
# partition!
|
# partition!
|
||||||
self._setup_object(lambda part: part < 2 ** (PART_POWER - 1))
|
self._setup_object(lambda part: part < 2 ** (PART_POWER - 1))
|
||||||
with mock.patch('swift.cli.relinker.replace_partition_in_path',
|
with mock.patch('swift.cli.relinker.replace_partition_in_path',
|
||||||
lambda *args: args[0]):
|
lambda *args, **kwargs: args[0]):
|
||||||
self.assertEqual(0, relinker.main([
|
self.assertEqual(0, relinker.main([
|
||||||
'cleanup',
|
'cleanup',
|
||||||
'--swift-dir', self.testdir,
|
'--swift-dir', self.testdir,
|
||||||
@@ -611,11 +613,11 @@ class TestRelinker(unittest.TestCase):
|
|||||||
def test_cleanup_second_quartile_no_rehash(self):
|
def test_cleanup_second_quartile_no_rehash(self):
|
||||||
# we need a part in upper half of current part power
|
# we need a part in upper half of current part power
|
||||||
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
||||||
self.assertGreater(self.part, 2 ** (PART_POWER - 1))
|
self.assertGreaterEqual(self.part, 2 ** (PART_POWER - 1))
|
||||||
self._common_test_cleanup()
|
self._common_test_cleanup()
|
||||||
|
|
||||||
def fake_hash_suffix(suffix_dir, policy):
|
def fake_hash_suffix(suffix_dir, policy):
|
||||||
# check that the suffix dir is empty and remove it just like the
|
# check that the hash dir is empty and remove it just like the
|
||||||
# real _hash_suffix
|
# real _hash_suffix
|
||||||
self.assertEqual([self._hash], os.listdir(suffix_dir))
|
self.assertEqual([self._hash], os.listdir(suffix_dir))
|
||||||
hash_dir = os.path.join(suffix_dir, self._hash)
|
hash_dir = os.path.join(suffix_dir, self._hash)
|
||||||
@@ -984,21 +986,120 @@ class TestRelinker(unittest.TestCase):
|
|||||||
os.close(locks[0]) # Release the lock
|
os.close(locks[0]) # Release the lock
|
||||||
|
|
||||||
def test_cleanup_not_yet_relinked(self):
|
def test_cleanup_not_yet_relinked(self):
|
||||||
|
# force rehash of new partition to not happen during cleanup
|
||||||
|
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
||||||
self._common_test_cleanup(relink=False)
|
self._common_test_cleanup(relink=False)
|
||||||
self.assertEqual(1, relinker.main([
|
with mock.patch.object(relinker.logging, 'getLogger',
|
||||||
'cleanup',
|
return_value=self.logger):
|
||||||
'--swift-dir', self.testdir,
|
self.assertEqual(0, relinker.main([
|
||||||
'--devices', self.devices,
|
'cleanup',
|
||||||
'--skip-mount',
|
'--swift-dir', self.testdir,
|
||||||
]))
|
'--devices', self.devices,
|
||||||
|
'--skip-mount',
|
||||||
|
]))
|
||||||
|
|
||||||
self.assertTrue(os.path.isfile(
|
self.assertFalse(os.path.isfile(self.objname)) # old file removed
|
||||||
os.path.join(self.objdir, self.object_fname)))
|
self.assertTrue(os.path.isfile(self.expected_file)) # link created
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||||
|
self.assertIn(
|
||||||
|
'Relinking (cleanup) created link: %s to %s'
|
||||||
|
% (self.objname, self.expected_file),
|
||||||
|
self.logger.get_lines_for_level('debug'))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||||
|
# old partition should be cleaned up
|
||||||
|
self.assertFalse(os.path.exists(self.part_dir))
|
||||||
|
# suffix should be invalidated in new partition
|
||||||
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
|
with open(hashes_invalid, 'r') as fd:
|
||||||
|
self.assertEqual(str(self.suffix), fd.read().strip())
|
||||||
|
|
||||||
|
def test_cleanup_same_object_different_inode_in_new_partition(self):
|
||||||
|
# force rehash of new partition to not happen during cleanup
|
||||||
|
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
||||||
|
self._common_test_cleanup(relink=False)
|
||||||
|
# new file in the new partition but different inode
|
||||||
|
os.makedirs(self.expected_dir)
|
||||||
|
with open(self.expected_file, 'w') as fd:
|
||||||
|
fd.write('same but different')
|
||||||
|
|
||||||
|
with mock.patch.object(relinker.logging, 'getLogger',
|
||||||
|
return_value=self.logger):
|
||||||
|
res = relinker.main([
|
||||||
|
'cleanup',
|
||||||
|
'--swift-dir', self.testdir,
|
||||||
|
'--devices', self.devices,
|
||||||
|
'--skip-mount',
|
||||||
|
])
|
||||||
|
|
||||||
|
self.assertEqual(1, res)
|
||||||
|
self.assertTrue(os.path.isfile(self.objname))
|
||||||
|
with open(self.objname, 'r') as fd:
|
||||||
|
self.assertEqual('Hello World!', fd.read())
|
||||||
|
self.assertTrue(os.path.isfile(self.expected_file))
|
||||||
|
with open(self.expected_file, 'r') as fd:
|
||||||
|
self.assertEqual('same but different', fd.read())
|
||||||
|
warning_lines = self.logger.get_lines_for_level('warning')
|
||||||
|
self.assertEqual(1, len(warning_lines), warning_lines)
|
||||||
|
self.assertIn('Error relinking (cleanup): failed to relink %s to %s'
|
||||||
|
% (self.objname, self.expected_file), warning_lines[0])
|
||||||
|
# suffix should not be invalidated in new partition
|
||||||
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
|
self.assertFalse(os.path.exists(hashes_invalid))
|
||||||
|
|
||||||
|
def test_cleanup_older_object_in_new_partition(self):
|
||||||
|
# relink of the current object failed, but there is an older version of
|
||||||
|
# same object in the new partition
|
||||||
|
# force rehash of new partition to not happen during cleanup
|
||||||
|
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
||||||
|
self._common_test_cleanup(relink=False)
|
||||||
|
os.makedirs(self.expected_dir)
|
||||||
|
older_obj_file = os.path.join(
|
||||||
|
self.expected_dir,
|
||||||
|
utils.Timestamp(int(self.obj_ts) - 1).internal + '.data')
|
||||||
|
with open(older_obj_file, "wb") as fd:
|
||||||
|
fd.write(b"Hello Olde Worlde!")
|
||||||
|
write_metadata(fd, {'name': self.obj_path, 'Content-Length': '18'})
|
||||||
|
|
||||||
|
with mock.patch.object(relinker.logging, 'getLogger',
|
||||||
|
return_value=self.logger):
|
||||||
|
res = relinker.main([
|
||||||
|
'cleanup',
|
||||||
|
'--swift-dir', self.testdir,
|
||||||
|
'--devices', self.devices,
|
||||||
|
'--skip-mount',
|
||||||
|
])
|
||||||
|
|
||||||
|
self.assertEqual(0, res)
|
||||||
|
self.assertFalse(os.path.isfile(self.objname)) # old file removed
|
||||||
|
self.assertTrue(os.path.isfile(older_obj_file))
|
||||||
|
self.assertTrue(os.path.isfile(self.expected_file)) # link created
|
||||||
|
self.assertIn(
|
||||||
|
'Relinking (cleanup) created link: %s to %s'
|
||||||
|
% (self.objname, self.expected_file),
|
||||||
|
self.logger.get_lines_for_level('debug'))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||||
|
# old partition should be cleaned up
|
||||||
|
self.assertFalse(os.path.exists(self.part_dir))
|
||||||
|
# suffix should be invalidated in new partition
|
||||||
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
|
with open(hashes_invalid, 'r') as fd:
|
||||||
|
self.assertEqual(str(self.suffix), fd.read().strip())
|
||||||
|
|
||||||
def test_cleanup_deleted(self):
|
def test_cleanup_deleted(self):
|
||||||
|
# force rehash of new partition to not happen during cleanup
|
||||||
|
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
||||||
self._common_test_cleanup()
|
self._common_test_cleanup()
|
||||||
|
# rehash during relink creates hashes.invalid...
|
||||||
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
|
|
||||||
# Pretend the object got deleted in between and there is a tombstone
|
# Pretend the object got deleted in between and there is a tombstone
|
||||||
|
# note: the tombstone would normally be at a newer timestamp but here
|
||||||
|
# we make the tombstone at same timestamp - it is treated as the
|
||||||
|
# 'required' file in the new partition, so the .data is deleted in the
|
||||||
|
# old partition
|
||||||
fname_ts = self.expected_file[:-4] + "ts"
|
fname_ts = self.expected_file[:-4] + "ts"
|
||||||
os.rename(self.expected_file, fname_ts)
|
os.rename(self.expected_file, fname_ts)
|
||||||
|
|
||||||
@@ -1008,6 +1109,14 @@ class TestRelinker(unittest.TestCase):
|
|||||||
'--devices', self.devices,
|
'--devices', self.devices,
|
||||||
'--skip-mount',
|
'--skip-mount',
|
||||||
]))
|
]))
|
||||||
|
self.assertTrue(os.path.isfile(fname_ts))
|
||||||
|
self.assertFalse(os.path.exists(self.objname))
|
||||||
|
# old partition should be cleaned up
|
||||||
|
self.assertFalse(os.path.exists(self.part_dir))
|
||||||
|
# suffix should not be invalidated in new partition
|
||||||
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
|
with open(hashes_invalid, 'r') as fd:
|
||||||
|
self.assertEqual('', fd.read().strip())
|
||||||
|
|
||||||
def test_cleanup_reapable(self):
|
def test_cleanup_reapable(self):
|
||||||
# relink a tombstone
|
# relink a tombstone
|
||||||
@@ -1029,32 +1138,68 @@ class TestRelinker(unittest.TestCase):
|
|||||||
]))
|
]))
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [])
|
self.assertEqual(self.logger.get_lines_for_level('error'), [])
|
||||||
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
|
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
|
||||||
self.assertIn(
|
# reclaimed during relinker cleanup...
|
||||||
"Found reapable on-disk file: %s" % self.objname,
|
|
||||||
self.logger.get_lines_for_level('debug'))
|
|
||||||
# self.expected_file may or may not exist; it depends on whether the
|
|
||||||
# object was in the upper-half of the partition space. ultimately,
|
|
||||||
# that part doesn't really matter much -- but we definitely *don't*
|
|
||||||
# want self.objname around polluting the old partition space.
|
|
||||||
self.assertFalse(os.path.exists(self.objname))
|
self.assertFalse(os.path.exists(self.objname))
|
||||||
|
# reclaimed during relinker relink or relinker cleanup, depending on
|
||||||
|
# which quartile the partition is in ...
|
||||||
|
self.assertFalse(os.path.exists(self.expected_file))
|
||||||
|
|
||||||
def test_cleanup_doesnotexist(self):
|
def test_cleanup_new_does_not_exist(self):
|
||||||
self._common_test_cleanup()
|
self._common_test_cleanup()
|
||||||
|
# Pretend the file in the new place got deleted in between relink and
|
||||||
# Pretend the file in the new place got deleted inbetween
|
# cleanup: cleanup should re-create the link
|
||||||
os.remove(self.expected_file)
|
os.remove(self.expected_file)
|
||||||
|
|
||||||
with mock.patch.object(relinker.logging, 'getLogger',
|
with mock.patch.object(relinker.logging, 'getLogger',
|
||||||
return_value=self.logger):
|
return_value=self.logger):
|
||||||
self.assertEqual(1, relinker.main([
|
self.assertEqual(0, relinker.main([
|
||||||
'cleanup',
|
'cleanup',
|
||||||
'--swift-dir', self.testdir,
|
'--swift-dir', self.testdir,
|
||||||
'--devices', self.devices,
|
'--devices', self.devices,
|
||||||
'--skip-mount',
|
'--skip-mount',
|
||||||
]))
|
]))
|
||||||
self.assertEqual(self.logger.get_lines_for_level('warning'),
|
self.assertTrue(os.path.isfile(self.expected_file)) # link created
|
||||||
['Error cleaning up %s: %s' % (self.objname,
|
self.assertFalse(os.path.exists(self.objname)) # link created
|
||||||
repr(exceptions.DiskFileNotExist()))])
|
self.assertIn(
|
||||||
|
'Relinking (cleanup) created link: %s to %s'
|
||||||
|
% (self.objname, self.expected_file),
|
||||||
|
self.logger.get_lines_for_level('debug'))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||||
|
|
||||||
|
def test_cleanup_new_does_not_exist_and_relink_fails(self):
|
||||||
|
# force rehash of new partition to not happen during cleanup
|
||||||
|
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
||||||
|
self._common_test_cleanup()
|
||||||
|
# rehash during relink creates hashes.invalid...
|
||||||
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
|
# Pretend the file in the new place got deleted in between relink and
|
||||||
|
# cleanup: cleanup attempts to re-create the link but fails
|
||||||
|
os.remove(self.expected_file)
|
||||||
|
|
||||||
|
with mock.patch('swift.obj.diskfile.os.link', side_effect=OSError):
|
||||||
|
with mock.patch.object(relinker.logging, 'getLogger',
|
||||||
|
return_value=self.logger):
|
||||||
|
self.assertEqual(1, relinker.main([
|
||||||
|
'cleanup',
|
||||||
|
'--swift-dir', self.testdir,
|
||||||
|
'--devices', self.devices,
|
||||||
|
'--skip-mount',
|
||||||
|
]))
|
||||||
|
self.assertFalse(os.path.isfile(self.expected_file))
|
||||||
|
self.assertTrue(os.path.isfile(self.objname)) # old file intact
|
||||||
|
self.assertEqual(
|
||||||
|
self.logger.get_lines_for_level('warning'),
|
||||||
|
['Error relinking (cleanup): failed to relink %s to %s: '
|
||||||
|
% (self.objname, self.expected_file)]
|
||||||
|
)
|
||||||
|
# suffix should not be invalidated in new partition
|
||||||
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
|
with open(hashes_invalid, 'r') as fd:
|
||||||
|
self.assertEqual('', fd.read().strip())
|
||||||
|
# nor in the old partition
|
||||||
|
old_hashes_invalid = os.path.join(self.part_dir, 'hashes.invalid')
|
||||||
|
self.assertFalse(os.path.exists(old_hashes_invalid))
|
||||||
|
|
||||||
@patch_policies(
|
@patch_policies(
|
||||||
[ECStoragePolicy(
|
[ECStoragePolicy(
|
||||||
@@ -1062,7 +1207,6 @@ class TestRelinker(unittest.TestCase):
|
|||||||
ec_ndata=4, ec_nparity=2)])
|
ec_ndata=4, ec_nparity=2)])
|
||||||
def test_cleanup_diskfile_error(self):
|
def test_cleanup_diskfile_error(self):
|
||||||
self._common_test_cleanup()
|
self._common_test_cleanup()
|
||||||
|
|
||||||
# Switch the policy type so all fragments raise DiskFileError.
|
# Switch the policy type so all fragments raise DiskFileError.
|
||||||
with mock.patch.object(relinker.logging, 'getLogger',
|
with mock.patch.object(relinker.logging, 'getLogger',
|
||||||
return_value=self.logger):
|
return_value=self.logger):
|
||||||
@@ -1073,36 +1217,12 @@ class TestRelinker(unittest.TestCase):
|
|||||||
'--skip-mount',
|
'--skip-mount',
|
||||||
]))
|
]))
|
||||||
log_lines = self.logger.get_lines_for_level('warning')
|
log_lines = self.logger.get_lines_for_level('warning')
|
||||||
self.assertEqual(3, len(log_lines),
|
# once for cleanup_ondisk_files in old and new location, once for
|
||||||
'Expected 3 log lines, got %r' % log_lines)
|
# get_ondisk_files of union of files, once for the rehash
|
||||||
# Once to check the old partition space...
|
self.assertEqual(4, len(log_lines),
|
||||||
self.assertIn('Bad fragment index: None', log_lines[0])
|
'Expected 5 log lines, got %r' % log_lines)
|
||||||
# ... again for the new partition ...
|
for line in log_lines:
|
||||||
self.assertIn('Bad fragment index: None', log_lines[0])
|
self.assertIn('Bad fragment index: None', line, log_lines)
|
||||||
# ... and one last time for the rehash
|
|
||||||
self.assertIn('Bad fragment index: None', log_lines[1])
|
|
||||||
|
|
||||||
def test_cleanup_quarantined(self):
|
|
||||||
self._common_test_cleanup()
|
|
||||||
# Pretend the object in the new place got corrupted
|
|
||||||
with open(self.expected_file, "wb") as obj:
|
|
||||||
obj.write(b'trash')
|
|
||||||
|
|
||||||
with mock.patch.object(relinker.logging, 'getLogger',
|
|
||||||
return_value=self.logger):
|
|
||||||
self.assertEqual(1, relinker.main([
|
|
||||||
'cleanup',
|
|
||||||
'--swift-dir', self.testdir,
|
|
||||||
'--devices', self.devices,
|
|
||||||
'--skip-mount',
|
|
||||||
]))
|
|
||||||
|
|
||||||
log_lines = self.logger.get_lines_for_level('warning')
|
|
||||||
self.assertEqual(2, len(log_lines),
|
|
||||||
'Expected 2 log lines, got %r' % log_lines)
|
|
||||||
self.assertIn('metadata content-length 12 does not match '
|
|
||||||
'actual object size 5', log_lines[0])
|
|
||||||
self.assertIn('failed audit and was quarantined', log_lines[1])
|
|
||||||
|
|
||||||
def test_rehashing(self):
|
def test_rehashing(self):
|
||||||
calls = []
|
calls = []
|
||||||
|
|||||||
@@ -4382,6 +4382,14 @@ cluster_dfw1 = http://dfw1.host/v1/
|
|||||||
self.assertEqual(utils.replace_partition_in_path(old, 10), old)
|
self.assertEqual(utils.replace_partition_in_path(old, 10), old)
|
||||||
self.assertEqual(utils.replace_partition_in_path(new, 11), new)
|
self.assertEqual(utils.replace_partition_in_path(new, 11), new)
|
||||||
|
|
||||||
|
# check hash_dir option
|
||||||
|
old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77'
|
||||||
|
exp = '/s/n/d/o/1400/c77/af088baea4806dcaba30bf07d9e64c77'
|
||||||
|
actual = utils.replace_partition_in_path(old, 11, is_hash_dir=True)
|
||||||
|
self.assertEqual(exp, actual)
|
||||||
|
actual = utils.replace_partition_in_path(exp, 11, is_hash_dir=True)
|
||||||
|
self.assertEqual(exp, actual)
|
||||||
|
|
||||||
def test_round_robin_iter(self):
|
def test_round_robin_iter(self):
|
||||||
it1 = iter([1, 2, 3])
|
it1 = iter([1, 2, 3])
|
||||||
it2 = iter([4, 5])
|
it2 = iter([4, 5])
|
||||||
|
|||||||
Reference in New Issue
Block a user