Merge "reconstructor: purge meta files in pure handoffs"
This commit is contained in:
commit
407af63349
|
@ -1488,21 +1488,22 @@ class BaseDiskFileManager(object):
|
|||
self, audit_location.path, dev_path,
|
||||
audit_location.partition, policy=audit_location.policy)
|
||||
|
||||
def get_diskfile_from_hash(self, device, partition, object_hash,
|
||||
policy, **kwargs):
|
||||
def get_diskfile_and_filenames_from_hash(self, device, partition,
|
||||
object_hash, policy, **kwargs):
|
||||
"""
|
||||
Returns a DiskFile instance for an object at the given
|
||||
object_hash. Just in case someone thinks of refactoring, be
|
||||
sure DiskFileDeleted is *not* raised, but the DiskFile
|
||||
instance representing the tombstoned object is returned
|
||||
instead.
|
||||
Returns a tuple of (a DiskFile instance for an object at the given
|
||||
object_hash, the basenames of the files in the object's hash dir).
|
||||
Just in case someone thinks of refactoring, be sure DiskFileDeleted is
|
||||
*not* raised, but the DiskFile instance representing the tombstoned
|
||||
object is returned instead.
|
||||
|
||||
:param device: name of target device
|
||||
:param partition: partition on the device in which the object lives
|
||||
:param object_hash: the hash of an object path
|
||||
:param policy: the StoragePolicy instance
|
||||
:raises DiskFileNotExist: if the object does not exist
|
||||
:returns: an instance of BaseDiskFile
|
||||
:returns: a tuple comprising (an instance of BaseDiskFile, a list of
|
||||
file basenames)
|
||||
"""
|
||||
dev_path = self.get_dev_path(device)
|
||||
if not dev_path:
|
||||
|
@ -1541,9 +1542,27 @@ class BaseDiskFileManager(object):
|
|||
metadata.get('name', ''), 3, 3, True)
|
||||
except ValueError:
|
||||
raise DiskFileNotExist()
|
||||
return self.diskfile_cls(self, dev_path,
|
||||
partition, account, container, obj,
|
||||
policy=policy, **kwargs)
|
||||
df = self.diskfile_cls(self, dev_path, partition, account, container,
|
||||
obj, policy=policy, **kwargs)
|
||||
return df, filenames
|
||||
|
||||
def get_diskfile_from_hash(self, device, partition, object_hash, policy,
|
||||
**kwargs):
|
||||
"""
|
||||
Returns a DiskFile instance for an object at the given object_hash.
|
||||
Just in case someone thinks of refactoring, be sure DiskFileDeleted is
|
||||
*not* raised, but the DiskFile instance representing the tombstoned
|
||||
object is returned instead.
|
||||
|
||||
:param device: name of target device
|
||||
:param partition: partition on the device in which the object lives
|
||||
:param object_hash: the hash of an object path
|
||||
:param policy: the StoragePolicy instance
|
||||
:raises DiskFileNotExist: if the object does not exist
|
||||
:returns: an instance of BaseDiskFile
|
||||
"""
|
||||
return self.get_diskfile_and_filenames_from_hash(
|
||||
device, partition, object_hash, policy, **kwargs)[0]
|
||||
|
||||
def get_hashes(self, device, partition, suffixes, policy,
|
||||
skip_rehash=False):
|
||||
|
@ -3325,7 +3344,8 @@ class ECDiskFile(BaseDiskFile):
|
|||
frag_prefs=self._frag_prefs, policy=policy)
|
||||
return self._ondisk_info
|
||||
|
||||
def purge(self, timestamp, frag_index, nondurable_purge_delay=0):
|
||||
def purge(self, timestamp, frag_index, nondurable_purge_delay=0,
|
||||
meta_timestamp=None):
|
||||
"""
|
||||
Remove a tombstone file matching the specified timestamp or
|
||||
datafile matching the specified timestamp and fragment index
|
||||
|
@ -3344,12 +3364,20 @@ class ECDiskFile(BaseDiskFile):
|
|||
a whole number or None.
|
||||
:param nondurable_purge_delay: only remove a non-durable data file if
|
||||
it's been on disk longer than this many seconds.
|
||||
:param meta_timestamp: if not None then remove any meta file with this
|
||||
timestamp
|
||||
"""
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
timestamp, ext='.ts')
|
||||
purge_path = os.path.join(self._datadir, purge_file)
|
||||
remove_file(purge_path)
|
||||
|
||||
if meta_timestamp is not None:
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
meta_timestamp, ext='.meta')
|
||||
purge_path = os.path.join(self._datadir, purge_file)
|
||||
remove_file(purge_path)
|
||||
|
||||
if frag_index is not None:
|
||||
# data file may or may not be durable so try removing both filename
|
||||
# possibilities
|
||||
|
|
|
@ -961,7 +961,7 @@ class ObjectReconstructor(Daemon):
|
|||
self.suffix_count += len(suffixes)
|
||||
return suffixes, node
|
||||
|
||||
def delete_reverted_objs(self, job, objects, frag_index):
|
||||
def delete_reverted_objs(self, job, objects):
|
||||
"""
|
||||
For EC we can potentially revert only some of a partition
|
||||
so we'll delete reverted objects here. Note that we delete
|
||||
|
@ -970,24 +970,37 @@ class ObjectReconstructor(Daemon):
|
|||
:param job: the job being processed
|
||||
:param objects: a dict of objects to be deleted, each entry maps
|
||||
hash=>timestamp
|
||||
:param frag_index: (int) the fragment index of data files to be deleted
|
||||
"""
|
||||
df_mgr = self._df_router[job['policy']]
|
||||
suffixes_to_delete = set()
|
||||
for object_hash, timestamps in objects.items():
|
||||
try:
|
||||
df = df_mgr.get_diskfile_from_hash(
|
||||
df, filenames = df_mgr.get_diskfile_and_filenames_from_hash(
|
||||
job['local_dev']['device'], job['partition'],
|
||||
object_hash, job['policy'],
|
||||
frag_index=frag_index)
|
||||
frag_index=job['frag_index'])
|
||||
# legacy durable data files look like modern nondurable data
|
||||
# files; we therefore override nondurable_purge_delay when we
|
||||
# know the data file is durable so that legacy durable data
|
||||
# files get purged
|
||||
nondurable_purge_delay = (0 if timestamps.get('durable')
|
||||
else df_mgr.commit_window)
|
||||
df.purge(timestamps['ts_data'], frag_index,
|
||||
nondurable_purge_delay)
|
||||
data_files = [
|
||||
f for f in filenames
|
||||
if f.endswith('.data')]
|
||||
purgable_data_files = [
|
||||
f for f in data_files
|
||||
if f.startswith(timestamps['ts_data'].internal)]
|
||||
if (job['primary_frag_index'] is None
|
||||
and len(purgable_data_files) == len(data_files) <= 1):
|
||||
# pure handoff node, and we're about to purge the last
|
||||
# .data file, so it's ok to remove any meta file that may
|
||||
# have been reverted
|
||||
meta_timestamp = timestamps.get('ts_meta')
|
||||
else:
|
||||
meta_timestamp = None
|
||||
df.purge(timestamps['ts_data'], job['frag_index'],
|
||||
nondurable_purge_delay, meta_timestamp)
|
||||
except DiskFileNotExist:
|
||||
# may have passed reclaim age since being reverted, or may have
|
||||
# raced with another reconstructor process trying the same
|
||||
|
@ -995,7 +1008,7 @@ class ObjectReconstructor(Daemon):
|
|||
except DiskFileError:
|
||||
self.logger.exception(
|
||||
'Unable to purge DiskFile (%r %r %r)',
|
||||
object_hash, timestamps['ts_data'], frag_index)
|
||||
object_hash, timestamps['ts_data'], job['frag_index'])
|
||||
suffixes_to_delete.add(object_hash[-3:])
|
||||
|
||||
for suffix in suffixes_to_delete:
|
||||
|
@ -1080,8 +1093,7 @@ class ObjectReconstructor(Daemon):
|
|||
syncd_with += 1
|
||||
reverted_objs.update(in_sync_objs)
|
||||
if syncd_with >= len(job['sync_to']):
|
||||
self.delete_reverted_objs(
|
||||
job, reverted_objs, job['frag_index'])
|
||||
self.delete_reverted_objs(job, reverted_objs)
|
||||
else:
|
||||
self.handoffs_remaining += 1
|
||||
except PartitionLockTimeout:
|
||||
|
@ -1150,7 +1162,8 @@ class ObjectReconstructor(Daemon):
|
|||
data_fi_to_suffixes[fi].append(suffix)
|
||||
|
||||
# helper to ensure consistent structure of jobs
|
||||
def build_job(job_type, frag_index, suffixes, sync_to):
|
||||
def build_job(job_type, frag_index, suffixes, sync_to,
|
||||
primary_frag_index):
|
||||
return {
|
||||
'job_type': job_type,
|
||||
'frag_index': frag_index,
|
||||
|
@ -1163,28 +1176,33 @@ class ObjectReconstructor(Daemon):
|
|||
'local_dev': local_dev,
|
||||
# ssync likes to have it handy
|
||||
'device': local_dev['device'],
|
||||
# provide a hint to revert jobs that the node is a primary for
|
||||
# one of the frag indexes
|
||||
'primary_frag_index': primary_frag_index,
|
||||
}
|
||||
|
||||
# aggregate jobs for all the fragment index in this part
|
||||
jobs = []
|
||||
|
||||
# check the primary nodes - to see if the part belongs here
|
||||
primary_frag_index = None
|
||||
part_nodes = policy.object_ring.get_part_nodes(partition)
|
||||
for node in part_nodes:
|
||||
if node['id'] == local_dev['id']:
|
||||
# this partition belongs here, we'll need a sync job
|
||||
frag_index = policy.get_backend_index(node['index'])
|
||||
primary_frag_index = policy.get_backend_index(node['index'])
|
||||
try:
|
||||
suffixes = data_fi_to_suffixes.pop(frag_index)
|
||||
suffixes = data_fi_to_suffixes.pop(primary_frag_index)
|
||||
except KeyError:
|
||||
# N.B. If this function ever returns an empty list of jobs
|
||||
# the entire partition will be deleted.
|
||||
suffixes = []
|
||||
sync_job = build_job(
|
||||
job_type=SYNC,
|
||||
frag_index=frag_index,
|
||||
frag_index=primary_frag_index,
|
||||
suffixes=suffixes,
|
||||
sync_to=_get_partners(node['index'], part_nodes),
|
||||
primary_frag_index=primary_frag_index
|
||||
)
|
||||
# ssync callback to rebuild missing fragment_archives
|
||||
sync_job['sync_diskfile_builder'] = self.reconstruct_fa
|
||||
|
@ -1215,6 +1233,7 @@ class ObjectReconstructor(Daemon):
|
|||
frag_index=fi,
|
||||
suffixes=data_fi_to_suffixes[fi],
|
||||
sync_to=nodes_sync_to,
|
||||
primary_frag_index=primary_frag_index
|
||||
)
|
||||
jobs.append(revert_job)
|
||||
|
||||
|
@ -1241,7 +1260,8 @@ class ObjectReconstructor(Daemon):
|
|||
job_type=REVERT,
|
||||
frag_index=None,
|
||||
suffixes=non_data_fragment_suffixes,
|
||||
sync_to=random.sample(part_nodes, nsample)
|
||||
sync_to=random.sample(part_nodes, nsample),
|
||||
primary_frag_index=primary_frag_index
|
||||
))
|
||||
# return a list of jobs for this part
|
||||
return jobs
|
||||
|
|
|
@ -1678,13 +1678,15 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
|||
|
||||
def test_get_diskfile_from_hash(self):
|
||||
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
|
||||
with mock.patch(self._manager_mock('diskfile_cls')) as dfclass, \
|
||||
mock_return = object()
|
||||
with mock.patch(self._manager_mock('diskfile_cls'),
|
||||
return_value=mock_return) as dfclass, \
|
||||
mock.patch(self._manager_mock(
|
||||
'cleanup_ondisk_files')) as cleanup, \
|
||||
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
|
||||
cleanup.return_value = {'files': ['1381679759.90941.data']}
|
||||
readmeta.return_value = {'name': '/a/c/o'}
|
||||
self.df_mgr.get_diskfile_from_hash(
|
||||
actual = self.df_mgr.get_diskfile_from_hash(
|
||||
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
|
||||
dfclass.assert_called_once_with(
|
||||
self.df_mgr, '/srv/dev/', '9',
|
||||
|
@ -1694,6 +1696,30 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
|||
readmeta.assert_called_once_with(
|
||||
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/'
|
||||
'1381679759.90941.data')
|
||||
self.assertEqual(mock_return, actual)
|
||||
|
||||
def test_get_diskfile_and_filenames_from_hash(self):
|
||||
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
|
||||
mock_return = object()
|
||||
with mock.patch(self._manager_mock('diskfile_cls'),
|
||||
return_value=mock_return) as dfclass, \
|
||||
mock.patch(self._manager_mock(
|
||||
'cleanup_ondisk_files')) as cleanup, \
|
||||
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
|
||||
cleanup.return_value = {'files': ['1381679759.90941.data']}
|
||||
readmeta.return_value = {'name': '/a/c/o'}
|
||||
actual, names = self.df_mgr.get_diskfile_and_filenames_from_hash(
|
||||
'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
|
||||
dfclass.assert_called_once_with(
|
||||
self.df_mgr, '/srv/dev/', '9',
|
||||
'a', 'c', 'o', policy=POLICIES[0])
|
||||
cleanup.assert_called_once_with(
|
||||
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900')
|
||||
readmeta.assert_called_once_with(
|
||||
'/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/'
|
||||
'1381679759.90941.data')
|
||||
self.assertEqual(mock_return, actual)
|
||||
self.assertEqual(['1381679759.90941.data'], names)
|
||||
|
||||
def test_listdir_enoent(self):
|
||||
oserror = OSError()
|
||||
|
@ -1919,7 +1945,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
|||
expected = sorted(expected_items)
|
||||
actual = sorted(hash_items)
|
||||
# default list diff easiest to debug
|
||||
self.assertEqual(actual, expected)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_yield_hashes_tombstones(self):
|
||||
ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
|
||||
|
@ -2127,6 +2153,9 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||
'9333a92d072897b136b3fc06595b4abc': [
|
||||
ts1.internal + '.ts',
|
||||
ts2.internal + '.meta'],
|
||||
# dangling .meta is not yielded because it cannot be sync'd
|
||||
'9222a92d072897b136b3fc06595b4abc': [
|
||||
ts3.internal + '.meta'],
|
||||
},
|
||||
'456': {
|
||||
# only latest metadata timestamp
|
||||
|
@ -6116,15 +6145,61 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
|||
for frag_index in (1, 2):
|
||||
df = self._simple_get_diskfile(frag_index=frag_index)
|
||||
write_diskfile(df, ts)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({
|
||||
'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Delete': 'me'
|
||||
})
|
||||
|
||||
# sanity
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts.internal + '#2#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
df.purge(ts, 2)
|
||||
self.assertEqual(os.listdir(df._datadir), [
|
||||
# by default .meta file is not purged
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
|
||||
def test_purge_final_fragment_index_and_meta(self):
|
||||
ts = self.ts()
|
||||
df = self._simple_get_diskfile(frag_index=1)
|
||||
write_diskfile(df, ts)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({
|
||||
'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Delete': 'me',
|
||||
})
|
||||
|
||||
# sanity
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
df.purge(ts, 1, meta_timestamp=ts_meta)
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
def test_purge_final_fragment_index_and_not_meta(self):
|
||||
ts = self.ts()
|
||||
df = self._simple_get_diskfile(frag_index=1)
|
||||
write_diskfile(df, ts)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({
|
||||
'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Delete': 'me',
|
||||
})
|
||||
|
||||
# sanity
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '#1#d.data',
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
df.purge(ts, 1, meta_timestamp=ts)
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts_meta.internal + '.meta',
|
||||
])
|
||||
|
||||
def test_purge_last_fragment_index(self):
|
||||
|
|
|
@ -36,7 +36,7 @@ from six.moves.urllib.parse import unquote
|
|||
from swift.common import utils
|
||||
from swift.common.exceptions import DiskFileError, DiskFileQuarantined
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.utils import dump_recon_cache, md5, Timestamp
|
||||
from swift.common.utils import dump_recon_cache, md5, Timestamp, mkdirs
|
||||
from swift.obj import diskfile, reconstructor as object_reconstructor
|
||||
from swift.common import ring
|
||||
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
|
||||
|
@ -332,6 +332,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
'suffixes': ['061'],
|
||||
'partition': 0,
|
||||
'frag_index': 2,
|
||||
'primary_frag_index': 1,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
|
@ -395,6 +396,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
'suffixes': ['061', '3c1'],
|
||||
'partition': 0,
|
||||
'frag_index': 1,
|
||||
'primary_frag_index': 1,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
|
@ -440,6 +442,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
'suffixes': ['061', '3c1'],
|
||||
'partition': 1,
|
||||
'frag_index': 1,
|
||||
'primary_frag_index': 4,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
|
@ -481,6 +484,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
'suffixes': ['3c1'],
|
||||
'partition': 1,
|
||||
'frag_index': 0,
|
||||
'primary_frag_index': 4,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
|
@ -544,6 +548,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
'suffixes': [],
|
||||
'partition': 1,
|
||||
'frag_index': 4,
|
||||
'primary_frag_index': 4,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
|
@ -589,6 +594,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
'suffixes': ['061'],
|
||||
'partition': 2,
|
||||
'frag_index': 0,
|
||||
'primary_frag_index': None,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
|
@ -628,6 +634,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
'suffixes': ['3c1'],
|
||||
'partition': 2,
|
||||
'frag_index': 2,
|
||||
'primary_frag_index': None,
|
||||
'device': 'sda1',
|
||||
'local_dev': {
|
||||
'replication_port': 6200,
|
||||
|
@ -1166,9 +1173,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
# may not be for the reverted frag index
|
||||
self.assertTrue(files)
|
||||
n_files += len(files)
|
||||
self.assertEqual(context['job']['frag_index'],
|
||||
context['node']['index'])
|
||||
expected_calls.append(mock.call(context['job'],
|
||||
context['available_map'],
|
||||
context['node']['index']))
|
||||
context['available_map']))
|
||||
else:
|
||||
self.assertFalse(context.get('include_non_durable'))
|
||||
|
||||
|
@ -4642,6 +4650,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
|||
job = {
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': frag_index,
|
||||
'primary_frag_index': None,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to,
|
||||
'partition': partition,
|
||||
|
@ -4722,6 +4731,193 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
|||
self.assertEqual(
|
||||
[], self.reconstructor.logger.logger.get_lines_for_level('error'))
|
||||
|
||||
def _make_frag(self, df, fi, ts_data):
|
||||
with df.create() as writer:
|
||||
test_data = b'test data'
|
||||
writer.write(test_data)
|
||||
metadata = {
|
||||
'X-Timestamp': ts_data.internal,
|
||||
'Content-Length': len(test_data),
|
||||
'Etag': md5(test_data, usedforsecurity=False).hexdigest(),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': fi,
|
||||
}
|
||||
writer.put(metadata)
|
||||
writer.commit(ts_data)
|
||||
|
||||
def _do_test_process_job_revert_cleanup_with_meta(self, frag_indexes,
|
||||
primary_frag_index):
|
||||
sync_to = [[dict(random.choice([n for n in self.policy.object_ring.devs
|
||||
if n != self.local_dev]),
|
||||
index=frag_index)] for frag_index in frag_indexes]
|
||||
partition = 0
|
||||
|
||||
part_path = os.path.join(self.devices, self.local_dev['device'],
|
||||
diskfile.get_data_dir(self.policy),
|
||||
str(partition))
|
||||
mkdirs(part_path)
|
||||
df_mgr = self.reconstructor._df_router[self.policy]
|
||||
df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
|
||||
'c', 'data-obj', policy=self.policy)
|
||||
|
||||
ts_data = self.ts()
|
||||
for frag_index in frag_indexes:
|
||||
self._make_frag(df, frag_index, ts_data)
|
||||
if primary_frag_index is not None:
|
||||
self._make_frag(df, primary_frag_index, ts_data)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Test': 'testing'})
|
||||
|
||||
ohash = os.path.basename(df._datadir)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
|
||||
jobs = [{
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': frag_index,
|
||||
'primary_frag_index': primary_frag_index,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to[i],
|
||||
'partition': partition,
|
||||
'path': part_path,
|
||||
'hashes': {},
|
||||
'policy': self.policy,
|
||||
'local_dev': self.local_dev,
|
||||
'device': self.local_dev['device'],
|
||||
} for i, frag_index in enumerate(frag_indexes)]
|
||||
|
||||
ondisk_files_during_sync = []
|
||||
|
||||
def ssync_response_callback(*args):
|
||||
ondisk_files_during_sync.append(os.listdir(df._datadir))
|
||||
# success should not increment handoffs_remaining
|
||||
return True, {ohash: {'ts_data': ts_data, 'ts_meta': ts_meta}}
|
||||
|
||||
ssync_calls = []
|
||||
with mock_ssync_sender(ssync_calls,
|
||||
response_callback=ssync_response_callback):
|
||||
for job in jobs:
|
||||
self.reconstructor.process_job(job)
|
||||
|
||||
self.assertEqual(self.reconstructor.handoffs_remaining, 0)
|
||||
self.assertEqual(len(jobs), len(ssync_calls))
|
||||
self.assertEqual(len(jobs), len(ondisk_files_during_sync))
|
||||
# verify that the meta file is intact at startof every job/ssync call:
|
||||
# if it is removed at all, it should be removed in the *last* call
|
||||
for fileset in ondisk_files_during_sync:
|
||||
self.assertIn(ts_meta.internal + '.meta', fileset)
|
||||
return df
|
||||
|
||||
def test_process_job_revert_does_cleanup_meta_pure_handoff(self):
|
||||
# verify that danging meta files are cleaned up if the revert job is
|
||||
# for a pure handoff partition
|
||||
frag_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
df = self._do_test_process_job_revert_cleanup_with_meta(
|
||||
frag_indexes=[frag_index], primary_frag_index=None)
|
||||
# hashpath has been removed
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
extra_index = frag_index
|
||||
while extra_index == frag_index:
|
||||
extra_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
df = self._do_test_process_job_revert_cleanup_with_meta(
|
||||
frag_indexes=[frag_index, extra_index], primary_frag_index=None)
|
||||
# hashpath has been removed
|
||||
self.assertFalse(os.path.exists(df._datadir))
|
||||
|
||||
def test_process_job_revert_does_not_cleanup_meta_also_primary(self):
|
||||
# verify that danging meta files are not cleaned up if the revert job
|
||||
# is for a handoff partition that is also a primary for another frag
|
||||
# index
|
||||
frag_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
primary_frag_index = frag_index
|
||||
while primary_frag_index == frag_index:
|
||||
primary_frag_index = random.randint(
|
||||
0, self.policy.ec_n_unique_fragments - 1)
|
||||
df = self._do_test_process_job_revert_cleanup_with_meta(
|
||||
frag_indexes=[frag_index], primary_frag_index=primary_frag_index)
|
||||
# hashpath has not been removed
|
||||
self.assertTrue(os.path.exists(df._datadir))
|
||||
file_info = df._manager.cleanup_ondisk_files(df._datadir)
|
||||
self.maxDiff = None
|
||||
self.assertTrue('meta_file' in file_info)
|
||||
self.assertTrue(os.path.exists(file_info['meta_file']))
|
||||
self.assertTrue('data_info' in file_info)
|
||||
self.assertEqual(primary_frag_index,
|
||||
file_info['data_info']['frag_index'])
|
||||
self.assertTrue(os.path.exists(file_info['data_file']))
|
||||
# only the primary frag and meta file remain
|
||||
self.assertEqual(2, len(os.listdir(df._datadir)))
|
||||
|
||||
def test_process_job_revert_does_not_cleanup_meta_new_data(self):
|
||||
# verify that danging meta files are not cleaned up if the revert job
|
||||
# is for a pure handoff partition that has a newer data frag in
|
||||
# addition to the frag that was sync'd
|
||||
frag_index = 0
|
||||
extra_frag_index = 1
|
||||
sync_to = [dict(random.choice([n for n in self.policy.object_ring.devs
|
||||
if n != self.local_dev]),
|
||||
index=frag_index)]
|
||||
partition = 0
|
||||
|
||||
part_path = os.path.join(self.devices, self.local_dev['device'],
|
||||
diskfile.get_data_dir(self.policy),
|
||||
str(partition))
|
||||
mkdirs(part_path)
|
||||
df_mgr = self.reconstructor._df_router[self.policy]
|
||||
df = df_mgr.get_diskfile(self.local_dev['device'], partition, 'a',
|
||||
'c', 'data-obj', policy=self.policy)
|
||||
|
||||
ts_data0 = self.ts() # original frag
|
||||
ts_data1 = self.ts() # new one written during ssync
|
||||
self._make_frag(df, frag_index, ts_data0)
|
||||
ts_meta = self.ts()
|
||||
df.write_metadata({'X-Timestamp': ts_meta.internal,
|
||||
'X-Object-Meta-Test': 'testing'})
|
||||
|
||||
ohash = os.path.basename(df._datadir)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
|
||||
job = {
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': frag_index,
|
||||
'primary_frag_index': None,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to,
|
||||
'partition': partition,
|
||||
'path': part_path,
|
||||
'hashes': {},
|
||||
'policy': self.policy,
|
||||
'local_dev': self.local_dev,
|
||||
'device': self.local_dev['device'],
|
||||
}
|
||||
|
||||
def ssync_response_callback(*args):
|
||||
# pretend that during the ssync call the original frag is replaced
|
||||
# by a newer one
|
||||
self._make_frag(df, extra_frag_index, ts_data1)
|
||||
return True, {ohash: {'ts_data': ts_data0, 'ts_meta': ts_meta}}
|
||||
|
||||
ssync_calls = []
|
||||
with mock_ssync_sender(ssync_calls,
|
||||
response_callback=ssync_response_callback):
|
||||
self.reconstructor.process_job(job)
|
||||
|
||||
self.assertEqual(1, len(ssync_calls))
|
||||
# hashpath has not been removed
|
||||
self.assertTrue(os.path.exists(df._datadir))
|
||||
file_info = df._manager.cleanup_ondisk_files(df._datadir)
|
||||
self.maxDiff = None
|
||||
self.assertIsNotNone(file_info['meta_file'])
|
||||
self.assertTrue(os.path.exists(file_info['meta_file']))
|
||||
self.assertTrue('data_info' in file_info)
|
||||
self.assertTrue(os.path.exists(file_info['data_file']))
|
||||
# only the newer frag and meta file remain
|
||||
self.assertEqual(2, len(os.listdir(df._datadir)))
|
||||
self.assertEqual(ts_data1, file_info['data_info']['timestamp'])
|
||||
|
||||
def test_process_job_revert_cleanup_tombstone(self):
|
||||
partition = 0
|
||||
sync_to = [random.choice([
|
||||
|
@ -4744,6 +4940,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
|||
job = {
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': None,
|
||||
'primary_frag_index': None,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to,
|
||||
'partition': partition,
|
||||
|
|
Loading…
Reference in New Issue