diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 1778474e9d..802193f546 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1488,21 +1488,22 @@ class BaseDiskFileManager(object): self, audit_location.path, dev_path, audit_location.partition, policy=audit_location.policy) - def get_diskfile_from_hash(self, device, partition, object_hash, - policy, **kwargs): + def get_diskfile_and_filenames_from_hash(self, device, partition, + object_hash, policy, **kwargs): """ - Returns a DiskFile instance for an object at the given - object_hash. Just in case someone thinks of refactoring, be - sure DiskFileDeleted is *not* raised, but the DiskFile - instance representing the tombstoned object is returned - instead. + Returns a tuple of (a DiskFile instance for an object at the given + object_hash, the basenames of the files in the object's hash dir). + Just in case someone thinks of refactoring, be sure DiskFileDeleted is + *not* raised, but the DiskFile instance representing the tombstoned + object is returned instead. :param device: name of target device :param partition: partition on the device in which the object lives :param object_hash: the hash of an object path :param policy: the StoragePolicy instance :raises DiskFileNotExist: if the object does not exist - :returns: an instance of BaseDiskFile + :returns: a tuple comprising (an instance of BaseDiskFile, a list of + file basenames) """ dev_path = self.get_dev_path(device) if not dev_path: @@ -1541,9 +1542,27 @@ class BaseDiskFileManager(object): metadata.get('name', ''), 3, 3, True) except ValueError: raise DiskFileNotExist() - return self.diskfile_cls(self, dev_path, - partition, account, container, obj, - policy=policy, **kwargs) + df = self.diskfile_cls(self, dev_path, partition, account, container, + obj, policy=policy, **kwargs) + return df, filenames + + def get_diskfile_from_hash(self, device, partition, object_hash, policy, + **kwargs): + """ + Returns a DiskFile instance for an object at the given object_hash. + Just in case someone thinks of refactoring, be sure DiskFileDeleted is + *not* raised, but the DiskFile instance representing the tombstoned + object is returned instead. + + :param device: name of target device + :param partition: partition on the device in which the object lives + :param object_hash: the hash of an object path + :param policy: the StoragePolicy instance + :raises DiskFileNotExist: if the object does not exist + :returns: an instance of BaseDiskFile + """ + return self.get_diskfile_and_filenames_from_hash( + device, partition, object_hash, policy, **kwargs)[0] def get_hashes(self, device, partition, suffixes, policy, skip_rehash=False): @@ -3325,7 +3344,8 @@ class ECDiskFile(BaseDiskFile): frag_prefs=self._frag_prefs, policy=policy) return self._ondisk_info - def purge(self, timestamp, frag_index, nondurable_purge_delay=0): + def purge(self, timestamp, frag_index, nondurable_purge_delay=0, + meta_timestamp=None): """ Remove a tombstone file matching the specified timestamp or datafile matching the specified timestamp and fragment index @@ -3344,12 +3364,20 @@ class ECDiskFile(BaseDiskFile): a whole number or None. :param nondurable_purge_delay: only remove a non-durable data file if it's been on disk longer than this many seconds. + :param meta_timestamp: if not None then remove any meta file with this + timestamp """ purge_file = self.manager.make_on_disk_filename( timestamp, ext='.ts') purge_path = os.path.join(self._datadir, purge_file) remove_file(purge_path) + if meta_timestamp is not None: + purge_file = self.manager.make_on_disk_filename( + meta_timestamp, ext='.meta') + purge_path = os.path.join(self._datadir, purge_file) + remove_file(purge_path) + if frag_index is not None: # data file may or may not be durable so try removing both filename # possibilities diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index cc4d42f6f8..accda48510 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -961,7 +961,7 @@ class ObjectReconstructor(Daemon): self.suffix_count += len(suffixes) return suffixes, node - def delete_reverted_objs(self, job, objects, frag_index): + def delete_reverted_objs(self, job, objects): """ For EC we can potentially revert only some of a partition so we'll delete reverted objects here. Note that we delete @@ -970,24 +970,37 @@ class ObjectReconstructor(Daemon): :param job: the job being processed :param objects: a dict of objects to be deleted, each entry maps hash=>timestamp - :param frag_index: (int) the fragment index of data files to be deleted """ df_mgr = self._df_router[job['policy']] suffixes_to_delete = set() for object_hash, timestamps in objects.items(): try: - df = df_mgr.get_diskfile_from_hash( + df, filenames = df_mgr.get_diskfile_and_filenames_from_hash( job['local_dev']['device'], job['partition'], object_hash, job['policy'], - frag_index=frag_index) + frag_index=job['frag_index']) # legacy durable data files look like modern nondurable data # files; we therefore override nondurable_purge_delay when we # know the data file is durable so that legacy durable data # files get purged nondurable_purge_delay = (0 if timestamps.get('durable') else df_mgr.commit_window) - df.purge(timestamps['ts_data'], frag_index, - nondurable_purge_delay) + data_files = [ + f for f in filenames + if f.endswith('.data')] + purgable_data_files = [ + f for f in data_files + if f.startswith(timestamps['ts_data'].internal)] + if (job['primary_frag_index'] is None + and len(purgable_data_files) == len(data_files) <= 1): + # pure handoff node, and we're about to purge the last + # .data file, so it's ok to remove any meta file that may + # have been reverted + meta_timestamp = timestamps.get('ts_meta') + else: + meta_timestamp = None + df.purge(timestamps['ts_data'], job['frag_index'], + nondurable_purge_delay, meta_timestamp) except DiskFileNotExist: # may have passed reclaim age since being reverted, or may have # raced with another reconstructor process trying the same @@ -995,7 +1008,7 @@ class ObjectReconstructor(Daemon): except DiskFileError: self.logger.exception( 'Unable to purge DiskFile (%r %r %r)', - object_hash, timestamps['ts_data'], frag_index) + object_hash, timestamps['ts_data'], job['frag_index']) suffixes_to_delete.add(object_hash[-3:]) for suffix in suffixes_to_delete: @@ -1080,8 +1093,7 @@ class ObjectReconstructor(Daemon): syncd_with += 1 reverted_objs.update(in_sync_objs) if syncd_with >= len(job['sync_to']): - self.delete_reverted_objs( - job, reverted_objs, job['frag_index']) + self.delete_reverted_objs(job, reverted_objs) else: self.handoffs_remaining += 1 except PartitionLockTimeout: @@ -1150,7 +1162,8 @@ class ObjectReconstructor(Daemon): data_fi_to_suffixes[fi].append(suffix) # helper to ensure consistent structure of jobs - def build_job(job_type, frag_index, suffixes, sync_to): + def build_job(job_type, frag_index, suffixes, sync_to, + primary_frag_index): return { 'job_type': job_type, 'frag_index': frag_index, @@ -1163,28 +1176,33 @@ class ObjectReconstructor(Daemon): 'local_dev': local_dev, # ssync likes to have it handy 'device': local_dev['device'], + # provide a hint to revert jobs that the node is a primary for + # one of the frag indexes + 'primary_frag_index': primary_frag_index, } # aggregate jobs for all the fragment index in this part jobs = [] # check the primary nodes - to see if the part belongs here + primary_frag_index = None part_nodes = policy.object_ring.get_part_nodes(partition) for node in part_nodes: if node['id'] == local_dev['id']: # this partition belongs here, we'll need a sync job - frag_index = policy.get_backend_index(node['index']) + primary_frag_index = policy.get_backend_index(node['index']) try: - suffixes = data_fi_to_suffixes.pop(frag_index) + suffixes = data_fi_to_suffixes.pop(primary_frag_index) except KeyError: # N.B. If this function ever returns an empty list of jobs # the entire partition will be deleted. suffixes = [] sync_job = build_job( job_type=SYNC, - frag_index=frag_index, + frag_index=primary_frag_index, suffixes=suffixes, sync_to=_get_partners(node['index'], part_nodes), + primary_frag_index=primary_frag_index ) # ssync callback to rebuild missing fragment_archives sync_job['sync_diskfile_builder'] = self.reconstruct_fa @@ -1215,6 +1233,7 @@ class ObjectReconstructor(Daemon): frag_index=fi, suffixes=data_fi_to_suffixes[fi], sync_to=nodes_sync_to, + primary_frag_index=primary_frag_index ) jobs.append(revert_job) @@ -1241,7 +1260,8 @@ class ObjectReconstructor(Daemon): job_type=REVERT, frag_index=None, suffixes=non_data_fragment_suffixes, - sync_to=random.sample(part_nodes, nsample) + sync_to=random.sample(part_nodes, nsample), + primary_frag_index=primary_frag_index )) # return a list of jobs for this part return jobs diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 30111b238a..8fdb9b1a7d 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -1678,13 +1678,15 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): def test_get_diskfile_from_hash(self): self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') - with mock.patch(self._manager_mock('diskfile_cls')) as dfclass, \ + mock_return = object() + with mock.patch(self._manager_mock('diskfile_cls'), + return_value=mock_return) as dfclass, \ mock.patch(self._manager_mock( 'cleanup_ondisk_files')) as cleanup, \ mock.patch('swift.obj.diskfile.read_metadata') as readmeta: cleanup.return_value = {'files': ['1381679759.90941.data']} readmeta.return_value = {'name': '/a/c/o'} - self.df_mgr.get_diskfile_from_hash( + actual = self.df_mgr.get_diskfile_from_hash( 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) dfclass.assert_called_once_with( self.df_mgr, '/srv/dev/', '9', @@ -1694,6 +1696,30 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): readmeta.assert_called_once_with( '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/' '1381679759.90941.data') + self.assertEqual(mock_return, actual) + + def test_get_diskfile_and_filenames_from_hash(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + mock_return = object() + with mock.patch(self._manager_mock('diskfile_cls'), + return_value=mock_return) as dfclass, \ + mock.patch(self._manager_mock( + 'cleanup_ondisk_files')) as cleanup, \ + mock.patch('swift.obj.diskfile.read_metadata') as readmeta: + cleanup.return_value = {'files': ['1381679759.90941.data']} + readmeta.return_value = {'name': '/a/c/o'} + actual, names = self.df_mgr.get_diskfile_and_filenames_from_hash( + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + dfclass.assert_called_once_with( + self.df_mgr, '/srv/dev/', '9', + 'a', 'c', 'o', policy=POLICIES[0]) + cleanup.assert_called_once_with( + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900') + readmeta.assert_called_once_with( + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/' + '1381679759.90941.data') + self.assertEqual(mock_return, actual) + self.assertEqual(['1381679759.90941.data'], names) def test_listdir_enoent(self): oserror = OSError() @@ -1919,7 +1945,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): expected = sorted(expected_items) actual = sorted(hash_items) # default list diff easiest to debug - self.assertEqual(actual, expected) + self.assertEqual(expected, actual) def test_yield_hashes_tombstones(self): ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) @@ -2127,6 +2153,9 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): '9333a92d072897b136b3fc06595b4abc': [ ts1.internal + '.ts', ts2.internal + '.meta'], + # dangling .meta is not yielded because it cannot be sync'd + '9222a92d072897b136b3fc06595b4abc': [ + ts3.internal + '.meta'], }, '456': { # only latest metadata timestamp @@ -6116,15 +6145,61 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase): for frag_index in (1, 2): df = self._simple_get_diskfile(frag_index=frag_index) write_diskfile(df, ts) + ts_meta = self.ts() + df.write_metadata({ + 'X-Timestamp': ts_meta.internal, + 'X-Object-Meta-Delete': 'me' + }) # sanity self.assertEqual(sorted(os.listdir(df._datadir)), [ ts.internal + '#1#d.data', ts.internal + '#2#d.data', + ts_meta.internal + '.meta', ]) df.purge(ts, 2) - self.assertEqual(os.listdir(df._datadir), [ + # by default .meta file is not purged + self.assertEqual(sorted(os.listdir(df._datadir)), [ ts.internal + '#1#d.data', + ts_meta.internal + '.meta', + ]) + + def test_purge_final_fragment_index_and_meta(self): + ts = self.ts() + df = self._simple_get_diskfile(frag_index=1) + write_diskfile(df, ts) + ts_meta = self.ts() + df.write_metadata({ + 'X-Timestamp': ts_meta.internal, + 'X-Object-Meta-Delete': 'me', + }) + + # sanity + self.assertEqual(sorted(os.listdir(df._datadir)), [ + ts.internal + '#1#d.data', + ts_meta.internal + '.meta', + ]) + df.purge(ts, 1, meta_timestamp=ts_meta) + self.assertFalse(os.path.exists(df._datadir)) + + def test_purge_final_fragment_index_and_not_meta(self): + ts = self.ts() + df = self._simple_get_diskfile(frag_index=1) + write_diskfile(df, ts) + ts_meta = self.ts() + df.write_metadata({ + 'X-Timestamp': ts_meta.internal, + 'X-Object-Meta-Delete': 'me', + }) + + # sanity + self.assertEqual(sorted(os.listdir(df._datadir)), [ + ts.internal + '#1#d.data', + ts_meta.internal + '.meta', + ]) + df.purge(ts, 1, meta_timestamp=ts) + self.assertEqual(sorted(os.listdir(df._datadir)), [ + ts_meta.internal + '.meta', ]) def test_purge_last_fragment_index(self): diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 1035f74169..ef3999296f 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -36,7 +36,7 @@ from six.moves.urllib.parse import unquote from swift.common import utils from swift.common.exceptions import DiskFileError, DiskFileQuarantined from swift.common.header_key_dict import HeaderKeyDict -from swift.common.utils import dump_recon_cache, md5, Timestamp +from swift.common.utils import dump_recon_cache, md5, Timestamp, mkdirs from swift.obj import diskfile, reconstructor as object_reconstructor from swift.common import ring from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, @@ -332,6 +332,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'suffixes': ['061'], 'partition': 0, 'frag_index': 2, + 'primary_frag_index': 1, 'device': 'sda1', 'local_dev': { 'replication_port': 6200, @@ -395,6 +396,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'suffixes': ['061', '3c1'], 'partition': 0, 'frag_index': 1, + 'primary_frag_index': 1, 'device': 'sda1', 'local_dev': { 'replication_port': 6200, @@ -440,6 +442,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'suffixes': ['061', '3c1'], 'partition': 1, 'frag_index': 1, + 'primary_frag_index': 4, 'device': 'sda1', 'local_dev': { 'replication_port': 6200, @@ -481,6 +484,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'suffixes': ['3c1'], 'partition': 1, 'frag_index': 0, + 'primary_frag_index': 4, 'device': 'sda1', 'local_dev': { 'replication_port': 6200, @@ -544,6 +548,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'suffixes': [], 'partition': 1, 'frag_index': 4, + 'primary_frag_index': 4, 'device': 'sda1', 'local_dev': { 'replication_port': 6200, @@ -589,6 +594,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'suffixes': ['061'], 'partition': 2, 'frag_index': 0, + 'primary_frag_index': None, 'device': 'sda1', 'local_dev': { 'replication_port': 6200, @@ -628,6 +634,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): 'suffixes': ['3c1'], 'partition': 2, 'frag_index': 2, + 'primary_frag_index': None, 'device': 'sda1', 'local_dev': { 'replication_port': 6200, @@ -1166,9 +1173,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # may not be for the reverted frag index self.assertTrue(files) n_files += len(files) + self.assertEqual(context['job']['frag_index'], + context['node']['index']) expected_calls.append(mock.call(context['job'], - context['available_map'], - context['node']['index'])) + context['available_map'])) else: self.assertFalse(context.get('include_non_durable')) @@ -4642,6 +4650,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): job = { 'job_type': object_reconstructor.REVERT, 'frag_index': frag_index, + 'primary_frag_index': None, 'suffixes': [suffix], 'sync_to': sync_to, 'partition': partition, @@ -4722,6 +4731,193 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual( [], self.reconstructor.logger.logger.get_lines_for_level('error')) + def _make_frag(self, df, fi, ts_data): + with df.create() as writer: + test_data = b'test data' + writer.write(test_data) + metadata = { + 'X-Timestamp': ts_data.internal, + 'Content-Length': len(test_data), + 'Etag': md5(test_data, usedforsecurity=False).hexdigest(), + 'X-Object-Sysmeta-Ec-Frag-Index': fi, + } + writer.put(metadata) + writer.commit(ts_data) + + def _do_test_process_job_revert_cleanup_with_meta(self, frag_indexes, + primary_frag_index): + sync_to = [[dict(random.choice([n for n in self.policy.object_ring.devs + if n != self.local_dev]), + index=frag_index)] for frag_index in frag_indexes] + partition = 0 + + part_path = os.path.join(self.devices, self.local_dev['device'], + diskfile.get_data_dir(self.policy), + str(partition)) + mkdirs(part_path) + df_mgr = self.reconstructor._df_router[self.policy] + df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a', + 'c', 'data-obj', policy=self.policy) + + ts_data = self.ts() + for frag_index in frag_indexes: + self._make_frag(df, frag_index, ts_data) + if primary_frag_index is not None: + self._make_frag(df, primary_frag_index, ts_data) + ts_meta = self.ts() + df.write_metadata({'X-Timestamp': ts_meta.internal, + 'X-Object-Meta-Test': 'testing'}) + + ohash = os.path.basename(df._datadir) + suffix = os.path.basename(os.path.dirname(df._datadir)) + + jobs = [{ + 'job_type': object_reconstructor.REVERT, + 'frag_index': frag_index, + 'primary_frag_index': primary_frag_index, + 'suffixes': [suffix], + 'sync_to': sync_to[i], + 'partition': partition, + 'path': part_path, + 'hashes': {}, + 'policy': self.policy, + 'local_dev': self.local_dev, + 'device': self.local_dev['device'], + } for i, frag_index in enumerate(frag_indexes)] + + ondisk_files_during_sync = [] + + def ssync_response_callback(*args): + ondisk_files_during_sync.append(os.listdir(df._datadir)) + # success should not increment handoffs_remaining + return True, {ohash: {'ts_data': ts_data, 'ts_meta': ts_meta}} + + ssync_calls = [] + with mock_ssync_sender(ssync_calls, + response_callback=ssync_response_callback): + for job in jobs: + self.reconstructor.process_job(job) + + self.assertEqual(self.reconstructor.handoffs_remaining, 0) + self.assertEqual(len(jobs), len(ssync_calls)) + self.assertEqual(len(jobs), len(ondisk_files_during_sync)) + # verify that the meta file is intact at startof every job/ssync call: + # if it is removed at all, it should be removed in the *last* call + for fileset in ondisk_files_during_sync: + self.assertIn(ts_meta.internal + '.meta', fileset) + return df + + def test_process_job_revert_does_cleanup_meta_pure_handoff(self): + # verify that danging meta files are cleaned up if the revert job is + # for a pure handoff partition + frag_index = random.randint( + 0, self.policy.ec_n_unique_fragments - 1) + df = self._do_test_process_job_revert_cleanup_with_meta( + frag_indexes=[frag_index], primary_frag_index=None) + # hashpath has been removed + self.assertFalse(os.path.exists(df._datadir)) + + extra_index = frag_index + while extra_index == frag_index: + extra_index = random.randint( + 0, self.policy.ec_n_unique_fragments - 1) + df = self._do_test_process_job_revert_cleanup_with_meta( + frag_indexes=[frag_index, extra_index], primary_frag_index=None) + # hashpath has been removed + self.assertFalse(os.path.exists(df._datadir)) + + def test_process_job_revert_does_not_cleanup_meta_also_primary(self): + # verify that danging meta files are not cleaned up if the revert job + # is for a handoff partition that is also a primary for another frag + # index + frag_index = random.randint( + 0, self.policy.ec_n_unique_fragments - 1) + primary_frag_index = frag_index + while primary_frag_index == frag_index: + primary_frag_index = random.randint( + 0, self.policy.ec_n_unique_fragments - 1) + df = self._do_test_process_job_revert_cleanup_with_meta( + frag_indexes=[frag_index], primary_frag_index=primary_frag_index) + # hashpath has not been removed + self.assertTrue(os.path.exists(df._datadir)) + file_info = df._manager.cleanup_ondisk_files(df._datadir) + self.maxDiff = None + self.assertTrue('meta_file' in file_info) + self.assertTrue(os.path.exists(file_info['meta_file'])) + self.assertTrue('data_info' in file_info) + self.assertEqual(primary_frag_index, + file_info['data_info']['frag_index']) + self.assertTrue(os.path.exists(file_info['data_file'])) + # only the primary frag and meta file remain + self.assertEqual(2, len(os.listdir(df._datadir))) + + def test_process_job_revert_does_not_cleanup_meta_new_data(self): + # verify that danging meta files are not cleaned up if the revert job + # is for a pure handoff partition that has a newer data frag in + # addition to the frag that was sync'd + frag_index = 0 + extra_frag_index = 1 + sync_to = [dict(random.choice([n for n in self.policy.object_ring.devs + if n != self.local_dev]), + index=frag_index)] + partition = 0 + + part_path = os.path.join(self.devices, self.local_dev['device'], + diskfile.get_data_dir(self.policy), + str(partition)) + mkdirs(part_path) + df_mgr = self.reconstructor._df_router[self.policy] + df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a', + 'c', 'data-obj', policy=self.policy) + + ts_data0 = self.ts() # original frag + ts_data1 = self.ts() # new one written during ssync + self._make_frag(df, frag_index, ts_data0) + ts_meta = self.ts() + df.write_metadata({'X-Timestamp': ts_meta.internal, + 'X-Object-Meta-Test': 'testing'}) + + ohash = os.path.basename(df._datadir) + suffix = os.path.basename(os.path.dirname(df._datadir)) + + job = { + 'job_type': object_reconstructor.REVERT, + 'frag_index': frag_index, + 'primary_frag_index': None, + 'suffixes': [suffix], + 'sync_to': sync_to, + 'partition': partition, + 'path': part_path, + 'hashes': {}, + 'policy': self.policy, + 'local_dev': self.local_dev, + 'device': self.local_dev['device'], + } + + def ssync_response_callback(*args): + # pretend that during the ssync call the original frag is replaced + # by a newer one + self._make_frag(df, extra_frag_index, ts_data1) + return True, {ohash: {'ts_data': ts_data0, 'ts_meta': ts_meta}} + + ssync_calls = [] + with mock_ssync_sender(ssync_calls, + response_callback=ssync_response_callback): + self.reconstructor.process_job(job) + + self.assertEqual(1, len(ssync_calls)) + # hashpath has not been removed + self.assertTrue(os.path.exists(df._datadir)) + file_info = df._manager.cleanup_ondisk_files(df._datadir) + self.maxDiff = None + self.assertIsNotNone(file_info['meta_file']) + self.assertTrue(os.path.exists(file_info['meta_file'])) + self.assertTrue('data_info' in file_info) + self.assertTrue(os.path.exists(file_info['data_file'])) + # only the newer frag and meta file remain + self.assertEqual(2, len(os.listdir(df._datadir))) + self.assertEqual(ts_data1, file_info['data_info']['timestamp']) + def test_process_job_revert_cleanup_tombstone(self): partition = 0 sync_to = [random.choice([ @@ -4744,6 +4940,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): job = { 'job_type': object_reconstructor.REVERT, 'frag_index': None, + 'primary_frag_index': None, 'suffixes': [suffix], 'sync_to': sync_to, 'partition': partition,