diff --git a/swift/common/utils.py b/swift/common/utils.py index d6cc5d7afb..ab80487a02 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -832,6 +832,9 @@ class Timestamp(object): other = Timestamp(other) return cmp(self.internal, other.internal) + def __hash__(self): + return hash(self.internal) + def normalize_timestamp(timestamp): """ diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index f8b7b72a7c..ebb849a9e9 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -460,92 +460,175 @@ class BaseDiskFileManager(object): """ raise NotImplementedError - def _gather_on_disk_file(self, filename, ext, context, frag_index=None, - **kwargs): + def _process_ondisk_files(self, exts, results, **kwargs): """ - Called by gather_ondisk_files() for each file in an object - datadir in reverse sorted order. If a file is considered part of a - valid on-disk file set it will be added to the context dict, keyed by - its extension. If a file is considered to be obsolete it will be added - to a list stored under the key 'obsolete' in the context dict. + Called by get_ondisk_files(). Should be over-ridden to implement + subclass specific handling of files. - :param filename: name of file to be accepted or not - :param ext: extension part of filename - :param context: a context dict that may have been populated by previous - calls to this method - :returns: True if a valid file set has been found, False otherwise + :param exts: dict of lists of file info, keyed by extension + :param results: a dict that may be updated with results """ raise NotImplementedError - def _verify_on_disk_files(self, accepted_files, **kwargs): + def _verify_ondisk_files(self, results, **kwargs): """ Verify that the final combination of on disk files complies with the diskfile contract. - :param accepted_files: files that have been found and accepted + :param results: files that have been found and accepted :returns: True if the file combination is compliant, False otherwise """ - raise NotImplementedError + data_file, meta_file, ts_file = tuple( + [results[key] + for key in ('data_file', 'meta_file', 'ts_file')]) - def gather_ondisk_files(self, files, include_obsolete=False, - verify=False, **kwargs): + return ((data_file is None and meta_file is None and ts_file is None) + or (ts_file is not None and data_file is None + and meta_file is None) + or (data_file is not None and ts_file is None)) + + def _split_list(self, original_list, condition): """ - Given a simple list of files names, iterate over them to determine the - files that constitute a valid object, and optionally determine the - files that are obsolete and could be deleted. Note that some files may - fall into neither category. + Split a list into two lists. The first list contains the first N items + of the original list, in their original order, where 0 < N <= + len(original list). The second list contains the remaining items of the + original list, in their original order. + + The index, N, at which the original list is split is the index of the + first item in the list that does not satisfy the given condition. Note + that the original list should be appropriately sorted if the second + list is to contain no items that satisfy the given condition. + + :param original_list: the list to be split. + :param condition: a single argument function that will be used to test + for the list item to split on. + :return: a tuple of two lists. + """ + for i, item in enumerate(original_list): + if not condition(item): + return original_list[:i], original_list[i:] + return original_list, [] + + def _split_gt_timestamp(self, file_info_list, timestamp): + """ + Given a list of file info dicts, reverse sorted by timestamp, split the + list into two: items newer than timestamp, and items at same time or + older than timestamp. + + :param file_info_list: a list of file_info dicts. + :param timestamp: a Timestamp. + :return: a tuple of two lists. + """ + return self._split_list( + file_info_list, lambda x: x['timestamp'] > timestamp) + + def _split_gte_timestamp(self, file_info_list, timestamp): + """ + Given a list of file info dicts, reverse sorted by timestamp, split the + list into two: items newer than or at same time as the timestamp, and + items older than timestamp. + + :param file_info_list: a list of file_info dicts. + :param timestamp: a Timestamp. + :return: a tuple of two lists. + """ + return self._split_list( + file_info_list, lambda x: x['timestamp'] >= timestamp) + + def get_ondisk_files(self, files, datadir, verify=True, **kwargs): + """ + Given a simple list of files names, determine the files that constitute + a valid fileset i.e. a set of files that defines the state of an + object, and determine the files that are obsolete and could be deleted. + Note that some files may fall into neither category. + + If a file is considered part of a valid fileset then its info dict will + be added to the results dict, keyed by _info. Any files that + are no longer required will have their info dicts added to a list + stored under the key 'obsolete'. + + The results dict will always contain entries with keys 'ts_file', + 'data_file' and 'meta_file'. Their values will be the fully qualified + path to a file of the corresponding type if there is such a file in the + valid fileset, or None. :param files: a list of file names. - :param include_obsolete: By default the iteration will stop when a - valid file set has been found. Setting this - argument to True will cause the iteration to - continue in order to find all obsolete files. + :param datadir: directory name files are from. :param verify: if True verify that the ondisk file contract has not been violated, otherwise do not verify. - :returns: a dict that may contain: valid on disk files keyed by their - filename extension; a list of obsolete files stored under the - key 'obsolete'. + :returns: a dict that will contain keys: + ts_file -> path to a .ts file or None + data_file -> path to a .data file or None + meta_file -> path to a .meta file or None + and may contain keys: + ts_info -> a file info dict for a .ts file + data_info -> a file info dict for a .data file + meta_info -> a file info dict for a .meta file + obsolete -> a list of file info dicts for obsolete files """ - files.sort(reverse=True) - results = {} + # Build the exts data structure: + # exts is a dict that maps file extensions to a list of file_info + # dicts for the files having that extension. The file_info dicts are of + # the form returned by parse_on_disk_filename, with the filename added. + # Each list is sorted in reverse timestamp order. + # + # The exts dict will be modified during subsequent processing as files + # are removed to be discarded or ignored. + exts = defaultdict(list) for afile in files: - ts_file = results.get('.ts') - data_file = results.get('.data') - if not include_obsolete: - assert ts_file is None, "On-disk file search loop" \ - " continuing after tombstone, %s, encountered" % ts_file - assert data_file is None, "On-disk file search loop" \ - " continuing after data file, %s, encountered" % data_file + # Categorize files by extension + try: + file_info = self.parse_on_disk_filename(afile) + file_info['filename'] = afile + exts[file_info['ext']].append(file_info) + except DiskFileError as e: + self.logger.warning('Unexpected file %s: %s' % + (os.path.join(datadir or '', afile), e)) + for ext in exts: + # For each extension sort files into reverse chronological order. + exts[ext] = sorted( + exts[ext], key=lambda info: info['timestamp'], reverse=True) - ext = splitext(afile)[1] - if self._gather_on_disk_file( - afile, ext, results, **kwargs): - if not include_obsolete: - break + # the results dict is used to collect results of file filtering + results = {} + + # non-tombstones older than or equal to latest tombstone are obsolete + if exts.get('.ts'): + for ext in filter(lambda ext: ext != '.ts', exts.keys()): + exts[ext], older = self._split_gt_timestamp( + exts[ext], exts['.ts'][0]['timestamp']) + results.setdefault('obsolete', []).extend(older) + + # all but most recent .meta and .ts are obsolete + for ext in ('.meta', '.ts'): + if ext in exts: + results.setdefault('obsolete', []).extend(exts[ext][1:]) + exts[ext] = exts[ext][:1] + + # delegate to subclass handler + self._process_ondisk_files(exts, results, **kwargs) + + # set final choice of files + if exts.get('.ts'): + results['ts_info'] = exts['.ts'][0] + if 'data_info' in results and exts.get('.meta'): + # only report a meta file if there is a data file + results['meta_info'] = exts['.meta'][0] + + # set ts_file, data_file and meta_file with path to chosen file or None + for info_key in ('data_info', 'meta_info', 'ts_info'): + info = results.get(info_key) + key = info_key[:-5] + '_file' + results[key] = join(datadir, info['filename']) if info else None if verify: - assert self._verify_on_disk_files( + assert self._verify_ondisk_files( results, **kwargs), \ "On-disk file search algorithm contract is broken: %s" \ - % results.values() + % str(results) + return results - def get_ondisk_files(self, files, datadir, **kwargs): - """ - Given a simple list of files names, determine the files to use. - - :param files: simple set of files as a python list - :param datadir: directory name files are from for convenience - :returns: dict of files to use having keys 'data_file', 'ts_file', - 'meta_file' and optionally other policy specific keys - """ - file_info = self.gather_ondisk_files(files, verify=True, **kwargs) - for ext in ('.data', '.meta', '.ts'): - filename = file_info.get(ext) - key = '%s_file' % ext[1:] - file_info[key] = join(datadir, filename) if filename else None - return file_info - def cleanup_ondisk_files(self, hsh_path, reclaim_age=ONE_WEEK, **kwargs): """ Clean up on-disk files that are obsolete and gather the set of valid @@ -560,27 +643,24 @@ class BaseDiskFileManager(object): key 'obsolete'; a list of files remaining in the directory, reverse sorted, stored under the key 'files'. """ - def is_reclaimable(filename): - timestamp = self.parse_on_disk_filename(filename)['timestamp'] + def is_reclaimable(timestamp): return (time.time() - float(timestamp)) > reclaim_age files = listdir(hsh_path) files.sort(reverse=True) - results = self.gather_ondisk_files(files, include_obsolete=True, - **kwargs) - # TODO ref to durables here - if '.durable' in results and not results.get('fragments'): - # a .durable with no .data is deleted as soon as it is found - results.setdefault('obsolete', []).append(results.pop('.durable')) - if '.ts' in results and is_reclaimable(results['.ts']): - results.setdefault('obsolete', []).append(results.pop('.ts')) - for filename in results.get('fragments_without_durable', []): + results = self.get_ondisk_files( + files, hsh_path, verify=False, **kwargs) + if 'ts_info' in results and is_reclaimable( + results['ts_info']['timestamp']): + remove_file(join(hsh_path, results['ts_info']['filename'])) + files.remove(results.pop('ts_info')['filename']) + for file_info in results.get('possible_reclaim', []): # stray fragments are not deleted until reclaim-age - if is_reclaimable(filename): - results.setdefault('obsolete', []).append(filename) - for filename in results.get('obsolete', []): - remove_file(join(hsh_path, filename)) - files.remove(filename) + if is_reclaimable(file_info['timestamp']): + results.setdefault('obsolete', []).append(file_info) + for file_info in results.get('obsolete', []): + remove_file(join(hsh_path, file_info['filename'])) + files.remove(file_info['filename']) results['files'] = files return results @@ -915,9 +995,9 @@ class BaseDiskFileManager(object): (os.path.join(partition_path, suffix), suffix) for suffix in suffixes) key_preference = ( - ('ts_meta', '.meta'), - ('ts_data', '.data'), - ('ts_data', '.ts'), + ('ts_meta', 'meta_info'), + ('ts_data', 'data_info'), + ('ts_data', 'ts_info'), ) for suffix_path, suffix in suffixes: for object_hash in self._listdir(suffix_path): @@ -926,14 +1006,13 @@ class BaseDiskFileManager(object): results = self.cleanup_ondisk_files( object_path, self.reclaim_age, **kwargs) timestamps = {} - for ts_key, ext in key_preference: - if ext not in results: + for ts_key, info_key in key_preference: + if info_key not in results: continue - timestamps[ts_key] = self.parse_on_disk_filename( - results[ext])['timestamp'] + timestamps[ts_key] = results[info_key]['timestamp'] if 'ts_data' not in timestamps: # file sets that do not include a .data or .ts - # file can not be opened and therefore can not + # file cannot be opened and therefore cannot # be ssync'd continue yield (object_path, object_hash, timestamps) @@ -1430,6 +1509,7 @@ class BaseDiskFile(object): self._obj = None self._datadir = None self._tmpdir = join(device_path, get_tmp_dir(policy)) + self._ondisk_info = None self._metadata = None self._datafile_metadata = None self._metafile_metadata = None @@ -1479,6 +1559,26 @@ class BaseDiskFile(object): raise DiskFileNotOpen() return Timestamp(self._datafile_metadata.get('X-Timestamp')) + @property + def durable_timestamp(self): + """ + Provides the timestamp of the newest data file found in the object + directory. + + :return: A Timestamp instance, or None if no data file was found. + :raises DiskFileNotOpen: if the open() method has not been previously + called on this instance. + """ + if self._ondisk_info is None: + raise DiskFileNotOpen() + if self._datafile_metadata: + return Timestamp(self._datafile_metadata.get('X-Timestamp')) + return None + + @property + def fragments(self): + return None + @classmethod def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition, policy): return cls(mgr, device_path, None, partition, _datadir=hash_dir_path, @@ -1524,8 +1624,8 @@ class BaseDiskFile(object): # The data directory does not exist, so the object cannot exist. files = [] - # gather info about the valid files to us to open the DiskFile - file_info = self._get_ondisk_file(files) + # gather info about the valid files to use to open the DiskFile + file_info = self._get_ondisk_files(files) self._data_file = file_info.get('data_file') if not self._data_file: @@ -1579,7 +1679,7 @@ class BaseDiskFile(object): self._logger.increment('quarantines') return DiskFileQuarantined(msg) - def _get_ondisk_file(self, files): + def _get_ondisk_files(self, files): """ Determine the on-disk files to use. @@ -1950,8 +2050,9 @@ class DiskFile(BaseDiskFile): reader_cls = DiskFileReader writer_cls = DiskFileWriter - def _get_ondisk_file(self, files): - return self.manager.get_ondisk_files(files, self._datadir) + def _get_ondisk_files(self, files): + self._ondisk_info = self.manager.get_ondisk_files(files, self._datadir) + return self._ondisk_info @DiskFileRouter.register(REPL_POLICY) @@ -1967,89 +2068,44 @@ class DiskFileManager(BaseDiskFileManager): * timestamp is a :class:`~swift.common.utils.Timestamp` * ext is a string, the file extension including the leading dot or - the empty string if the filename has no extenstion. + the empty string if the filename has no extension. :raises DiskFileError: if any part of the filename is not able to be validated. """ - filename, ext = splitext(filename) + float_part, ext = splitext(filename) + try: + timestamp = Timestamp(float_part) + except ValueError: + raise DiskFileError('Invalid Timestamp value in filename %r' + % filename) return { - 'timestamp': Timestamp(filename), + 'timestamp': timestamp, 'ext': ext, } - def _gather_on_disk_file(self, filename, ext, context, frag_index=None, - **kwargs): + def _process_ondisk_files(self, exts, results, **kwargs): """ - Called by gather_ondisk_files() for each file in an object - datadir in reverse sorted order. If a file is considered part of a - valid on-disk file set it will be added to the context dict, keyed by - its extension. If a file is considered to be obsolete it will be added - to a list stored under the key 'obsolete' in the context dict. + Implement replication policy specific handling of .data files. - :param filename: name of file to be accepted or not - :param ext: extension part of filename - :param context: a context dict that may have been populated by previous - calls to this method - :returns: True if a valid file set has been found, False otherwise + :param exts: dict of lists of file info, keyed by extension + :param results: a dict that may be updated with results """ + if exts.get('.data'): + for ext in exts.keys(): + if ext == '.data': + # older .data's are obsolete + exts[ext], obsolete = self._split_gte_timestamp( + exts[ext], exts['.data'][0]['timestamp']) + else: + # other files at same or older timestamp as most recent + # data are obsolete + exts[ext], obsolete = self._split_gt_timestamp( + exts[ext], exts['.data'][0]['timestamp']) + results.setdefault('obsolete', []).extend(obsolete) - # if first file with given extension then add filename to context - # dict and return True - accept_first = lambda: context.setdefault(ext, filename) == filename - # add the filename to the list of obsolete files in context dict - discard = lambda: context.setdefault('obsolete', []).append(filename) - # set a flag in the context dict indicating that a valid fileset has - # been found - set_valid_fileset = lambda: context.setdefault('found_valid', True) - # return True if the valid fileset flag is set in the context dict - have_valid_fileset = lambda: context.get('found_valid') - - if ext == '.data': - if have_valid_fileset(): - # valid fileset means we must have a newer - # .data or .ts, so discard the older .data file - discard() - else: - accept_first() - set_valid_fileset() - elif ext == '.ts': - if have_valid_fileset() or not accept_first(): - # newer .data or .ts already found so discard this - discard() - if not have_valid_fileset(): - # remove any .meta that may have been previously found - context.pop('.meta', None) - set_valid_fileset() - elif ext == '.meta': - if have_valid_fileset() or not accept_first(): - # newer .data, .durable or .ts already found so discard this - discard() - else: - # ignore unexpected files - pass - return have_valid_fileset() - - def _verify_on_disk_files(self, accepted_files, **kwargs): - """ - Verify that the final combination of on disk files complies with the - replicated diskfile contract. - - :param accepted_files: files that have been found and accepted - :returns: True if the file combination is compliant, False otherwise - """ - # mimic legacy behavior - .meta is ignored when .ts is found - if accepted_files.get('.ts'): - accepted_files.pop('.meta', None) - - data_file, meta_file, ts_file, durable_file = tuple( - [accepted_files.get(ext) - for ext in ('.data', '.meta', '.ts', '.durable')]) - - return ((data_file is None and meta_file is None and ts_file is None) - or (ts_file is not None and data_file is None - and meta_file is None) - or (data_file is not None and ts_file is None)) + # set results + results['data_info'] = exts['.data'][0] def _hash_suffix(self, path, reclaim_age): """ @@ -2153,14 +2209,47 @@ class ECDiskFile(BaseDiskFile): if frag_index is not None: self._frag_index = self.manager.validate_fragment_index(frag_index) - def _get_ondisk_file(self, files): + @property + def durable_timestamp(self): + """ + Provides the timestamp of the newest durable file found in the object + directory. + + :return: A Timestamp instance, or None if no durable file was found. + :raises DiskFileNotOpen: if the open() method has not been previously + called on this instance. + """ + if self._ondisk_info is None: + raise DiskFileNotOpen() + if self._ondisk_info.get('durable_frag_set'): + return self._ondisk_info['durable_frag_set'][0]['timestamp'] + return None + + @property + def fragments(self): + """ + Provides information about all fragments that were found in the object + directory, including fragments without a matching durable file, and + including any fragment chosen to construct the opened diskfile. + + :return: A dict mapping -> , + or None if the diskfile has not been opened or no fragments + were found. + """ + if self._ondisk_info: + frag_sets = self._ondisk_info['frag_sets'] + return dict([(ts, [info['frag_index'] for info in frag_set]) + for ts, frag_set in frag_sets.items()]) + + def _get_ondisk_files(self, files): """ The only difference between this method and the replication policy DiskFile method is passing in the frag_index kwarg to our manager's get_ondisk_files method. """ - return self.manager.get_ondisk_files( + self._ondisk_info = self.manager.get_ondisk_files( files, self._datadir, frag_index=self._frag_index) + return self._ondisk_info def purge(self, timestamp, frag_index): """ @@ -2254,9 +2343,13 @@ class ECDiskFileManager(BaseDiskFileManager): validated. """ frag_index = None - filename, ext = splitext(filename) - parts = filename.split('#', 1) - timestamp = parts[0] + float_frag, ext = splitext(filename) + parts = float_frag.split('#', 1) + try: + timestamp = Timestamp(parts[0]) + except ValueError: + raise DiskFileError('Invalid Timestamp value in filename %r' + % filename) if ext == '.data': # it is an error for an EC data file to not have a valid # fragment index @@ -2267,137 +2360,94 @@ class ECDiskFileManager(BaseDiskFileManager): pass frag_index = self.validate_fragment_index(frag_index) return { - 'timestamp': Timestamp(timestamp), + 'timestamp': timestamp, 'frag_index': frag_index, 'ext': ext, } - def is_obsolete(self, filename, other_filename): + def _process_ondisk_files(self, exts, results, frag_index=None, **kwargs): """ - Test if a given file is considered to be obsolete with respect to - another file in an object storage dir. + Implement EC policy specific handling of .data and .durable files. - Implements EC policy specific behavior when comparing files against a - .durable file. - - A simple string comparison would consider t2#1.data to be older than - t2.durable (since t2#1.data < t2.durable). By stripping off the file - extensions we get the desired behavior: t2#1 > t2 without compromising - the detection of t1#1 < t2. - - :param filename: a string representing an absolute filename - :param other_filename: a string representing an absolute filename - :returns: True if filename is considered obsolete, False otherwise. - """ - if other_filename.endswith('.durable'): - return splitext(filename)[0] < splitext(other_filename)[0] - return filename < other_filename - - def _gather_on_disk_file(self, filename, ext, context, frag_index=None, - **kwargs): - """ - Called by gather_ondisk_files() for each file in an object - datadir in reverse sorted order. If a file is considered part of a - valid on-disk file set it will be added to the context dict, keyed by - its extension. If a file is considered to be obsolete it will be added - to a list stored under the key 'obsolete' in the context dict. - - :param filename: name of file to be accepted or not - :param ext: extension part of filename - :param context: a context dict that may have been populated by previous - calls to this method + :param exts: dict of lists of file info, keyed by extension + :param results: a dict that may be updated with results :param frag_index: if set, search for a specific fragment index .data file, otherwise accept the first valid .data file. - :returns: True if a valid file set has been found, False otherwise """ + durable_info = None + if exts.get('.durable'): + durable_info = exts['.durable'][0] + # Mark everything older than most recent .durable as obsolete + # and remove from the exts dict. + for ext in exts.keys(): + exts[ext], older = self._split_gte_timestamp( + exts[ext], durable_info['timestamp']) + results.setdefault('obsolete', []).extend(older) - # if first file with given extension then add filename to context - # dict and return True - accept_first = lambda: context.setdefault(ext, filename) == filename - # add the filename to the list of obsolete files in context dict - discard = lambda: context.setdefault('obsolete', []).append(filename) - # set a flag in the context dict indicating that a valid fileset has - # been found - set_valid_fileset = lambda: context.setdefault('found_valid', True) - # return True if the valid fileset flag is set in the context dict - have_valid_fileset = lambda: context.get('found_valid') + # Split the list of .data files into sets of frags having the same + # timestamp, identifying the durable and newest sets (if any) as we go. + # To do this we can take advantage of the list of .data files being + # reverse-time ordered. Keep the resulting per-timestamp frag sets in + # a frag_sets dict mapping a Timestamp instance -> frag_set. + all_frags = exts.get('.data') + frag_sets = {} + durable_frag_set = None + while all_frags: + frag_set, all_frags = self._split_gte_timestamp( + all_frags, all_frags[0]['timestamp']) + # sort the frag set into ascending frag_index order + frag_set.sort(key=lambda info: info['frag_index']) + timestamp = frag_set[0]['timestamp'] + frag_sets[timestamp] = frag_set + if durable_info and durable_info['timestamp'] == timestamp: + durable_frag_set = frag_set - if context.get('.durable'): - # a .durable file has been found - if ext == '.data': - if self.is_obsolete(filename, context.get('.durable')): - # this and remaining data files are older than durable - discard() - set_valid_fileset() - else: - # accept the first .data file if it matches requested - # frag_index, or if no specific frag_index is requested - fi = self.parse_on_disk_filename(filename)['frag_index'] - if frag_index is None or frag_index == int(fi): - accept_first() - set_valid_fileset() - # else: keep searching for a .data file to match frag_index - context.setdefault('fragments', []).append(filename) + # Select a single chosen frag from the chosen frag_set, by either + # matching against a specified frag_index or taking the highest index. + chosen_frag = None + if durable_frag_set: + if frag_index is not None: + # search the frag set to find the exact frag_index + for info in durable_frag_set: + if info['frag_index'] == frag_index: + chosen_frag = info + break else: - # there can no longer be a matching .data file so mark what has - # been found so far as the valid fileset - discard() - set_valid_fileset() - elif ext == '.data': - # not yet found a .durable - if have_valid_fileset(): - # valid fileset means we must have a newer - # .ts, so discard the older .data file - discard() - else: - # .data newer than a .durable or .ts, don't discard yet - context.setdefault('fragments_without_durable', []).append( - filename) - elif ext == '.ts': - if have_valid_fileset() or not accept_first(): - # newer .data, .durable or .ts already found so discard this - discard() - if not have_valid_fileset(): - # remove any .meta that may have been previously found - context.pop('.meta', None) - set_valid_fileset() - elif ext in ('.meta', '.durable'): - if have_valid_fileset() or not accept_first(): - # newer .data, .durable or .ts already found so discard this - discard() - else: - # ignore unexpected files - pass - return have_valid_fileset() + chosen_frag = durable_frag_set[-1] - def _verify_on_disk_files(self, accepted_files, frag_index=None, **kwargs): + # If we successfully found a frag then set results + if chosen_frag: + results['data_info'] = chosen_frag + results['durable_frag_set'] = durable_frag_set + results['frag_sets'] = frag_sets + + # Mark any isolated .durable as obsolete + if exts.get('.durable') and not durable_frag_set: + results.setdefault('obsolete', []).extend(exts['.durable']) + exts.pop('.durable') + + # Fragments *may* be ready for reclaim, unless they are durable or + # at the timestamp we have just chosen for constructing the diskfile. + for frag_set in frag_sets.values(): + if frag_set == durable_frag_set: + continue + results.setdefault('possible_reclaim', []).extend(frag_set) + + def _verify_ondisk_files(self, results, frag_index=None, **kwargs): """ Verify that the final combination of on disk files complies with the erasure-coded diskfile contract. - :param accepted_files: files that have been found and accepted + :param results: files that have been found and accepted :param frag_index: specifies a specific fragment index .data file :returns: True if the file combination is compliant, False otherwise """ - if not accepted_files.get('.data'): - # We may find only a .meta, which doesn't mean the on disk - # contract is broken. So we clear it to comply with - # superclass assertions. - accepted_files.pop('.meta', None) - - data_file, meta_file, ts_file, durable_file = tuple( - [accepted_files.get(ext) - for ext in ('.data', '.meta', '.ts', '.durable')]) - - return ((data_file is None or durable_file is not None) - and (data_file is None and meta_file is None - and ts_file is None and durable_file is None) - or (ts_file is not None and data_file is None - and meta_file is None and durable_file is None) - or (data_file is not None and durable_file is not None - and ts_file is None) - or (durable_file is not None and meta_file is None - and ts_file is None)) + if super(ECDiskFileManager, self)._verify_ondisk_files( + results, **kwargs): + have_data_file = results['data_file'] is not None + have_durable = results.get('durable_frag_set') is not None + return have_data_file == have_durable + return False def _hash_suffix(self, path, reclaim_age): """ @@ -2412,12 +2462,12 @@ class ECDiskFileManager(BaseDiskFileManager): # here we flatten out the hashers hexdigest into a dictionary instead # of just returning the one hexdigest for the whole suffix def mapper(filename): - info = self.parse_on_disk_filename(filename) - fi = info['frag_index'] - if fi is None: - return None, filename - else: - return fi, info['timestamp'].internal + info = self.parse_on_disk_filename(filename) + fi = info['frag_index'] + if fi is None: + return None, filename + else: + return fi, info['timestamp'].internal hash_per_fi = self._hash_suffix_dir(path, mapper, reclaim_age) return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items()) diff --git a/swift/obj/mem_diskfile.py b/swift/obj/mem_diskfile.py index 277a9f1faf..fe1dc5e496 100644 --- a/swift/obj/mem_diskfile.py +++ b/swift/obj/mem_diskfile.py @@ -254,6 +254,7 @@ class DiskFile(object): self._metadata = None self._fp = None self._filesystem = fs + self.fragments = None def open(self): """ @@ -421,3 +422,5 @@ class DiskFile(object): return Timestamp(self._metadata.get('X-Timestamp')) data_timestamp = timestamp + + durable_timestamp = timestamp diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 1de31aa438..a336e78b60 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -779,6 +779,15 @@ class TestTimestamp(unittest.TestCase): self.assertEqual( sorted([t.internal for t in timestamps]), expected) + def test_hashable(self): + ts_0 = utils.Timestamp('1402444821.72589') + ts_0_also = utils.Timestamp('1402444821.72589') + self.assertEqual(ts_0, ts_0_also) # sanity + self.assertEqual(hash(ts_0), hash(ts_0_also)) + d = {ts_0: 'whatever'} + self.assertIn(ts_0, d) # sanity + self.assertIn(ts_0_also, d) + class TestUtils(unittest.TestCase): """Tests for swift.common.utils """ diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 991f38a496..534882bec3 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -20,6 +20,7 @@ import six.moves.cPickle as pickle import os import errno import itertools +from unittest.util import safe_repr import mock import unittest import email @@ -462,6 +463,35 @@ class BaseDiskFileTestMixin(object): return '.'.join([ mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name]) + def _assertDictContainsSubset(self, subset, dictionary, msg=None): + """Checks whether dictionary is a superset of subset.""" + # This is almost identical to the method in python3.4 version of + # unitest.case.TestCase.assertDictContainsSubset, reproduced here to + # avoid the deprecation warning in the original when using python3. + missing = [] + mismatched = [] + for key, value in subset.items(): + if key not in dictionary: + missing.append(key) + elif value != dictionary[key]: + mismatched.append('%s, expected: %s, actual: %s' % + (safe_repr(key), safe_repr(value), + safe_repr(dictionary[key]))) + + if not (missing or mismatched): + return + + standardMsg = '' + if missing: + standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in + missing) + if mismatched: + if standardMsg: + standardMsg += '; ' + standardMsg += 'Mismatched values: %s' % ','.join(mismatched) + + self.fail(self._formatMessage(msg, standardMsg)) + class DiskFileManagerMixin(BaseDiskFileTestMixin): """ @@ -516,8 +546,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): for _order in ('ordered', 'shuffled', 'shuffled'): class_under_test = self._get_diskfile(policy, frag_index) try: - actual = class_under_test._get_ondisk_file(files) - self.assertDictContainsSubset( + actual = class_under_test._get_ondisk_files(files) + self._assertDictContainsSubset( expected, actual, 'Expected %s from %s but got %s' % (expected, files, actual)) @@ -593,14 +623,38 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): df_mgr = self.df_router[policy] datadir = os.path.join('/srv/node/sdb1/', diskfile.get_data_dir(policy)) - self.assertEqual(expected, df_mgr.get_ondisk_files( - files, datadir)) + actual = df_mgr.get_ondisk_files(files, datadir) + self._assertDictContainsSubset(expected, actual) # check diskfile under the hood df = self._get_diskfile(policy, frag_index=frag_index) - self.assertEqual(expected, df._get_ondisk_file(files)) + actual = df._get_ondisk_files(files) + self._assertDictContainsSubset(expected, actual) # check diskfile open self.assertRaises(DiskFileNotExist, df.open) + def test_get_ondisk_files_with_unexpected_file(self): + unexpected_files = ['junk', 'junk.data', '.junk'] + timestamp = next(make_timestamp_iter()) + tomb_file = timestamp.internal + '.ts' + for policy in POLICIES: + for unexpected in unexpected_files: + files = [unexpected, tomb_file] + df_mgr = self.df_router[policy] + df_mgr.logger = FakeLogger() + datadir = os.path.join('/srv/node/sdb1/', + diskfile.get_data_dir(policy)) + + results = df_mgr.get_ondisk_files(files, datadir) + + expected = {'ts_file': os.path.join(datadir, tomb_file)} + self._assertDictContainsSubset(expected, results) + + log_lines = df_mgr.logger.get_lines_for_level('warning') + self.assertTrue( + log_lines[0].startswith( + 'Unexpected file %s' + % os.path.join(datadir, unexpected))) + def test_construct_dev_path(self): res_path = self.df_mgr.construct_dev_path('abc') self.assertEqual(os.path.join(self.df_mgr.devices, 'abc'), res_path) @@ -1014,14 +1068,59 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): self._test_yield_hashes_cleanup(scenarios, POLICIES[0]) def test_get_ondisk_files_with_stray_meta(self): - # get_ondisk_files does not tolerate a stray .meta file + # get_ondisk_files ignores a stray .meta file class_under_test = self._get_diskfile(POLICIES[0]) files = ['0000000007.00000.meta'] - self.assertRaises(AssertionError, - class_under_test.manager.get_ondisk_files, files, - self.testdir) + with mock.patch('swift.obj.diskfile.os.listdir', lambda *args: files): + self.assertRaises(DiskFileNotExist, class_under_test.open) + + def test_verify_ondisk_files(self): + # ._verify_ondisk_files should only return False if get_ondisk_files + # has produced a bad set of files due to a bug, so to test it we need + # to probe it directly. + mgr = self.df_router[POLICIES.default] + ok_scenarios = ( + {'ts_file': None, 'data_file': None, 'meta_file': None}, + {'ts_file': None, 'data_file': 'a_file', 'meta_file': None}, + {'ts_file': None, 'data_file': 'a_file', 'meta_file': 'a_file'}, + {'ts_file': 'a_file', 'data_file': None, 'meta_file': None}, + ) + + for scenario in ok_scenarios: + self.assertTrue(mgr._verify_ondisk_files(scenario), + 'Unexpected result for scenario %s' % scenario) + + # construct every possible invalid combination of results + vals = (None, 'a_file') + for ts_file, data_file, meta_file in [ + (a, b, c) for a in vals for b in vals for c in vals]: + scenario = { + 'ts_file': ts_file, + 'data_file': data_file, + 'meta_file': meta_file} + if scenario in ok_scenarios: + continue + self.assertFalse(mgr._verify_ondisk_files(scenario), + 'Unexpected result for scenario %s' % scenario) + + def test_parse_on_disk_filename(self): + mgr = self.df_router[POLICIES.default] + for ts in (Timestamp('1234567890.00001'), + Timestamp('1234567890.00001', offset=17)): + for ext in ('.meta', '.data', '.ts'): + fname = '%s%s' % (ts.internal, ext) + info = mgr.parse_on_disk_filename(fname) + self.assertEqual(ts, info['timestamp']) + self.assertEqual(ext, info['ext']) + + def test_parse_on_disk_filename_errors(self): + mgr = self.df_router[POLICIES.default] + with self.assertRaises(DiskFileError) as cm: + mgr.parse_on_disk_filename('junk') + self.assertEqual("Invalid Timestamp value in filename 'junk'", + str(cm.exception)) def test_hash_cleanup_listdir_reclaim(self): # Each scenario specifies a list of (filename, extension, [survives]) @@ -1187,6 +1286,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): # data with no durable is ignored [('0000000007.00000#0.data', False, True)], + # data newer than tombstone with no durable is ignored + [('0000000007.00000#0.data', False, True), + ('0000000006.00000.ts', '.ts', True)], + # data newer than durable is ignored [('0000000008.00000#1.data', False, True), ('0000000007.00000.durable', '.durable'), @@ -1365,7 +1468,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): reclaim_age=1000) def test_get_ondisk_files_with_stray_meta(self): - # get_ondisk_files does not tolerate a stray .meta file + # get_ondisk_files ignores a stray .meta file class_under_test = self._get_diskfile(POLICIES.default) @contextmanager @@ -1408,6 +1511,41 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): self.fail('expected DiskFileNotExist opening %s with %r' % ( class_under_test.__class__.__name__, files)) + def test_verify_ondisk_files(self): + # _verify_ondisk_files should only return False if get_ondisk_files + # has produced a bad set of files due to a bug, so to test it we need + # to probe it directly. + mgr = self.df_router[POLICIES.default] + ok_scenarios = ( + {'ts_file': None, 'data_file': None, 'meta_file': None, + 'durable_frag_set': None}, + {'ts_file': None, 'data_file': 'a_file', 'meta_file': None, + 'durable_frag_set': ['a_file']}, + {'ts_file': None, 'data_file': 'a_file', 'meta_file': 'a_file', + 'durable_frag_set': ['a_file']}, + {'ts_file': 'a_file', 'data_file': None, 'meta_file': None, + 'durable_frag_set': None}, + ) + + for scenario in ok_scenarios: + self.assertTrue(mgr._verify_ondisk_files(scenario), + 'Unexpected result for scenario %s' % scenario) + + # construct every possible invalid combination of results + vals = (None, 'a_file') + for ts_file, data_file, meta_file, durable_frag in [ + (a, b, c, d) + for a in vals for b in vals for c in vals for d in vals]: + scenario = { + 'ts_file': ts_file, + 'data_file': data_file, + 'meta_file': meta_file, + 'durable_frag_set': [durable_frag] if durable_frag else None} + if scenario in ok_scenarios: + continue + self.assertFalse(mgr._verify_ondisk_files(scenario), + 'Unexpected result for scenario %s' % scenario) + def test_parse_on_disk_filename(self): mgr = self.df_router[POLICIES.default] for ts in (Timestamp('1234567890.00001'), @@ -1416,6 +1554,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): fname = '%s#%s.data' % (ts.internal, frag) info = mgr.parse_on_disk_filename(fname) self.assertEqual(ts, info['timestamp']) + self.assertEqual('.data', info['ext']) self.assertEqual(frag, info['frag_index']) self.assertEqual(mgr.make_on_disk_filename(**info), fname) @@ -1423,6 +1562,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): fname = '%s%s' % (ts.internal, ext) info = mgr.parse_on_disk_filename(fname) self.assertEqual(ts, info['timestamp']) + self.assertEqual(ext, info['ext']) self.assertEqual(None, info['frag_index']) self.assertEqual(mgr.make_on_disk_filename(**info), fname) @@ -1431,12 +1571,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): for ts in (Timestamp('1234567890.00001'), Timestamp('1234567890.00001', offset=17)): fname = '%s.data' % ts.internal - try: + with self.assertRaises(DiskFileError) as cm: mgr.parse_on_disk_filename(fname) - msg = 'Expected DiskFileError for filename %s' % fname - self.fail(msg) - except DiskFileError: - pass + self.assertTrue(str(cm.exception).startswith("Bad fragment index")) expected = { '': 'bad', @@ -1451,13 +1588,14 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): for frag, msg in expected.items(): fname = '%s#%s.data' % (ts.internal, frag) - try: + with self.assertRaises(DiskFileError) as cm: mgr.parse_on_disk_filename(fname) - except DiskFileError as e: - self.assertTrue(msg in str(e).lower()) - else: - msg = 'Expected DiskFileError for filename %s' % fname - self.fail(msg) + self.assertTrue(msg in str(cm.exception).lower()) + + with self.assertRaises(DiskFileError) as cm: + mgr.parse_on_disk_filename('junk') + self.assertEqual("Invalid Timestamp value in filename 'junk'", + str(cm.exception)) def test_make_on_disk_filename(self): mgr = self.df_router[POLICIES.default] @@ -1524,34 +1662,6 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): actual = mgr.make_on_disk_filename(ts, ext, frag_index=frag) self.assertEqual(expected, actual) - def test_is_obsolete(self): - mgr = self.df_router[POLICIES.default] - for ts in (Timestamp('1234567890.00001'), - Timestamp('1234567890.00001', offset=17)): - for ts2 in (Timestamp('1234567890.99999'), - Timestamp('1234567890.99999', offset=17), - ts): - f_2 = mgr.make_on_disk_filename(ts, '.durable') - for fi in (0, 2): - for ext in ('.data', '.meta', '.durable', '.ts'): - f_1 = mgr.make_on_disk_filename( - ts2, ext, frag_index=fi) - self.assertFalse(mgr.is_obsolete(f_1, f_2), - '%s should not be obsolete w.r.t. %s' - % (f_1, f_2)) - - for ts2 in (Timestamp('1234567890.00000'), - Timestamp('1234500000.00000', offset=0), - Timestamp('1234500000.00000', offset=17)): - f_2 = mgr.make_on_disk_filename(ts, '.durable') - for fi in (0, 2): - for ext in ('.data', '.meta', '.durable', '.ts'): - f_1 = mgr.make_on_disk_filename( - ts2, ext, frag_index=fi) - self.assertTrue(mgr.is_obsolete(f_1, f_2), - '%s should not be w.r.t. %s' - % (f_1, f_2)) - def test_yield_hashes(self): old_ts = '1383180000.12345' fresh_ts = Timestamp(time() - 10).internal @@ -1724,6 +1834,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): # missing frag index '9444a92d072897b136b3fc06595b7456': [ ts1.internal + '.data'], + # junk '9555a92d072897b136b3fc06595b8456': [ 'junk_file'], # missing .durable @@ -1733,6 +1844,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): # .meta files w/o .data files can't be opened, and are ignored '9777a92d072897b136b3fc06595ba456': [ ts1.internal + '.meta'], + # multiple meta files with no data '9888a92d072897b136b3fc06595bb456': [ ts1.internal + '.meta', ts2.internal + '.meta'], @@ -2259,12 +2371,13 @@ class DiskFileMixin(BaseDiskFileTestMixin): def _get_open_disk_file(self, invalid_type=None, obj_name='o', fsize=1024, csize=8, mark_deleted=False, prealloc=False, ts=None, mount_check=False, extra_metadata=None, - policy=None, frag_index=None): + policy=None, frag_index=None, data=None, + commit=True): '''returns a DiskFile''' policy = policy or POLICIES.legacy df = self._simple_get_diskfile(obj=obj_name, policy=policy, frag_index=frag_index) - data = '0' * fsize + data = data or '0' * fsize etag = md5() if ts: timestamp = Timestamp(ts) @@ -2304,7 +2417,8 @@ class DiskFileMixin(BaseDiskFileTestMixin): elif invalid_type == 'Bad-X-Delete-At': metadata['X-Delete-At'] = 'bad integer' diskfile.write_metadata(writer._fd, metadata) - writer.commit(timestamp) + if commit: + writer.commit(timestamp) if mark_deleted: df.delete(timestamp) @@ -3181,6 +3295,33 @@ class DiskFileMixin(BaseDiskFileTestMixin): with self.assertRaises(DiskFileNotOpen): df.data_timestamp + def test_durable_timestamp(self): + ts_1 = self.ts() + df = self._get_open_disk_file(ts=ts_1.internal) + with df.open(): + self.assertEqual(df.durable_timestamp, ts_1.internal) + # verify durable timestamp does not change when metadata is written + ts_2 = self.ts() + df.write_metadata({'X-Timestamp': ts_2.internal}) + with df.open(): + self.assertEqual(df.durable_timestamp, ts_1.internal) + + def test_durable_timestamp_not_open(self): + df = self._simple_get_diskfile() + with self.assertRaises(DiskFileNotOpen): + df.durable_timestamp + + def test_durable_timestamp_no_data_file(self): + df = self._get_open_disk_file(self.ts().internal) + for f in os.listdir(df._datadir): + if f.endswith('.data'): + os.unlink(os.path.join(df._datadir, f)) + df = self._simple_get_diskfile() + with self.assertRaises(DiskFileNotExist): + df.open() + # open() was attempted, but no data file so expect None + self.assertIsNone(df.durable_timestamp) + def test_error_in_hash_cleanup_listdir(self): def mock_hcl(*args, **kwargs): @@ -3914,6 +4055,72 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase): 'a', 'c', 'o', policy=policy) self.assertRaises(DiskFileNotExist, df.read_metadata) + def test_fragments(self): + ts_1 = self.ts() + self._get_open_disk_file(ts=ts_1.internal, frag_index=0) + df = self._get_open_disk_file(ts=ts_1.internal, frag_index=2) + self.assertEqual(df.fragments, {ts_1: [0, 2]}) + + # now add a newer datafile for frag index 3 but don't write a + # durable with it (so ignore the error when we try to open) + ts_2 = self.ts() + try: + df = self._get_open_disk_file(ts=ts_2.internal, frag_index=3, + commit=False) + except DiskFileNotExist: + pass + + # sanity check: should have 2* .data, .durable, .data + files = os.listdir(df._datadir) + self.assertEqual(4, len(files)) + with df.open(): + self.assertEqual(df.fragments, {ts_1: [0, 2], ts_2: [3]}) + + # verify frags available even if open fails e.g. if .durable missing + for f in filter(lambda f: f.endswith('.durable'), files): + os.remove(os.path.join(df._datadir, f)) + + self.assertRaises(DiskFileNotExist, df.open) + self.assertEqual(df.fragments, {ts_1: [0, 2], ts_2: [3]}) + + def test_fragments_not_open(self): + df = self._simple_get_diskfile() + self.assertIsNone(df.fragments) + + def test_durable_timestamp_no_durable_file(self): + try: + self._get_open_disk_file(self.ts().internal, commit=False) + except DiskFileNotExist: + pass + df = self._simple_get_diskfile() + with self.assertRaises(DiskFileNotExist): + df.open() + # open() was attempted, but no durable file so expect None + self.assertIsNone(df.durable_timestamp) + + def test_durable_timestamp_missing_frag_index(self): + ts1 = self.ts() + self._get_open_disk_file(ts=ts1.internal, frag_index=1) + df = self._simple_get_diskfile(frag_index=2) + with self.assertRaises(DiskFileNotExist): + df.open() + # open() was attempted, but no data file for frag index so expect None + self.assertIsNone(df.durable_timestamp) + + def test_durable_timestamp_newer_non_durable_data_file(self): + ts1 = self.ts() + self._get_open_disk_file(ts=ts1.internal) + ts2 = self.ts() + try: + self._get_open_disk_file(ts=ts2.internal, commit=False) + except DiskFileNotExist: + pass + df = self._simple_get_diskfile() + # sanity check - one .durable file, two .data files + self.assertEqual(3, len(os.listdir(df._datadir))) + df.open() + self.assertEqual(ts1, df.durable_timestamp) + @patch_policies(with_ec_default=True) class TestSuffixHashes(unittest.TestCase): @@ -4493,15 +4700,19 @@ class TestSuffixHashes(unittest.TestCase): filename += '#%s' % df._frag_index filename += suff open(os.path.join(df._datadir, filename), 'w').close() + meta_timestamp = Timestamp(now) + metadata_filename = meta_timestamp.internal + '.meta' + open(os.path.join(df._datadir, metadata_filename), 'w').close() + # call get_hashes and it should clean things up hashes = df_mgr.get_hashes('sda1', '0', [], policy) + data_filename = timestamp.internal if policy.policy_type == EC_POLICY: data_filename += '#%s' % df._frag_index data_filename += '.data' - metadata_filename = timestamp.internal + '.meta' - durable_filename = timestamp.internal + '.durable' if policy.policy_type == EC_POLICY: + durable_filename = timestamp.internal + '.durable' hasher = md5() hasher.update(metadata_filename) hasher.update(durable_filename) diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 60c42855b9..b7286527fd 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -1499,15 +1499,16 @@ class TestSender(BaseTest): '%(body)s\r\n' % expected) def test_send_post(self): + ts_iter = make_timestamp_iter() # create .data file extra_metadata = {'X-Object-Meta-Foo': 'old_value', 'X-Object-Sysmeta-Test': 'test_sysmeta', 'Content-Type': 'test_content_type'} - ts_0 = next(make_timestamp_iter()) + ts_0 = next(ts_iter) df = self._make_open_diskfile(extra_metadata=extra_metadata, timestamp=ts_0) # create .meta file - ts_1 = next(make_timestamp_iter()) + ts_1 = next(ts_iter) newer_metadata = {'X-Object-Meta-Foo': 'new_value', 'X-Timestamp': ts_1.internal} df.write_metadata(newer_metadata)