From 2934818d608e6cedd30ecb81900d02969476275c Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Tue, 22 Jun 2021 16:48:59 +0100 Subject: [PATCH] reconstructor: Delay purging reverted non-durable datafiles The reconstructor may revert a non-durable datafile on a handoff concurrently with an object server PUT that is about to make the datafile durable. This could previously lead to the reconstructor deleting the recently written datafile before the object-server attempts to rename it to a durable datafile, and consequently a traceback in the object server. The reconstructor will now only remove reverted nondurable datafiles that are older (according to mtime) than a period set by a new nondurable_purge_delay option (defaults to 60 seconds). More recent nondurable datafiles may be made durable or will remain on the handoff until a subsequent reconstructor cycle. Change-Id: I0d519ebaaade35249fb7b17bd5f419ffdaa616c0 --- etc/object-server.conf-sample | 6 ++ swift/common/utils.py | 19 +++++ swift/obj/diskfile.py | 9 +- swift/obj/reconstructor.py | 13 ++- test/probe/test_reconstructor_revert.py | 8 +- test/unit/common/test_utils.py | 18 ++++ test/unit/obj/test_reconstructor.py | 109 ++++++++++++++++++++---- 7 files changed, 160 insertions(+), 22 deletions(-) diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 58ea877c5f..bc9a975634 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -423,6 +423,12 @@ use = egg:swift#recon # to be rebuilt). The minimum is only exceeded if request_node_count is # greater, and only for the purposes of quarantining. # request_node_count = 2 * replicas +# +# Sets a delay, in seconds, before the reconstructor removes non-durable data +# files from a handoff node after reverting them to a primary. This gives the +# object-server a window in which to finish a concurrent PUT on a handoff and +# mark the data durable. +# nondurable_purge_delay = 60.0 [object-updater] # You can override the default log routing for this app here (don't use set!): diff --git a/swift/common/utils.py b/swift/common/utils.py index d00cbde4fd..2a0aecc389 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3277,6 +3277,25 @@ def remove_directory(path): raise +def is_file_older(path, age): + """ + Test if a file mtime is older than the given age, suppressing any OSErrors. + + :param path: first and only argument passed to os.stat + :param age: age in seconds + :return: True if age is less than or equal to zero or if the file mtime is + more than ``age`` in the past; False if age is greater than zero and + the file mtime is less than or equal to ``age`` in the past or if there + is an OSError while stat'ing the file. + """ + if age <= 0: + return True + try: + return time.time() - os.stat(path).st_mtime > age + except OSError: + return False + + def audit_location_generator(devices, datadir, suffix='', mount_check=True, logger=None, devices_filter=None, partitions_filter=None, diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index a786c800bc..95a2d42bdd 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -66,7 +66,7 @@ from swift.common.utils import mkdirs, Timestamp, \ get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \ MD5_OF_EMPTY_STRING, link_fd_to_path, \ O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \ - md5 + md5, is_file_older from swift.common.splice import splice, tee from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ @@ -3308,7 +3308,7 @@ class ECDiskFile(BaseDiskFile): frag_prefs=self._frag_prefs, policy=policy) return self._ondisk_info - def purge(self, timestamp, frag_index): + def purge(self, timestamp, frag_index, nondurable_purge_delay=0): """ Remove a tombstone file matching the specified timestamp or datafile matching the specified timestamp and fragment index @@ -3325,6 +3325,8 @@ class ECDiskFile(BaseDiskFile): :class:`~swift.common.utils.Timestamp` :param frag_index: fragment archive index, must be a whole number or None. + :param nondurable_purge_delay: only remove a non-durable data file if + it's been on disk longer than this many seconds. """ purge_file = self.manager.make_on_disk_filename( timestamp, ext='.ts') @@ -3334,7 +3336,8 @@ class ECDiskFile(BaseDiskFile): # possibilities purge_file = self.manager.make_on_disk_filename( timestamp, ext='.data', frag_index=frag_index) - remove_file(os.path.join(self._datadir, purge_file)) + if is_file_older(purge_file, nondurable_purge_delay): + remove_file(os.path.join(self._datadir, purge_file)) purge_file = self.manager.make_on_disk_filename( timestamp, ext='.data', frag_index=frag_index, durable=True) remove_file(os.path.join(self._datadir, purge_file)) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index b53b8ba5e5..30fe0a703b 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -34,7 +34,7 @@ from swift.common.utils import ( GreenAsyncPile, Timestamp, remove_file, load_recon_cache, parse_override_options, distribute_evenly, PrefixLoggerAdapter, remove_directory, config_request_node_count_value, - non_negative_int) + non_negative_int, non_negative_float) from swift.common.header_key_dict import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -237,6 +237,8 @@ class ObjectReconstructor(Daemon): conf.get('quarantine_threshold', 0)) self.request_node_count = config_request_node_count_value( conf.get('request_node_count', '2 * replicas')) + self.nondurable_purge_delay = non_negative_float( + conf.get('nondurable_purge_delay', '60')) # When upgrading from liberasurecode<=1.5.0, you may want to continue # writing legacy CRCs until all nodes are upgraded and capabale of @@ -975,7 +977,14 @@ class ObjectReconstructor(Daemon): job['local_dev']['device'], job['partition'], object_hash, job['policy'], frag_index=frag_index) - df.purge(timestamps['ts_data'], frag_index) + # legacy durable data files look like modern nondurable data + # files; we therefore override nondurable_purge_delay when we + # know the data file is durable so that legacy durable data + # files get purged + nondurable_purge_delay = (0 if timestamps.get('durable') + else self.nondurable_purge_delay) + df.purge(timestamps['ts_data'], frag_index, + nondurable_purge_delay) except DiskFileError: self.logger.exception( 'Unable to purge DiskFile (%r %r %r)', diff --git a/test/probe/test_reconstructor_revert.py b/test/probe/test_reconstructor_revert.py index c90de6b8a6..a58778606a 100644 --- a/test/probe/test_reconstructor_revert.py +++ b/test/probe/test_reconstructor_revert.py @@ -20,6 +20,7 @@ import random import shutil from collections import defaultdict +from swift.obj.reconstructor import ObjectReconstructor from test.probe.common import ECProbeTest, Body from swift.common import direct_client @@ -395,9 +396,12 @@ class TestReconstructorRevert(ECProbeTest): # fix the 507'ing primary self.revive_drive(pdevs[0]) - # fire up reconstructor on handoff node only + # fire up reconstructor on handoff node only; nondurable_purge_delay is + # set to zero to ensure the nondurable handoff frag is purged hnode_id = (hnodes[0]['port'] % 100) // 10 - self.reconstructor.once(number=hnode_id) + self.run_custom_daemon( + ObjectReconstructor, 'object-reconstructor', hnode_id, + {'nondurable_purge_delay': '0'}) # primary now has only the newer non-durable frag self.assert_direct_get_fails(onodes[0], opart, 404) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 4f2cabc5d2..583905245b 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -2869,6 +2869,24 @@ log_name = %(yarr)s''' with mock.patch('swift.common.utils.os.rmdir', _m_rmdir): self.assertRaises(OSError, utils.remove_directory, dir_name) + @with_tempdir + def test_is_file_older(self, tempdir): + ts = utils.Timestamp(time.time() - 100000) + file_name = os.path.join(tempdir, '%s.data' % ts.internal) + # assert no raise + self.assertFalse(os.path.exists(file_name)) + self.assertTrue(utils.is_file_older(file_name, 0)) + self.assertFalse(utils.is_file_older(file_name, 1)) + + with open(file_name, 'w') as f: + f.write('1') + self.assertTrue(os.path.exists(file_name)) + self.assertTrue(utils.is_file_older(file_name, 0)) + # check that timestamp in file name is not relevant + self.assertFalse(utils.is_file_older(file_name, 50000)) + time.sleep(0.01) + self.assertTrue(utils.is_file_older(file_name, 0.009)) + def test_human_readable(self): self.assertEqual(utils.human_readable(0), '0') self.assertEqual(utils.human_readable(1), '1') diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 86ddddb712..a897b4ae1d 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -36,7 +36,7 @@ from six.moves.urllib.parse import unquote from swift.common import utils from swift.common.exceptions import DiskFileError, DiskFileQuarantined from swift.common.header_key_dict import HeaderKeyDict -from swift.common.utils import dump_recon_cache, md5 +from swift.common.utils import dump_recon_cache, md5, Timestamp from swift.obj import diskfile, reconstructor as object_reconstructor from swift.common import ring from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, @@ -245,18 +245,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): '1': part_1, '2': part_2} - def _create_df(obj_num, part_num): - self._create_diskfile( - part=part_num, object_name='o' + str(obj_set), - policy=policy, frag_index=scenarios[part_num](obj_set), - timestamp=utils.Timestamp(t)) - for part_num in self.part_nums: # create 3 unique objects per part, each part # will then have a unique mix of FIs for the # possible scenarios for obj_num in range(0, 3): - _create_df(obj_num, part_num) + self._create_diskfile( + part=part_num, object_name='o' + str(obj_set), + policy=policy, frag_index=scenarios[part_num](obj_set), + timestamp=utils.Timestamp(t)) ips = utils.whataremyips(self.reconstructor.bind_ip) for policy in [p for p in POLICIES if p.policy_type == EC_POLICY]: @@ -293,7 +290,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): rmtree(self.testdir, ignore_errors=1) def _create_diskfile(self, policy=None, part=0, object_name='o', - frag_index=0, timestamp=None, test_data=None): + frag_index=0, timestamp=None, test_data=None, + commit=True): policy = policy or self.policy df_mgr = self.reconstructor._df_router[policy] df = df_mgr.get_diskfile('sda1', part, 'a', 'c', object_name, @@ -301,7 +299,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): timestamp = timestamp or utils.Timestamp.now() test_data = test_data or b'test data' write_diskfile(df, timestamp, data=test_data, frag_index=frag_index, - legacy_durable=self.legacy_durable) + commit=commit, legacy_durable=self.legacy_durable) return df def assert_expected_jobs(self, part_num, jobs): @@ -1092,7 +1090,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): matches a failure dict will return success == False. """ class _fake_ssync(object): - def __init__(self, daemon, node, job, suffixes, **kwargs): + def __init__(self, daemon, node, job, suffixes, + include_non_durable=False, **kwargs): # capture context and generate an available_map of objs context = {} context['node'] = node @@ -1101,10 +1100,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.suffixes = suffixes self.daemon = daemon self.job = job + frag_prefs = [] if include_non_durable else None hash_gen = self.daemon._df_router[job['policy']].yield_hashes( self.job['device'], self.job['partition'], self.job['policy'], self.suffixes, - frag_index=self.job.get('frag_index')) + frag_index=self.job.get('frag_index'), + frag_prefs=frag_prefs) self.available_map = {} for hash_, timestamps in hash_gen: self.available_map[hash_] = timestamps @@ -1116,7 +1117,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.success = False break context['success'] = self.success - context.update(kwargs) + context['include_non_durable'] = include_non_durable def __call__(self, *args, **kwargs): return self.success, self.available_map if self.success else {} @@ -1191,6 +1192,66 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # sanity check that some files should were deleted self.assertGreater(n_files, n_files_after) + def test_delete_reverted_nondurable(self): + # verify reconstructor only deletes reverted nondurable fragments after + # nondurable_purge_delay + shutil.rmtree(self.ec_obj_path) + ips = utils.whataremyips(self.reconstructor.bind_ip) + local_devs = [dev for dev in self.ec_obj_ring.devs + if dev and dev['replication_ip'] in ips and + dev['replication_port'] == + self.reconstructor.port] + partition = (local_devs[0]['id'] + 1) % 3 + # recent non-durable + df_recent = self._create_diskfile( + object_name='recent', part=partition, commit=False) + datafile_recent = df_recent.manager.cleanup_ondisk_files( + df_recent._datadir, frag_prefs=[])['data_file'] + # older non-durable but with recent mtime + df_older = self._create_diskfile( + object_name='older', part=partition, commit=False, + timestamp=Timestamp(time.time() - 61)) + datafile_older = df_older.manager.cleanup_ondisk_files( + df_older._datadir, frag_prefs=[])['data_file'] + # durable + df_durable = self._create_diskfile( + object_name='durable', part=partition, commit=True) + datafile_durable = df_durable.manager.cleanup_ondisk_files( + df_durable._datadir, frag_prefs=[])['data_file'] + self.assertTrue(os.path.exists(datafile_recent)) + self.assertTrue(os.path.exists(datafile_older)) + self.assertTrue(os.path.exists(datafile_durable)) + + ssync_calls = [] + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)): + self.reconstructor.handoffs_only = True + self.reconstructor.reconstruct() + for context in ssync_calls: + self.assertEqual(REVERT, context['job']['job_type']) + self.assertTrue(True, context.get('include_non_durable')) + # neither nondurable should be removed yet with default purge delay + # because their mtimes are too recent + self.assertTrue(os.path.exists(datafile_recent)) + self.assertTrue(os.path.exists(datafile_older)) + # but durable is purged + self.assertFalse(os.path.exists(datafile_durable)) + + ssync_calls = [] + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)): + self.reconstructor.handoffs_only = True + # turn down the purge delay... + self.reconstructor.nondurable_purge_delay = 0 + self.reconstructor.reconstruct() + for context in ssync_calls: + self.assertEqual(REVERT, context['job']['job_type']) + self.assertTrue(True, context.get('include_non_durable')) + + # ...now the nondurables get purged + self.assertFalse(os.path.exists(datafile_recent)) + self.assertFalse(os.path.exists(datafile_older)) + def test_no_delete_failed_revert(self): # test will only process revert jobs self.reconstructor.handoffs_only = True @@ -1314,8 +1375,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # part 2 should be totally empty hash_gen = self.reconstructor._df_router[self.policy].yield_hashes( 'sda1', '2', self.policy, suffixes=stub_data.keys()) - for path, hash_, ts in hash_gen: - self.fail('found %s with %s in %s' % (hash_, ts, path)) + for hash_, ts in hash_gen: + self.fail('found %s : %s' % (hash_, ts)) new_hashes = self.reconstructor._get_hashes( 'sda1', 2, self.policy, do_listdir=True) @@ -5328,6 +5389,24 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): object_reconstructor.ObjectReconstructor( {'quarantine_threshold': bad}) + def test_nondurable_purge_delay_conf(self): + reconstructor = object_reconstructor.ObjectReconstructor({}) + self.assertEqual(60, reconstructor.nondurable_purge_delay) + + reconstructor = object_reconstructor.ObjectReconstructor( + {'nondurable_purge_delay': '0'}) + self.assertEqual(0, reconstructor.nondurable_purge_delay) + + reconstructor = object_reconstructor.ObjectReconstructor( + {'nondurable_purge_delay': '3.2'}) + self.assertEqual(3.2, reconstructor.nondurable_purge_delay) + + for bad in ('-1', -1, 'auto', 'bad'): + with annotate_failure(bad): + with self.assertRaises(ValueError): + object_reconstructor.ObjectReconstructor( + {'nondurable_purge_delay': bad}) + def test_request_node_count_conf(self): # default is 1 * replicas reconstructor = object_reconstructor.ObjectReconstructor({})