reconstructor: Delay purging reverted non-durable datafiles
The reconstructor may revert a non-durable datafile on a handoff concurrently with an object server PUT that is about to make the datafile durable. This could previously lead to the reconstructor deleting the recently written datafile before the object-server attempts to rename it to a durable datafile, and consequently a traceback in the object server. The reconstructor will now only remove reverted nondurable datafiles that are older (according to mtime) than a period set by a new nondurable_purge_delay option (defaults to 60 seconds). More recent nondurable datafiles may be made durable or will remain on the handoff until a subsequent reconstructor cycle. Change-Id: I0d519ebaaade35249fb7b17bd5f419ffdaa616c0
This commit is contained in:
@@ -423,6 +423,12 @@ use = egg:swift#recon
|
||||
# to be rebuilt). The minimum is only exceeded if request_node_count is
|
||||
# greater, and only for the purposes of quarantining.
|
||||
# request_node_count = 2 * replicas
|
||||
#
|
||||
# Sets a delay, in seconds, before the reconstructor removes non-durable data
|
||||
# files from a handoff node after reverting them to a primary. This gives the
|
||||
# object-server a window in which to finish a concurrent PUT on a handoff and
|
||||
# mark the data durable.
|
||||
# nondurable_purge_delay = 60.0
|
||||
|
||||
[object-updater]
|
||||
# You can override the default log routing for this app here (don't use set!):
|
||||
|
||||
@@ -3277,6 +3277,25 @@ def remove_directory(path):
|
||||
raise
|
||||
|
||||
|
||||
def is_file_older(path, age):
|
||||
"""
|
||||
Test if a file mtime is older than the given age, suppressing any OSErrors.
|
||||
|
||||
:param path: first and only argument passed to os.stat
|
||||
:param age: age in seconds
|
||||
:return: True if age is less than or equal to zero or if the file mtime is
|
||||
more than ``age`` in the past; False if age is greater than zero and
|
||||
the file mtime is less than or equal to ``age`` in the past or if there
|
||||
is an OSError while stat'ing the file.
|
||||
"""
|
||||
if age <= 0:
|
||||
return True
|
||||
try:
|
||||
return time.time() - os.stat(path).st_mtime > age
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def audit_location_generator(devices, datadir, suffix='',
|
||||
mount_check=True, logger=None,
|
||||
devices_filter=None, partitions_filter=None,
|
||||
|
||||
@@ -66,7 +66,7 @@ from swift.common.utils import mkdirs, Timestamp, \
|
||||
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
|
||||
MD5_OF_EMPTY_STRING, link_fd_to_path, \
|
||||
O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \
|
||||
md5
|
||||
md5, is_file_older
|
||||
from swift.common.splice import splice, tee
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||
@@ -3308,7 +3308,7 @@ class ECDiskFile(BaseDiskFile):
|
||||
frag_prefs=self._frag_prefs, policy=policy)
|
||||
return self._ondisk_info
|
||||
|
||||
def purge(self, timestamp, frag_index):
|
||||
def purge(self, timestamp, frag_index, nondurable_purge_delay=0):
|
||||
"""
|
||||
Remove a tombstone file matching the specified timestamp or
|
||||
datafile matching the specified timestamp and fragment index
|
||||
@@ -3325,6 +3325,8 @@ class ECDiskFile(BaseDiskFile):
|
||||
:class:`~swift.common.utils.Timestamp`
|
||||
:param frag_index: fragment archive index, must be
|
||||
a whole number or None.
|
||||
:param nondurable_purge_delay: only remove a non-durable data file if
|
||||
it's been on disk longer than this many seconds.
|
||||
"""
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
timestamp, ext='.ts')
|
||||
@@ -3334,7 +3336,8 @@ class ECDiskFile(BaseDiskFile):
|
||||
# possibilities
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
timestamp, ext='.data', frag_index=frag_index)
|
||||
remove_file(os.path.join(self._datadir, purge_file))
|
||||
if is_file_older(purge_file, nondurable_purge_delay):
|
||||
remove_file(os.path.join(self._datadir, purge_file))
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
timestamp, ext='.data', frag_index=frag_index, durable=True)
|
||||
remove_file(os.path.join(self._datadir, purge_file))
|
||||
|
||||
@@ -34,7 +34,7 @@ from swift.common.utils import (
|
||||
GreenAsyncPile, Timestamp, remove_file,
|
||||
load_recon_cache, parse_override_options, distribute_evenly,
|
||||
PrefixLoggerAdapter, remove_directory, config_request_node_count_value,
|
||||
non_negative_int)
|
||||
non_negative_int, non_negative_float)
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
@@ -237,6 +237,8 @@ class ObjectReconstructor(Daemon):
|
||||
conf.get('quarantine_threshold', 0))
|
||||
self.request_node_count = config_request_node_count_value(
|
||||
conf.get('request_node_count', '2 * replicas'))
|
||||
self.nondurable_purge_delay = non_negative_float(
|
||||
conf.get('nondurable_purge_delay', '60'))
|
||||
|
||||
# When upgrading from liberasurecode<=1.5.0, you may want to continue
|
||||
# writing legacy CRCs until all nodes are upgraded and capabale of
|
||||
@@ -975,7 +977,14 @@ class ObjectReconstructor(Daemon):
|
||||
job['local_dev']['device'], job['partition'],
|
||||
object_hash, job['policy'],
|
||||
frag_index=frag_index)
|
||||
df.purge(timestamps['ts_data'], frag_index)
|
||||
# legacy durable data files look like modern nondurable data
|
||||
# files; we therefore override nondurable_purge_delay when we
|
||||
# know the data file is durable so that legacy durable data
|
||||
# files get purged
|
||||
nondurable_purge_delay = (0 if timestamps.get('durable')
|
||||
else self.nondurable_purge_delay)
|
||||
df.purge(timestamps['ts_data'], frag_index,
|
||||
nondurable_purge_delay)
|
||||
except DiskFileError:
|
||||
self.logger.exception(
|
||||
'Unable to purge DiskFile (%r %r %r)',
|
||||
|
||||
@@ -20,6 +20,7 @@ import random
|
||||
import shutil
|
||||
from collections import defaultdict
|
||||
|
||||
from swift.obj.reconstructor import ObjectReconstructor
|
||||
from test.probe.common import ECProbeTest, Body
|
||||
|
||||
from swift.common import direct_client
|
||||
@@ -395,9 +396,12 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
# fix the 507'ing primary
|
||||
self.revive_drive(pdevs[0])
|
||||
|
||||
# fire up reconstructor on handoff node only
|
||||
# fire up reconstructor on handoff node only; nondurable_purge_delay is
|
||||
# set to zero to ensure the nondurable handoff frag is purged
|
||||
hnode_id = (hnodes[0]['port'] % 100) // 10
|
||||
self.reconstructor.once(number=hnode_id)
|
||||
self.run_custom_daemon(
|
||||
ObjectReconstructor, 'object-reconstructor', hnode_id,
|
||||
{'nondurable_purge_delay': '0'})
|
||||
|
||||
# primary now has only the newer non-durable frag
|
||||
self.assert_direct_get_fails(onodes[0], opart, 404)
|
||||
|
||||
@@ -2869,6 +2869,24 @@ log_name = %(yarr)s'''
|
||||
with mock.patch('swift.common.utils.os.rmdir', _m_rmdir):
|
||||
self.assertRaises(OSError, utils.remove_directory, dir_name)
|
||||
|
||||
@with_tempdir
|
||||
def test_is_file_older(self, tempdir):
|
||||
ts = utils.Timestamp(time.time() - 100000)
|
||||
file_name = os.path.join(tempdir, '%s.data' % ts.internal)
|
||||
# assert no raise
|
||||
self.assertFalse(os.path.exists(file_name))
|
||||
self.assertTrue(utils.is_file_older(file_name, 0))
|
||||
self.assertFalse(utils.is_file_older(file_name, 1))
|
||||
|
||||
with open(file_name, 'w') as f:
|
||||
f.write('1')
|
||||
self.assertTrue(os.path.exists(file_name))
|
||||
self.assertTrue(utils.is_file_older(file_name, 0))
|
||||
# check that timestamp in file name is not relevant
|
||||
self.assertFalse(utils.is_file_older(file_name, 50000))
|
||||
time.sleep(0.01)
|
||||
self.assertTrue(utils.is_file_older(file_name, 0.009))
|
||||
|
||||
def test_human_readable(self):
|
||||
self.assertEqual(utils.human_readable(0), '0')
|
||||
self.assertEqual(utils.human_readable(1), '1')
|
||||
|
||||
@@ -36,7 +36,7 @@ from six.moves.urllib.parse import unquote
|
||||
from swift.common import utils
|
||||
from swift.common.exceptions import DiskFileError, DiskFileQuarantined
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.utils import dump_recon_cache, md5
|
||||
from swift.common.utils import dump_recon_cache, md5, Timestamp
|
||||
from swift.obj import diskfile, reconstructor as object_reconstructor
|
||||
from swift.common import ring
|
||||
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
|
||||
@@ -245,18 +245,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
'1': part_1,
|
||||
'2': part_2}
|
||||
|
||||
def _create_df(obj_num, part_num):
|
||||
self._create_diskfile(
|
||||
part=part_num, object_name='o' + str(obj_set),
|
||||
policy=policy, frag_index=scenarios[part_num](obj_set),
|
||||
timestamp=utils.Timestamp(t))
|
||||
|
||||
for part_num in self.part_nums:
|
||||
# create 3 unique objects per part, each part
|
||||
# will then have a unique mix of FIs for the
|
||||
# possible scenarios
|
||||
for obj_num in range(0, 3):
|
||||
_create_df(obj_num, part_num)
|
||||
self._create_diskfile(
|
||||
part=part_num, object_name='o' + str(obj_set),
|
||||
policy=policy, frag_index=scenarios[part_num](obj_set),
|
||||
timestamp=utils.Timestamp(t))
|
||||
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
for policy in [p for p in POLICIES if p.policy_type == EC_POLICY]:
|
||||
@@ -293,7 +290,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
|
||||
def _create_diskfile(self, policy=None, part=0, object_name='o',
|
||||
frag_index=0, timestamp=None, test_data=None):
|
||||
frag_index=0, timestamp=None, test_data=None,
|
||||
commit=True):
|
||||
policy = policy or self.policy
|
||||
df_mgr = self.reconstructor._df_router[policy]
|
||||
df = df_mgr.get_diskfile('sda1', part, 'a', 'c', object_name,
|
||||
@@ -301,7 +299,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
timestamp = timestamp or utils.Timestamp.now()
|
||||
test_data = test_data or b'test data'
|
||||
write_diskfile(df, timestamp, data=test_data, frag_index=frag_index,
|
||||
legacy_durable=self.legacy_durable)
|
||||
commit=commit, legacy_durable=self.legacy_durable)
|
||||
return df
|
||||
|
||||
def assert_expected_jobs(self, part_num, jobs):
|
||||
@@ -1092,7 +1090,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
matches a failure dict will return success == False.
|
||||
"""
|
||||
class _fake_ssync(object):
|
||||
def __init__(self, daemon, node, job, suffixes, **kwargs):
|
||||
def __init__(self, daemon, node, job, suffixes,
|
||||
include_non_durable=False, **kwargs):
|
||||
# capture context and generate an available_map of objs
|
||||
context = {}
|
||||
context['node'] = node
|
||||
@@ -1101,10 +1100,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
self.suffixes = suffixes
|
||||
self.daemon = daemon
|
||||
self.job = job
|
||||
frag_prefs = [] if include_non_durable else None
|
||||
hash_gen = self.daemon._df_router[job['policy']].yield_hashes(
|
||||
self.job['device'], self.job['partition'],
|
||||
self.job['policy'], self.suffixes,
|
||||
frag_index=self.job.get('frag_index'))
|
||||
frag_index=self.job.get('frag_index'),
|
||||
frag_prefs=frag_prefs)
|
||||
self.available_map = {}
|
||||
for hash_, timestamps in hash_gen:
|
||||
self.available_map[hash_] = timestamps
|
||||
@@ -1116,7 +1117,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
self.success = False
|
||||
break
|
||||
context['success'] = self.success
|
||||
context.update(kwargs)
|
||||
context['include_non_durable'] = include_non_durable
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.success, self.available_map if self.success else {}
|
||||
@@ -1191,6 +1192,66 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# sanity check that some files should were deleted
|
||||
self.assertGreater(n_files, n_files_after)
|
||||
|
||||
def test_delete_reverted_nondurable(self):
|
||||
# verify reconstructor only deletes reverted nondurable fragments after
|
||||
# nondurable_purge_delay
|
||||
shutil.rmtree(self.ec_obj_path)
|
||||
ips = utils.whataremyips(self.reconstructor.bind_ip)
|
||||
local_devs = [dev for dev in self.ec_obj_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
dev['replication_port'] ==
|
||||
self.reconstructor.port]
|
||||
partition = (local_devs[0]['id'] + 1) % 3
|
||||
# recent non-durable
|
||||
df_recent = self._create_diskfile(
|
||||
object_name='recent', part=partition, commit=False)
|
||||
datafile_recent = df_recent.manager.cleanup_ondisk_files(
|
||||
df_recent._datadir, frag_prefs=[])['data_file']
|
||||
# older non-durable but with recent mtime
|
||||
df_older = self._create_diskfile(
|
||||
object_name='older', part=partition, commit=False,
|
||||
timestamp=Timestamp(time.time() - 61))
|
||||
datafile_older = df_older.manager.cleanup_ondisk_files(
|
||||
df_older._datadir, frag_prefs=[])['data_file']
|
||||
# durable
|
||||
df_durable = self._create_diskfile(
|
||||
object_name='durable', part=partition, commit=True)
|
||||
datafile_durable = df_durable.manager.cleanup_ondisk_files(
|
||||
df_durable._datadir, frag_prefs=[])['data_file']
|
||||
self.assertTrue(os.path.exists(datafile_recent))
|
||||
self.assertTrue(os.path.exists(datafile_older))
|
||||
self.assertTrue(os.path.exists(datafile_durable))
|
||||
|
||||
ssync_calls = []
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(ssync_calls)):
|
||||
self.reconstructor.handoffs_only = True
|
||||
self.reconstructor.reconstruct()
|
||||
for context in ssync_calls:
|
||||
self.assertEqual(REVERT, context['job']['job_type'])
|
||||
self.assertTrue(True, context.get('include_non_durable'))
|
||||
# neither nondurable should be removed yet with default purge delay
|
||||
# because their mtimes are too recent
|
||||
self.assertTrue(os.path.exists(datafile_recent))
|
||||
self.assertTrue(os.path.exists(datafile_older))
|
||||
# but durable is purged
|
||||
self.assertFalse(os.path.exists(datafile_durable))
|
||||
|
||||
ssync_calls = []
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(ssync_calls)):
|
||||
self.reconstructor.handoffs_only = True
|
||||
# turn down the purge delay...
|
||||
self.reconstructor.nondurable_purge_delay = 0
|
||||
self.reconstructor.reconstruct()
|
||||
for context in ssync_calls:
|
||||
self.assertEqual(REVERT, context['job']['job_type'])
|
||||
self.assertTrue(True, context.get('include_non_durable'))
|
||||
|
||||
# ...now the nondurables get purged
|
||||
self.assertFalse(os.path.exists(datafile_recent))
|
||||
self.assertFalse(os.path.exists(datafile_older))
|
||||
|
||||
def test_no_delete_failed_revert(self):
|
||||
# test will only process revert jobs
|
||||
self.reconstructor.handoffs_only = True
|
||||
@@ -1314,8 +1375,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# part 2 should be totally empty
|
||||
hash_gen = self.reconstructor._df_router[self.policy].yield_hashes(
|
||||
'sda1', '2', self.policy, suffixes=stub_data.keys())
|
||||
for path, hash_, ts in hash_gen:
|
||||
self.fail('found %s with %s in %s' % (hash_, ts, path))
|
||||
for hash_, ts in hash_gen:
|
||||
self.fail('found %s : %s' % (hash_, ts))
|
||||
|
||||
new_hashes = self.reconstructor._get_hashes(
|
||||
'sda1', 2, self.policy, do_listdir=True)
|
||||
@@ -5328,6 +5389,24 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
|
||||
object_reconstructor.ObjectReconstructor(
|
||||
{'quarantine_threshold': bad})
|
||||
|
||||
def test_nondurable_purge_delay_conf(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor({})
|
||||
self.assertEqual(60, reconstructor.nondurable_purge_delay)
|
||||
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'nondurable_purge_delay': '0'})
|
||||
self.assertEqual(0, reconstructor.nondurable_purge_delay)
|
||||
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'nondurable_purge_delay': '3.2'})
|
||||
self.assertEqual(3.2, reconstructor.nondurable_purge_delay)
|
||||
|
||||
for bad in ('-1', -1, 'auto', 'bad'):
|
||||
with annotate_failure(bad):
|
||||
with self.assertRaises(ValueError):
|
||||
object_reconstructor.ObjectReconstructor(
|
||||
{'nondurable_purge_delay': bad})
|
||||
|
||||
def test_request_node_count_conf(self):
|
||||
# default is 1 * replicas
|
||||
reconstructor = object_reconstructor.ObjectReconstructor({})
|
||||
|
||||
Reference in New Issue
Block a user