diff --git a/bin/swift-object-relinker b/bin/swift-object-relinker new file mode 100755 index 00000000..8b79bfd8 --- /dev/null +++ b/bin/swift-object-relinker @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import argparse +import sys + +from swift.cli.relinker import main + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Relink and cleanup objects to increase partition power') + parser.add_argument('action', choices=['relink', 'cleanup']) + parser.add_argument('--swift-dir', default='/etc/swift', + dest='swift_dir', help='Path to swift directory') + parser.add_argument('--devices', default='/srv/node', + dest='devices', help='Path to swift device directory') + parser.add_argument('--skip-mount-check', default=False, + action="store_true", dest='skip_mount_check') + parser.add_argument('--logfile', default=None, + dest='logfile') + parser.add_argument('--debug', default=False, action='store_true') + + args = parser.parse_args() + + sys.exit(main(args)) diff --git a/doc/source/index.rst b/doc/source/index.rst index 27afd5b1..44867466 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -62,6 +62,7 @@ Overview and Concepts overview_encryption overview_backing_store ring_background + ring_partpower associated_projects Developer Documentation diff --git a/doc/source/ring_partpower.rst b/doc/source/ring_partpower.rst new file mode 100644 index 00000000..17e689aa --- /dev/null +++ b/doc/source/ring_partpower.rst @@ -0,0 +1,184 @@ +============================== +Modifying Ring Partition Power +============================== + +The ring partition power determines the on-disk location of data files and is +selected when creating a new ring. In normal operation, it is a fixed value. +This is because a different partition power results in a different on-disk +location for all data files. + +However, increasing the partition power by 1 can be done by choosing locations +that are on the same disk. As a result, we can create hard-links for both the +new and old locations, avoiding data movement without impacting availability. + +To enable a partition power change without interrupting user access, object +servers need to be aware of it in advance. Therefore a partition power change +needs to be done in multiple steps. + +.. note:: + + Do not increase the partition power on account and container rings. + Increasing the partition power is *only* supported for object rings. + Trying to increase the part_power for account and container rings *will* + result in unavailability, maybe even data loss. + + +------- +Caveats +------- + +Before increasing the partition power, consider the possible drawbacks. +There are a few caveats when increasing the partition power: + +* All hashes.pkl files will become invalid once hard links are created, and the + replicators will need significantly more time on the first run after finishing + the partition power increase. +* Object replicators will skip partitions during the partition power increase. + Replicators are not aware of hard-links, and would simply copy the content; + this would result in heavy data movement and the worst case would be that all + data is stored twice. +* Due to the fact that each object will now be hard linked from two locations, + many more inodes will be used - expect around twice the amount. You need to + check the free inode count *before* increasing the partition power. +* Also, object auditors might read each object twice before cleanup removes the + second hard link. +* Due to the new inodes more memory is needed to cache them, and your + object servers should have plenty of available memory to avoid running out of + inode cache. Setting ``vfs_cache_pressure`` to 1 might help with that. +* All nodes in the cluster *must* run at least Swift version 2.13.0 or later. + +Due to these caveats you should only increase the partition power if really +needed, i.e. if the number of partitions per disk is extremely low and the data +is distributed unevenly across disks. + +----------------------------------- +1. Prepare partition power increase +----------------------------------- + +The swift-ring-builder is used to prepare the ring for an upcoming partition +power increase. It will store a new variable ``next_part_power`` with the current +partition power + 1. Object servers recognize this, and hard links to the new +location will be created (or deleted) on every PUT or DELETE. This will make +it possible to access newly written objects using the future partition power:: + + swift-ring-builder prepare_increase_partition_power + swift-ring-builder write_ring + +Now you need to copy the updated .ring.gz to all nodes. Already existing data +needs to be relinked too; therefore an operator has to run a relinker command +on all object servers in this phase:: + + swift-object-relinker relink + +.. note:: + + Start relinking after *all* the servers re-read the modified ring files, + which normally happens within 15 seconds after writing a modified ring. + Also, make sure the modified rings are pushed to all nodes running object + services (replicators, reconstructors and reconcilers)- they have to skip + partitions during relinking. + +Relinking might take some time; while there is no data copied or actually +moved, the tool still needs to walk the whole file system and create new hard +links as required. + +--------------------------- +2. Increase partition power +--------------------------- + +Now that all existing data can be found using the new location, it's time to +actually increase the partition power itself:: + + swift-ring-builder increase_partition_power + swift-ring-builder write_ring + +Now you need to copy the updated .ring.gz again to all nodes. Object servers +are now using the new, increased partition power and no longer create +additional hard links. + + +.. note:: + + The object servers will create additional hard links for each modified or + new object, and this requires more inodes. + +.. note:: + + If you decide you don't want to increase the partition power, you should + instead cancel the increase. It is not possible to revert this operation + once started. To abort the partition power increase, execute the following + commands, copy the updated .ring.gz files to all nodes and continue with + `3. Cleanup`_ afterwards:: + + swift-ring-builder cancel_increase_partition_power + swift-ring-builder write_ring + + +---------- +3. Cleanup +---------- + +Existing hard links in the old locations need to be removed, and a cleanup tool +is provided to do this. Run the following command on each storage node:: + + swift-object-relinker cleanup + +.. note:: + + The cleanup must be finished within your object servers reclaim_age period + (which is by default 1 week). Otherwise objects that have been overwritten + between step #1 and step #2 and deleted afterwards can't be cleaned up + anymore. + +Afterwards it is required to update the rings one last +time to inform servers that all steps to increase the partition power are done, +and replicators should resume their job:: + + swift-ring-builder finish_increase_partition_power + swift-ring-builder write_ring + +Now you need to copy the updated .ring.gz again to all nodes. + +---------- +Background +---------- + +An existing object that is currently located on partition X will be placed +either on partition 2*X or 2*X+1 after the partition power is increased. The +reason for this is the Ring.get_part() method, that does a bitwise shift to the +right. + +To avoid actual data movement to different disks or even nodes, the allocation +of partitions to nodes needs to be changed. The allocation is pairwise due to +the above mentioned new partition scheme. Therefore devices are allocated like +this, with the partition being the index and the value being the device id:: + + old new + part dev part dev + ---- --- ---- --- + 0 0 0 0 + 1 0 + 1 3 2 3 + 3 3 + 2 7 4 7 + 5 7 + 3 5 6 5 + 7 5 + 4 2 8 2 + 9 2 + 5 1 10 1 + 11 1 + +There is a helper method to compute the new path, and the following example +shows the mapping between old and new location:: + + >>> from swift.common.utils import replace_partition_in_path + >>> old='objects/16003/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data' + >>> replace_partition_in_path(old, 14) + 'objects/16003/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data' + >>> replace_partition_in_path(old, 15) + 'objects/32007/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data' + +Using the original partition power (14) it returned the same path; however +after an increase to 15 it returns the new path, and the new partition is 2*X+1 +in this case. diff --git a/setup.cfg b/setup.cfg index ea73b2c7..938bd82d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,7 @@ scripts = bin/swift-object-info bin/swift-object-replicator bin/swift-object-reconstructor + bin/swift-object-relinker bin/swift-object-server bin/swift-object-updater bin/swift-oldies diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py new file mode 100644 index 00000000..b7b4aaf7 --- /dev/null +++ b/swift/cli/relinker.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import os +from swift.common.storage_policy import POLICIES +from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \ + DiskFileQuarantined +from swift.common.utils import replace_partition_in_path, \ + audit_location_generator, get_logger +from swift.obj import diskfile + + +def relink(swift_dir='/etc/swift', + devices='/srv/node', + skip_mount_check=False, + logger=logging.getLogger()): + mount_check = not skip_mount_check + run = False + relinked = errors = 0 + for policy in POLICIES: + policy.object_ring = None # Ensure it will be reloaded + policy.load_ring(swift_dir) + part_power = policy.object_ring.part_power + next_part_power = policy.object_ring.next_part_power + if not next_part_power or next_part_power == part_power: + continue + logging.info('Relinking files for policy %s under %s', + policy.name, devices) + run = True + locations = audit_location_generator( + devices, + diskfile.get_data_dir(policy), + mount_check=mount_check) + for fname, _, _ in locations: + newfname = replace_partition_in_path(fname, next_part_power) + try: + diskfile.relink_paths(fname, newfname, check_existing=True) + relinked += 1 + except OSError as exc: + errors += 1 + logger.warning("Relinking %s to %s failed: %s", + fname, newfname, exc) + + if not run: + logger.warning("No policy found to increase the partition power.") + return 2 + logging.info('Relinked %d diskfiles (%d errors)', relinked, errors) + if errors > 0: + return 1 + return 0 + + +def cleanup(swift_dir='/etc/swift', + devices='/srv/node', + skip_mount_check=False, + logger=logging.getLogger()): + mount_check = not skip_mount_check + conf = {'devices': devices, 'mount_check': mount_check} + diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf)) + errors = cleaned_up = 0 + run = False + for policy in POLICIES: + policy.object_ring = None # Ensure it will be reloaded + policy.load_ring(swift_dir) + part_power = policy.object_ring.part_power + next_part_power = policy.object_ring.next_part_power + if not next_part_power or next_part_power != part_power: + continue + logging.info('Cleaning up files for policy %s under %s', + policy.name, devices) + run = True + locations = audit_location_generator( + devices, + diskfile.get_data_dir(policy), + mount_check=mount_check) + for fname, device, partition in locations: + expected_fname = replace_partition_in_path(fname, part_power) + if fname == expected_fname: + continue + # Make sure there is a valid object file in the expected new + # location. Note that this could be newer than the original one + # (which happens if there is another PUT after partition power + # has been increased, but cleanup did not yet run) + loc = diskfile.AuditLocation( + os.path.dirname(expected_fname), device, partition, policy) + diskfile_mgr = diskfile_router[policy] + df = diskfile_mgr.get_diskfile_from_audit_location(loc) + try: + with df.open(): + pass + except DiskFileQuarantined as exc: + logger.warning('ERROR Object %(obj)s failed audit and was' + ' quarantined: %(err)r', + {'obj': loc, 'err': exc}) + errors += 1 + continue + except DiskFileDeleted: + pass + except DiskFileNotExist as exc: + err = False + if policy.policy_type == 'erasure_coding': + # Might be a non-durable fragment - check that there is + # a fragment in the new path. Will be fixed by the + # reconstructor then + if not os.path.isfile(expected_fname): + err = True + else: + err = True + if err: + logger.warning( + 'Error cleaning up %s: %r', fname, exc) + errors += 1 + continue + try: + os.remove(fname) + cleaned_up += 1 + logging.debug("Removed %s", fname) + except OSError as exc: + logger.warning('Error cleaning up %s: %r', fname, exc) + errors += 1 + + if not run: + logger.warning("No policy found to increase the partition power.") + return 2 + logging.info('Cleaned up %d diskfiles (%d errors)', cleaned_up, errors) + if errors > 0: + return 1 + return 0 + + +def main(args): + logging.basicConfig( + format='%(message)s', + level=logging.DEBUG if args.debug else logging.INFO, + filename=args.logfile) + + logger = logging.getLogger() + + if args.action == 'relink': + return relink( + args.swift_dir, args.devices, args.skip_mount_check, logger) + + if args.action == 'cleanup': + return cleanup( + args.swift_dir, args.devices, args.skip_mount_check, logger) diff --git a/swift/cli/ringbuilder.py b/swift/cli/ringbuilder.py index 1e1f2d97..4c594cd5 100644 --- a/swift/cli/ringbuilder.py +++ b/swift/cli/ringbuilder.py @@ -497,12 +497,14 @@ swift-ring-builder print('The overload factor is %0.2f%% (%.6f)' % ( builder.overload * 100, builder.overload)) + ring_dict = None + builder_dict = builder.get_ring().to_dict() + # compare ring file against builder file if not exists(ring_file): print('Ring file %s not found, ' 'probably it hasn\'t been written yet' % ring_file) else: - builder_dict = builder.get_ring().to_dict() try: ring_dict = RingData.load(ring_file).to_dict() except Exception as exc: @@ -520,6 +522,24 @@ swift-ring-builder for dev in builder._iter_devs(): flags = 'DEL' if dev in builder._remove_devs else '' print_dev_f(dev, balance_per_dev[dev['id']], flags) + + # Print some helpful info if partition power increase in progress + if (builder.next_part_power and + builder.next_part_power == (builder.part_power + 1)): + print('\nPreparing increase of partition power (%d -> %d)' % ( + builder.part_power, builder.next_part_power)) + print('Run "swift-object-relinker relink" on all nodes before ' + 'moving on to increase_partition_power.') + if (builder.next_part_power and + builder.part_power == builder.next_part_power): + print('\nIncreased partition power (%d -> %d)' % ( + builder.part_power, builder.next_part_power)) + if builder_dict != ring_dict: + print('First run "swift-ring-builder write_ring"' + ' now and copy the updated .ring.gz file to all nodes.') + print('Run "swift-object-relinker cleanup" on all nodes before ' + 'moving on to finish_increase_partition_power.') + exit(EXIT_SUCCESS) @staticmethod @@ -650,6 +670,11 @@ swift-ring-builder add print(Commands.add.__doc__.strip()) exit(EXIT_ERROR) + if builder.next_part_power: + print('Partition power increase in progress. You need ') + print('to finish the increase first before adding devices.') + exit(EXIT_WARNING) + try: for new_dev in _parse_add_values(argv[3:]): for dev in builder.devs: @@ -795,6 +820,11 @@ swift-ring-builder remove print(parse_search_value.__doc__.strip()) exit(EXIT_ERROR) + if builder.next_part_power: + print('Partition power increase in progress. You need ') + print('to finish the increase first before removing devices.') + exit(EXIT_WARNING) + devs, opts = _parse_remove_values(argv[3:]) input_question = 'Are you sure you want to remove these ' \ @@ -857,6 +887,11 @@ swift-ring-builder rebalance [options] handler.setFormatter(formatter) logger.addHandler(handler) + if builder.next_part_power: + print('Partition power increase in progress.') + print('You need to finish the increase first before rebalancing.') + exit(EXIT_WARNING) + devs_changed = builder.devs_changed min_part_seconds_left = builder.min_part_seconds_left try: @@ -1221,6 +1256,159 @@ swift-ring-builder set_overload [%] builder.save(builder_file) exit(status) + @staticmethod + def prepare_increase_partition_power(): + """ +swift-ring-builder prepare_increase_partition_power + Prepare the ring to increase the partition power by one. + + A write_ring command is needed to make the change take effect. + + Once the updated rings have been deployed to all servers you need to run + the swift-object-relinker tool to relink existing data. + + ***************************** + USE THIS WITH EXTREME CAUTION + ***************************** + + If you increase the partition power and deploy changed rings, you may + introduce unavailability in your cluster. This has an end-user impact. Make + sure you execute required operations to increase the partition power + accurately. + + """ + if len(argv) < 3: + print(Commands.prepare_increase_partition_power.__doc__.strip()) + exit(EXIT_ERROR) + + if "object" not in basename(builder_file): + print( + 'Partition power increase is only supported for object rings.') + exit(EXIT_ERROR) + + if not builder.prepare_increase_partition_power(): + print('Ring is already prepared for partition power increase.') + exit(EXIT_ERROR) + + builder.save(builder_file) + + print('The next partition power is now %d.' % builder.next_part_power) + print('The change will take effect after the next write_ring.') + print('Ensure your proxy-servers, object-replicators and ') + print('reconstructors are using the changed rings and relink ') + print('(using swift-object-relinker) your existing data') + print('before the partition power increase') + exit(EXIT_SUCCESS) + + @staticmethod + def increase_partition_power(): + """ +swift-ring-builder increase_partition_power + Increases the partition power by one. Needs to be run after + prepare_increase_partition_power has been run and all existing data has + been relinked using the swift-object-relinker tool. + + A write_ring command is needed to make the change take effect. + + Once the updated rings have been deployed to all servers you need to run + the swift-object-relinker tool to cleanup old data. + + ***************************** + USE THIS WITH EXTREME CAUTION + ***************************** + + If you increase the partition power and deploy changed rings, you may + introduce unavailability in your cluster. This has an end-user impact. Make + sure you execute required operations to increase the partition power + accurately. + + """ + if len(argv) < 3: + print(Commands.increase_partition_power.__doc__.strip()) + exit(EXIT_ERROR) + + if builder.increase_partition_power(): + print('The partition power is now %d.' % builder.part_power) + print('The change will take effect after the next write_ring.') + + builder._update_last_part_moves() + builder.save(builder_file) + + exit(EXIT_SUCCESS) + else: + print('Ring partition power cannot be increased. Either the ring') + print('was not prepared yet, or this operation has already run.') + exit(EXIT_ERROR) + + @staticmethod + def cancel_increase_partition_power(): + """ +swift-ring-builder cancel_increase_partition_power + Cancel the increase of the partition power. + + A write_ring command is needed to make the change take effect. + + Once the updated rings have been deployed to all servers you need to run + the swift-object-relinker tool to cleanup unneeded links. + + ***************************** + USE THIS WITH EXTREME CAUTION + ***************************** + + If you increase the partition power and deploy changed rings, you may + introduce unavailability in your cluster. This has an end-user impact. Make + sure you execute required operations to increase the partition power + accurately. + + """ + if len(argv) < 3: + print(Commands.cancel_increase_partition_power.__doc__.strip()) + exit(EXIT_ERROR) + + if not builder.cancel_increase_partition_power(): + print('Ring partition power increase cannot be canceled.') + exit(EXIT_ERROR) + + builder.save(builder_file) + + print('The next partition power is now %d.' % builder.next_part_power) + print('The change will take effect after the next write_ring.') + print('Ensure your object-servers are using the changed rings and') + print('cleanup (using swift-object-relinker) the hard links') + exit(EXIT_SUCCESS) + + @staticmethod + def finish_increase_partition_power(): + """ +swift-ring-builder finish_increase_partition_power + Finally removes the next_part_power flag. Has to be run after the + swift-object-relinker tool has been used to cleanup old existing data. + + A write_ring command is needed to make the change take effect. + + ***************************** + USE THIS WITH EXTREME CAUTION + ***************************** + + If you increase the partition power and deploy changed rings, you may + introduce unavailability in your cluster. This has an end-user impact. Make + sure you execute required operations to increase the partition power + accurately. + + """ + if len(argv) < 3: + print(Commands.finish_increase_partition_power.__doc__.strip()) + exit(EXIT_ERROR) + + if not builder.finish_increase_partition_power(): + print('Ring partition power increase cannot be finished.') + exit(EXIT_ERROR) + + print('The change will take effect after the next write_ring.') + builder.save(builder_file) + + exit(EXIT_SUCCESS) + def main(arguments=None): global argv, backup_dir, builder, builder_file, ring_file diff --git a/swift/common/ring/builder.py b/swift/common/ring/builder.py index 61def270..44787708 100644 --- a/swift/common/ring/builder.py +++ b/swift/common/ring/builder.py @@ -86,6 +86,7 @@ class RingBuilder(object): % (min_part_hours,)) self.part_power = part_power + self.next_part_power = None self.replicas = replicas self.min_part_hours = min_part_hours self.parts = 2 ** self.part_power @@ -210,6 +211,7 @@ class RingBuilder(object): """ if hasattr(builder, 'devs'): self.part_power = builder.part_power + self.next_part_power = builder.next_part_power self.replicas = builder.replicas self.min_part_hours = builder.min_part_hours self.parts = builder.parts @@ -225,6 +227,7 @@ class RingBuilder(object): self._id = getattr(builder, '_id', None) else: self.part_power = builder['part_power'] + self.next_part_power = builder.get('next_part_power') self.replicas = builder['replicas'] self.min_part_hours = builder['min_part_hours'] self.parts = builder['parts'] @@ -261,6 +264,7 @@ class RingBuilder(object): copy_from. """ return {'part_power': self.part_power, + 'next_part_power': self.next_part_power, 'replicas': self.replicas, 'min_part_hours': self.min_part_hours, 'parts': self.parts, @@ -341,7 +345,8 @@ class RingBuilder(object): self._ring = \ RingData([array('H', p2d) for p2d in self._replica2part2dev], - devs, self.part_shift) + devs, self.part_shift, + self.next_part_power) return self._ring def add_dev(self, dev): @@ -1751,6 +1756,26 @@ class RingBuilder(object): matched_devs.append(dev) return matched_devs + def prepare_increase_partition_power(self): + """ + Prepares a ring for partition power increase. + + This makes it possible to compute the future location of any object + based on the next partition power. + + In this phase object servers should create hard links when finalizing a + write to the new location as well. A relinker will be run after + restarting object-servers, creating hard links to all existing objects + in their future location. + + :returns: False if next_part_power was not set, otherwise True. + """ + if self.next_part_power: + return False + self.next_part_power = self.part_power + 1 + self.version += 1 + return True + def increase_partition_power(self): """ Increases ring partition power by one. @@ -1759,8 +1784,17 @@ class RingBuilder(object): OLD: 0, 3, 7, 5, 2, 1, ... NEW: 0, 0, 3, 3, 7, 7, 5, 5, 2, 2, 1, 1, ... + + :returns: False if next_part_power was not set or is equal to current + part_power, None if something went wrong, otherwise True. """ + if not self.next_part_power: + return False + + if self.next_part_power != (self.part_power + 1): + return False + new_replica2part2dev = [] for replica in self._replica2part2dev: new_replica = array('H') @@ -1775,13 +1809,47 @@ class RingBuilder(object): # We need to update the time when a partition has been moved the last # time. Since this is an array of all partitions, we need to double it - # two + # too new_last_part_moves = [] for partition in self._last_part_moves: new_last_part_moves.append(partition) new_last_part_moves.append(partition) self._last_part_moves = new_last_part_moves - self.part_power += 1 + self.part_power = self.next_part_power self.parts *= 2 self.version += 1 + return True + + def cancel_increase_partition_power(self): + """ + Cancels a ring partition power increasement. + + This sets the next_part_power to the current part_power. Object + replicators will still skip replication, and a cleanup is still + required. Finally, a finish_increase_partition_power needs to be run. + + :returns: False if next_part_power was not set or is equal to current + part_power, otherwise True. + """ + + if not self.next_part_power: + return False + + if self.next_part_power != (self.part_power + 1): + return False + + self.next_part_power = self.part_power + self.version += 1 + return True + + def finish_increase_partition_power(self): + """Finish the partition power increase. + + The hard links from the old object locations should be removed by now. + """ + if self.next_part_power and self.next_part_power == self.part_power: + self.next_part_power = None + self.version += 1 + return True + return False diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 884aa923..9b4df35b 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -38,10 +38,12 @@ from swift.common.ring.utils import tiers_for_dev class RingData(object): """Partitioned consistent hashing ring data (used for serialization).""" - def __init__(self, replica2part2dev_id, devs, part_shift): + def __init__(self, replica2part2dev_id, devs, part_shift, + next_part_power=None): self.devs = devs self._replica2part2dev_id = replica2part2dev_id self._part_shift = part_shift + self.next_part_power = next_part_power for dev in self.devs: if dev is not None: @@ -113,18 +115,27 @@ class RingData(object): if not hasattr(ring_data, 'devs'): ring_data = RingData(ring_data['replica2part2dev_id'], - ring_data['devs'], ring_data['part_shift']) + ring_data['devs'], ring_data['part_shift'], + ring_data.get('next_part_power')) return ring_data def serialize_v1(self, file_obj): # Write out new-style serialization magic and version: file_obj.write(struct.pack('!4sH', 'R1NG', 1)) ring = self.to_dict() + + # Only include next_part_power if it is set in the + # builder, otherwise just ignore it + _text = {'devs': ring['devs'], 'part_shift': ring['part_shift'], + 'replica_count': len(ring['replica2part2dev_id']), + 'byteorder': sys.byteorder} + + next_part_power = ring.get('next_part_power') + if next_part_power is not None: + _text['next_part_power'] = next_part_power + json_encoder = json.JSONEncoder(sort_keys=True) - json_text = json_encoder.encode( - {'devs': ring['devs'], 'part_shift': ring['part_shift'], - 'replica_count': len(ring['replica2part2dev_id']), - 'byteorder': sys.byteorder}) + json_text = json_encoder.encode(_text) json_len = len(json_text) file_obj.write(struct.pack('!I', json_len)) file_obj.write(json_text) @@ -155,7 +166,8 @@ class RingData(object): def to_dict(self): return {'devs': self.devs, 'replica2part2dev_id': self._replica2part2dev_id, - 'part_shift': self._part_shift} + 'part_shift': self._part_shift, + 'next_part_power': self.next_part_power} class Ring(object): @@ -244,6 +256,15 @@ class Ring(object): self._num_regions = len(regions) self._num_zones = len(zones) self._num_ips = len(ips) + self._next_part_power = ring_data.next_part_power + + @property + def next_part_power(self): + return self._next_part_power + + @property + def part_power(self): + return 32 - self._part_shift def _rebuild_tier_data(self): self.tier2devs = defaultdict(list) diff --git a/swift/common/utils.py b/swift/common/utils.py index fb95ea2c..79dca27f 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -17,6 +17,7 @@ from __future__ import print_function +import binascii import errno import fcntl import grp @@ -27,6 +28,7 @@ import operator import os import pwd import re +import struct import sys import time import uuid @@ -4238,3 +4240,25 @@ def md5_hash_for_file(fname): for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), ''): md5sum.update(block) return md5sum.hexdigest() + + +def replace_partition_in_path(path, part_power): + """ + Takes a full path to a file and a partition power and returns + the same path, but with the correct partition number. Most useful when + increasing the partition power. + + :param path: full path to a file, for example object .data file + :param part_power: partition power to compute correct partition number + :returns: Path with re-computed partition power + """ + + path_components = path.split(os.sep) + digest = binascii.unhexlify(path_components[-2]) + + part_shift = 32 - int(part_power) + part = struct.unpack_from('>I', digest)[0] >> part_shift + + path_components[-4] = "%d" % part + + return os.sep.join(path_components) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index b26dd74d..362b3a3f 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -65,7 +65,7 @@ from swift.common.utils import mkdirs, Timestamp, \ config_true_value, listdir, split_path, ismount, remove_file, \ get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \ tpool_reraise, MD5_OF_EMPTY_STRING, link_fd_to_path, o_tmpfile_supported, \ - O_TMPFILE, makedirs_count + O_TMPFILE, makedirs_count, replace_partition_in_path from swift.common.splice import splice, tee from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ @@ -345,6 +345,37 @@ def invalidate_hash(suffix_dir): inv_fh.write(suffix + "\n") +def relink_paths(target_path, new_target_path, check_existing=False): + """ + Hard-links a file located in target_path using the second path + new_target_path. Creates intermediate directories if required. + + :param target_path: current absolute filename + :param new_target_path: new absolute filename for the hardlink + :param check_existing: if True, check whether the link is already present + before attempting to create a new one + """ + + if target_path != new_target_path: + logging.debug('Relinking %s to %s due to next_part_power set', + target_path, new_target_path) + new_target_dir = os.path.dirname(new_target_path) + if not os.path.isdir(new_target_dir): + os.makedirs(new_target_dir) + + link_exists = False + if check_existing: + try: + new_stat = os.stat(new_target_path) + orig_stat = os.stat(target_path) + link_exists = (new_stat.st_ino == orig_stat.st_ino) + except OSError: + pass # if anything goes wrong, try anyway + + if not link_exists: + os.link(target_path, new_target_path) + + def get_part_path(dev_path, policy, partition): """ Given the device path, policy, and partition, returns the full @@ -1455,9 +1486,11 @@ class BaseDiskFileWriter(object): :param tmppath: full path name of the opened file descriptor :param bytes_per_sync: number bytes written between sync calls :param diskfile: the diskfile creating this DiskFileWriter instance + :param next_part_power: the next partition power to be used """ - def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile): + def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile, + next_part_power): # Parameter tracking self._name = name self._datadir = datadir @@ -1465,6 +1498,7 @@ class BaseDiskFileWriter(object): self._tmppath = tmppath self._bytes_per_sync = bytes_per_sync self._diskfile = diskfile + self.next_part_power = next_part_power # Internal attributes self._upload_size = 0 @@ -1528,6 +1562,21 @@ class BaseDiskFileWriter(object): # It was an unnamed temp file created by open() with O_TMPFILE link_fd_to_path(self._fd, target_path, self._diskfile._dirs_created) + + # Check if the partition power will/has been increased + new_target_path = None + if self.next_part_power: + new_target_path = replace_partition_in_path( + target_path, self.next_part_power) + if target_path != new_target_path: + try: + fsync_dir(os.path.dirname(target_path)) + relink_paths(target_path, new_target_path) + except OSError as exc: + self.manager.logger.exception( + 'Relinking %s to %s failed: %s', + target_path, new_target_path, exc) + # If rename is successful, flag put as succeeded. This is done to avoid # unnecessary os.unlink() of tempfile later. As renamer() has # succeeded, the tempfile would no longer exist at its original path. @@ -1538,6 +1587,8 @@ class BaseDiskFileWriter(object): except OSError: logging.exception(_('Problem cleaning up %s'), self._datadir) + self._part_power_cleanup(target_path, new_target_path) + def _put(self, metadata, cleanup=True, *a, **kw): """ Helper method for subclasses. @@ -1584,6 +1635,43 @@ class BaseDiskFileWriter(object): """ pass + def _part_power_cleanup(self, cur_path, new_path): + """ + Cleanup relative DiskFile directories. + + If the partition power is increased soon or has just been increased but + the relinker didn't yet cleanup the old files, an additional cleanup of + the relative dirs has to be done. Otherwise there might be some unused + files left if a PUT or DELETE is done in the meantime + :param cur_path: current full path to an object file + :param new_path: recomputed path to an object file, based on the + next_part_power set in the ring + + """ + if new_path is None: + return + + # Partition power will be increased soon + if new_path != cur_path: + new_target_dir = os.path.dirname(new_path) + try: + self.manager.cleanup_ondisk_files(new_target_dir) + except OSError: + logging.exception( + _('Problem cleaning up %s'), new_target_dir) + + # Partition power has been increased, cleanup not yet finished + else: + prev_part_power = int(self.next_part_power) - 1 + old_target_path = replace_partition_in_path( + cur_path, prev_part_power) + old_target_dir = os.path.dirname(old_target_path) + try: + self.manager.cleanup_ondisk_files(old_target_dir) + except OSError: + logging.exception( + _('Problem cleaning up %s'), old_target_dir) + class BaseDiskFileReader(object): """ @@ -1922,6 +2010,7 @@ class BaseDiskFile(object): :param use_linkat: if True, use open() with linkat() to create obj file :param open_expired: if True, open() will not raise a DiskFileExpired if object is expired + :param next_part_power: the next partition power to be used """ reader_cls = None # must be set by subclasses writer_cls = None # must be set by subclasses @@ -1929,7 +2018,8 @@ class BaseDiskFile(object): def __init__(self, mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, - use_linkat=False, open_expired=False, **kwargs): + use_linkat=False, open_expired=False, next_part_power=None, + **kwargs): self._manager = mgr self._device_path = device_path self._logger = mgr.logger @@ -1947,6 +2037,7 @@ class BaseDiskFile(object): # in all entry fops being carried out synchronously. self._dirs_created = 0 self.policy = policy + self.next_part_power = next_part_power if account and container and obj: self._name = '/' + '/'.join((account, container, obj)) self._account = account @@ -2491,7 +2582,8 @@ class BaseDiskFile(object): raise dfw = self.writer_cls(self._name, self._datadir, fd, tmppath, bytes_per_sync=self._bytes_per_sync, - diskfile=self) + diskfile=self, + next_part_power=self.next_part_power) yield dfw finally: try: @@ -2712,10 +2804,27 @@ class ECDiskFileWriter(BaseDiskFileWriter): def _finalize_durable(self, data_file_path, durable_data_file_path): exc = None + new_data_file_path = new_durable_data_file_path = None + if self.next_part_power: + new_data_file_path = replace_partition_in_path( + data_file_path, self.next_part_power) + new_durable_data_file_path = replace_partition_in_path( + durable_data_file_path, self.next_part_power) try: try: os.rename(data_file_path, durable_data_file_path) fsync_dir(self._datadir) + if self.next_part_power and \ + data_file_path != new_data_file_path: + try: + os.rename(new_data_file_path, + new_durable_data_file_path) + except OSError as exc: + self.manager.logger.exception( + 'Renaming new path %s to %s failed: %s', + new_data_file_path, new_durable_data_file_path, + exc) + except (OSError, IOError) as err: if err.errno not in (errno.ENOSPC, errno.EDQUOT): # re-raise to catch all handler @@ -2733,6 +2842,9 @@ class ECDiskFileWriter(BaseDiskFileWriter): self.manager.logger.exception( _('Problem cleaning up %(datadir)s (%(err)s)'), {'datadir': self._datadir, 'err': os_err}) + self._part_power_cleanup( + durable_data_file_path, new_durable_data_file_path) + except Exception as err: params = {'file': durable_data_file_path, 'err': err} self.manager.logger.exception( diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index f5e6fe48..2d327d40 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -916,6 +916,19 @@ class ObjectReconstructor(Daemon): all_parts = [] for policy, local_devices in policy2devices.items(): + # Skip replication if next_part_power is set. In this case + # every object is hard-linked twice, but the replicator + # can't detect them and would create a second copy of the + # file if not yet existing - and this might double the + # actual transferred and stored data + next_part_power = getattr( + policy.object_ring, 'next_part_power', None) + if next_part_power is not None: + self.logger.warning( + _("next_part_power set in policy '%s'. Skipping"), + policy.name) + continue + df_mgr = self._df_router[policy] for local_dev in local_devices: dev_path = df_mgr.get_dev_path(local_dev['device']) @@ -1018,6 +1031,7 @@ class ObjectReconstructor(Daemon): self.logger.info(_("Ring change detected. Aborting " "current reconstruction pass.")) return + self.reconstruction_part_count += 1 jobs = self.build_reconstruction_jobs(part_info) if not jobs: diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 4f1d43c6..c7731549 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -676,6 +676,19 @@ class ObjectReplicator(Daemon): jobs = [] ips = whataremyips(self.bind_ip) for policy in POLICIES: + # Skip replication if next_part_power is set. In this case + # every object is hard-linked twice, but the replicator can't + # detect them and would create a second copy of the file if not + # yet existing - and this might double the actual transferred + # and stored data + next_part_power = getattr( + policy.object_ring, 'next_part_power', None) + if next_part_power is not None: + self.logger.warning( + _("next_part_power set in policy '%s'. Skipping"), + policy.name) + continue + if policy.policy_type == REPL_POLICY: if (override_policies is not None and str(policy.idx) not in override_policies): @@ -744,6 +757,7 @@ class ObjectReplicator(Daemon): self.logger.info(_("Ring change detected. Aborting " "current replication pass.")) return + try: if isfile(job['path']): # Clean up any (probably zero-byte) files where a diff --git a/swift/obj/server.py b/swift/obj/server.py index 1efa3997..0cf9e7d1 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -513,11 +513,13 @@ class ObjectController(BaseStorageServer): if new_delete_at and new_delete_at < time.time(): return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain') + next_part_power = request.headers.get('X-Backend-Next-Part-Power') try: disk_file = self.get_diskfile( device, partition, account, container, obj, policy=policy, open_expired=config_true_value( - request.headers.get('x-backend-replication', 'false'))) + request.headers.get('x-backend-replication', 'false')), + next_part_power=next_part_power) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -675,10 +677,12 @@ class ObjectController(BaseStorageServer): # nodes; handoff nodes should 409 subrequests to over-write an # existing data fragment until they offloaded the existing fragment frag_index = request.headers.get('X-Backend-Ssync-Frag-Index') + next_part_power = request.headers.get('X-Backend-Next-Part-Power') try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy, frag_index=frag_index) + policy=policy, frag_index=frag_index, + next_part_power=next_part_power) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: @@ -987,10 +991,11 @@ class ObjectController(BaseStorageServer): device, partition, account, container, obj, policy = \ get_name_and_placement(request, 5, 5, True) req_timestamp = valid_timestamp(request) + next_part_power = request.headers.get('X-Backend-Next-Part-Power') try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy) + policy=policy, next_part_power=next_part_power) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index e9c9b5e1..86bb2996 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -235,6 +235,9 @@ class BaseObjectController(Controller): container_info['storage_policy']) obj_ring = self.app.get_object_ring(policy_index) req.headers['X-Backend-Storage-Policy-Index'] = policy_index + next_part_power = getattr(obj_ring, 'next_part_power', None) + if next_part_power: + req.headers['X-Backend-Next-Part-Power'] = next_part_power partition, nodes = obj_ring.get_nodes( self.account_name, self.container_name, self.object_name) @@ -643,6 +646,9 @@ class BaseObjectController(Controller): # pass the policy index to storage nodes via req header req.headers['X-Backend-Storage-Policy-Index'] = policy_index + next_part_power = getattr(obj_ring, 'next_part_power', None) + if next_part_power: + req.headers['X-Backend-Next-Part-Power'] = next_part_power req.acl = container_info['write_acl'] req.environ['swift_sync_key'] = container_info['sync_key'] @@ -701,6 +707,9 @@ class BaseObjectController(Controller): obj_ring = self.app.get_object_ring(policy_index) # pass the policy index to storage nodes via req header req.headers['X-Backend-Storage-Policy-Index'] = policy_index + next_part_power = getattr(obj_ring, 'next_part_power', None) + if next_part_power: + req.headers['X-Backend-Next-Part-Power'] = next_part_power container_partition = container_info['partition'] container_nodes = container_info['nodes'] req.acl = container_info['write_acl'] diff --git a/test/probe/test_object_partpower_increase.py b/test/probe/test_object_partpower_increase.py new file mode 100755 index 00000000..e0eb5499 --- /dev/null +++ b/test/probe/test_object_partpower_increase.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from errno import EEXIST +from shutil import copyfile +from tempfile import mkstemp +from time import time +from unittest import main +from uuid import uuid4 + +from swiftclient import client + +from swift.cli.relinker import relink, cleanup +from swift.common.manager import Manager +from swift.common.ring import RingBuilder +from swift.common.utils import replace_partition_in_path +from swift.obj.diskfile import get_data_dir +from test.probe.common import ECProbeTest, ProbeTest, ReplProbeTest + + +class TestPartPowerIncrease(ProbeTest): + def setUp(self): + super(TestPartPowerIncrease, self).setUp() + _, self.ring_file_backup = mkstemp() + _, self.builder_file_backup = mkstemp() + self.ring_file = self.object_ring.serialized_path + self.builder_file = self.ring_file.replace('ring.gz', 'builder') + copyfile(self.ring_file, self.ring_file_backup) + copyfile(self.builder_file, self.builder_file_backup) + # In case the test user is not allowed to write rings + self.assertTrue(os.access('/etc/swift', os.W_OK)) + self.assertTrue(os.access('/etc/swift/backups', os.W_OK)) + self.assertTrue(os.access('/etc/swift/object.builder', os.W_OK)) + self.assertTrue(os.access('/etc/swift/object.ring.gz', os.W_OK)) + # Ensure the test object will be erasure coded + self.data = ' ' * getattr(self.policy, 'ec_segment_size', 1) + + self.devices = [ + self.device_dir('object', {'ip': ip, 'port': port, 'device': ''}) + for ip, port in set((dev['ip'], dev['port']) + for dev in self.object_ring.devs)] + + def tearDown(self): + # Keep a backup copy of the modified .builder file + backup_dir = os.path.join( + os.path.dirname(self.builder_file), 'backups') + try: + os.mkdir(backup_dir) + except OSError as err: + if err.errno != EEXIST: + raise + backup_name = (os.path.join( + backup_dir, + '%d.probe.' % time() + os.path.basename(self.builder_file))) + copyfile(self.builder_file, backup_name) + + # Restore original ring + os.system('sudo mv %s %s' % ( + self.ring_file_backup, self.ring_file)) + os.system('sudo mv %s %s' % ( + self.builder_file_backup, self.builder_file)) + + def _find_objs_ondisk(self, container, obj): + locations = [] + opart, onodes = self.object_ring.get_nodes( + self.account, container, obj) + for node in onodes: + start_dir = os.path.join( + self.device_dir('object', node), + get_data_dir(self.policy), + str(opart)) + for root, dirs, files in os.walk(start_dir): + for filename in files: + if filename.endswith('.data'): + locations.append(os.path.join(root, filename)) + return locations + + def _test_main(self, cancel=False): + container = 'container-%s' % uuid4() + obj = 'object-%s' % uuid4() + obj2 = 'object-%s' % uuid4() + + # Create container + headers = {'X-Storage-Policy': self.policy.name} + client.put_container(self.url, self.token, container, headers=headers) + + # Create a new object + client.put_object(self.url, self.token, container, obj, self.data) + client.head_object(self.url, self.token, container, obj) + + # Prepare partition power increase + builder = RingBuilder.load(self.builder_file) + builder.prepare_increase_partition_power() + builder.save(self.builder_file) + ring_data = builder.get_ring() + ring_data.save(self.ring_file) + + # Ensure the proxy uses the changed ring + Manager(['proxy']).restart() + + # Ensure object is still accessible + client.head_object(self.url, self.token, container, obj) + + # Relink existing objects + for device in self.devices: + self.assertEqual(0, relink(skip_mount_check=True, devices=device)) + + # Create second object after relinking and ensure it is accessible + client.put_object(self.url, self.token, container, obj2, self.data) + client.head_object(self.url, self.token, container, obj2) + + # Remember the original object locations + org_locations = self._find_objs_ondisk(container, obj) + org_locations += self._find_objs_ondisk(container, obj2) + + # Remember the new object locations + new_locations = [] + for loc in org_locations: + new_locations.append(replace_partition_in_path( + str(loc), self.object_ring.part_power + 1)) + + # Overwrite existing object - to ensure that older timestamp files + # will be cleaned up properly later + client.put_object(self.url, self.token, container, obj, self.data) + + # Ensure objects are still accessible + client.head_object(self.url, self.token, container, obj) + client.head_object(self.url, self.token, container, obj2) + + # Increase partition power + builder = RingBuilder.load(self.builder_file) + if not cancel: + builder.increase_partition_power() + else: + builder.cancel_increase_partition_power() + builder.save(self.builder_file) + ring_data = builder.get_ring() + ring_data.save(self.ring_file) + + # Ensure the proxy uses the changed ring + Manager(['proxy']).restart() + + # Ensure objects are still accessible + client.head_object(self.url, self.token, container, obj) + client.head_object(self.url, self.token, container, obj2) + + # Overwrite existing object - to ensure that older timestamp files + # will be cleaned up properly later + client.put_object(self.url, self.token, container, obj, self.data) + + # Cleanup old objects in the wrong location + for device in self.devices: + self.assertEqual(0, cleanup(skip_mount_check=True, devices=device)) + + # Ensure objects are still accessible + client.head_object(self.url, self.token, container, obj) + client.head_object(self.url, self.token, container, obj2) + + # Ensure data in old or relinked object locations is removed + if not cancel: + for fn in org_locations: + self.assertFalse(os.path.exists(fn)) + else: + for fn in new_locations: + self.assertFalse(os.path.exists(fn)) + + +class TestReplPartPowerIncrease(TestPartPowerIncrease, ReplProbeTest): + def test_main(self): + self._test_main() + + def test_canceled(self): + self._test_main(cancel=True) + + +class TestECPartPowerIncrease(TestPartPowerIncrease, ECProbeTest): + def test_main(self): + self._test_main() + + def test_canceled(self): + self._test_main(cancel=True) + + +if __name__ == '__main__': + main() diff --git a/test/unit/__init__.py b/test/unit/__init__.py index c94ae073..23f387d0 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -300,8 +300,7 @@ class FabricatedRing(Ring): self.nodes = nodes self.port = port self.replicas = replicas - self.part_power = part_power - self._part_shift = 32 - self.part_power + self._part_shift = 32 - part_power self._reload() def _reload(self, *args, **kwargs): diff --git a/test/unit/cli/test_relinker.py b/test/unit/cli/test_relinker.py new file mode 100644 index 00000000..108866a8 --- /dev/null +++ b/test/unit/cli/test_relinker.py @@ -0,0 +1,172 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import binascii +import os +import shutil +import struct +import tempfile +import unittest + +from swift.cli import relinker +from swift.common import exceptions, ring, utils +from swift.common import storage_policy +from swift.common.storage_policy import ( + StoragePolicy, StoragePolicyCollection, POLICIES) + +from swift.obj.diskfile import write_metadata + +from test.unit import FakeLogger + + +class TestRelinker(unittest.TestCase): + def setUp(self): + self.logger = FakeLogger() + self.testdir = tempfile.mkdtemp() + self.devices = os.path.join(self.testdir, 'node') + shutil.rmtree(self.testdir, ignore_errors=1) + os.mkdir(self.testdir) + os.mkdir(self.devices) + + self.rb = ring.RingBuilder(8, 6.0, 1) + + for i in range(6): + ip = "127.0.0.%s" % i + self.rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1, + 'ip': ip, 'port': 10000, 'device': 'sda1'}) + self.rb.rebalance(seed=1) + + self.existing_device = 'sda1' + os.mkdir(os.path.join(self.devices, self.existing_device)) + self.objects = os.path.join(self.devices, self.existing_device, + 'objects') + os.mkdir(self.objects) + self._hash = utils.hash_path('a/c/o') + digest = binascii.unhexlify(self._hash) + part = struct.unpack_from('>I', digest)[0] >> 24 + self.next_part = struct.unpack_from('>I', digest)[0] >> 23 + self.objdir = os.path.join( + self.objects, str(part), self._hash[-3:], self._hash) + os.makedirs(self.objdir) + self.object_fname = "1278553064.00000.data" + self.objname = os.path.join(self.objdir, self.object_fname) + with open(self.objname, "wb") as dummy: + dummy.write("Hello World!") + write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'}) + + test_policies = [StoragePolicy(0, 'platin', True)] + storage_policy._POLICIES = StoragePolicyCollection(test_policies) + + self.expected_dir = os.path.join( + self.objects, str(self.next_part), self._hash[-3:], self._hash) + self.expected_file = os.path.join(self.expected_dir, self.object_fname) + + def _save_ring(self): + rd = self.rb.get_ring() + for policy in POLICIES: + rd.save(os.path.join( + self.testdir, '%s.ring.gz' % policy.ring_name)) + # Enforce ring reloading in relinker + policy.object_ring = None + + def tearDown(self): + shutil.rmtree(self.testdir, ignore_errors=1) + storage_policy.reload_storage_policies() + + def test_relink(self): + self.rb.prepare_increase_partition_power() + self._save_ring() + relinker.relink(self.testdir, self.devices, True) + + self.assertTrue(os.path.isdir(self.expected_dir)) + self.assertTrue(os.path.isfile(self.expected_file)) + + stat_old = os.stat(os.path.join(self.objdir, self.object_fname)) + stat_new = os.stat(self.expected_file) + self.assertEqual(stat_old.st_ino, stat_new.st_ino) + + def _common_test_cleanup(self, relink=True): + # Create a ring that has prev_part_power set + self.rb.prepare_increase_partition_power() + self.rb.increase_partition_power() + self._save_ring() + + os.makedirs(self.expected_dir) + + if relink: + # Create a hardlink to the original object name. This is expected + # after a normal relinker run + os.link(os.path.join(self.objdir, self.object_fname), + self.expected_file) + + def test_cleanup(self): + self._common_test_cleanup() + self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True)) + + # Old objectname should be removed, new should still exist + self.assertTrue(os.path.isdir(self.expected_dir)) + self.assertTrue(os.path.isfile(self.expected_file)) + self.assertFalse(os.path.isfile( + os.path.join(self.objdir, self.object_fname))) + + def test_cleanup_not_yet_relinked(self): + self._common_test_cleanup(relink=False) + self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True)) + + self.assertTrue(os.path.isfile( + os.path.join(self.objdir, self.object_fname))) + + def test_cleanup_deleted(self): + self._common_test_cleanup() + + # Pretend the object got deleted inbetween and there is a tombstone + fname_ts = self.expected_file[:-4] + "ts" + os.rename(self.expected_file, fname_ts) + + self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True)) + + def test_cleanup_doesnotexist(self): + self._common_test_cleanup() + + # Pretend the file in the new place got deleted inbetween + os.remove(self.expected_file) + + self.assertEqual( + 1, relinker.cleanup(self.testdir, self.devices, True, self.logger)) + self.assertEqual(self.logger.get_lines_for_level('warning'), + ['Error cleaning up %s: %s' % (self.objname, + repr(exceptions.DiskFileNotExist()))]) + + def test_cleanup_non_durable_fragment(self): + self._common_test_cleanup() + + # Actually all fragments are non-durable and raise and DiskFileNotExist + # in EC in this test. However, if the counterpart exists in the new + # location, this is ok - it will be fixed by the reconstructor later on + storage_policy._POLICIES[0].policy_type = 'erasure_coding' + + self.assertEqual( + 0, relinker.cleanup(self.testdir, self.devices, True, self.logger)) + self.assertEqual(self.logger.get_lines_for_level('warning'), []) + + def test_cleanup_quarantined(self): + self._common_test_cleanup() + # Pretend the object in the new place got corrupted + with open(self.expected_file, "wb") as obj: + obj.write('trash') + + self.assertEqual( + 1, relinker.cleanup(self.testdir, self.devices, True, self.logger)) + + self.assertIn('failed audit and was quarantined', + self.logger.get_lines_for_level('warning')[0]) diff --git a/test/unit/cli/test_ringbuilder.py b/test/unit/cli/test_ringbuilder.py index 568286c0..9e355952 100644 --- a/test/unit/cli/test_ringbuilder.py +++ b/test/unit/cli/test_ringbuilder.py @@ -489,6 +489,16 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): dev = ring.devs[-1] self.assertGreater(dev['region'], 0) + def test_add_device_part_power_increase(self): + self.create_sample_ring() + ring = RingBuilder.load(self.tmpfile) + ring.next_part_power = 1 + ring.save(self.tmpfile) + + argv = ["", self.tmpfile, "add", + "r0z0-127.0.1.1:6200/sda1_some meta data", "100"] + self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv) + def test_remove_device(self): for search_value in self.search_values: self.create_sample_ring() @@ -762,6 +772,15 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): "--ip", "unknown"] self.assertSystemExit(EXIT_ERROR, ringbuilder.main, argv) + def test_remove_device_part_power_increase(self): + self.create_sample_ring() + ring = RingBuilder.load(self.tmpfile) + ring.next_part_power = 1 + ring.save(self.tmpfile) + + argv = ["", self.tmpfile, "remove", "d0"] + self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv) + def test_set_weight(self): for search_value in self.search_values: self.create_sample_ring() @@ -1988,6 +2007,15 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): ring = RingBuilder.load(self.tmpfile) self.assertEqual(last_replica2part2dev, ring._replica2part2dev) + def test_rebalance_part_power_increase(self): + self.create_sample_ring() + ring = RingBuilder.load(self.tmpfile) + ring.next_part_power = 1 + ring.save(self.tmpfile) + + argv = ["", self.tmpfile, "rebalance", "3"] + self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv) + def test_write_ring(self): self.create_sample_ring() argv = ["", self.tmpfile, "rebalance"] diff --git a/test/unit/common/ring/test_builder.py b/test/unit/common/ring/test_builder.py index 6fdc3e1c..8f244df8 100644 --- a/test/unit/common/ring/test_builder.py +++ b/test/unit/common/ring/test_builder.py @@ -2604,6 +2604,40 @@ class TestRingBuilder(unittest.TestCase): except exceptions.DuplicateDeviceError: self.fail("device hole not reused") + def test_prepare_increase_partition_power(self): + ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz') + + rb = ring.RingBuilder(8, 3.0, 1) + self.assertEqual(rb.part_power, 8) + + # add more devices than replicas to the ring + for i in range(10): + dev = "sdx%s" % i + rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1, + 'ip': '127.0.0.1', 'port': 10000, 'device': dev}) + rb.rebalance(seed=1) + + self.assertFalse(rb.cancel_increase_partition_power()) + self.assertEqual(rb.part_power, 8) + self.assertIsNone(rb.next_part_power) + + self.assertFalse(rb.finish_increase_partition_power()) + self.assertEqual(rb.part_power, 8) + self.assertIsNone(rb.next_part_power) + + self.assertTrue(rb.prepare_increase_partition_power()) + self.assertEqual(rb.part_power, 8) + self.assertEqual(rb.next_part_power, 9) + + # Save .ring.gz, and load ring from it to ensure prev/next is set + rd = rb.get_ring() + rd.save(ring_file) + + r = ring.Ring(ring_file) + expected_part_shift = 32 - 8 + self.assertEqual(expected_part_shift, r._part_shift) + self.assertEqual(9, r.next_part_power) + def test_increase_partition_power(self): rb = ring.RingBuilder(8, 3.0, 1) self.assertEqual(rb.part_power, 8) @@ -2623,13 +2657,18 @@ class TestRingBuilder(unittest.TestCase): old_part, old_nodes = r.get_nodes("acc", "cont", "obj") old_version = rb.version - rb.increase_partition_power() + self.assertTrue(rb.prepare_increase_partition_power()) + self.assertTrue(rb.increase_partition_power()) rb.validate() changed_parts, _balance, removed_devs = rb.rebalance() self.assertEqual(changed_parts, 0) self.assertEqual(removed_devs, 0) + # Make sure cancellation is not possible + # after increasing the partition power + self.assertFalse(rb.cancel_increase_partition_power()) + old_ring = r rd = rb.get_ring() rd.save(ring_file) @@ -2637,8 +2676,9 @@ class TestRingBuilder(unittest.TestCase): new_part, new_nodes = r.get_nodes("acc", "cont", "obj") # sanity checks - self.assertEqual(rb.part_power, 9) - self.assertEqual(rb.version, old_version + 2) + self.assertEqual(9, rb.part_power) + self.assertEqual(9, rb.next_part_power) + self.assertEqual(rb.version, old_version + 3) # make sure there is always the same device assigned to every pair of # partitions @@ -2670,6 +2710,107 @@ class TestRingBuilder(unittest.TestCase): # nodes after increasing the partition power self.assertEqual(old_nodes, new_nodes) + def test_finalize_increase_partition_power(self): + ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz') + + rb = ring.RingBuilder(8, 3.0, 1) + self.assertEqual(rb.part_power, 8) + + # add more devices than replicas to the ring + for i in range(10): + dev = "sdx%s" % i + rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1, + 'ip': '127.0.0.1', 'port': 10000, 'device': dev}) + rb.rebalance(seed=1) + + self.assertTrue(rb.prepare_increase_partition_power()) + + # Make sure this doesn't do any harm before actually increasing the + # partition power + self.assertFalse(rb.finish_increase_partition_power()) + self.assertEqual(rb.next_part_power, 9) + + self.assertTrue(rb.increase_partition_power()) + + self.assertFalse(rb.prepare_increase_partition_power()) + self.assertEqual(rb.part_power, 9) + self.assertEqual(rb.next_part_power, 9) + + self.assertTrue(rb.finish_increase_partition_power()) + + self.assertEqual(rb.part_power, 9) + self.assertIsNone(rb.next_part_power) + + # Save .ring.gz, and load ring from it to ensure prev/next is set + rd = rb.get_ring() + rd.save(ring_file) + + r = ring.Ring(ring_file) + expected_part_shift = 32 - 9 + self.assertEqual(expected_part_shift, r._part_shift) + self.assertIsNone(r.next_part_power) + + def test_prepare_increase_partition_power_failed(self): + rb = ring.RingBuilder(8, 3.0, 1) + self.assertEqual(rb.part_power, 8) + + self.assertTrue(rb.prepare_increase_partition_power()) + self.assertEqual(rb.next_part_power, 9) + + # next_part_power is still set, do not increase again + self.assertFalse(rb.prepare_increase_partition_power()) + self.assertEqual(rb.next_part_power, 9) + + def test_increase_partition_power_failed(self): + rb = ring.RingBuilder(8, 3.0, 1) + self.assertEqual(rb.part_power, 8) + + # add more devices than replicas to the ring + for i in range(10): + dev = "sdx%s" % i + rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1, + 'ip': '127.0.0.1', 'port': 10000, 'device': dev}) + rb.rebalance(seed=1) + + # next_part_power not set, can't increase the part power + self.assertFalse(rb.increase_partition_power()) + self.assertEqual(rb.part_power, 8) + + self.assertTrue(rb.prepare_increase_partition_power()) + + self.assertTrue(rb.increase_partition_power()) + self.assertEqual(rb.part_power, 9) + + # part_power already increased + self.assertFalse(rb.increase_partition_power()) + self.assertEqual(rb.part_power, 9) + + def test_cancel_increase_partition_power(self): + rb = ring.RingBuilder(8, 3.0, 1) + self.assertEqual(rb.part_power, 8) + + # add more devices than replicas to the ring + for i in range(10): + dev = "sdx%s" % i + rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1, + 'ip': '127.0.0.1', 'port': 10000, 'device': dev}) + rb.rebalance(seed=1) + + old_version = rb.version + self.assertTrue(rb.prepare_increase_partition_power()) + + # sanity checks + self.assertEqual(8, rb.part_power) + self.assertEqual(9, rb.next_part_power) + self.assertEqual(rb.version, old_version + 1) + + self.assertTrue(rb.cancel_increase_partition_power()) + rb.validate() + + self.assertEqual(8, rb.part_power) + self.assertEqual(8, rb.next_part_power) + self.assertEqual(rb.version, old_version + 2) + class TestGetRequiredOverload(unittest.TestCase): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 25926f2e..e4cf63e1 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -3809,6 +3809,28 @@ cluster_dfw1 = http://dfw1.host/v1/ self.fail('Invalid results from pure function:\n%s' % '\n'.join(failures)) + def test_replace_partition_in_path(self): + # Check for new part = part * 2 + old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f' + new = '/s/n/d/o/1400/c77/af088baea4806dcaba30bf07d9e64c77/f' + # Expected outcome + self.assertEqual(utils.replace_partition_in_path(old, 11), new) + + # Make sure there is no change if the part power didn't change + self.assertEqual(utils.replace_partition_in_path(old, 10), old) + self.assertEqual(utils.replace_partition_in_path(new, 11), new) + + # Check for new part = part * 2 + 1 + old = '/s/n/d/o/693/c77/ad708baea4806dcaba30bf07d9e64c77/f' + new = '/s/n/d/o/1387/c77/ad708baea4806dcaba30bf07d9e64c77/f' + + # Expected outcome + self.assertEqual(utils.replace_partition_in_path(old, 11), new) + + # Make sure there is no change if the part power didn't change + self.assertEqual(utils.replace_partition_in_path(old, 10), old) + self.assertEqual(utils.replace_partition_in_path(new, 11), new) + class ResellerConfReader(unittest.TestCase): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 3490ab94..d9c8d616 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -28,6 +28,7 @@ import uuid import xattr import re import six +import struct from collections import defaultdict from random import shuffle, randint from shutil import rmtree @@ -3839,16 +3840,25 @@ class DiskFileMixin(BaseDiskFileTestMixin): DiskFileNoSpace, diskfile.write_metadata, 'n/a', metadata) - def _create_diskfile_dir(self, timestamp, policy, legacy_durable=False): + def _create_diskfile_dir(self, timestamp, policy, legacy_durable=False, + partition=0, next_part_power=None, + expect_error=False): timestamp = Timestamp(timestamp) df = self._simple_get_diskfile(account='a', container='c', obj='o_%s' % policy, - policy=policy) + policy=policy, + partition=partition, + next_part_power=next_part_power) frag_index = None if policy.policy_type == EC_POLICY: frag_index = df._frag_index or 7 - write_diskfile(df, timestamp, frag_index=frag_index, - legacy_durable=legacy_durable) + if expect_error: + with self.assertRaises(Exception): + write_diskfile(df, timestamp, frag_index=frag_index, + legacy_durable=legacy_durable) + else: + write_diskfile(df, timestamp, frag_index=frag_index, + legacy_durable=legacy_durable) return df._datadir def test_commit(self): @@ -3892,7 +3902,77 @@ class DiskFileMixin(BaseDiskFileTestMixin): for policy in POLICIES: self._do_test_write_cleanup(policy, legacy_durable=True) - def test_commit_no_extra_fsync(self): + @mock.patch("swift.obj.diskfile.BaseDiskFileManager.cleanup_ondisk_files") + def test_write_cleanup_part_power_increase(self, mock_cleanup): + # Without next_part_power set we expect only one cleanup per DiskFile + # and no linking + for policy in POLICIES: + timestamp = Timestamp(time()).internal + df_dir = self._create_diskfile_dir(timestamp, policy) + self.assertEqual(1, mock_cleanup.call_count) + mock_cleanup.assert_called_once_with(df_dir) + mock_cleanup.reset_mock() + + # With next_part_power set to part_power + 1 we expect two cleanups per + # DiskFile: first cleanup the current directory, but also cleanup the + # future directory where hardlinks are created + for policy in POLICIES: + timestamp = Timestamp(time()).internal + df_dir = self._create_diskfile_dir( + timestamp, policy, next_part_power=11) + + self.assertEqual(2, mock_cleanup.call_count) + mock_cleanup.assert_any_call(df_dir) + + # Make sure the translated path is also cleaned up + expected_fname = utils.replace_partition_in_path( + os.path.join(df_dir, "dummy"), 11) + expected_dir = os.path.dirname(expected_fname) + mock_cleanup.assert_any_call(expected_dir) + + mock_cleanup.reset_mock() + + # With next_part_power set to part_power we expect two cleanups per + # DiskFile: first cleanup the current directory, but also cleanup the + # previous old directory + for policy in POLICIES: + digest = utils.hash_path( + 'a', 'c', 'o_%s' % policy, raw_digest=True) + partition = struct.unpack_from('>I', digest)[0] >> (32 - 10) + timestamp = Timestamp(time()).internal + df_dir = self._create_diskfile_dir( + timestamp, policy, partition=partition, next_part_power=10) + + self.assertEqual(2, mock_cleanup.call_count) + mock_cleanup.assert_any_call(df_dir) + + # Make sure the path using the old part power is also cleaned up + expected_fname = utils.replace_partition_in_path( + os.path.join(df_dir, "dummy"), 9) + expected_dir = os.path.dirname(expected_fname) + mock_cleanup.assert_any_call(expected_dir) + + mock_cleanup.reset_mock() + + @mock.patch.object(diskfile.BaseDiskFileManager, 'cleanup_ondisk_files', + side_effect=Exception) + def test_killed_before_cleanup(self, mock_cleanup): + for policy in POLICIES: + timestamp = Timestamp(time()).internal + digest = utils.hash_path( + 'a', 'c', 'o_%s' % policy, raw_digest=True) + partition = struct.unpack_from('>I', digest)[0] >> (32 - 10) + df_dir = self._create_diskfile_dir(timestamp, policy, + partition=partition, + next_part_power=11, + expect_error=True) + expected_fname = utils.replace_partition_in_path( + os.path.join(df_dir, "dummy"), 11) + expected_dir = os.path.dirname(expected_fname) + + self.assertEqual(os.listdir(df_dir), os.listdir(expected_dir)) + + def test_commit_fsync(self): for policy in POLICIES: df = self._simple_get_diskfile(account='a', container='c', obj='o', policy=policy) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index f3d7851b..0c78573a 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -65,7 +65,26 @@ def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs): yield fake_ssync -def _create_test_rings(path): +def make_ec_archive_bodies(policy, test_body): + segment_size = policy.ec_segment_size + # split up the body into buffers + chunks = [test_body[x:x + segment_size] + for x in range(0, len(test_body), segment_size)] + # encode the buffers into fragment payloads + fragment_payloads = [] + for chunk in chunks: + fragments = \ + policy.pyeclib_driver.encode(chunk) * policy.ec_duplication_factor + if not fragments: + break + fragment_payloads.append(fragments) + + # join up the fragment payloads per node + ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)] + return ec_archive_bodies + + +def _create_test_rings(path, next_part_power=None): testgz = os.path.join(path, 'object.ring.gz') intended_replica2part2dev_id = [ [0, 1, 2], @@ -87,14 +106,16 @@ def _create_test_rings(path): with closing(GzipFile(testgz, 'wb')) as f: pickle.dump( ring.RingData(intended_replica2part2dev_id, - intended_devs, intended_part_shift), + intended_devs, intended_part_shift, + next_part_power), f) testgz = os.path.join(path, 'object-1.ring.gz') with closing(GzipFile(testgz, 'wb')) as f: pickle.dump( ring.RingData(intended_replica2part2dev_id, - intended_devs, intended_part_shift), + intended_devs, intended_part_shift, + next_part_power), f) @@ -1218,6 +1239,19 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(self.reconstructor.suffix_count, 0) self.assertEqual(len(found_jobs), 6) + def test_reconstructor_skipped_partpower_increase(self): + self.reconstructor._reset_stats() + _create_test_rings(self.testdir, 10) + # Enforce re-reading the EC ring + POLICIES[1].object_ring = ring.Ring(self.testdir, ring_name='object-1') + + self.reconstructor.reconstruct() + + self.assertEqual(0, self.reconstructor.reconstruction_count) + warnings = self.reconstructor.logger.get_lines_for_level('warning') + self.assertIn( + "next_part_power set in policy 'one'. Skipping", warnings) + class TestGlobalSetupObjectReconstructorLegacyDurable( TestGlobalSetupObjectReconstructor): diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 29c1d142..4913ad50 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -131,7 +131,7 @@ def _mock_process(ret): object_replicator.subprocess.Popen = orig_process -def _create_test_rings(path, devs=None): +def _create_test_rings(path, devs=None, next_part_power=None): testgz = os.path.join(path, 'object.ring.gz') intended_replica2part2dev_id = [ [0, 1, 2, 3, 4, 5, 6], @@ -159,14 +159,14 @@ def _create_test_rings(path, devs=None): with closing(GzipFile(testgz, 'wb')) as f: pickle.dump( ring.RingData(intended_replica2part2dev_id, - intended_devs, intended_part_shift), + intended_devs, intended_part_shift, next_part_power), f) testgz = os.path.join(path, 'object-1.ring.gz') with closing(GzipFile(testgz, 'wb')) as f: pickle.dump( ring.RingData(intended_replica2part2dev_id, - intended_devs, intended_part_shift), + intended_devs, intended_part_shift, next_part_power), f) for policy in POLICIES: policy.object_ring = None # force reload @@ -1959,6 +1959,15 @@ class TestObjectReplicator(unittest.TestCase): # After 10 cycles every partition is seen exactly once self.assertEqual(sorted(range(partitions)), sorted(seen)) + def test_replicate_skipped_partpower_increase(self): + _create_test_rings(self.testdir, next_part_power=4) + self.replicator.replicate() + self.assertEqual(0, self.replicator.job_count) + self.assertEqual(0, self.replicator.replication_count) + warnings = self.logger.get_lines_for_level('warning') + self.assertIn( + "next_part_power set in policy 'one'. Skipping", warnings) + if __name__ == '__main__': unittest.main()