Merge "Make ECDiskFile report all fragments found on disk"

This commit is contained in:
Jenkins
2015-12-19 17:15:46 +00:00
committed by Gerrit Code Review
6 changed files with 615 additions and 338 deletions

View File

@@ -832,6 +832,9 @@ class Timestamp(object):
other = Timestamp(other) other = Timestamp(other)
return cmp(self.internal, other.internal) return cmp(self.internal, other.internal)
def __hash__(self):
return hash(self.internal)
def normalize_timestamp(timestamp): def normalize_timestamp(timestamp):
""" """

View File

@@ -460,92 +460,175 @@ class BaseDiskFileManager(object):
""" """
raise NotImplementedError raise NotImplementedError
def _gather_on_disk_file(self, filename, ext, context, frag_index=None, def _process_ondisk_files(self, exts, results, **kwargs):
**kwargs):
""" """
Called by gather_ondisk_files() for each file in an object Called by get_ondisk_files(). Should be over-ridden to implement
datadir in reverse sorted order. If a file is considered part of a subclass specific handling of files.
valid on-disk file set it will be added to the context dict, keyed by
its extension. If a file is considered to be obsolete it will be added
to a list stored under the key 'obsolete' in the context dict.
:param filename: name of file to be accepted or not :param exts: dict of lists of file info, keyed by extension
:param ext: extension part of filename :param results: a dict that may be updated with results
:param context: a context dict that may have been populated by previous
calls to this method
:returns: True if a valid file set has been found, False otherwise
""" """
raise NotImplementedError raise NotImplementedError
def _verify_on_disk_files(self, accepted_files, **kwargs): def _verify_ondisk_files(self, results, **kwargs):
""" """
Verify that the final combination of on disk files complies with the Verify that the final combination of on disk files complies with the
diskfile contract. diskfile contract.
:param accepted_files: files that have been found and accepted :param results: files that have been found and accepted
:returns: True if the file combination is compliant, False otherwise :returns: True if the file combination is compliant, False otherwise
""" """
raise NotImplementedError data_file, meta_file, ts_file = tuple(
[results[key]
for key in ('data_file', 'meta_file', 'ts_file')])
def gather_ondisk_files(self, files, include_obsolete=False, return ((data_file is None and meta_file is None and ts_file is None)
verify=False, **kwargs): or (ts_file is not None and data_file is None
and meta_file is None)
or (data_file is not None and ts_file is None))
def _split_list(self, original_list, condition):
""" """
Given a simple list of files names, iterate over them to determine the Split a list into two lists. The first list contains the first N items
files that constitute a valid object, and optionally determine the of the original list, in their original order, where 0 < N <=
files that are obsolete and could be deleted. Note that some files may len(original list). The second list contains the remaining items of the
fall into neither category. original list, in their original order.
The index, N, at which the original list is split is the index of the
first item in the list that does not satisfy the given condition. Note
that the original list should be appropriately sorted if the second
list is to contain no items that satisfy the given condition.
:param original_list: the list to be split.
:param condition: a single argument function that will be used to test
for the list item to split on.
:return: a tuple of two lists.
"""
for i, item in enumerate(original_list):
if not condition(item):
return original_list[:i], original_list[i:]
return original_list, []
def _split_gt_timestamp(self, file_info_list, timestamp):
"""
Given a list of file info dicts, reverse sorted by timestamp, split the
list into two: items newer than timestamp, and items at same time or
older than timestamp.
:param file_info_list: a list of file_info dicts.
:param timestamp: a Timestamp.
:return: a tuple of two lists.
"""
return self._split_list(
file_info_list, lambda x: x['timestamp'] > timestamp)
def _split_gte_timestamp(self, file_info_list, timestamp):
"""
Given a list of file info dicts, reverse sorted by timestamp, split the
list into two: items newer than or at same time as the timestamp, and
items older than timestamp.
:param file_info_list: a list of file_info dicts.
:param timestamp: a Timestamp.
:return: a tuple of two lists.
"""
return self._split_list(
file_info_list, lambda x: x['timestamp'] >= timestamp)
def get_ondisk_files(self, files, datadir, verify=True, **kwargs):
"""
Given a simple list of files names, determine the files that constitute
a valid fileset i.e. a set of files that defines the state of an
object, and determine the files that are obsolete and could be deleted.
Note that some files may fall into neither category.
If a file is considered part of a valid fileset then its info dict will
be added to the results dict, keyed by <extension>_info. Any files that
are no longer required will have their info dicts added to a list
stored under the key 'obsolete'.
The results dict will always contain entries with keys 'ts_file',
'data_file' and 'meta_file'. Their values will be the fully qualified
path to a file of the corresponding type if there is such a file in the
valid fileset, or None.
:param files: a list of file names. :param files: a list of file names.
:param include_obsolete: By default the iteration will stop when a :param datadir: directory name files are from.
valid file set has been found. Setting this
argument to True will cause the iteration to
continue in order to find all obsolete files.
:param verify: if True verify that the ondisk file contract has not :param verify: if True verify that the ondisk file contract has not
been violated, otherwise do not verify. been violated, otherwise do not verify.
:returns: a dict that may contain: valid on disk files keyed by their :returns: a dict that will contain keys:
filename extension; a list of obsolete files stored under the ts_file -> path to a .ts file or None
key 'obsolete'. data_file -> path to a .data file or None
meta_file -> path to a .meta file or None
and may contain keys:
ts_info -> a file info dict for a .ts file
data_info -> a file info dict for a .data file
meta_info -> a file info dict for a .meta file
obsolete -> a list of file info dicts for obsolete files
""" """
files.sort(reverse=True) # Build the exts data structure:
results = {} # exts is a dict that maps file extensions to a list of file_info
# dicts for the files having that extension. The file_info dicts are of
# the form returned by parse_on_disk_filename, with the filename added.
# Each list is sorted in reverse timestamp order.
#
# The exts dict will be modified during subsequent processing as files
# are removed to be discarded or ignored.
exts = defaultdict(list)
for afile in files: for afile in files:
ts_file = results.get('.ts') # Categorize files by extension
data_file = results.get('.data') try:
if not include_obsolete: file_info = self.parse_on_disk_filename(afile)
assert ts_file is None, "On-disk file search loop" \ file_info['filename'] = afile
" continuing after tombstone, %s, encountered" % ts_file exts[file_info['ext']].append(file_info)
assert data_file is None, "On-disk file search loop" \ except DiskFileError as e:
" continuing after data file, %s, encountered" % data_file self.logger.warning('Unexpected file %s: %s' %
(os.path.join(datadir or '', afile), e))
for ext in exts:
# For each extension sort files into reverse chronological order.
exts[ext] = sorted(
exts[ext], key=lambda info: info['timestamp'], reverse=True)
ext = splitext(afile)[1] # the results dict is used to collect results of file filtering
if self._gather_on_disk_file( results = {}
afile, ext, results, **kwargs):
if not include_obsolete: # non-tombstones older than or equal to latest tombstone are obsolete
break if exts.get('.ts'):
for ext in filter(lambda ext: ext != '.ts', exts.keys()):
exts[ext], older = self._split_gt_timestamp(
exts[ext], exts['.ts'][0]['timestamp'])
results.setdefault('obsolete', []).extend(older)
# all but most recent .meta and .ts are obsolete
for ext in ('.meta', '.ts'):
if ext in exts:
results.setdefault('obsolete', []).extend(exts[ext][1:])
exts[ext] = exts[ext][:1]
# delegate to subclass handler
self._process_ondisk_files(exts, results, **kwargs)
# set final choice of files
if exts.get('.ts'):
results['ts_info'] = exts['.ts'][0]
if 'data_info' in results and exts.get('.meta'):
# only report a meta file if there is a data file
results['meta_info'] = exts['.meta'][0]
# set ts_file, data_file and meta_file with path to chosen file or None
for info_key in ('data_info', 'meta_info', 'ts_info'):
info = results.get(info_key)
key = info_key[:-5] + '_file'
results[key] = join(datadir, info['filename']) if info else None
if verify: if verify:
assert self._verify_on_disk_files( assert self._verify_ondisk_files(
results, **kwargs), \ results, **kwargs), \
"On-disk file search algorithm contract is broken: %s" \ "On-disk file search algorithm contract is broken: %s" \
% results.values() % str(results)
return results return results
def get_ondisk_files(self, files, datadir, **kwargs):
"""
Given a simple list of files names, determine the files to use.
:param files: simple set of files as a python list
:param datadir: directory name files are from for convenience
:returns: dict of files to use having keys 'data_file', 'ts_file',
'meta_file' and optionally other policy specific keys
"""
file_info = self.gather_ondisk_files(files, verify=True, **kwargs)
for ext in ('.data', '.meta', '.ts'):
filename = file_info.get(ext)
key = '%s_file' % ext[1:]
file_info[key] = join(datadir, filename) if filename else None
return file_info
def cleanup_ondisk_files(self, hsh_path, reclaim_age=ONE_WEEK, **kwargs): def cleanup_ondisk_files(self, hsh_path, reclaim_age=ONE_WEEK, **kwargs):
""" """
Clean up on-disk files that are obsolete and gather the set of valid Clean up on-disk files that are obsolete and gather the set of valid
@@ -560,27 +643,24 @@ class BaseDiskFileManager(object):
key 'obsolete'; a list of files remaining in the directory, key 'obsolete'; a list of files remaining in the directory,
reverse sorted, stored under the key 'files'. reverse sorted, stored under the key 'files'.
""" """
def is_reclaimable(filename): def is_reclaimable(timestamp):
timestamp = self.parse_on_disk_filename(filename)['timestamp']
return (time.time() - float(timestamp)) > reclaim_age return (time.time() - float(timestamp)) > reclaim_age
files = listdir(hsh_path) files = listdir(hsh_path)
files.sort(reverse=True) files.sort(reverse=True)
results = self.gather_ondisk_files(files, include_obsolete=True, results = self.get_ondisk_files(
**kwargs) files, hsh_path, verify=False, **kwargs)
# TODO ref to durables here if 'ts_info' in results and is_reclaimable(
if '.durable' in results and not results.get('fragments'): results['ts_info']['timestamp']):
# a .durable with no .data is deleted as soon as it is found remove_file(join(hsh_path, results['ts_info']['filename']))
results.setdefault('obsolete', []).append(results.pop('.durable')) files.remove(results.pop('ts_info')['filename'])
if '.ts' in results and is_reclaimable(results['.ts']): for file_info in results.get('possible_reclaim', []):
results.setdefault('obsolete', []).append(results.pop('.ts'))
for filename in results.get('fragments_without_durable', []):
# stray fragments are not deleted until reclaim-age # stray fragments are not deleted until reclaim-age
if is_reclaimable(filename): if is_reclaimable(file_info['timestamp']):
results.setdefault('obsolete', []).append(filename) results.setdefault('obsolete', []).append(file_info)
for filename in results.get('obsolete', []): for file_info in results.get('obsolete', []):
remove_file(join(hsh_path, filename)) remove_file(join(hsh_path, file_info['filename']))
files.remove(filename) files.remove(file_info['filename'])
results['files'] = files results['files'] = files
return results return results
@@ -915,9 +995,9 @@ class BaseDiskFileManager(object):
(os.path.join(partition_path, suffix), suffix) (os.path.join(partition_path, suffix), suffix)
for suffix in suffixes) for suffix in suffixes)
key_preference = ( key_preference = (
('ts_meta', '.meta'), ('ts_meta', 'meta_info'),
('ts_data', '.data'), ('ts_data', 'data_info'),
('ts_data', '.ts'), ('ts_data', 'ts_info'),
) )
for suffix_path, suffix in suffixes: for suffix_path, suffix in suffixes:
for object_hash in self._listdir(suffix_path): for object_hash in self._listdir(suffix_path):
@@ -926,14 +1006,13 @@ class BaseDiskFileManager(object):
results = self.cleanup_ondisk_files( results = self.cleanup_ondisk_files(
object_path, self.reclaim_age, **kwargs) object_path, self.reclaim_age, **kwargs)
timestamps = {} timestamps = {}
for ts_key, ext in key_preference: for ts_key, info_key in key_preference:
if ext not in results: if info_key not in results:
continue continue
timestamps[ts_key] = self.parse_on_disk_filename( timestamps[ts_key] = results[info_key]['timestamp']
results[ext])['timestamp']
if 'ts_data' not in timestamps: if 'ts_data' not in timestamps:
# file sets that do not include a .data or .ts # file sets that do not include a .data or .ts
# file can not be opened and therefore can not # file cannot be opened and therefore cannot
# be ssync'd # be ssync'd
continue continue
yield (object_path, object_hash, timestamps) yield (object_path, object_hash, timestamps)
@@ -1430,6 +1509,7 @@ class BaseDiskFile(object):
self._obj = None self._obj = None
self._datadir = None self._datadir = None
self._tmpdir = join(device_path, get_tmp_dir(policy)) self._tmpdir = join(device_path, get_tmp_dir(policy))
self._ondisk_info = None
self._metadata = None self._metadata = None
self._datafile_metadata = None self._datafile_metadata = None
self._metafile_metadata = None self._metafile_metadata = None
@@ -1479,6 +1559,26 @@ class BaseDiskFile(object):
raise DiskFileNotOpen() raise DiskFileNotOpen()
return Timestamp(self._datafile_metadata.get('X-Timestamp')) return Timestamp(self._datafile_metadata.get('X-Timestamp'))
@property
def durable_timestamp(self):
"""
Provides the timestamp of the newest data file found in the object
directory.
:return: A Timestamp instance, or None if no data file was found.
:raises DiskFileNotOpen: if the open() method has not been previously
called on this instance.
"""
if self._ondisk_info is None:
raise DiskFileNotOpen()
if self._datafile_metadata:
return Timestamp(self._datafile_metadata.get('X-Timestamp'))
return None
@property
def fragments(self):
return None
@classmethod @classmethod
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy): def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy):
return cls(mgr, device_path, None, partition, _datadir=hash_dir_path, return cls(mgr, device_path, None, partition, _datadir=hash_dir_path,
@@ -1524,8 +1624,8 @@ class BaseDiskFile(object):
# The data directory does not exist, so the object cannot exist. # The data directory does not exist, so the object cannot exist.
files = [] files = []
# gather info about the valid files to us to open the DiskFile # gather info about the valid files to use to open the DiskFile
file_info = self._get_ondisk_file(files) file_info = self._get_ondisk_files(files)
self._data_file = file_info.get('data_file') self._data_file = file_info.get('data_file')
if not self._data_file: if not self._data_file:
@@ -1579,7 +1679,7 @@ class BaseDiskFile(object):
self._logger.increment('quarantines') self._logger.increment('quarantines')
return DiskFileQuarantined(msg) return DiskFileQuarantined(msg)
def _get_ondisk_file(self, files): def _get_ondisk_files(self, files):
""" """
Determine the on-disk files to use. Determine the on-disk files to use.
@@ -1950,8 +2050,9 @@ class DiskFile(BaseDiskFile):
reader_cls = DiskFileReader reader_cls = DiskFileReader
writer_cls = DiskFileWriter writer_cls = DiskFileWriter
def _get_ondisk_file(self, files): def _get_ondisk_files(self, files):
return self.manager.get_ondisk_files(files, self._datadir) self._ondisk_info = self.manager.get_ondisk_files(files, self._datadir)
return self._ondisk_info
@DiskFileRouter.register(REPL_POLICY) @DiskFileRouter.register(REPL_POLICY)
@@ -1967,89 +2068,44 @@ class DiskFileManager(BaseDiskFileManager):
* timestamp is a :class:`~swift.common.utils.Timestamp` * timestamp is a :class:`~swift.common.utils.Timestamp`
* ext is a string, the file extension including the leading dot or * ext is a string, the file extension including the leading dot or
the empty string if the filename has no extenstion. the empty string if the filename has no extension.
:raises DiskFileError: if any part of the filename is not able to be :raises DiskFileError: if any part of the filename is not able to be
validated. validated.
""" """
filename, ext = splitext(filename) float_part, ext = splitext(filename)
try:
timestamp = Timestamp(float_part)
except ValueError:
raise DiskFileError('Invalid Timestamp value in filename %r'
% filename)
return { return {
'timestamp': Timestamp(filename), 'timestamp': timestamp,
'ext': ext, 'ext': ext,
} }
def _gather_on_disk_file(self, filename, ext, context, frag_index=None, def _process_ondisk_files(self, exts, results, **kwargs):
**kwargs):
""" """
Called by gather_ondisk_files() for each file in an object Implement replication policy specific handling of .data files.
datadir in reverse sorted order. If a file is considered part of a
valid on-disk file set it will be added to the context dict, keyed by
its extension. If a file is considered to be obsolete it will be added
to a list stored under the key 'obsolete' in the context dict.
:param filename: name of file to be accepted or not :param exts: dict of lists of file info, keyed by extension
:param ext: extension part of filename :param results: a dict that may be updated with results
:param context: a context dict that may have been populated by previous
calls to this method
:returns: True if a valid file set has been found, False otherwise
""" """
if exts.get('.data'):
for ext in exts.keys():
if ext == '.data':
# older .data's are obsolete
exts[ext], obsolete = self._split_gte_timestamp(
exts[ext], exts['.data'][0]['timestamp'])
else:
# other files at same or older timestamp as most recent
# data are obsolete
exts[ext], obsolete = self._split_gt_timestamp(
exts[ext], exts['.data'][0]['timestamp'])
results.setdefault('obsolete', []).extend(obsolete)
# if first file with given extension then add filename to context # set results
# dict and return True results['data_info'] = exts['.data'][0]
accept_first = lambda: context.setdefault(ext, filename) == filename
# add the filename to the list of obsolete files in context dict
discard = lambda: context.setdefault('obsolete', []).append(filename)
# set a flag in the context dict indicating that a valid fileset has
# been found
set_valid_fileset = lambda: context.setdefault('found_valid', True)
# return True if the valid fileset flag is set in the context dict
have_valid_fileset = lambda: context.get('found_valid')
if ext == '.data':
if have_valid_fileset():
# valid fileset means we must have a newer
# .data or .ts, so discard the older .data file
discard()
else:
accept_first()
set_valid_fileset()
elif ext == '.ts':
if have_valid_fileset() or not accept_first():
# newer .data or .ts already found so discard this
discard()
if not have_valid_fileset():
# remove any .meta that may have been previously found
context.pop('.meta', None)
set_valid_fileset()
elif ext == '.meta':
if have_valid_fileset() or not accept_first():
# newer .data, .durable or .ts already found so discard this
discard()
else:
# ignore unexpected files
pass
return have_valid_fileset()
def _verify_on_disk_files(self, accepted_files, **kwargs):
"""
Verify that the final combination of on disk files complies with the
replicated diskfile contract.
:param accepted_files: files that have been found and accepted
:returns: True if the file combination is compliant, False otherwise
"""
# mimic legacy behavior - .meta is ignored when .ts is found
if accepted_files.get('.ts'):
accepted_files.pop('.meta', None)
data_file, meta_file, ts_file, durable_file = tuple(
[accepted_files.get(ext)
for ext in ('.data', '.meta', '.ts', '.durable')])
return ((data_file is None and meta_file is None and ts_file is None)
or (ts_file is not None and data_file is None
and meta_file is None)
or (data_file is not None and ts_file is None))
def _hash_suffix(self, path, reclaim_age): def _hash_suffix(self, path, reclaim_age):
""" """
@@ -2153,14 +2209,47 @@ class ECDiskFile(BaseDiskFile):
if frag_index is not None: if frag_index is not None:
self._frag_index = self.manager.validate_fragment_index(frag_index) self._frag_index = self.manager.validate_fragment_index(frag_index)
def _get_ondisk_file(self, files): @property
def durable_timestamp(self):
"""
Provides the timestamp of the newest durable file found in the object
directory.
:return: A Timestamp instance, or None if no durable file was found.
:raises DiskFileNotOpen: if the open() method has not been previously
called on this instance.
"""
if self._ondisk_info is None:
raise DiskFileNotOpen()
if self._ondisk_info.get('durable_frag_set'):
return self._ondisk_info['durable_frag_set'][0]['timestamp']
return None
@property
def fragments(self):
"""
Provides information about all fragments that were found in the object
directory, including fragments without a matching durable file, and
including any fragment chosen to construct the opened diskfile.
:return: A dict mapping <Timestamp instance> -> <list of frag indexes>,
or None if the diskfile has not been opened or no fragments
were found.
"""
if self._ondisk_info:
frag_sets = self._ondisk_info['frag_sets']
return dict([(ts, [info['frag_index'] for info in frag_set])
for ts, frag_set in frag_sets.items()])
def _get_ondisk_files(self, files):
""" """
The only difference between this method and the replication policy The only difference between this method and the replication policy
DiskFile method is passing in the frag_index kwarg to our manager's DiskFile method is passing in the frag_index kwarg to our manager's
get_ondisk_files method. get_ondisk_files method.
""" """
return self.manager.get_ondisk_files( self._ondisk_info = self.manager.get_ondisk_files(
files, self._datadir, frag_index=self._frag_index) files, self._datadir, frag_index=self._frag_index)
return self._ondisk_info
def purge(self, timestamp, frag_index): def purge(self, timestamp, frag_index):
""" """
@@ -2254,9 +2343,13 @@ class ECDiskFileManager(BaseDiskFileManager):
validated. validated.
""" """
frag_index = None frag_index = None
filename, ext = splitext(filename) float_frag, ext = splitext(filename)
parts = filename.split('#', 1) parts = float_frag.split('#', 1)
timestamp = parts[0] try:
timestamp = Timestamp(parts[0])
except ValueError:
raise DiskFileError('Invalid Timestamp value in filename %r'
% filename)
if ext == '.data': if ext == '.data':
# it is an error for an EC data file to not have a valid # it is an error for an EC data file to not have a valid
# fragment index # fragment index
@@ -2267,137 +2360,94 @@ class ECDiskFileManager(BaseDiskFileManager):
pass pass
frag_index = self.validate_fragment_index(frag_index) frag_index = self.validate_fragment_index(frag_index)
return { return {
'timestamp': Timestamp(timestamp), 'timestamp': timestamp,
'frag_index': frag_index, 'frag_index': frag_index,
'ext': ext, 'ext': ext,
} }
def is_obsolete(self, filename, other_filename): def _process_ondisk_files(self, exts, results, frag_index=None, **kwargs):
""" """
Test if a given file is considered to be obsolete with respect to Implement EC policy specific handling of .data and .durable files.
another file in an object storage dir.
Implements EC policy specific behavior when comparing files against a :param exts: dict of lists of file info, keyed by extension
.durable file. :param results: a dict that may be updated with results
A simple string comparison would consider t2#1.data to be older than
t2.durable (since t2#1.data < t2.durable). By stripping off the file
extensions we get the desired behavior: t2#1 > t2 without compromising
the detection of t1#1 < t2.
:param filename: a string representing an absolute filename
:param other_filename: a string representing an absolute filename
:returns: True if filename is considered obsolete, False otherwise.
"""
if other_filename.endswith('.durable'):
return splitext(filename)[0] < splitext(other_filename)[0]
return filename < other_filename
def _gather_on_disk_file(self, filename, ext, context, frag_index=None,
**kwargs):
"""
Called by gather_ondisk_files() for each file in an object
datadir in reverse sorted order. If a file is considered part of a
valid on-disk file set it will be added to the context dict, keyed by
its extension. If a file is considered to be obsolete it will be added
to a list stored under the key 'obsolete' in the context dict.
:param filename: name of file to be accepted or not
:param ext: extension part of filename
:param context: a context dict that may have been populated by previous
calls to this method
:param frag_index: if set, search for a specific fragment index .data :param frag_index: if set, search for a specific fragment index .data
file, otherwise accept the first valid .data file. file, otherwise accept the first valid .data file.
:returns: True if a valid file set has been found, False otherwise
""" """
durable_info = None
if exts.get('.durable'):
durable_info = exts['.durable'][0]
# Mark everything older than most recent .durable as obsolete
# and remove from the exts dict.
for ext in exts.keys():
exts[ext], older = self._split_gte_timestamp(
exts[ext], durable_info['timestamp'])
results.setdefault('obsolete', []).extend(older)
# if first file with given extension then add filename to context # Split the list of .data files into sets of frags having the same
# dict and return True # timestamp, identifying the durable and newest sets (if any) as we go.
accept_first = lambda: context.setdefault(ext, filename) == filename # To do this we can take advantage of the list of .data files being
# add the filename to the list of obsolete files in context dict # reverse-time ordered. Keep the resulting per-timestamp frag sets in
discard = lambda: context.setdefault('obsolete', []).append(filename) # a frag_sets dict mapping a Timestamp instance -> frag_set.
# set a flag in the context dict indicating that a valid fileset has all_frags = exts.get('.data')
# been found frag_sets = {}
set_valid_fileset = lambda: context.setdefault('found_valid', True) durable_frag_set = None
# return True if the valid fileset flag is set in the context dict while all_frags:
have_valid_fileset = lambda: context.get('found_valid') frag_set, all_frags = self._split_gte_timestamp(
all_frags, all_frags[0]['timestamp'])
# sort the frag set into ascending frag_index order
frag_set.sort(key=lambda info: info['frag_index'])
timestamp = frag_set[0]['timestamp']
frag_sets[timestamp] = frag_set
if durable_info and durable_info['timestamp'] == timestamp:
durable_frag_set = frag_set
if context.get('.durable'): # Select a single chosen frag from the chosen frag_set, by either
# a .durable file has been found # matching against a specified frag_index or taking the highest index.
if ext == '.data': chosen_frag = None
if self.is_obsolete(filename, context.get('.durable')): if durable_frag_set:
# this and remaining data files are older than durable if frag_index is not None:
discard() # search the frag set to find the exact frag_index
set_valid_fileset() for info in durable_frag_set:
else: if info['frag_index'] == frag_index:
# accept the first .data file if it matches requested chosen_frag = info
# frag_index, or if no specific frag_index is requested break
fi = self.parse_on_disk_filename(filename)['frag_index']
if frag_index is None or frag_index == int(fi):
accept_first()
set_valid_fileset()
# else: keep searching for a .data file to match frag_index
context.setdefault('fragments', []).append(filename)
else: else:
# there can no longer be a matching .data file so mark what has chosen_frag = durable_frag_set[-1]
# been found so far as the valid fileset
discard()
set_valid_fileset()
elif ext == '.data':
# not yet found a .durable
if have_valid_fileset():
# valid fileset means we must have a newer
# .ts, so discard the older .data file
discard()
else:
# .data newer than a .durable or .ts, don't discard yet
context.setdefault('fragments_without_durable', []).append(
filename)
elif ext == '.ts':
if have_valid_fileset() or not accept_first():
# newer .data, .durable or .ts already found so discard this
discard()
if not have_valid_fileset():
# remove any .meta that may have been previously found
context.pop('.meta', None)
set_valid_fileset()
elif ext in ('.meta', '.durable'):
if have_valid_fileset() or not accept_first():
# newer .data, .durable or .ts already found so discard this
discard()
else:
# ignore unexpected files
pass
return have_valid_fileset()
def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs): # If we successfully found a frag then set results
if chosen_frag:
results['data_info'] = chosen_frag
results['durable_frag_set'] = durable_frag_set
results['frag_sets'] = frag_sets
# Mark any isolated .durable as obsolete
if exts.get('.durable') and not durable_frag_set:
results.setdefault('obsolete', []).extend(exts['.durable'])
exts.pop('.durable')
# Fragments *may* be ready for reclaim, unless they are durable or
# at the timestamp we have just chosen for constructing the diskfile.
for frag_set in frag_sets.values():
if frag_set == durable_frag_set:
continue
results.setdefault('possible_reclaim', []).extend(frag_set)
def _verify_ondisk_files(self, results, frag_index=None, **kwargs):
""" """
Verify that the final combination of on disk files complies with the Verify that the final combination of on disk files complies with the
erasure-coded diskfile contract. erasure-coded diskfile contract.
:param accepted_files: files that have been found and accepted :param results: files that have been found and accepted
:param frag_index: specifies a specific fragment index .data file :param frag_index: specifies a specific fragment index .data file
:returns: True if the file combination is compliant, False otherwise :returns: True if the file combination is compliant, False otherwise
""" """
if not accepted_files.get('.data'): if super(ECDiskFileManager, self)._verify_ondisk_files(
# We may find only a .meta, which doesn't mean the on disk results, **kwargs):
# contract is broken. So we clear it to comply with have_data_file = results['data_file'] is not None
# superclass assertions. have_durable = results.get('durable_frag_set') is not None
accepted_files.pop('.meta', None) return have_data_file == have_durable
return False
data_file, meta_file, ts_file, durable_file = tuple(
[accepted_files.get(ext)
for ext in ('.data', '.meta', '.ts', '.durable')])
return ((data_file is None or durable_file is not None)
and (data_file is None and meta_file is None
and ts_file is None and durable_file is None)
or (ts_file is not None and data_file is None
and meta_file is None and durable_file is None)
or (data_file is not None and durable_file is not None
and ts_file is None)
or (durable_file is not None and meta_file is None
and ts_file is None))
def _hash_suffix(self, path, reclaim_age): def _hash_suffix(self, path, reclaim_age):
""" """
@@ -2412,12 +2462,12 @@ class ECDiskFileManager(BaseDiskFileManager):
# here we flatten out the hashers hexdigest into a dictionary instead # here we flatten out the hashers hexdigest into a dictionary instead
# of just returning the one hexdigest for the whole suffix # of just returning the one hexdigest for the whole suffix
def mapper(filename): def mapper(filename):
info = self.parse_on_disk_filename(filename) info = self.parse_on_disk_filename(filename)
fi = info['frag_index'] fi = info['frag_index']
if fi is None: if fi is None:
return None, filename return None, filename
else: else:
return fi, info['timestamp'].internal return fi, info['timestamp'].internal
hash_per_fi = self._hash_suffix_dir(path, mapper, reclaim_age) hash_per_fi = self._hash_suffix_dir(path, mapper, reclaim_age)
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items()) return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())

View File

@@ -254,6 +254,7 @@ class DiskFile(object):
self._metadata = None self._metadata = None
self._fp = None self._fp = None
self._filesystem = fs self._filesystem = fs
self.fragments = None
def open(self): def open(self):
""" """
@@ -421,3 +422,5 @@ class DiskFile(object):
return Timestamp(self._metadata.get('X-Timestamp')) return Timestamp(self._metadata.get('X-Timestamp'))
data_timestamp = timestamp data_timestamp = timestamp
durable_timestamp = timestamp

View File

@@ -779,6 +779,15 @@ class TestTimestamp(unittest.TestCase):
self.assertEqual( self.assertEqual(
sorted([t.internal for t in timestamps]), expected) sorted([t.internal for t in timestamps]), expected)
def test_hashable(self):
ts_0 = utils.Timestamp('1402444821.72589')
ts_0_also = utils.Timestamp('1402444821.72589')
self.assertEqual(ts_0, ts_0_also) # sanity
self.assertEqual(hash(ts_0), hash(ts_0_also))
d = {ts_0: 'whatever'}
self.assertIn(ts_0, d) # sanity
self.assertIn(ts_0_also, d)
class TestUtils(unittest.TestCase): class TestUtils(unittest.TestCase):
"""Tests for swift.common.utils """ """Tests for swift.common.utils """

View File

@@ -20,6 +20,7 @@ import six.moves.cPickle as pickle
import os import os
import errno import errno
import itertools import itertools
from unittest.util import safe_repr
import mock import mock
import unittest import unittest
import email import email
@@ -462,6 +463,35 @@ class BaseDiskFileTestMixin(object):
return '.'.join([ return '.'.join([
mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name]) mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name])
def _assertDictContainsSubset(self, subset, dictionary, msg=None):
"""Checks whether dictionary is a superset of subset."""
# This is almost identical to the method in python3.4 version of
# unitest.case.TestCase.assertDictContainsSubset, reproduced here to
# avoid the deprecation warning in the original when using python3.
missing = []
mismatched = []
for key, value in subset.items():
if key not in dictionary:
missing.append(key)
elif value != dictionary[key]:
mismatched.append('%s, expected: %s, actual: %s' %
(safe_repr(key), safe_repr(value),
safe_repr(dictionary[key])))
if not (missing or mismatched):
return
standardMsg = ''
if missing:
standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in
missing)
if mismatched:
if standardMsg:
standardMsg += '; '
standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
self.fail(self._formatMessage(msg, standardMsg))
class DiskFileManagerMixin(BaseDiskFileTestMixin): class DiskFileManagerMixin(BaseDiskFileTestMixin):
""" """
@@ -516,8 +546,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
for _order in ('ordered', 'shuffled', 'shuffled'): for _order in ('ordered', 'shuffled', 'shuffled'):
class_under_test = self._get_diskfile(policy, frag_index) class_under_test = self._get_diskfile(policy, frag_index)
try: try:
actual = class_under_test._get_ondisk_file(files) actual = class_under_test._get_ondisk_files(files)
self.assertDictContainsSubset( self._assertDictContainsSubset(
expected, actual, expected, actual,
'Expected %s from %s but got %s' 'Expected %s from %s but got %s'
% (expected, files, actual)) % (expected, files, actual))
@@ -593,14 +623,38 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
df_mgr = self.df_router[policy] df_mgr = self.df_router[policy]
datadir = os.path.join('/srv/node/sdb1/', datadir = os.path.join('/srv/node/sdb1/',
diskfile.get_data_dir(policy)) diskfile.get_data_dir(policy))
self.assertEqual(expected, df_mgr.get_ondisk_files( actual = df_mgr.get_ondisk_files(files, datadir)
files, datadir)) self._assertDictContainsSubset(expected, actual)
# check diskfile under the hood # check diskfile under the hood
df = self._get_diskfile(policy, frag_index=frag_index) df = self._get_diskfile(policy, frag_index=frag_index)
self.assertEqual(expected, df._get_ondisk_file(files)) actual = df._get_ondisk_files(files)
self._assertDictContainsSubset(expected, actual)
# check diskfile open # check diskfile open
self.assertRaises(DiskFileNotExist, df.open) self.assertRaises(DiskFileNotExist, df.open)
def test_get_ondisk_files_with_unexpected_file(self):
unexpected_files = ['junk', 'junk.data', '.junk']
timestamp = next(make_timestamp_iter())
tomb_file = timestamp.internal + '.ts'
for policy in POLICIES:
for unexpected in unexpected_files:
files = [unexpected, tomb_file]
df_mgr = self.df_router[policy]
df_mgr.logger = FakeLogger()
datadir = os.path.join('/srv/node/sdb1/',
diskfile.get_data_dir(policy))
results = df_mgr.get_ondisk_files(files, datadir)
expected = {'ts_file': os.path.join(datadir, tomb_file)}
self._assertDictContainsSubset(expected, results)
log_lines = df_mgr.logger.get_lines_for_level('warning')
self.assertTrue(
log_lines[0].startswith(
'Unexpected file %s'
% os.path.join(datadir, unexpected)))
def test_construct_dev_path(self): def test_construct_dev_path(self):
res_path = self.df_mgr.construct_dev_path('abc') res_path = self.df_mgr.construct_dev_path('abc')
self.assertEqual(os.path.join(self.df_mgr.devices, 'abc'), res_path) self.assertEqual(os.path.join(self.df_mgr.devices, 'abc'), res_path)
@@ -1014,14 +1068,59 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
self._test_yield_hashes_cleanup(scenarios, POLICIES[0]) self._test_yield_hashes_cleanup(scenarios, POLICIES[0])
def test_get_ondisk_files_with_stray_meta(self): def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files does not tolerate a stray .meta file # get_ondisk_files ignores a stray .meta file
class_under_test = self._get_diskfile(POLICIES[0]) class_under_test = self._get_diskfile(POLICIES[0])
files = ['0000000007.00000.meta'] files = ['0000000007.00000.meta']
self.assertRaises(AssertionError, with mock.patch('swift.obj.diskfile.os.listdir', lambda *args: files):
class_under_test.manager.get_ondisk_files, files, self.assertRaises(DiskFileNotExist, class_under_test.open)
self.testdir)
def test_verify_ondisk_files(self):
# ._verify_ondisk_files should only return False if get_ondisk_files
# has produced a bad set of files due to a bug, so to test it we need
# to probe it directly.
mgr = self.df_router[POLICIES.default]
ok_scenarios = (
{'ts_file': None, 'data_file': None, 'meta_file': None},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': None},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': 'a_file'},
{'ts_file': 'a_file', 'data_file': None, 'meta_file': None},
)
for scenario in ok_scenarios:
self.assertTrue(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
# construct every possible invalid combination of results
vals = (None, 'a_file')
for ts_file, data_file, meta_file in [
(a, b, c) for a in vals for b in vals for c in vals]:
scenario = {
'ts_file': ts_file,
'data_file': data_file,
'meta_file': meta_file}
if scenario in ok_scenarios:
continue
self.assertFalse(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
def test_parse_on_disk_filename(self):
mgr = self.df_router[POLICIES.default]
for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)):
for ext in ('.meta', '.data', '.ts'):
fname = '%s%s' % (ts.internal, ext)
info = mgr.parse_on_disk_filename(fname)
self.assertEqual(ts, info['timestamp'])
self.assertEqual(ext, info['ext'])
def test_parse_on_disk_filename_errors(self):
mgr = self.df_router[POLICIES.default]
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename('junk')
self.assertEqual("Invalid Timestamp value in filename 'junk'",
str(cm.exception))
def test_hash_cleanup_listdir_reclaim(self): def test_hash_cleanup_listdir_reclaim(self):
# Each scenario specifies a list of (filename, extension, [survives]) # Each scenario specifies a list of (filename, extension, [survives])
@@ -1187,6 +1286,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# data with no durable is ignored # data with no durable is ignored
[('0000000007.00000#0.data', False, True)], [('0000000007.00000#0.data', False, True)],
# data newer than tombstone with no durable is ignored
[('0000000007.00000#0.data', False, True),
('0000000006.00000.ts', '.ts', True)],
# data newer than durable is ignored # data newer than durable is ignored
[('0000000008.00000#1.data', False, True), [('0000000008.00000#1.data', False, True),
('0000000007.00000.durable', '.durable'), ('0000000007.00000.durable', '.durable'),
@@ -1365,7 +1468,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
reclaim_age=1000) reclaim_age=1000)
def test_get_ondisk_files_with_stray_meta(self): def test_get_ondisk_files_with_stray_meta(self):
# get_ondisk_files does not tolerate a stray .meta file # get_ondisk_files ignores a stray .meta file
class_under_test = self._get_diskfile(POLICIES.default) class_under_test = self._get_diskfile(POLICIES.default)
@contextmanager @contextmanager
@@ -1408,6 +1511,41 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
self.fail('expected DiskFileNotExist opening %s with %r' % ( self.fail('expected DiskFileNotExist opening %s with %r' % (
class_under_test.__class__.__name__, files)) class_under_test.__class__.__name__, files))
def test_verify_ondisk_files(self):
# _verify_ondisk_files should only return False if get_ondisk_files
# has produced a bad set of files due to a bug, so to test it we need
# to probe it directly.
mgr = self.df_router[POLICIES.default]
ok_scenarios = (
{'ts_file': None, 'data_file': None, 'meta_file': None,
'durable_frag_set': None},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': None,
'durable_frag_set': ['a_file']},
{'ts_file': None, 'data_file': 'a_file', 'meta_file': 'a_file',
'durable_frag_set': ['a_file']},
{'ts_file': 'a_file', 'data_file': None, 'meta_file': None,
'durable_frag_set': None},
)
for scenario in ok_scenarios:
self.assertTrue(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
# construct every possible invalid combination of results
vals = (None, 'a_file')
for ts_file, data_file, meta_file, durable_frag in [
(a, b, c, d)
for a in vals for b in vals for c in vals for d in vals]:
scenario = {
'ts_file': ts_file,
'data_file': data_file,
'meta_file': meta_file,
'durable_frag_set': [durable_frag] if durable_frag else None}
if scenario in ok_scenarios:
continue
self.assertFalse(mgr._verify_ondisk_files(scenario),
'Unexpected result for scenario %s' % scenario)
def test_parse_on_disk_filename(self): def test_parse_on_disk_filename(self):
mgr = self.df_router[POLICIES.default] mgr = self.df_router[POLICIES.default]
for ts in (Timestamp('1234567890.00001'), for ts in (Timestamp('1234567890.00001'),
@@ -1416,6 +1554,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
fname = '%s#%s.data' % (ts.internal, frag) fname = '%s#%s.data' % (ts.internal, frag)
info = mgr.parse_on_disk_filename(fname) info = mgr.parse_on_disk_filename(fname)
self.assertEqual(ts, info['timestamp']) self.assertEqual(ts, info['timestamp'])
self.assertEqual('.data', info['ext'])
self.assertEqual(frag, info['frag_index']) self.assertEqual(frag, info['frag_index'])
self.assertEqual(mgr.make_on_disk_filename(**info), fname) self.assertEqual(mgr.make_on_disk_filename(**info), fname)
@@ -1423,6 +1562,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
fname = '%s%s' % (ts.internal, ext) fname = '%s%s' % (ts.internal, ext)
info = mgr.parse_on_disk_filename(fname) info = mgr.parse_on_disk_filename(fname)
self.assertEqual(ts, info['timestamp']) self.assertEqual(ts, info['timestamp'])
self.assertEqual(ext, info['ext'])
self.assertEqual(None, info['frag_index']) self.assertEqual(None, info['frag_index'])
self.assertEqual(mgr.make_on_disk_filename(**info), fname) self.assertEqual(mgr.make_on_disk_filename(**info), fname)
@@ -1431,12 +1571,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
for ts in (Timestamp('1234567890.00001'), for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)): Timestamp('1234567890.00001', offset=17)):
fname = '%s.data' % ts.internal fname = '%s.data' % ts.internal
try: with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname) mgr.parse_on_disk_filename(fname)
msg = 'Expected DiskFileError for filename %s' % fname self.assertTrue(str(cm.exception).startswith("Bad fragment index"))
self.fail(msg)
except DiskFileError:
pass
expected = { expected = {
'': 'bad', '': 'bad',
@@ -1451,13 +1588,14 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
for frag, msg in expected.items(): for frag, msg in expected.items():
fname = '%s#%s.data' % (ts.internal, frag) fname = '%s#%s.data' % (ts.internal, frag)
try: with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname) mgr.parse_on_disk_filename(fname)
except DiskFileError as e: self.assertTrue(msg in str(cm.exception).lower())
self.assertTrue(msg in str(e).lower())
else: with self.assertRaises(DiskFileError) as cm:
msg = 'Expected DiskFileError for filename %s' % fname mgr.parse_on_disk_filename('junk')
self.fail(msg) self.assertEqual("Invalid Timestamp value in filename 'junk'",
str(cm.exception))
def test_make_on_disk_filename(self): def test_make_on_disk_filename(self):
mgr = self.df_router[POLICIES.default] mgr = self.df_router[POLICIES.default]
@@ -1524,34 +1662,6 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
actual = mgr.make_on_disk_filename(ts, ext, frag_index=frag) actual = mgr.make_on_disk_filename(ts, ext, frag_index=frag)
self.assertEqual(expected, actual) self.assertEqual(expected, actual)
def test_is_obsolete(self):
mgr = self.df_router[POLICIES.default]
for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)):
for ts2 in (Timestamp('1234567890.99999'),
Timestamp('1234567890.99999', offset=17),
ts):
f_2 = mgr.make_on_disk_filename(ts, '.durable')
for fi in (0, 2):
for ext in ('.data', '.meta', '.durable', '.ts'):
f_1 = mgr.make_on_disk_filename(
ts2, ext, frag_index=fi)
self.assertFalse(mgr.is_obsolete(f_1, f_2),
'%s should not be obsolete w.r.t. %s'
% (f_1, f_2))
for ts2 in (Timestamp('1234567890.00000'),
Timestamp('1234500000.00000', offset=0),
Timestamp('1234500000.00000', offset=17)):
f_2 = mgr.make_on_disk_filename(ts, '.durable')
for fi in (0, 2):
for ext in ('.data', '.meta', '.durable', '.ts'):
f_1 = mgr.make_on_disk_filename(
ts2, ext, frag_index=fi)
self.assertTrue(mgr.is_obsolete(f_1, f_2),
'%s should not be w.r.t. %s'
% (f_1, f_2))
def test_yield_hashes(self): def test_yield_hashes(self):
old_ts = '1383180000.12345' old_ts = '1383180000.12345'
fresh_ts = Timestamp(time() - 10).internal fresh_ts = Timestamp(time() - 10).internal
@@ -1724,6 +1834,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# missing frag index # missing frag index
'9444a92d072897b136b3fc06595b7456': [ '9444a92d072897b136b3fc06595b7456': [
ts1.internal + '.data'], ts1.internal + '.data'],
# junk
'9555a92d072897b136b3fc06595b8456': [ '9555a92d072897b136b3fc06595b8456': [
'junk_file'], 'junk_file'],
# missing .durable # missing .durable
@@ -1733,6 +1844,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# .meta files w/o .data files can't be opened, and are ignored # .meta files w/o .data files can't be opened, and are ignored
'9777a92d072897b136b3fc06595ba456': [ '9777a92d072897b136b3fc06595ba456': [
ts1.internal + '.meta'], ts1.internal + '.meta'],
# multiple meta files with no data
'9888a92d072897b136b3fc06595bb456': [ '9888a92d072897b136b3fc06595bb456': [
ts1.internal + '.meta', ts1.internal + '.meta',
ts2.internal + '.meta'], ts2.internal + '.meta'],
@@ -2259,12 +2371,13 @@ class DiskFileMixin(BaseDiskFileTestMixin):
def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024, def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024,
csize=8, mark_deleted=False, prealloc=False, csize=8, mark_deleted=False, prealloc=False,
ts=None, mount_check=False, extra_metadata=None, ts=None, mount_check=False, extra_metadata=None,
policy=None, frag_index=None): policy=None, frag_index=None, data=None,
commit=True):
'''returns a DiskFile''' '''returns a DiskFile'''
policy = policy or POLICIES.legacy policy = policy or POLICIES.legacy
df = self._simple_get_diskfile(obj=obj_name, policy=policy, df = self._simple_get_diskfile(obj=obj_name, policy=policy,
frag_index=frag_index) frag_index=frag_index)
data = '0' * fsize data = data or '0' * fsize
etag = md5() etag = md5()
if ts: if ts:
timestamp = Timestamp(ts) timestamp = Timestamp(ts)
@@ -2304,7 +2417,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
elif invalid_type == 'Bad-X-Delete-At': elif invalid_type == 'Bad-X-Delete-At':
metadata['X-Delete-At'] = 'bad integer' metadata['X-Delete-At'] = 'bad integer'
diskfile.write_metadata(writer._fd, metadata) diskfile.write_metadata(writer._fd, metadata)
writer.commit(timestamp) if commit:
writer.commit(timestamp)
if mark_deleted: if mark_deleted:
df.delete(timestamp) df.delete(timestamp)
@@ -3181,6 +3295,33 @@ class DiskFileMixin(BaseDiskFileTestMixin):
with self.assertRaises(DiskFileNotOpen): with self.assertRaises(DiskFileNotOpen):
df.data_timestamp df.data_timestamp
def test_durable_timestamp(self):
ts_1 = self.ts()
df = self._get_open_disk_file(ts=ts_1.internal)
with df.open():
self.assertEqual(df.durable_timestamp, ts_1.internal)
# verify durable timestamp does not change when metadata is written
ts_2 = self.ts()
df.write_metadata({'X-Timestamp': ts_2.internal})
with df.open():
self.assertEqual(df.durable_timestamp, ts_1.internal)
def test_durable_timestamp_not_open(self):
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotOpen):
df.durable_timestamp
def test_durable_timestamp_no_data_file(self):
df = self._get_open_disk_file(self.ts().internal)
for f in os.listdir(df._datadir):
if f.endswith('.data'):
os.unlink(os.path.join(df._datadir, f))
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotExist):
df.open()
# open() was attempted, but no data file so expect None
self.assertIsNone(df.durable_timestamp)
def test_error_in_hash_cleanup_listdir(self): def test_error_in_hash_cleanup_listdir(self):
def mock_hcl(*args, **kwargs): def mock_hcl(*args, **kwargs):
@@ -3914,6 +4055,72 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
'a', 'c', 'o', policy=policy) 'a', 'c', 'o', policy=policy)
self.assertRaises(DiskFileNotExist, df.read_metadata) self.assertRaises(DiskFileNotExist, df.read_metadata)
def test_fragments(self):
ts_1 = self.ts()
self._get_open_disk_file(ts=ts_1.internal, frag_index=0)
df = self._get_open_disk_file(ts=ts_1.internal, frag_index=2)
self.assertEqual(df.fragments, {ts_1: [0, 2]})
# now add a newer datafile for frag index 3 but don't write a
# durable with it (so ignore the error when we try to open)
ts_2 = self.ts()
try:
df = self._get_open_disk_file(ts=ts_2.internal, frag_index=3,
commit=False)
except DiskFileNotExist:
pass
# sanity check: should have 2* .data, .durable, .data
files = os.listdir(df._datadir)
self.assertEqual(4, len(files))
with df.open():
self.assertEqual(df.fragments, {ts_1: [0, 2], ts_2: [3]})
# verify frags available even if open fails e.g. if .durable missing
for f in filter(lambda f: f.endswith('.durable'), files):
os.remove(os.path.join(df._datadir, f))
self.assertRaises(DiskFileNotExist, df.open)
self.assertEqual(df.fragments, {ts_1: [0, 2], ts_2: [3]})
def test_fragments_not_open(self):
df = self._simple_get_diskfile()
self.assertIsNone(df.fragments)
def test_durable_timestamp_no_durable_file(self):
try:
self._get_open_disk_file(self.ts().internal, commit=False)
except DiskFileNotExist:
pass
df = self._simple_get_diskfile()
with self.assertRaises(DiskFileNotExist):
df.open()
# open() was attempted, but no durable file so expect None
self.assertIsNone(df.durable_timestamp)
def test_durable_timestamp_missing_frag_index(self):
ts1 = self.ts()
self._get_open_disk_file(ts=ts1.internal, frag_index=1)
df = self._simple_get_diskfile(frag_index=2)
with self.assertRaises(DiskFileNotExist):
df.open()
# open() was attempted, but no data file for frag index so expect None
self.assertIsNone(df.durable_timestamp)
def test_durable_timestamp_newer_non_durable_data_file(self):
ts1 = self.ts()
self._get_open_disk_file(ts=ts1.internal)
ts2 = self.ts()
try:
self._get_open_disk_file(ts=ts2.internal, commit=False)
except DiskFileNotExist:
pass
df = self._simple_get_diskfile()
# sanity check - one .durable file, two .data files
self.assertEqual(3, len(os.listdir(df._datadir)))
df.open()
self.assertEqual(ts1, df.durable_timestamp)
@patch_policies(with_ec_default=True) @patch_policies(with_ec_default=True)
class TestSuffixHashes(unittest.TestCase): class TestSuffixHashes(unittest.TestCase):
@@ -4493,15 +4700,19 @@ class TestSuffixHashes(unittest.TestCase):
filename += '#%s' % df._frag_index filename += '#%s' % df._frag_index
filename += suff filename += suff
open(os.path.join(df._datadir, filename), 'w').close() open(os.path.join(df._datadir, filename), 'w').close()
meta_timestamp = Timestamp(now)
metadata_filename = meta_timestamp.internal + '.meta'
open(os.path.join(df._datadir, metadata_filename), 'w').close()
# call get_hashes and it should clean things up # call get_hashes and it should clean things up
hashes = df_mgr.get_hashes('sda1', '0', [], policy) hashes = df_mgr.get_hashes('sda1', '0', [], policy)
data_filename = timestamp.internal data_filename = timestamp.internal
if policy.policy_type == EC_POLICY: if policy.policy_type == EC_POLICY:
data_filename += '#%s' % df._frag_index data_filename += '#%s' % df._frag_index
data_filename += '.data' data_filename += '.data'
metadata_filename = timestamp.internal + '.meta'
durable_filename = timestamp.internal + '.durable'
if policy.policy_type == EC_POLICY: if policy.policy_type == EC_POLICY:
durable_filename = timestamp.internal + '.durable'
hasher = md5() hasher = md5()
hasher.update(metadata_filename) hasher.update(metadata_filename)
hasher.update(durable_filename) hasher.update(durable_filename)

View File

@@ -1499,15 +1499,16 @@ class TestSender(BaseTest):
'%(body)s\r\n' % expected) '%(body)s\r\n' % expected)
def test_send_post(self): def test_send_post(self):
ts_iter = make_timestamp_iter()
# create .data file # create .data file
extra_metadata = {'X-Object-Meta-Foo': 'old_value', extra_metadata = {'X-Object-Meta-Foo': 'old_value',
'X-Object-Sysmeta-Test': 'test_sysmeta', 'X-Object-Sysmeta-Test': 'test_sysmeta',
'Content-Type': 'test_content_type'} 'Content-Type': 'test_content_type'}
ts_0 = next(make_timestamp_iter()) ts_0 = next(ts_iter)
df = self._make_open_diskfile(extra_metadata=extra_metadata, df = self._make_open_diskfile(extra_metadata=extra_metadata,
timestamp=ts_0) timestamp=ts_0)
# create .meta file # create .meta file
ts_1 = next(make_timestamp_iter()) ts_1 = next(ts_iter)
newer_metadata = {'X-Object-Meta-Foo': 'new_value', newer_metadata = {'X-Object-Meta-Foo': 'new_value',
'X-Timestamp': ts_1.internal} 'X-Timestamp': ts_1.internal}
df.write_metadata(newer_metadata) df.write_metadata(newer_metadata)