relinker: Rehash as we complete partitions

Previously, we would rely on replication/reconstruction to rehash
once the part power increase was complete. This would lead to large
I/O spikes if operators didn't proactively tune down replication.

Now, do the rehashing in the relinker as it completes work. Operators
should already be mindful of the relinker's I/O usage and are likely
limiting it through some combination of cgroups, ionice, and/or
--files-per-second.

Also clean up empty partitions as we clean up. Operators with metrics on
cluster-wide primary/handoff partition counts can use them to monitor
relinking/cleanup progress:

 P  3N            .----------.
 a               /            '.
 r              /               '.
 t  2N         /       .-----------------
 i            /        |
 t           /         |
 i   N -----'---------'
 o
 n
 s   0 ----------------------------------
       t0   t1    t2   t3   t4    t5

At t0, prior to relinking, there are

    N := <replica count> * 2 ** <old part power>

primary partitions throughout the cluster and a negligible number of
handoffs.

At t1, relinking begins. In any non-trivial, low-replica-count cluster,
the probability of a new-part-power partition being assigned to the same
device as its old-part-power equivalent is low, so handoffs grow while
primaries remain constant.

At t2, relinking is complete. The total number of partitions is now 3N
(N primaries + 2N handoffs).

At t3, the ring with increased part power is distributed. The notion of
what's a handoff and what's a primary inverts.

At t4, cleanup begins. The "handoffs" are cleaned up as hard links and
now-empty partitions are removed.

At t5, cleanup is complete and there are now 2N total partitions.

Change-Id: Ib5bf426cf38559091917f2d25f4f60183cd16354
This commit is contained in:
Tim Burke 2021-02-02 15:16:43 -08:00 committed by Alistair Coles
parent a90515bbbe
commit d24884450f
3 changed files with 213 additions and 43 deletions

View File

@ -30,16 +30,21 @@ Caveats
Before increasing the partition power, consider the possible drawbacks.
There are a few caveats when increasing the partition power:
* All hashes.pkl files will become invalid once hard links are created, and the
replicators will need significantly more time on the first run after finishing
the partition power increase.
* Object replicators will skip partitions during the partition power increase.
Replicators are not aware of hard-links, and would simply copy the content;
this would result in heavy data movement and the worst case would be that all
data is stored twice.
* Almost all diskfiles in the cluster need to be relinked then cleaned up,
and all partition directories need to be rehashed. This imposes significant
I/O load on object servers, which may impact client requests. Consider using
cgroups, ``ionice``, or even just the built-in ``--files-per-second``
rate-limiting to reduce client impact.
* Object replicators and reconstructors will skip affected policies during the
partition power increase. Replicators are not aware of hard-links, and would
simply copy the content; this would result in heavy data movement and the
worst case would be that all data is stored twice.
* Due to the fact that each object will now be hard linked from two locations,
many more inodes will be used - expect around twice the amount. You need to
check the free inode count *before* increasing the partition power.
many more inodes will be used temporarily - expect around twice the amount.
You need to check the free inode count *before* increasing the partition
power. Even after the increase is complete and extra hardlinks are cleaned
up, expect increased inode usage since there will be twice as many partition
and suffix directories.
* Also, object auditors might read each object twice before cleanup removes the
second hard link.
* Due to the new inodes more memory is needed to cache them, and your
@ -76,13 +81,14 @@ on all object servers in this phase::
which normally happens within 15 seconds after writing a modified ring.
Also, make sure the modified rings are pushed to all nodes running object
services (replicators, reconstructors and reconcilers)- they have to skip
partitions during relinking.
the policy during relinking.
.. note::
The relinking command must run as the same user as the daemon processes
(usually swift). It will create files and directories that must be
manipulable by the daemon processes (server, auditor, replicator, ...).
If necessary, the ``--user`` option may be used to drop privileges.
Relinking might take some time; while there is no data copied or actually
moved, the tool still needs to walk the whole file system and create new hard
@ -131,10 +137,11 @@ is provided to do this. Run the following command on each storage node::
.. note::
The cleanup must be finished within your object servers reclaim_age period
(which is by default 1 week). Otherwise objects that have been overwritten
between step #1 and step #2 and deleted afterwards can't be cleaned up
anymore.
The cleanup must be finished within your object servers ``reclaim_age``
period (which is by default 1 week). Otherwise objects that have been
overwritten between step #1 and step #2 and deleted afterwards can't be
cleaned up anymore. You may want to increase your ``reclaim_age`` before
or during relinking.
Afterwards it is required to update the rings one last
time to inform servers that all steps to increase the partition power are done,

View File

@ -26,7 +26,7 @@ from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
DiskFileQuarantined
from swift.common.utils import replace_partition_in_path, config_true_value, \
audit_location_generator, get_logger, readconf, drop_privileges, \
RateLimitedIterator
RateLimitedIterator, lock_path
from swift.obj import diskfile
@ -136,19 +136,73 @@ def partitions_filter(states, part_power, next_part_power,
# Save states when a partition is done
def hook_post_partition(states, step,
def hook_post_partition(states, step, policy, diskfile_manager,
partition_path):
part = os.path.basename(os.path.abspath(partition_path))
datadir_path = os.path.dirname(os.path.abspath(partition_path))
device_path = os.path.dirname(os.path.abspath(datadir_path))
datadir_name = os.path.basename(os.path.abspath(datadir_path))
datadir_path, part = os.path.split(os.path.abspath(partition_path))
device_path, datadir_name = os.path.split(datadir_path)
device = os.path.basename(device_path)
state_tmp_file = os.path.join(device_path,
STATE_TMP_FILE.format(datadir=datadir_name))
state_file = os.path.join(device_path,
STATE_FILE.format(datadir=datadir_name))
if step in (STEP_RELINK, STEP_CLEANUP):
states["state"][part] = True
# We started with a partition space like
# |0 N|
# |ABCDEFGHIJKLMNOP|
#
# After relinking, it will be more like
# |0 2N|
# |AABBCCDDEEFFGGHHIIJJKKLLMMNNOOPP|
#
# We want to hold off on rehashing until after cleanup, since that is the
# point at which we've finished with filesystem manipulations. But there's
# a slight complication: we know the upper half has nothing to clean up,
# so the cleanup phase only looks at
# |0 2N|
# |AABBCCDDEEFFGGHH |
#
# To ensure that the upper half gets rehashed, too, do it as part of
# relinking; as we finish
# |0 N|
# | IJKLMNOP|
# shift to the new partition space and rehash
# |0 2N|
# | IIJJKKLLMMNNOOPP|
partition = int(part)
if step == STEP_RELINK and partition >= 2 ** (states['part_power'] - 1):
for new_part in (2 * partition, 2 * partition + 1):
diskfile_manager.get_hashes(device, new_part, [], policy)
elif step == STEP_CLEANUP:
hashes = diskfile_manager.get_hashes(device, partition, [], policy)
# In any reasonably-large cluster, we'd expect all old partitions P
# to be empty after cleanup (i.e., it's unlikely that there's another
# partition Q := P//2 that also has data on this device).
#
# Try to clean up empty partitions now, so operators can use existing
# rebalance-complete metrics to monitor relinking progress (provided
# there are few/no handoffs when relinking starts and little data is
# written to handoffs during the increase).
if not hashes:
with lock_path(partition_path):
# Same lock used by invalidate_hashes, consolidate_hashes,
# get_hashes
try:
os.unlink(os.path.join(partition_path, 'hashes.pkl'))
os.unlink(os.path.join(partition_path, 'hashes.invalid'))
os.unlink(os.path.join(partition_path, '.lock'))
except OSError:
pass
try:
os.rmdir(partition_path)
except OSError:
# Most likely, some data landed in here or we hit an error
# above. Let the replicator deal with things; it was worth
# a shot.
pass
# Then mark this part as done, in case the process is interrupted and
# needs to resume.
states["state"][part] = True
with open(state_tmp_file, 'wt') as f:
json.dump(states, f)
os.fsync(f.fileno())
@ -198,10 +252,13 @@ def relink(swift_dir='/etc/swift',
device=None,
files_per_second=0):
mount_check = not skip_mount_check
conf = {'devices': devices, 'mount_check': mount_check}
diskfile_router = diskfile.DiskFileRouter(conf, logger)
found_policy = False
relinked = errors = 0
error_counter = {}
for policy in POLICIES:
diskfile_mgr = diskfile_router[policy]
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(swift_dir)
part_power = policy.object_ring.part_power
@ -225,8 +282,8 @@ def relink(swift_dir='/etc/swift',
relink_hook_post_device = partial(hook_post_device, locks)
relink_partition_filter = partial(partitions_filter,
states, part_power, next_part_power)
relink_hook_post_partition = partial(hook_post_partition,
states, STEP_RELINK)
relink_hook_post_partition = partial(
hook_post_partition, states, STEP_RELINK, policy, diskfile_mgr)
relink_hashes_filter = partial(hashes_filter, next_part_power)
locations = audit_location_generator(
@ -247,6 +304,8 @@ def relink(swift_dir='/etc/swift',
try:
diskfile.relink_paths(fname, newfname, check_existing=True)
relinked += 1
suffix_dir = os.path.dirname(os.path.dirname(newfname))
diskfile.invalidate_hash(suffix_dir)
except OSError as exc:
errors += 1
logger.warning("Relinking %s to %s failed: %s",
@ -273,6 +332,7 @@ def cleanup(swift_dir='/etc/swift',
error_counter = {}
found_policy = False
for policy in POLICIES:
diskfile_mgr = diskfile_router[policy]
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(swift_dir)
part_power = policy.object_ring.part_power
@ -296,8 +356,8 @@ def cleanup(swift_dir='/etc/swift',
cleanup_hook_post_device = partial(hook_post_device, locks)
cleanup_partition_filter = partial(partitions_filter,
states, part_power, next_part_power)
cleanup_hook_post_partition = partial(hook_post_partition,
states, STEP_CLEANUP)
cleanup_hook_post_partition = partial(
hook_post_partition, states, STEP_CLEANUP, policy, diskfile_mgr)
cleanup_hashes_filter = partial(hashes_filter, next_part_power)
locations = audit_location_generator(
@ -323,7 +383,6 @@ def cleanup(swift_dir='/etc/swift',
# has been increased, but cleanup did not yet run)
loc = diskfile.AuditLocation(
os.path.dirname(expected_fname), device, partition, policy)
diskfile_mgr = diskfile_router[policy]
df = diskfile_mgr.get_diskfile_from_audit_location(loc)
try:
with df.open():
@ -355,6 +414,8 @@ def cleanup(swift_dir='/etc/swift',
os.remove(fname)
cleaned_up += 1
logger.debug("Removed %s", fname)
suffix_dir = os.path.dirname(os.path.dirname(fname))
diskfile.invalidate_hash(suffix_dir)
except OSError as exc:
logger.warning('Error cleaning up %s: %r', fname, exc)
errors += 1

View File

@ -21,10 +21,12 @@ from textwrap import dedent
import mock
import os
import pickle
import shutil
import struct
import tempfile
import unittest
import uuid
from swift.cli import relinker
from swift.common import exceptions, ring, utils
@ -32,7 +34,7 @@ from swift.common import storage_policy
from swift.common.storage_policy import (
StoragePolicy, StoragePolicyCollection, POLICIES, ECStoragePolicy)
from swift.obj.diskfile import write_metadata
from swift.obj.diskfile import write_metadata, DiskFileRouter, DiskFileManager
from test.unit import FakeLogger, skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \
patch_policies
@ -64,18 +66,24 @@ class TestRelinker(unittest.TestCase):
self.objects = os.path.join(self.devices, self.existing_device,
'objects')
os.mkdir(self.objects)
self._hash = utils.hash_path('a/c/o')
digest = binascii.unhexlify(self._hash)
self.part = struct.unpack_from('>I', digest)[0] >> 24
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
for _ in range(10):
obj_path = '/a/c/o-' + str(uuid.uuid4())
self._hash = utils.hash_path(obj_path[1:])
digest = binascii.unhexlify(self._hash)
self.part = struct.unpack_from('>I', digest)[0] >> 24
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
# There's 1/512 chance that both old and new parts will be 0;
# that's not a terribly interesting case, as there's nothing to do
if self.part != self.next_part:
break
self.objdir = os.path.join(
self.objects, str(self.part), self._hash[-3:], self._hash)
os.makedirs(self.objdir)
self.object_fname = "1278553064.00000.data"
self.object_fname = utils.Timestamp.now().internal + ".data"
self.objname = os.path.join(self.objdir, self.object_fname)
with open(self.objname, "wb") as dummy:
dummy.write(b"Hello World!")
write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})
write_metadata(dummy, {'name': obj_path, 'Content-Length': '12'})
test_policies = [StoragePolicy(0, 'platinum', True)]
storage_policy._POLICIES = StoragePolicyCollection(test_policies)
@ -255,6 +263,23 @@ class TestRelinker(unittest.TestCase):
stat_new = os.stat(self.expected_file)
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
if self.next_part < 2 ** PART_POWER:
# Invalidated now, rehashed during cleanup
with open(os.path.join(self.objects, str(self.next_part),
'hashes.invalid')) as fp:
self.assertEqual(fp.read(), self._hash[-3:] + '\n')
else:
# Invalidated and rehashed during relinking
with open(os.path.join(self.objects, str(self.next_part),
'hashes.invalid')) as fp:
self.assertEqual(fp.read(), '')
with open(os.path.join(self.objects, str(self.next_part),
'hashes.pkl'), 'rb') as fp:
self.assertIn(self._hash[-3:], pickle.load(fp))
self.assertFalse(os.path.exists(os.path.join(
self.objects, str(self.part), 'hashes.invalid')))
def test_relink_no_applicable_policy(self):
# NB do not prepare part power increase
self._save_ring()
@ -358,6 +383,8 @@ class TestRelinker(unittest.TestCase):
self.assertTrue(os.path.isfile(self.expected_file))
self.assertFalse(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
self.assertFalse(os.path.exists(
os.path.join(self.objects, str(self.part))))
def test_cleanup_no_applicable_policy(self):
# NB do not prepare part power increase
@ -444,12 +471,13 @@ class TestRelinker(unittest.TestCase):
'--devices', self.devices,
'--skip-mount',
]))
state = {str(self.part): True}
with open(state_file, 'rt') as f:
orig_inode = os.stat(state_file).st_ino
self.assertEqual(json.load(f), {
"part_power": PART_POWER,
"next_part_power": PART_POWER + 1,
"state": {str(self.part): True}})
"state": state})
self.rb.increase_partition_power()
self.rb._ring = None # Force builder to reload ring
@ -465,6 +493,8 @@ class TestRelinker(unittest.TestCase):
'--skip-mount',
]))
self.assertNotEqual(orig_inode, os.stat(state_file).st_ino)
if self.next_part < 2 ** PART_POWER:
state[str(self.next_part)] = True
with open(state_file, 'rt') as f:
# NB: part_power/next_part_power tuple changed, so state was reset
# (though we track prev_part_power for an efficient clean up)
@ -472,8 +502,7 @@ class TestRelinker(unittest.TestCase):
"prev_part_power": PART_POWER,
"part_power": PART_POWER + 1,
"next_part_power": PART_POWER + 1,
"state": {str(self.part): True,
str(self.next_part): True}})
"state": state})
def test_devices_filter_filtering(self):
# With no filtering, returns all devices
@ -546,8 +575,12 @@ class TestRelinker(unittest.TestCase):
'auditor_status.json']))
self.assertEqual(states["state"], {'96': False, '227': False})
pol = POLICIES[0]
mgr = DiskFileRouter({'devices': self.devices,
'mount_check': False}, self.logger)[pol]
# Ack partition 96
relinker.hook_post_partition(states, relinker.STEP_RELINK,
relinker.hook_post_partition(states, relinker.STEP_RELINK, pol, mgr,
os.path.join(datadir_path, '96'))
self.assertEqual(states["state"], {'96': True, '227': False})
with open(state_file, 'rt') as f:
@ -563,7 +596,7 @@ class TestRelinker(unittest.TestCase):
self.assertEqual(states["state"], {'96': True, '227': False})
# Ack partition 227
relinker.hook_post_partition(states, relinker.STEP_RELINK,
relinker.hook_post_partition(states, relinker.STEP_RELINK, pol, mgr,
os.path.join(datadir_path, '227'))
self.assertEqual(states["state"], {'96': True, '227': True})
with open(state_file, 'rt') as f:
@ -605,7 +638,7 @@ class TestRelinker(unittest.TestCase):
call_partition_filter(PART_POWER + 1, PART_POWER + 1,
['96', '227', '312']))
# Ack partition 227
relinker.hook_post_partition(states, relinker.STEP_CLEANUP,
relinker.hook_post_partition(states, relinker.STEP_CLEANUP, pol, mgr,
os.path.join(datadir_path, '227'))
self.assertEqual(states["state"],
{'96': False, '227': True})
@ -624,7 +657,7 @@ class TestRelinker(unittest.TestCase):
{'96': False, '227': True})
# Ack partition 96
relinker.hook_post_partition(states, relinker.STEP_CLEANUP,
relinker.hook_post_partition(states, relinker.STEP_CLEANUP, pol, mgr,
os.path.join(datadir_path, '96'))
self.assertEqual(states["state"],
{'96': True, '227': True})
@ -732,9 +765,12 @@ class TestRelinker(unittest.TestCase):
'--skip-mount',
]))
log_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(log_lines),
'Expected 1 log line, got %r' % log_lines)
self.assertEqual(2, len(log_lines),
'Expected 2 log lines, got %r' % log_lines)
# Once for the cleanup...
self.assertIn('Bad fragment index: None', log_lines[0])
# ... then again for the rehash
self.assertIn('Bad fragment index: None', log_lines[1])
def test_cleanup_quarantined(self):
self._common_test_cleanup()
@ -758,6 +794,72 @@ class TestRelinker(unittest.TestCase):
'actual object size 5', log_lines[0])
self.assertIn('failed audit and was quarantined', log_lines[1])
def test_rehashing(self):
calls = []
@contextmanager
def do_mocks():
orig_invalidate = relinker.diskfile.invalidate_hash
orig_get_hashes = DiskFileManager.get_hashes
def mock_invalidate(suffix_dir):
calls.append(('invalidate', suffix_dir))
return orig_invalidate(suffix_dir)
def mock_get_hashes(self, *args):
calls.append(('get_hashes', ) + args)
return orig_get_hashes(self, *args)
with mock.patch.object(relinker.diskfile, 'invalidate_hash',
mock_invalidate), \
mock.patch.object(DiskFileManager, 'get_hashes',
mock_get_hashes):
yield
old_suffix_dir = os.path.join(
self.objects, str(self.part), self._hash[-3:])
new_suffix_dir = os.path.join(
self.objects, str(self.next_part), self._hash[-3:])
with do_mocks():
self.rb.prepare_increase_partition_power()
self._save_ring()
self.assertEqual(0, relinker.main([
'relink',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
]))
expected = [('invalidate', new_suffix_dir)]
if self.part >= 2 ** (PART_POWER - 1):
expected.extend([
('get_hashes', self.existing_device, self.next_part & ~1,
[], POLICIES[0]),
('get_hashes', self.existing_device, self.next_part | 1,
[], POLICIES[0]),
])
self.assertEqual(calls, expected)
# Depending on partition, there may or may not be a get_hashes here
self.rb._ring = None # Force builder to reload ring
self.rb.increase_partition_power()
self._save_ring()
self.assertEqual(0, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
]))
if self.part < 2 ** (PART_POWER - 1):
expected.append(('get_hashes', self.existing_device,
self.next_part, [], POLICIES[0]))
expected.extend([
('invalidate', old_suffix_dir),
('get_hashes', self.existing_device, self.part, [],
POLICIES[0]),
])
self.assertEqual(calls, expected)
if __name__ == '__main__':
unittest.main()