From ae062f8b09aed2f7bad9581396607045ea217fa8 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Thu, 17 Mar 2022 22:30:40 -0700 Subject: [PATCH] ring: Introduce a v2 ring format There's a bunch of moving pieces here: - Add a new RingWriter class. Stick it in a new swift.common.ring.io module. You *can* use it like the old gzip file, but you can also define named sections which can be referenced later on read. Section names may be arbitrary strings, but the "swift/" prefix is reserved for upstream use. Sections must contain a single length-value encoded BLOB. If sections are used, an additional BLOB is written at the end containing a JSON section-index, followed by an uncompressed offset for the index. Move RingReader to ring/io.py, too. - Clean up some ring metadata handling: - Drop MD5 tracking in RingReader. It was brittle at best anyway, and nothing uses it. YAGNI - Fix size/raw_size attributes when loading only metadata. - Add the ability to seek within RingReaders, though you need to know what you're doing and only seek to flush points. - Let RingBuilder objects change how wide their replica2part2dev_id arrays are. Add a dev_id_bytes key to serialized ring metadata. dev_id_bytes may be either 2 or 4, but 4 requires v2 rings. We considered allowing dev_id_bytes of 1, but dropped it as unnecessary complexity for a niche use case. - swift-ring-builder version subcommand added, which takes a ring. This lets operators see the serialization format of a ring on disk: $ swift-ring-builder object.ring.gz version object.ring.gz: Serialization version: 2 (2-byte IDs), build version: 54 Signed-off-by: Tim Burke Change-Id: Ia0ac4ea2006d8965d7fdb6659d355c77386adb70 --- .gitignore | 1 + doc/source/index.rst | 1 + doc/source/overview_ring_format.rst | 253 +++++++ doc/source/ring.rst | 10 + doc/source/ring_partpower.rst | 2 + etc/magic | 20 + swift/cli/ringbuilder.py | 77 +- swift/common/exceptions.py | 4 + swift/common/ring/builder.py | 87 ++- swift/common/ring/composite_builder.py | 9 +- swift/common/ring/io.py | 657 ++++++++++++++++++ swift/common/ring/ring.py | 285 +++++--- swift/common/ring/utils.py | 68 ++ test/unit/cli/test_default_output.stub | 2 +- .../cli/test_default_output_id_assigned.stub | 2 +- test/unit/cli/test_default_sorted_output.stub | 10 +- test/unit/cli/test_ipv6_output.stub | 2 +- test/unit/cli/test_ringbuilder.py | 173 ++++- test/unit/common/ring/test_builder.py | 74 +- .../common/ring/test_composite_builder.py | 7 +- test/unit/common/ring/test_io.py | 284 ++++++++ test/unit/common/ring/test_ring.py | 497 +++++++++++-- 22 files changed, 2325 insertions(+), 200 deletions(-) create mode 100644 doc/source/overview_ring_format.rst create mode 100644 etc/magic create mode 100644 swift/common/ring/io.py create mode 100644 test/unit/common/ring/test_io.py diff --git a/.gitignore b/.gitignore index f6a236df14..46ae0a33b4 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ test/probe/.noseids RELEASENOTES.rst releasenotes/notes/reno.cache /tools/playbooks/**/*.retry +.vscode/* diff --git a/doc/source/index.rst b/doc/source/index.rst index bddb243ce7..6332d396e5 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -47,6 +47,7 @@ Overview and Concepts overview_architecture overview_wsgi_management overview_ring + overview_ring_format overview_policies overview_reaper overview_auth diff --git a/doc/source/overview_ring_format.rst b/doc/source/overview_ring_format.rst new file mode 100644 index 0000000000..7f108c3d8c --- /dev/null +++ b/doc/source/overview_ring_format.rst @@ -0,0 +1,253 @@ +================= +Ring File Formats +================= + +The ring is the most important data structure in Swift. How this data structure +been serialized to disk has changed over the years. + +Initially ring files contain three key pieces of information: + +* the part_power value (often stored as ``part_shift := 32 - part_power``) + + * which determines how many partitions are in the ring, + +* the device list + + * which includes all the disks participating in the ring, and + +* the replica-to-part-to-device table + + * which has all ``replica_count * (2 ** part_power)`` partition assignments. + +But the ability to extend the serialization format to add more data structures +to the ring serialization format has meant a new ring v2 format has been created. + +Ring files have always been gzipped when serialized, though the inner, +raw format has evolved over the years. + +Ring v0 +------- + +Initially, rings were simply pickle dumps of the RingData object. `With +Swift 1.3.0 `__, this +changed to pickling a pure-stdlib data structure, but the core concept +was the same. + +.. note: + + Swift 2.36.0 dropped support for v0 rings. + +Ring v1 +------- + +Pickle presented some problems, however. While `there are security +concerns `__ around unpickling +untrusted data, security boundaries are generally drawn such that rings are +assumed to be trusted. Ultimately, what pushed us to a new format were +`performance considerations `__. + +Starting in `Swift 1.7.0 `__, +Swift began using a new format (while still being willing to read the old one). +The new format starts with some magic so we may identify it as such:: + + +---------------+-------+ + |'R' '1' 'N' 'G'| | + +---------------+-------+ + +where ```` is a network-order two-byte version number (which is always 1). +After that, a JSON object is serialized as:: + + +---------------+-------...---+ + | | | + +---------------+-------...---+ + +where ```` is the network-order four-byte length (in bytes) of +````, which is the ASCII-encoded JSON-serialized object. This object +has at minimum three keys: + +* ``devs`` for the device list +* ``part_shift`` (i.e., ``32 - part_power``) +* ``replica_count`` for the integer number of part-to-device rows to read + +The replica-to-part-to-device table then follows:: + + +-------+-------+...+-------+-------+ + | | |...| | | + +-------+-------+...+-------+-------+ + | | |...| | | + +-------+-------+...+-------+-------+ + | ... | + +-------+-------+...+-------+-------+ + | | |...| + +-------+-------+...+ + +Each ```` is a host-order two-byte index into the ``devs`` list. Every row +except the last has exactly ``2 ** part_power`` entries; the last row may +have the same or fewer. + +The metadata object has proven quite versatile: new keys have been added +to provide additional information while remaining backwards-compatible. +In order, the following new fields have been added: + +* ``byteorder`` specifies whether the host-order for the + replica-to-part-to-device table is "big" or "little" endian. Added in + `Swift 2.12.0 `__, + this allows rings written on big-endian machines to be read on + little-endian machines and vice-versa. +* ``next_part_power`` indicates whether a partition-power increase is in + progress. Added in `Swift 2.15.0 `__, + this will have one of two values, if present: the ring's current + ``part_power``, indicating that there may be hardlinks to clean up, + or ``part_power + 1`` indicating that hardlinks may need to be created. + See :ref:`the documentation` + for more information. +* ``version`` specifies the version number of the ring-builder that was used + to write this ring. Added in `Swift 2.24.0 `__, + this allows the comparing of rings from different machines to determine + which is newer. + +Ring v2 +------- + +The way that v1 rings dealt with fractional replicas made it impossible +to reliably serialize additional large data structures after the +replica-to-part-to-device table. The v2 format has been designed to be +extensable. + +The new format starts with magic similar to v1:: + + +---------------+-------+ + |'R' '1' 'N' 'G'| | + +---------------+-------+ + +where is again a network-order two-byte version number (which is now 2). +By bumping the version number, we ensure that old versions of Swift refuse to +read the ring, rather than misinterpret the content. + +After that, a series of BLOBs are serialized, each as:: + + +-------------------------------+-------...---+ + | | | + +-------------------------------+-------...---+ + +where ```` is the network-order eight-byte length (in bytes) of +````. Each BLOB is preceded by a ``Z_FULL_FLUSH`` to allow it to be +decompressed without reading the whole file. + +The order of the BLOBs isn't important, although they do tend to be written +in the order Swift will read them while loading. This reduces the disk seeks +necessary to load. + +The final BLOB is an index: a JSON object mapping named sections to an array +of offsets within the file, like + +.. code:: + + { + section: [ + compressed start, + uncompressed start, + compressed end, + uncompressed end, + checksum method, + checksum value + ], + ... + } + +Section names may be arbitrary strings, but the "swift/" prefix is reserved +for upstream use. The start/end values mark the beginning and ending of the +section's BLOB. Note that some end values may be ``null`` if they were not +known when the index was written -- in particular, this *will* be true for +the index itself. The checksum method should be one of ``"md5"``, ``"sha1"``, +``"sha256"``, or ``"sha512"``; other values will be ignored in anticipation +of a need to support further algorithms. The checksum value will be the +hex-encoded digest of the uncompressed section's bytes. Like end values, +checksum data may be ``null`` if not known when the index is written. + +Finally, a "tail" is written: + +* the gzip stream is flushed with another ``Z_FULL_FLUSH``, +* the stream is switched to uncompressed, +* the eight-byte offset of the uncompressed start of the index is written, +* the gzip stream is flushed with another ``Z_FULL_FLUSH``, +* the eight-byte offset of the compressed start of the index is written, +* the gzip stream is flushed with another ``Z_FULL_FLUSH``, and +* the gzip stream is closed; this involves: + + * flushing the underlying deflate stream with ``Z_FINISH`` + * writing ``CRC32`` (of the full uncompressed data) + * writing ``ISIZE`` (the length of the full uncompressed data ``mod 2 ** 32``) + +By switching to uncompressed, we can know exactly how many bytes will be +written in the tail, so that when reading we can quickly seek to and read the +index offset, seek to the index start, and read the index. From there we +can do similar things for any other section. + + +* Seek to the end of the file +* Go back 31 bytes in the underlying file; this should leave us at the start of + the deflate block containing the offset for the compressed start +* Decompress 8 bytes from the deflate stream to get the location of the + compressed start of the index BLOB +* Seek to that location +* Read/decompress the size of the index BLOB +* Read/decompress the json serialized index. + +.. note:: This 31 bytes is the deflate block containing the 8 byte location, + a ``Z_FULL_FLUSH`` block, the ``Z_FINISH`` block, and the ``CRC32`` and + ``ISIZE``. For more information, see `RFC 1951`_ (for the deflate stream) + and `RFC 1952`_ (for the gzip format). + +The currently defined section and section names upstream are as follows: + +* ``swift/index`` - The swift index +* ``swift/ring/metadata`` - Ring metadata serialized as json +* ``swift/ring/devices`` - Devices json serialized data structure. + + * This has been seperated from the ring metadata structure in v1 as it + gets large + +* ``swift/ring/assignments`` - The ring replica2part2dev_id data structure + +.. note:: + Third-parties may find it useful to add their own sections; however, + the ``swift/`` prefix is reserved for future upstream enhancements. + +swift/ring/metadata +~~~~~~~~~~~~~~~~~~~ +This BLOB is an ASCII-encoded JSON object full of metadata, similar +to v1 rings. It has the following required keys: + +* ``part_shift`` +* ``dev_id_bytes`` specifies the number of bytes used for each ```` in the + replica-to-part-to-device table; will be one of 2, 4, or 8 + +Additionally, there are several optional keys which may be present: + +* ``next_part_power`` +* ``version`` + +Notice that two keys are no longer present: ``replica_count`` is no longer +needed as the size of the replica-to-part-to-device table is explicit, and +``byteorder`` is not needed as all data in v2 rings should be written using +network-order. + +swift/ring/devices +~~~~~~~~~~~~~~~~~~ +This BLOB contains a list of swift device dictionarys. And was seperated out +from the metadata BLOB as this can become a large structure in it's own right. + +swift/ring/assignments +~~~~~~~~~~~~~~~~~~~~~~ +This BLOB is the replica-to-part-to-device table. It's length will be +``replicas * (2 ** part_power) * dev_id_bytes``, where ``replicas`` is the exact +(potentially fractional) replica count for the ring. Unlike in v1, each +```` is written using network-order. + +Note that this is why we increased the size of ```` as compared to +the v1 format -- otherwise, we may not be able to represent rings with both +high ``replica_count`` and high ``part_power``. + +.. _RFC 1952: https://rfc-editor.org/rfc/rfc1952 +.. _RFC 1951: https://rfc-editor.org/rfc/rfc1951 diff --git a/doc/source/ring.rst b/doc/source/ring.rst index c65290d2be..196a0b88a3 100644 --- a/doc/source/ring.rst +++ b/doc/source/ring.rst @@ -4,6 +4,16 @@ Partitioned Consistent Hash Ring ******************************** +.. _ring-io: + +Ring IO +======= + +.. automodule:: swift.common.ring.io + :members: + :undoc-members: + :show-inheritance: + .. _ring: Ring diff --git a/doc/source/ring_partpower.rst b/doc/source/ring_partpower.rst index 42b94c5c1e..2e22bec5ed 100644 --- a/doc/source/ring_partpower.rst +++ b/doc/source/ring_partpower.rst @@ -1,3 +1,5 @@ +.. _modify_part_power: + ============================== Modifying Ring Partition Power ============================== diff --git a/etc/magic b/etc/magic new file mode 100644 index 0000000000..0bd2a65064 --- /dev/null +++ b/etc/magic @@ -0,0 +1,20 @@ +#------------------------------------------------------------------------------- +# Openstack swift +# Note: add this snippet to either /etc/magic or ~/.magic +#------------------------------------------------------------------------------- +# gzip compressed +0 beshort 0x1f8b +# compress method: deflate, flags: FNAME +>&0 beshort 0x0808 +# skip ahead another 6 (MTIME, XLF, OS); read FNAME +>>&6 search/0x40 \0 +# Skip ahead five; should cover +# 00 -- uncompressed block +# 06 00 -- ... of length 6 +# f9 ff -- (one's complement of length) +>>>&5 string/4 R1NG Swift ring, +>>>>&0 clear x +>>>>&0 beshort 1 version 1 +>>>>&0 beshort 2 version 2 +>>>>&0 default x +>>>>>&0 beshort x unknown version (0x%04x) diff --git a/swift/cli/ringbuilder.py b/swift/cli/ringbuilder.py index 1ddd66c4dd..2f609ad018 100644 --- a/swift/cli/ringbuilder.py +++ b/swift/cli/ringbuilder.py @@ -34,6 +34,7 @@ from swift.common import exceptions from swift.common.ring import RingBuilder, Ring, RingData from swift.common.ring.builder import MAX_BALANCE from swift.common.ring.composite_builder import CompositeRingBuilder +from swift.common.ring.ring import RING_CODECS, DEFAULT_RING_FORMAT_VERSION from swift.common.ring.utils import validate_args, \ validate_and_normalize_ip, build_dev_from_opts, \ parse_builder_ring_filename_args, parse_search_value, \ @@ -47,6 +48,8 @@ EXIT_SUCCESS = 0 EXIT_WARNING = 1 EXIT_ERROR = 2 +FORMAT_CHOICES = [str(v) for v in RING_CODECS] + global argv, backup_dir, builder, builder_file, ring_file argv = backup_dir = builder = builder_file = ring_file = None @@ -594,9 +597,9 @@ swift-ring-builder dispersion_trailer = '' if builder.dispersion is None else ( ', %.02f dispersion' % (builder.dispersion)) print('%d partitions, %.6f replicas, %d regions, %d zones, ' - '%d devices, %.02f balance%s' % ( + '%d devices, %d-byte IDs, %.02f balance%s' % ( builder.parts, builder.replicas, regions, zones, dev_count, - balance, dispersion_trailer)) + builder.dev_id_bytes, balance, dispersion_trailer)) print('The minimum number of hours before a partition can be ' 'reassigned is %s (%s remaining)' % ( builder.min_part_hours, @@ -617,6 +620,9 @@ swift-ring-builder except Exception as exc: print('Ring file %s is invalid: %r' % (ring_file, exc)) else: + # mostly just an implementation detail + builder_dict.pop('dev_id_bytes', None) + ring_dict.pop('dev_id_bytes', None) if builder_dict == ring_dict: print('Ring file %s is up-to-date' % ring_file) else: @@ -656,6 +662,24 @@ swift-ring-builder print(ring_empty_error) exit(EXIT_SUCCESS) + @staticmethod + def version(): + """ +swift-ring-builder version + """ + if len(argv) < 3: + print(Commands.create.__doc__.strip()) + exit(EXIT_ERROR) + try: + rd = RingData.load(ring_file, metadata_only=True) + except ValueError as e: + print(e) + exit(EXIT_ERROR) + print('%s: Serialization version: %d (%d-byte IDs), ' + 'build version: %d' % + (ring_file, rd.format_version, rd.dev_id_bytes, rd.version)) + exit(EXIT_SUCCESS) + @staticmethod def search(): """ @@ -1051,7 +1075,19 @@ swift-ring-builder rebalance [options] parser.add_option('-s', '--seed', help="seed to use for rebalance") parser.add_option('-d', '--debug', action='store_true', help="print debug information") + parser.add_option('--format-version', + choices=FORMAT_CHOICES, default=None, + help="specify ring format version") options, args = parser.parse_args(argv) + if options.format_version is None: + print("Defaulting to --format-version=1. This ensures the ring\n" + "written will be readable by older versions of Swift.\n" + "In a future release, the default will change to\n" + "--format-version=2\n") + options.format_version = DEFAULT_RING_FORMAT_VERSION + else: + # N.B. choices doesn't work with type=int + options.format_version = int(options.format_version) def get_seed(index): if options.seed: @@ -1166,9 +1202,11 @@ swift-ring-builder rebalance [options] status = EXIT_WARNING ts = time() builder.get_ring().save( - pathjoin(backup_dir, '%d.' % ts + basename(ring_file))) + pathjoin(backup_dir, '%d.' % ts + basename(ring_file)), + format_version=options.format_version) builder.save(pathjoin(backup_dir, '%d.' % ts + basename(builder_file))) - builder.get_ring().save(ring_file) + builder.get_ring().save( + ring_file, format_version=options.format_version) builder.save(builder_file) exit(status) @@ -1293,6 +1331,22 @@ swift-ring-builder write_ring 'set_info' calls when no rebalance is needed but you want to send out the new device information. """ + usage = Commands.write_ring.__doc__.strip() + parser = optparse.OptionParser(usage) + parser.add_option('--format-version', + choices=FORMAT_CHOICES, default=None, + help="specify ring format version") + options, args = parser.parse_args(argv) + if options.format_version is None: + print("Defaulting to --format-version=1. This ensures the ring\n" + "written will be readable by older versions of Swift.\n" + "In a future release, the default will change to\n" + "--format-version=2\n") + options.format_version = DEFAULT_RING_FORMAT_VERSION + else: + # N.B. choices doesn't work with type=int + options.format_version = int(options.format_version) + if not builder.devs: print('Unable to write empty ring.') exit(EXIT_ERROR) @@ -1304,8 +1358,9 @@ swift-ring-builder write_ring 'assignments but with devices; did you forget to run ' '"rebalance"?', file=sys.stderr) ring_data.save( - pathjoin(backup_dir, '%d.' % time() + basename(ring_file))) - ring_data.save(ring_file) + pathjoin(backup_dir, '%d.' % time() + basename(ring_file)), + format_version=options.format_version) + ring_data.save(ring_file, format_version=options.format_version) exit(EXIT_SUCCESS) @staticmethod @@ -1653,8 +1708,11 @@ def main(arguments=None): builder_file, ring_file = parse_builder_ring_filename_args(argv) if builder_file != argv[1]: - print('Note: using %s instead of %s as builder file' % ( - builder_file, argv[1])) + if len(argv) > 2 and argv[2] in ('write_builder', 'version'): + pass + else: + print('Note: using %s instead of %s as builder file' % ( + builder_file, argv[1])) try: builder = RingBuilder.load(builder_file) @@ -1668,7 +1726,8 @@ def main(arguments=None): print(msg) exit(EXIT_ERROR) except (exceptions.FileNotFoundError, exceptions.PermissionError) as e: - if len(argv) < 3 or argv[2] not in ('create', 'write_builder'): + if len(argv) < 3 or argv[2] not in ('create', 'write_builder', + 'version'): print(e) exit(EXIT_ERROR) except Exception as e: diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index b5cc199157..a3e1f2b68a 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -133,6 +133,10 @@ class PathNotDir(OSError): pass +class DevIdBytesTooSmall(ValueError): + pass + + class ChunkReadError(SwiftException): pass diff --git a/swift/common/ring/builder.py b/swift/common/ring/builder.py index 77b00bcd97..e4ea373909 100644 --- a/swift/common/ring/builder.py +++ b/swift/common/ring/builder.py @@ -33,12 +33,12 @@ from time import time from swift.common import exceptions from swift.common.ring.ring import RingData from swift.common.ring.utils import tiers_for_dev, build_tier_tree, \ - validate_and_normalize_address, validate_replicas_by_tier, pretty_dev + validate_and_normalize_address, validate_replicas_by_tier, pretty_dev, \ + none_dev_id, calc_dev_id_bytes, BYTES_TO_TYPE_CODE, resize_array # we can't store None's in the replica2part2dev array, so we high-jack # the max value for magic to represent the part is not currently # assigned to any device. -NONE_DEV = 2 ** 16 - 1 MAX_BALANCE = 999.99 MAX_BALANCE_GATHER_COUNT = 3 @@ -156,6 +156,31 @@ class RingBuilder(object): def part_shift(self): return 32 - self.part_power + @property + def dev_id_bytes(self): + if not self._replica2part2dev: + max_dev_id = len(self.devs) - 1 if self.devs else 0 + return calc_dev_id_bytes(max_dev_id) + return self._replica2part2dev[0].itemsize + + def set_dev_id_bytes(self, new_dev_id_bytes): + if self._replica2part2dev: + self._replica2part2dev = [ + resize_array(p2d, new_dev_id_bytes) + for p2d in self._replica2part2dev] + + @property + def dev_id_type_code(self): + return BYTES_TO_TYPE_CODE[self.dev_id_bytes] + + @property + def max_dev_id(self): + return none_dev_id(self.dev_id_bytes) - 1 + + @property + def none_dev_id(self): + return none_dev_id(self.dev_id_bytes) + @property def ever_rebalanced(self): return self._replica2part2dev is not None @@ -295,6 +320,7 @@ class RingBuilder(object): 'parts': self.parts, 'devs': self.devs, 'devs_changed': self.devs_changed, + 'dev_id_bytes': self.dev_id_bytes, 'version': self.version, 'overload': self.overload, '_replica2part2dev': self._replica2part2dev, @@ -369,8 +395,8 @@ class RingBuilder(object): version=self.version) else: self._ring = \ - RingData([array('H', p2d) for p2d in - self._replica2part2dev], + RingData([array(self.dev_id_type_code, p2d) + for p2d in self._replica2part2dev], devs, self.part_shift, self.next_part_power, self.version) @@ -417,6 +443,9 @@ class RingBuilder(object): if dev['id'] < len(self.devs) and self.devs[dev['id']] is not None: raise exceptions.DuplicateDeviceError( 'Duplicate device id: %d' % dev['id']) + if dev['id'] > self.max_dev_id: + self.set_dev_id_bytes(calc_dev_id_bytes(dev['id'])) + # Add holes to self.devs to ensure self.devs[dev['id']] will be the dev while dev['id'] >= len(self.devs): self.devs.append(None) @@ -559,10 +588,11 @@ class RingBuilder(object): # gather parts from replica count adjustment self._adjust_replica2part2dev_size(assign_parts) # gather parts from failed devices - removed_devs = self._gather_parts_from_failed_devices(assign_parts) + self._gather_parts_from_failed_devices(assign_parts) # gather parts for dispersion (N.B. this only picks up parts that # *must* disperse according to the replica plan) self._gather_parts_for_dispersion(assign_parts, replica_plan) + removed_devs = self._remove_failed_devices() # we'll gather a few times, or until we archive the plan for gather_count in range(MAX_BALANCE_GATHER_COUNT): @@ -747,7 +777,8 @@ class RingBuilder(object): )) break dev_id = self._replica2part2dev[replica][part] - if dev_id >= dev_len or not self.devs[dev_id]: + if dev_id == self.none_dev_id or dev_id >= dev_len or \ + self.devs[dev_id] is None: raise exceptions.RingValidationError( "Partition %d, replica %d was not allocated " "to a device." % @@ -987,24 +1018,45 @@ class RingBuilder(object): # reassign these partitions. However, we mark them as moved so later # choices will skip other replicas of the same partition if possible. + gathered_parts = 0 if self._remove_devs: dev_ids = [d['id'] for d in self._remove_devs if d['parts']] if dev_ids: for part, replica in self._each_part_replica(): dev_id = self._replica2part2dev[replica][part] if dev_id in dev_ids: - self._replica2part2dev[replica][part] = NONE_DEV + self._replica2part2dev[replica][part] = \ + self.none_dev_id self._set_part_moved(part) assign_parts[part].append(replica) + gathered_parts += 1 self.logger.debug( "Gathered %d/%d from dev %d [dev removed]", part, replica, dev_id) + return gathered_parts + + def _remove_failed_devices(self): removed_devs = 0 while self._remove_devs: remove_dev_id = self._remove_devs.pop()['id'] self.logger.debug("Removing dev %d", remove_dev_id) self.devs[remove_dev_id] = None removed_devs += 1 + + # Trim the dev list + while self.devs and self.devs[-1] is None: + self.devs.pop() + + if self.dev_id_bytes > 2: + # Consider shrinking the device IDs themselves + new_dev_id_bytes = self.dev_id_bytes // 2 + new_none_dev_id = none_dev_id(new_dev_id_bytes) + # Only shrink if the IDs all fit in the lower half of the next size + # down; this avoids excess churn when adding/removing devices near + # the limit of a particular dev_id_bytes + if len(self.devs) < new_none_dev_id // 2: + self.set_dev_id_bytes(new_dev_id_bytes) + return removed_devs def _adjust_replica2part2dev_size(self, to_assign): @@ -1052,7 +1104,7 @@ class RingBuilder(object): # newly-added pieces assigned to devices. for part in range(len(part2dev), desired_length): to_assign[part].append(replica) - part2dev.append(NONE_DEV) + part2dev.append(self.none_dev_id) new_parts += 1 elif len(part2dev) > desired_length: # Too long: truncate this mapping. @@ -1068,7 +1120,8 @@ class RingBuilder(object): to_assign[part].append(replica) new_parts += 1 self._replica2part2dev.append( - array('H', itertools.repeat(NONE_DEV, desired_length))) + array(self.dev_id_type_code, + itertools.repeat(self.none_dev_id, desired_length))) self.logger.debug( "%d new parts and %d removed parts from replica-count change", @@ -1095,7 +1148,7 @@ class RingBuilder(object): undispersed_dev_replicas = [] for replica in self._replicas_for_part(part): dev_id = self._replica2part2dev[replica][part] - if dev_id == NONE_DEV: + if dev_id == self.none_dev_id: continue dev = self.devs[dev_id] if all(replicas_at_tier[tier] <= @@ -1123,7 +1176,7 @@ class RingBuilder(object): self.logger.debug( "Gathered %d/%d from dev %s [dispersion]", part, replica, pretty_dev(dev)) - self._replica2part2dev[replica][part] = NONE_DEV + self._replica2part2dev[replica][part] = self.none_dev_id for tier in dev['tiers']: replicas_at_tier[tier] -= 1 self._set_part_moved(part) @@ -1158,7 +1211,7 @@ class RingBuilder(object): replicas_at_tier = defaultdict(int) for replica in self._replicas_for_part(part): dev_id = self._replica2part2dev[replica][part] - if dev_id == NONE_DEV: + if dev_id == self.none_dev_id: continue dev = self.devs[dev_id] for tier in dev['tiers']: @@ -1195,7 +1248,7 @@ class RingBuilder(object): self.logger.debug( "Gathered %d/%d from dev %s [weight disperse]", part, replica, pretty_dev(dev)) - self._replica2part2dev[replica][part] = NONE_DEV + self._replica2part2dev[replica][part] = self.none_dev_id for tier in dev['tiers']: replicas_at_tier[tier] -= 1 parts_wanted_in_tier[tier] -= 1 @@ -1249,7 +1302,7 @@ class RingBuilder(object): overweight_dev_replica = [] for replica in self._replicas_for_part(part): dev_id = self._replica2part2dev[replica][part] - if dev_id == NONE_DEV: + if dev_id == self.none_dev_id: continue dev = self.devs[dev_id] if dev['parts_wanted'] < 0: @@ -1271,7 +1324,7 @@ class RingBuilder(object): self.logger.debug( "Gathered %d/%d from dev %s [weight forced]", part, replica, pretty_dev(dev)) - self._replica2part2dev[replica][part] = NONE_DEV + self._replica2part2dev[replica][part] = self.none_dev_id self._set_part_moved(part) def _reassign_parts(self, reassign_parts, replica_plan): @@ -1692,7 +1745,7 @@ class RingBuilder(object): if part >= len(part2dev): continue dev_id = part2dev[part] - if dev_id == NONE_DEV: + if dev_id == self.none_dev_id: continue devs.append(self.devs[dev_id]) return devs @@ -1863,7 +1916,7 @@ class RingBuilder(object): new_replica2part2dev = [] for replica in self._replica2part2dev: - new_replica = array('H') + new_replica = array(self.dev_id_type_code) for device in replica: new_replica.append(device) new_replica.append(device) # append device a second time diff --git a/swift/common/ring/composite_builder.py b/swift/common/ring/composite_builder.py index 2b36c0804a..929e03b105 100644 --- a/swift/common/ring/composite_builder.py +++ b/swift/common/ring/composite_builder.py @@ -98,6 +98,8 @@ from random import shuffle from swift.common.exceptions import RingBuilderError from swift.common.ring import RingBuilder from swift.common.ring import RingData +from swift.common.ring.utils import calc_dev_id_bytes +from swift.common.ring.utils import resize_array from collections import defaultdict from itertools import combinations @@ -198,6 +200,9 @@ def _make_composite_ring(builders): :return: a new RingData instance built from the component builders :raises ValueError: if the builders are invalid with respect to each other """ + total_devices = sum(len(builder.devs) for builder in builders) + dev_id_bytes = calc_dev_id_bytes(total_devices) + composite_r2p2d = [] composite_devs = [] device_offset = 0 @@ -205,7 +210,9 @@ def _make_composite_ring(builders): # copy all devs list and replica2part2dev table to be able # to modify the id for each dev devs = copy.deepcopy(builder.devs) - r2p2d = copy.deepcopy(builder._replica2part2dev) + # Note that resize_array() always makes a copy + r2p2d = [resize_array(p2d, dev_id_bytes) + for p2d in builder._replica2part2dev] for part2dev in r2p2d: for part, dev in enumerate(part2dev): part2dev[part] += device_offset diff --git a/swift/common/ring/io.py b/swift/common/ring/io.py new file mode 100644 index 0000000000..0e5adabfb9 --- /dev/null +++ b/swift/common/ring/io.py @@ -0,0 +1,657 @@ +# Copyright (c) 2022 NVIDIA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import contextlib +import dataclasses +import gzip +import hashlib +import json +import logging +import os +import string +import struct +import tempfile +from typing import Optional +import zlib + +from swift.common.ring.utils import BYTES_TO_TYPE_CODE, network_order_array, \ + read_network_order_array + +ZLIB_FLUSH_MARKER = b"\x00\x00\xff\xff" +# we could pull from io.DEFAULT_BUFFER_SIZE, but... 8k seems small +DEFAULT_BUFFER_SIZE = 2 ** 16 +# v2 rings have sizes written with each section, as well as offsets at the end +# We *hope* we never need to go past 2**32-1 for those, but just in case... +V2_SIZE_FORMAT = "!Q" + + +class GzipReader(object): + chunk_size = DEFAULT_BUFFER_SIZE + + def __init__(self, fileobj): + self.fp = fileobj + self.reset_decompressor() + + @property + def name(self): + return self.fp.name + + def close(self): + self.fp.close() + + def read_sizes(self): + """ + Read the uncompressed and compressed sizes of the whole file. + + Gzip writes the uncompressed length (mod 2**32) write at the end. + Then we just need to ``tell()`` to get the compressed length. + """ + self.fp.seek(-4, os.SEEK_END) + uncompressed_size, = struct.unpack("= 0: + self._decompress_from_buffer(x + len(ZLIB_FLUSH_MARKER)) + return False + + chunk = self.fp.read(self.chunk_size) + if not chunk: + self._decompress_from_buffer(len(self.compressed_buffer)) + return True + self.compressed_buffer += chunk + + # if we found a flush marker in the new chunk, only go that far + x = self.compressed_buffer.find(ZLIB_FLUSH_MARKER) + if x >= 0: + self._decompress_from_buffer(x + len(ZLIB_FLUSH_MARKER)) + return False + + # we may have *almost* found the flush marker; + # gotta keep some of the tail + keep = len(ZLIB_FLUSH_MARKER) - 1 + # note that there's no guarantee that buffer will actually grow -- + # but we don't want to have more in compressed_buffer than strictly + # necessary + self._decompress_from_buffer(len(self.compressed_buffer) - keep) + return False + + def read(self, amount=-1): + """ + Read ``amount`` uncompressed bytes. + + :raises IOError: if you try to read everything + :raises zlib.error: if ``seek()`` was last called with a position + not at a flush boundary + """ + if amount < 0: + raise IOError("don't be greedy") + + while amount > len(self.buffer): + if self._buffer_chunk(): + break + + data, self.buffer = self.buffer[:amount], self.buffer[amount:] + return data + + +class SectionReader(object): + """ + A file-like wrapper that limits how many bytes may be read. + + Optionally, also verify data integrity. + + :param fp: a file-like object opened with mode "rb" + :param length: the maximum number of bytes that should be read + :param digest: optional hex digest of the expected bytes + :param checksum: checksumming instance to be fed bytes and later compared + against ``digest``; e.g. ``hashlib.sha256()`` + """ + def __init__(self, fp, length, digest=None, checksum=None): + self._fp = fp + self._remaining = length + self._digest = digest + self._checksum = checksum + + def read(self, amt=None): + """ + Read ``amt`` bytes, defaulting to "all remaining available bytes". + """ + if amt is None or amt < 0: + amt = self._remaining + amt = min(amt, self._remaining) + data = self._fp.read(amt) + self._remaining -= len(data) + if self._checksum: + self._checksum.update(data) + return data + + def read_ring_table(self, itemsize, partition_count): + max_row_len = itemsize * partition_count + type_code = BYTES_TO_TYPE_CODE[itemsize] + return [ + read_network_order_array(type_code, row) + for row in iter(lambda: self.read(max_row_len), b'') + ] + + def close(self): + """ + Verify that all bytes were read. + + If a digest was provided, also verify that the bytes read match + the digest. Does *not* close the underlying file-like. + + :raises ValueError: if verification fails + """ + if self._remaining: + raise ValueError('Incomplete read; expected %d more bytes ' + 'to be read' % self._remaining) + if self._digest and self._checksum.hexdigest() != self._digest: + raise ValueError('Hash mismatch in block: %r found; %r expected' % + (self._checksum.hexdigest(), self._digest)) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +@dataclasses.dataclass(frozen=True) +class IndexEntry: + compressed_start: int + uncompressed_start: int + compressed_end: Optional[int] = None + uncompressed_end: Optional[int] = None + checksum_method: Optional[str] = None + checksum_value: Optional[str] = None + + @property + def uncompressed_length(self) -> Optional[int]: + if self.uncompressed_end is None: + return None + return self.uncompressed_end - self.uncompressed_start + + @property + def compressed_length(self) -> Optional[int]: + if self.compressed_end is None: + return None + return self.compressed_end - self.compressed_start + + @property + def compression_ratio(self) -> Optional[float]: + if self.uncompressed_end is None: + return None + return 1 - self.compressed_length / self.uncompressed_length + + +class RingReader(GzipReader): + """ + Helper for reading ring files. + + Provides format-version detection, and loads the index for v2 rings. + """ + chunk_size = DEFAULT_BUFFER_SIZE + + def __init__(self, fileobj): + super(RingReader, self).__init__(fileobj) + self.index = {} + + magic = self.read(4) + if magic != b"R1NG": + raise ValueError(f"Bad ring magic: {magic!r}") + + self.version, = struct.unpack("!H", self.read(2)) + if self.version not in (1, 2): + msg = f"Unsupported ring version: {self.version}" + if hasattr(fileobj, "name"): + msg += f" for {fileobj.name!r}" + raise ValueError(msg) + + # NB: In a lot of places, "raw" implies "file on disk", i.e., the + # compressed stream -- but here it's actually the uncompressed stream. + self.raw_size, self.size = self.read_sizes() + + self.load_index() + + self.seek(0) + + def load_index(self): + """ + If this is a v2 ring, load the index stored at the end. + + This will be done as part of initialization; users shouldn't need to + do this themselves. + """ + if self.version != 2: + return + + # See notes in RingWriter.write_index and RingWriter.__exit__ for + # where this 31 (= 18 + 13) came from. + self.seek(-31, os.SEEK_END) + try: + index_start, = struct.unpack(V2_SIZE_FORMAT, self.read(8)) + except zlib.error: + # TODO: we can still fix this if we're willing to read everything + raise IOError("Could not read index offset " + "(was the file recompressed?)") + self.seek(index_start) + # ensure index entries are sorted by position + self.index = collections.OrderedDict(sorted( + ((section, IndexEntry(*entry)) + for section, entry in json.loads(self.read_blob()).items()), + key=lambda x: x[1].compressed_start)) + + def __contains__(self, section): + if self.version != 2: + return False + return section in self.index + + def read_blob(self, fmt=V2_SIZE_FORMAT): + """ + Read a length-value encoded BLOB + + Note that the RingReader needs to already be positioned correctly. + + :param fmt: the format code used to write the length of the BLOB. + All v2 BLOBs use ``!Q``, but v1 may require ``!I`` + :returns: the BLOB value + """ + prefix = self.read(struct.calcsize(fmt)) + blob_length, = struct.unpack(fmt, prefix) + return self.read(blob_length) + + def read_section(self, section): + """ + Seek to a section and read all its data + """ + with self.open_section(section) as reader: + return reader.read() + + @contextlib.contextmanager + def open_section(self, section): + """ + Open up a section without buffering the whole thing in memory + + :raises ValueError: if there is no index + :raises KeyError: if ``section`` is not in the index + :raises IOError: if there is a conflict between the section size in + the index and the length at the start of the blob + + :returns: a ``SectionReader`` wrapping the section + """ + if not self.index: + raise ValueError("No index loaded") + entry = self.index[section] + self.seek(entry.compressed_start) + size_len = struct.calcsize(V2_SIZE_FORMAT) + prefix = self.read(size_len) + blob_length, = struct.unpack(V2_SIZE_FORMAT, prefix) + if entry.compressed_end is not None and \ + size_len + blob_length != entry.uncompressed_length: + raise IOError("Inconsistent section size") + + if entry.checksum_method in ('md5', 'sha1', 'sha256', 'sha512'): + checksum = getattr(hashlib, entry.checksum_method)(prefix) + checksum_value = entry.checksum_value + else: + if entry.checksum_method is not None: + logging.getLogger('swift.ring').warning( + "Ignoring unsupported checksum %s:%s for section %s", + entry.checksum_method, entry.checksum_value, section) + checksum = checksum_value = None + + with SectionReader( + self, + blob_length, + digest=checksum_value, + checksum=checksum, + ) as reader: + yield reader + + +class GzipWriter(object): + def __init__(self, fileobj, filename='', mtime=1300507380.0): + self.raw_fp = fileobj + self.gzip_fp = gzip.GzipFile( + filename, + mode='wb', + fileobj=self.raw_fp, + mtime=mtime) + self.flushed = True + self.pos = 0 + + @classmethod + @contextlib.contextmanager + def open(cls, filename, *a, **kw): + """ + Open a compressed writer for ``filename`` + + Note that this also guarantees atomic writes using a temporary file + + :returns: a context manager that provides a ``GzipWriter`` instance + """ + fp = tempfile.NamedTemporaryFile( + dir=os.path.dirname(filename), + prefix=os.path.basename(filename), + delete=False) + try: + with cls(fp, filename, *a, **kw) as writer: + yield writer + except BaseException: + fp.close() + os.unlink(fp.name) + raise + else: + fp.flush() + os.fsync(fp.fileno()) + fp.close() + os.chmod(fp.name, 0o644) + os.rename(fp.name, filename) + + def __enter__(self): + return self + + def __exit__(self, e, v, t): + if e is None: + # only finalize if there was no error + self.close() + + def close(self): + # This does three things: + # * Flush the underlying compressobj (with Z_FINISH) and write + # the result + # * Write the (4-byte) CRC + # * Write the (4-byte) uncompressed length + # NB: if we wrote an index, the flush writes exactly 5 bytes, + # for 13 bytes total + self.gzip_fp.close() + + def write(self, data): + if not data: + return 0 + self.flushed = False + self.pos += len(data) + return self.gzip_fp.write(data) + + def flush(self): + """ + Ensure the gzip stream has been flushed using Z_FULL_FLUSH. + + By default, the gzip module uses Z_SYNC_FLUSH; this ensures that all + data is compressed and written to the stream, but retains some state + in the compressor. A full flush, by contrast, ensures no state may + carry over, allowing a reader to seek to the end of the flush and + start reading with a fresh decompressor. + """ + if not self.flushed: + # always use full flushes; this allows us to just start reading + # at the start of any section + self.gzip_fp.flush(zlib.Z_FULL_FLUSH) + self.flushed = True + + def tell(self): + """ + Return the position in the underlying (compressed) stream. + + Since this is primarily useful to get a position you may seek to later + and start reading, flush the writer first. + + If you want the position within the *uncompressed* stream, use the + ``pos`` attribute. + """ + self.flush() + return self.raw_fp.tell() + + def _set_compression_level(self, lvl): + # two valid deflate streams may be concatenated to produce another + # valid deflate stream, so finish the one stream... + self.flush() + # ... so we can start up another with whatever level we want + self.gzip_fp.compress = zlib.compressobj( + lvl, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) + + +class RingWriter(GzipWriter): + """ + Helper for writing ring files to later be read by a ``RingReader`` + + This has a few key features on top of a standard ``GzipFile``: + + * Helpers for writing length-value encoded BLOBs + * The ability to define named sections which will be written as + an index at the end of the file + * Flushes always use Z_FULL_FLUSH to support seeking. + + Note that the index will only be written if named sections were defined. + """ + checksum_method = 'sha256' + + def __init__(self, *a, **kw): + super(RingWriter, self).__init__(*a, **kw) + # index entries look like + # section: [ + # compressed start, + # uncompressed start, + # compressed end, + # uncompressed end, + # checksum_method, + # checksum_value + # ] + self.index = {} + self.current_section = None + self.checksum = None + + @contextlib.contextmanager + def section(self, name): + """ + Define a named section. + + Return a context manager; the section contains whatever data is written + within that context. + + The index will be updated to include the section and its starting + positions upon entering the context; upon exiting normally, the index + will be updated again with the ending positions and checksum + information. + """ + if self.current_section: + raise ValueError('Cannot create new section; currently writing %r' + % self.current_section) + allowed = string.ascii_letters + string.digits + '/-' + if any(c not in allowed for c in name): + raise ValueError('Section has invalid name: %s' % name) + if name in self.index: + raise ValueError('Cannot write duplicate section: %s' % name) + self.flush() + self.current_section = name + self.index[name] = IndexEntry(self.tell(), self.pos) + checksum_class = getattr(hashlib, self.checksum_method) + self.checksum = checksum_class() + try: + yield self + self.flush() + self.index[name] = dataclasses.replace( + self.index[name], + compressed_end=self.tell(), + uncompressed_end=self.pos, + checksum_method=self.checksum_method, + checksum_value=self.checksum.hexdigest(), + ) + finally: + self.flush() + self.checksum = None + self.current_section = None + + def write(self, data): + if self.checksum: + self.checksum.update(data) + return super().write(data) + + def close(self): + if self.index: + # only write index if we made use of any sections + self.write_index() + super().close() + + def write_magic(self, version): + """ + Write our file magic for identifying Swift rings. + + :param version: the ring version; should be 1 or 2 + """ + if self.pos != 0: + raise IOError("Magic must be written at the start of the file") + # switch to uncompressed, so libmagic can know what to expect + self._set_compression_level(0) + self.write(struct.pack("!4sH", b"R1NG", version)) + self._set_compression_level(9) + + def write_size(self, size, fmt=V2_SIZE_FORMAT): + """ + Write a size (often a BLOB-length, but sometimes a file offset). + + :param data: the size to write + :param fmt: the struct format to use when writing the length. + All v2 BLOBs should use ``!Q``. + """ + self.write(struct.pack(fmt, size)) + + def write_blob(self, data, fmt=V2_SIZE_FORMAT): + """ + Write a length-value encoded BLOB. + + :param data: the bytes to write + :param fmt: the struct format to use when writing the length. + All v2 BLOBs should use ``!Q``. + """ + self.write_size(len(data), fmt) + self.write(data) + + def write_json(self, data, fmt=V2_SIZE_FORMAT): + """ + Write a length-value encoded JSON BLOB. + + :param data: the JSON-serializable data to write + :param fmt: the struct format to use when writing the length. + All v2 BLOBs should use ``!Q``. + """ + json_data = json.dumps(data, sort_keys=True, ensure_ascii=True) + self.write_blob(json_data.encode('ascii'), fmt) + + def write_ring_table(self, table): + """ + Write a length-value encoded replica2part2dev table, or similar. + Should *not* be used for v1 rings, as there's always a ``!Q`` size + prefix, and values are written in network order. + :param table: list of arrays + """ + dev_id_bytes = table[0].itemsize if table else 0 + assignments = sum(len(a) for a in table) + self.write_size(assignments * dev_id_bytes) + for row in table: + with network_order_array(row): + row.tofile(self) + + def write_index(self): + """ + Write the index and its starting position at the end of the file. + + Callers should not need to use this themselves; it will be done + automatically when using the writer as a context manager. + """ + with self.section('swift/index'): + self.write_json({ + k: dataclasses.astuple(v) + for k, v in self.index.items() + }) + # switch to uncompressed + self._set_compression_level(0) + # ... which allows us to know that each of these write_size/flush pairs + # will write exactly 18 bytes to disk + self.write_size(self.index['swift/index'].uncompressed_start) + self.flush() + # This is the one we really care about in Swift code, but sometimes + # ops write their own tools and sometimes those just buffer all the + # decoded content + self.write_size(self.index['swift/index'].compressed_start) + self.flush() diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index e96270ca22..42c3d279e5 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -14,26 +14,37 @@ # limitations under the License. import array -import contextlib import json from collections import defaultdict -from gzip import GzipFile from os.path import getmtime import struct from time import time import os from itertools import chain, count -from tempfile import NamedTemporaryFile import sys -import zlib -from swift.common.exceptions import RingLoadError +from swift.common.exceptions import RingLoadError, DevIdBytesTooSmall from swift.common.utils import hash_path, validate_configuration, md5 -from swift.common.ring.utils import tiers_for_dev +from swift.common.ring.io import RingReader, RingWriter +from swift.common.ring.utils import tiers_for_dev, BYTES_TO_TYPE_CODE DEFAULT_RELOAD_TIME = 15 +RING_CODECS = { + 1: { + "serialize": lambda ring_data, writer: ring_data.serialize_v1(writer), + "deserialize": lambda cls, reader, metadata_only, _include_devices: + cls.deserialize_v1(reader, metadata_only=metadata_only), + }, + 2: { + "serialize": lambda ring_data, writer: ring_data.serialize_v2(writer), + "deserialize": lambda cls, reader, metadata_only, include_devices: + cls.deserialize_v2(reader, metadata_only=metadata_only, + include_devices=include_devices), + }, +} +DEFAULT_RING_FORMAT_VERSION = 1 def calc_replica_count(replica2part2dev_id): @@ -59,57 +70,6 @@ def normalize_devices(devs): dev.setdefault('replication_port', dev['port']) -class RingReader(object): - chunk_size = 2 ** 16 - - def __init__(self, filename): - self.fp = open(filename, 'rb') - self._reset() - - def _reset(self): - self._buffer = b'' - self.size = 0 - self.raw_size = 0 - self._md5 = md5(usedforsecurity=False) - self._decomp = zlib.decompressobj(32 + zlib.MAX_WBITS) - - @property - def close(self): - return self.fp.close - - def seek(self, pos, ref=0): - if (pos, ref) != (0, 0): - raise NotImplementedError - self._reset() - return self.fp.seek(pos, ref) - - def _buffer_chunk(self): - chunk = self.fp.read(self.chunk_size) - if not chunk: - return False - self.size += len(chunk) - self._md5.update(chunk) - chunk = self._decomp.decompress(chunk) - self.raw_size += len(chunk) - self._buffer += chunk - return True - - def read(self, amount=-1): - if amount < 0: - raise IOError("don't be greedy") - - while amount > len(self._buffer): - if not self._buffer_chunk(): - break - - result, self._buffer = self._buffer[:amount], self._buffer[amount:] - return result - - @property - def md5(self): - return self._md5.hexdigest() - - class RingData(object): """Partitioned consistent hashing ring data (used for serialization).""" @@ -124,15 +84,37 @@ class RingData(object): self._part_shift = part_shift self.next_part_power = next_part_power self.version = version - self.md5 = self.size = self.raw_size = None + self.format_version = None + self.size = self.raw_size = None + # Next two are used when replica2part2dev is empty + self._dev_id_bytes = 2 + self._replica_count = 0 + self._num_devs = sum(1 if dev is not None else 0 for dev in self.devs) @property def replica_count(self): """Number of replicas (full or partial) used in the ring.""" - return calc_replica_count(self._replica2part2dev_id) + if self._replica2part2dev_id: + return calc_replica_count(self._replica2part2dev_id) + else: + return self._replica_count + + @property + def part_power(self): + return 32 - self._part_shift + + @property + def dev_id_bytes(self): + if self._replica2part2dev_id: + # There's an assumption that these will all have the same itemsize, + # but just in case... + return max(part2dev_id.itemsize + for part2dev_id in self._replica2part2dev_id) + else: + return self._dev_id_bytes @classmethod - def deserialize_v1(cls, gz_file, metadata_only=False): + def deserialize_v1(cls, reader, metadata_only=False): """ Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`, and `replica2part2dev_id` keys. @@ -141,25 +123,32 @@ class RingData(object): `replica2part2dev_id` is not loaded and that key in the returned dictionary just has the value `[]`. - :param file gz_file: An opened file-like object which has already - consumed the 6 bytes of magic and version. + :param RingReader reader: An opened RingReader which has already + loaded the index at the end, gone back to the + front, and consumed the 6 bytes of magic and + version. :param bool metadata_only: If True, only load `devs` and `part_shift` :returns: A dict containing `devs`, `part_shift`, and `replica2part2dev_id` """ + if reader.tell() == 0: + magic = reader.read(6) + if magic != b'R1NG\x00\x01': + raise ValueError('unexpected magic: %r' % magic) - json_len, = struct.unpack('!I', gz_file.read(4)) - ring_dict = json.loads(gz_file.read(json_len)) + ring_dict = json.loads(reader.read_blob('!I')) ring_dict['replica2part2dev_id'] = [] + ring_dict['dev_id_bytes'] = 2 if metadata_only: return ring_dict byteswap = (ring_dict.get('byteorder', sys.byteorder) != sys.byteorder) + type_code = BYTES_TO_TYPE_CODE[ring_dict['dev_id_bytes']] partition_count = 1 << (32 - ring_dict['part_shift']) for x in range(ring_dict['replica_count']): - part2dev = array.array('H', gz_file.read(2 * partition_count)) + part2dev = array.array(type_code, reader.read(2 * partition_count)) if byteswap: part2dev.byteswap() ring_dict['replica2part2dev_id'].append(part2dev) @@ -167,7 +156,50 @@ class RingData(object): return ring_dict @classmethod - def load(cls, filename, metadata_only=False): + def deserialize_v2(cls, reader, metadata_only=False, include_devices=True): + """ + Deserialize a v2 ring file into a dictionary with ``devs``, + ``part_shift``, and ``replica2part2dev_id`` keys. + + If the optional kwarg ``metadata_only`` is True, then the + ``replica2part2dev_id`` is not loaded and that key in the returned + dictionary just has the value ``[]``. + + If the optional kwarg ``include_devices`` is False, then the ``devs`` + list is not loaded and that key in the returned dictionary just has + the value ``[]``. + + :param file reader: An opened file-like object which has already + consumed the 6 bytes of magic and version. + :param bool metadata_only: If True, skip loading + ``replica2part2dev_id`` + :param bool include_devices: If False and ``metadata_only`` is True, + skip loading ``devs`` + :returns: A dict containing ``devs``, ``part_shift``, + ``dev_id_bytes``, and ``replica2part2dev_id`` + """ + + ring_dict = json.loads(reader.read_section('swift/ring/metadata')) + ring_dict['replica2part2dev_id'] = [] + ring_dict['devs'] = [] + + if not metadata_only or include_devices: + ring_dict['devs'] = json.loads( + reader.read_section('swift/ring/devices')) + + if metadata_only: + return ring_dict + + partition_count = 1 << (32 - ring_dict['part_shift']) + + with reader.open_section('swift/ring/assignments') as section: + ring_dict['replica2part2dev_id'] = section.read_ring_table( + ring_dict['dev_id_bytes'], partition_count) + + return ring_dict + + @classmethod + def load(cls, filename, metadata_only=False, include_devices=True): """ Load ring data from a file. @@ -175,32 +207,37 @@ class RingData(object): :param bool metadata_only: If True, only load `devs` and `part_shift`. :returns: A RingData instance containing the loaded data. """ - with contextlib.closing(RingReader(filename)) as gz_file: - # See if the file is in the new format - magic = gz_file.read(4) - if magic != b'R1NG': - raise Exception('Bad ring magic %r for %r' % ( - magic, filename)) + with RingReader.open(filename) as reader: + if reader.version not in RING_CODECS: + raise Exception('Unknown ring format version %d for %r' % ( + reader.version, filename)) + ring_data = RING_CODECS[reader.version]['deserialize']( + cls, reader, metadata_only, include_devices) - format_version, = struct.unpack('!H', gz_file.read(2)) - if format_version == 1: - ring_data = cls.deserialize_v1( - gz_file, metadata_only=metadata_only) - else: - raise Exception('Unknown ring format version %d for %r' % - (format_version, filename)) - - ring_data = RingData(ring_data['replica2part2dev_id'], - ring_data['devs'], ring_data['part_shift'], - ring_data.get('next_part_power'), - ring_data.get('version')) - for attr in ('md5', 'size', 'raw_size'): - setattr(ring_data, attr, getattr(gz_file, attr)) + ring_data = cls.from_dict(ring_data) + ring_data.format_version = reader.version + for attr in ('size', 'raw_size'): + setattr(ring_data, attr, getattr(reader, attr)) return ring_data - def serialize_v1(self, file_obj): + @classmethod + def from_dict(cls, ring_data): + ring = cls(ring_data['replica2part2dev_id'], + ring_data['devs'], ring_data['part_shift'], + ring_data.get('next_part_power'), + ring_data.get('version')) + # For loading with metadata_only=True + if 'replica_count' in ring_data: + ring._replica_count = ring_data['replica_count'] + # dev_id_bytes only written down in v2 and above + ring._dev_id_bytes = ring_data.get('dev_id_bytes', 2) + return ring + + def serialize_v1(self, writer): + if self.dev_id_bytes != 2: + raise DevIdBytesTooSmall('Ring v1 only supports 2-byte dev IDs') # Write out new-style serialization magic and version: - file_obj.write(struct.pack('!4sH', b'R1NG', 1)) + writer.write_magic(version=1) ring = self.to_dict() # Only include next_part_power if it is set in the @@ -216,40 +253,62 @@ class RingData(object): if next_part_power is not None: _text['next_part_power'] = next_part_power - json_text = json.dumps(_text, sort_keys=True, - ensure_ascii=True).encode('ascii') - json_len = len(json_text) - file_obj.write(struct.pack('!I', json_len)) - file_obj.write(json_text) - for part2dev_id in ring['replica2part2dev_id']: - part2dev_id.tofile(file_obj) + writer.write_json(_text, '!I') - def save(self, filename, mtime=1300507380.0): + for part2dev_id in ring['replica2part2dev_id']: + part2dev_id.tofile(writer) + + def serialize_v2(self, writer): + writer.write_magic(version=2) + ring = self.to_dict() + + # Only include next_part_power if it is set in the + # builder, otherwise just ignore it + _text = { + 'part_shift': ring['part_shift'], + 'dev_id_bytes': ring['dev_id_bytes'], + 'replica_count': calc_replica_count(ring['replica2part2dev_id']), + 'version': ring['version']} + + next_part_power = ring.get('next_part_power') + if next_part_power is not None: + _text['next_part_power'] = next_part_power + + with writer.section('swift/ring/metadata'): + writer.write_json(_text) + + with writer.section('swift/ring/devices'): + writer.write_json(ring['devs']) + + with writer.section('swift/ring/assignments'): + writer.write_ring_table(ring['replica2part2dev_id']) + + def save(self, filename, mtime=1300507380.0, + format_version=DEFAULT_RING_FORMAT_VERSION): """ Serialize this RingData instance to disk. :param filename: File into which this instance should be serialized. :param mtime: time used to override mtime for gzip, default or None if the caller wants to include time + :param format_version: one of 0, 1, or 2. Older versions are retained + for the sake of clusters on older versions """ + if format_version not in RING_CODECS: + raise ValueError("format_version must be one of %r" % (tuple( + RING_CODECS.keys()),)) # Override the timestamp so that the same ring data creates # the same bytes on disk. This makes a checksum comparison a # good way to see if two rings are identical. - tempf = NamedTemporaryFile(dir=".", prefix=filename, delete=False) - gz_file = GzipFile(filename, mode='wb', fileobj=tempf, mtime=mtime) - self.serialize_v1(gz_file) - gz_file.close() - tempf.flush() - os.fsync(tempf.fileno()) - tempf.close() - os.chmod(tempf.name, 0o644) - os.rename(tempf.name, filename) + with RingWriter.open(filename, mtime) as writer: + RING_CODECS[format_version]['serialize'](self, writer) def to_dict(self): return {'devs': self.devs, 'replica2part2dev_id': self._replica2part2dev_id, 'part_shift': self._part_shift, 'next_part_power': self.next_part_power, + 'dev_id_bytes': self.dev_id_bytes, 'version': self.version} @@ -296,13 +355,13 @@ class Ring(object): self._mtime = getmtime(self.serialized_path) self._devs = ring_data.devs + self._dev_id_bytes = ring_data._dev_id_bytes self._replica2part2dev_id = ring_data._replica2part2dev_id self._part_shift = ring_data._part_shift self._rebuild_tier_data() self._update_bookkeeping() self._next_part_power = ring_data.next_part_power self._version = ring_data.version - self._md5 = ring_data.md5 self._size = ring_data.size self._raw_size = ring_data.raw_size @@ -340,6 +399,16 @@ class Ring(object): self._num_zones = len(zones) self._num_ips = len(ips) + @property + def dev_id_bytes(self): + if self._replica2part2dev_id: + # There's an assumption that these will all have the same itemsize, + # but just in case... + return max(part2dev_id.itemsize + for part2dev_id in self._replica2part2dev_id) + else: + return self._dev_id_bytes + @property def next_part_power(self): if time() > self._rtime: @@ -354,10 +423,6 @@ class Ring(object): def version(self): return self._version - @property - def md5(self): - return self._md5 - @property def size(self): return self._size diff --git a/swift/common/ring/utils.py b/swift/common/ring/utils.py index ee29363bad..bdb0e9692d 100644 --- a/swift/common/ring/utils.py +++ b/swift/common/ring/utils.py @@ -12,16 +12,84 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import array from collections import defaultdict +import contextlib import optparse import re import socket +import sys from swift.common import exceptions from swift.common.utils import expand_ipv6, is_valid_ip, is_valid_ipv4, \ is_valid_ipv6 +BYTES_TO_TYPE_CODE = { + # We don't support 1 byte arrays. For backwards compatibility reasons. + 2: 'H', + # Note that on some platforms, array.array('I') will be limited to 2-byte + # values. At the same time, however, using 'L' would get us 8-byte values + # on many platforms we care about. Use 'I' for now; hold off on writing + # custom array (de)serialization methods until someone actually complains. + 4: 'I', + # This just seems excessive; besides, array.array() only takes it on py33+ + # 8: 'Q', +} + + +def none_dev_id(dev_id_bytes): + ''' + we can't store None's in the replica2part2dev array, so we high-jack + the max value for magic to represent the part is not currently + assigned to any device. + ''' + return 2 ** (8 * dev_id_bytes) - 1 + + +def calc_dev_id_bytes(max_dev_id): + if max_dev_id < 0: + raise ValueError("Can't have negative device IDs") + for x in sorted(BYTES_TO_TYPE_CODE): + if max_dev_id < none_dev_id(x): + return x + else: + # > 4B devices?? + raise exceptions.DevIdBytesTooSmall('Way too many devices!') + + +def resize_array(old_arr, new_dev_id_bytes): + """ + Copy an array to use a new itemsize, while preserving none_dev_id values + """ + old_none_dev = none_dev_id(old_arr.itemsize) + new_none_dev = none_dev_id(new_dev_id_bytes) + return array.array( + BYTES_TO_TYPE_CODE[new_dev_id_bytes], + (new_none_dev if dev_id == old_none_dev else dev_id + for dev_id in old_arr)) + + +@contextlib.contextmanager +def network_order_array(arr): + if sys.byteorder == 'little': + # Switch to network-order for serialization + arr.byteswap() + try: + yield arr + finally: + if sys.byteorder == 'little': + # Didn't make a copy; switch it back + arr.byteswap() + + +def read_network_order_array(type_code, data): + arr = array.array(type_code, data) + if sys.byteorder == 'little': + arr.byteswap() + return arr + + def tiers_for_dev(dev): """ Returns a tuple of tiers for a given device in ascending order by diff --git a/test/unit/cli/test_default_output.stub b/test/unit/cli/test_default_output.stub index e2daa77d38..3bb9f5e44c 100644 --- a/test/unit/cli/test_default_output.stub +++ b/test/unit/cli/test_default_output.stub @@ -1,5 +1,5 @@ __RINGFILE__, build version 4, id (not assigned) -64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion +64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion The minimum number of hours before a partition can be reassigned is 1 (0:00:00 remaining) The overload factor is 0.00% (0.000000) Ring file __RINGFILE__.ring.gz not found, probably it hasn't been written yet diff --git a/test/unit/cli/test_default_output_id_assigned.stub b/test/unit/cli/test_default_output_id_assigned.stub index 9f60c03b50..b5a4abcfea 100644 --- a/test/unit/cli/test_default_output_id_assigned.stub +++ b/test/unit/cli/test_default_output_id_assigned.stub @@ -1,5 +1,5 @@ __RINGFILE__, build version 4, id __BUILDER_ID__ -64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion +64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion The minimum number of hours before a partition can be reassigned is 1 (0:00:00 remaining) The overload factor is 0.00% (0.000000) Ring file __RINGFILE__.ring.gz not found, probably it hasn't been written yet diff --git a/test/unit/cli/test_default_sorted_output.stub b/test/unit/cli/test_default_sorted_output.stub index ab510fd3d3..55af7c5306 100644 --- a/test/unit/cli/test_default_sorted_output.stub +++ b/test/unit/cli/test_default_sorted_output.stub @@ -1,10 +1,10 @@ __RINGFILE__, build version 9, id __BUILDER_ID__ -64 partitions, 3.000000 replicas, 2 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion +64 partitions, 3.000000 replicas, 2 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion The minimum number of hours before a partition can be reassigned is 1 (1:00:00 remaining) The overload factor is 0.00% (0.000000) Ring file __RINGFILE__.ring.gz is obsolete Devices: id region zone ip address:port replication ip:port name weight partitions balance flags meta - 1 1 1 127.0.0.2:6201 127.0.0.2:6201 sda2 100.00 64 33.33 - 4 1 2 127.0.0.5:6004 127.0.0.5:6004 sda5 100.00 64 33.33 - 0 2 1 127.0.0.6:6005 127.0.0.6:6005 sdb6 100.00 0 -100.00 - 2 2 2 127.0.0.3:6202 127.0.0.3:6202 sdc3 100.00 64 33.33 + 1 1 1 127.0.0.2:6201 127.0.0.2:6201 sda2 100.00 64 33.33 + 4 1 2 127.0.0.5:6004 127.0.0.5:6004 sda5 100.00 64 33.33 + 0 2 1 127.0.0.6:6005 127.0.0.6:6005 sdb6 100.00 0 -100.00 + 2 2 2 127.0.0.3:6202 127.0.0.3:6202 sdc3 100.00 64 33.33 diff --git a/test/unit/cli/test_ipv6_output.stub b/test/unit/cli/test_ipv6_output.stub index 30be348618..8571b79db7 100644 --- a/test/unit/cli/test_ipv6_output.stub +++ b/test/unit/cli/test_ipv6_output.stub @@ -1,5 +1,5 @@ __RINGFILE__, build version 4, id __BUILDER_ID__ -256 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion +256 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion The minimum number of hours before a partition can be reassigned is 1 (0:00:00 remaining) The overload factor is 0.00% (0.000000) Ring file __RINGFILE__.ring.gz not found, probably it hasn't been written yet diff --git a/test/unit/cli/test_ringbuilder.py b/test/unit/cli/test_ringbuilder.py index 385fe9b71d..82ec9d301f 100644 --- a/test/unit/cli/test_ringbuilder.py +++ b/test/unit/cli/test_ringbuilder.py @@ -31,6 +31,7 @@ from swift.cli import ringbuilder from swift.cli.ringbuilder import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR from swift.common import exceptions from swift.common.ring import RingBuilder +from swift.common.ring.io import RingReader from swift.common.ring.composite_builder import CompositeRingBuilder from test.unit import Timeout, write_stub_builder @@ -2121,7 +2122,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): expected = "%s, build version 6, id %s\n" \ "64 partitions, 3.000000 replicas, 4 regions, 4 zones, " \ - "4 devices, 100.00 balance, 0.00 dispersion\n" \ + "4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion\n" \ "The minimum number of hours before a partition can be " \ "reassigned is 1 (0:00:00 remaining)\n" \ "The overload factor is 0.00%% (0.000000)\n" \ @@ -2395,6 +2396,23 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): self.assertSystemExit(EXIT_ERROR, ringbuilder.main, argv) def test_rebalance_remove_zero_weighted_device(self): + self.create_sample_ring() + ring = RingBuilder.load(self.tmpfile) + ring.set_dev_weight(2, 0.0) + ring.rebalance() + ring.pretend_min_part_hours_passed() + ring.remove_dev(2) + ring.save(self.tmpfile) + + # Test rebalance after remove 0 weighted device + argv = ["", self.tmpfile, "rebalance", "3"] + self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + ring = RingBuilder.load(self.tmpfile) + self.assertTrue(ring.validate()) + self.assertEqual(len(ring.devs), 4) + self.assertIsNone(ring.devs[2]) + + def test_rebalance_remove_off_end_trims_dev_list(self): self.create_sample_ring() ring = RingBuilder.load(self.tmpfile) ring.set_dev_weight(3, 0.0) @@ -2408,7 +2426,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) ring = RingBuilder.load(self.tmpfile) self.assertTrue(ring.validate()) - self.assertIsNone(ring.devs[3]) + self.assertEqual(len(ring.devs), 3) def test_rebalance_resets_time_remaining(self): self.create_sample_ring() @@ -2546,12 +2564,32 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): argv = ["", self.tmpfile, "write_ring"] self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + for version in ("1", "2"): + argv = ["", self.tmpfile, "write_ring", "--format-version", + version] + self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + with RingReader.open("%s.ring.gz" % self.tmpfile) as reader: + self.assertEqual(int(version), reader.version) + + exp_results = {'valid_exit_codes': [EXIT_ERROR]} + out, err = self.run_srb("write_ring", "--format-version", "3", + exp_results=exp_results) + self.assertIn('invalid choice', err) + def test_write_empty_ring(self): ring = RingBuilder(6, 3, 1) ring.save(self.tmpfile) - exp_results = {'valid_exit_codes': [2]} + exp_results = {'valid_exit_codes': [EXIT_ERROR]} out, err = self.run_srb("write_ring", exp_results=exp_results) - self.assertEqual('Unable to write empty ring.\n', out) + exp_out = 'Unable to write empty ring.\n' + self.assertEqual(exp_out, out[-len(exp_out):]) + self.assertIn("Defaulting to --format-version=1", out) + + for version in (1, 2): + out, err = self.run_srb("write_ring", + "--format-version={}".format(version), + exp_results=exp_results) + self.assertEqual(exp_out, out) def test_write_builder(self): # Test builder file already exists @@ -2637,6 +2675,133 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin): argv = ["", self.tmpfile + '.builder', "rebalance"] self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv) + def test_version_serialization_default(self): + self.create_sample_ring() + rb = RingBuilder.load(self.tmpfile) + rb.rebalance() + rd = rb.get_ring() + rd.save(self.tmpfile + ".ring.gz") + + ring_file = os.path.join(os.path.dirname(self.tmpfile), + os.path.basename(self.tmpfile) + ".ring.gz") + + argv = ["", ring_file, "version"] + mock_stdout = io.StringIO() + with mock.patch("sys.stdout", mock_stdout): + self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + + expected = ("%s.ring.gz: Serialization version: 1 (2-byte IDs), " + "build version: 5\n" % self.tmpfile) + self.assertEqual(expected, mock_stdout.getvalue()) + + def test_version_serialization_1(self): + self.create_sample_ring() + rb = RingBuilder.load(self.tmpfile) + rb.rebalance() + rd = rb.get_ring() + rd.save(self.tmpfile + ".ring.gz", format_version=1) + + ring_file = os.path.join(os.path.dirname(self.tmpfile), + os.path.basename(self.tmpfile) + ".ring.gz") + + argv = ["", ring_file, "version"] + mock_stdout = io.StringIO() + with mock.patch("sys.stdout", mock_stdout): + self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + + expected = ("%s.ring.gz: Serialization version: 1 (2-byte IDs), " + "build version: 5\n" % self.tmpfile) + self.assertEqual(expected, mock_stdout.getvalue()) + + def test_version_serialization_2(self): + self.create_sample_ring() + rb = RingBuilder.load(self.tmpfile) + rb.rebalance() + rd = rb.get_ring() + rd.save(self.tmpfile + ".ring.gz", format_version=2) + + ring_file = os.path.join(os.path.dirname(self.tmpfile), + os.path.basename(self.tmpfile) + ".ring.gz") + + argv = ["", ring_file, "version"] + mock_stdout = io.StringIO() + with mock.patch("sys.stdout", mock_stdout): + self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + + expected = ("%s.ring.gz: Serialization version: 2 (2-byte IDs), " + "build version: 5\n" % self.tmpfile) + self.assertEqual(expected, mock_stdout.getvalue()) + + def test_version_from_builder_file(self): + self.create_sample_ring() + rb = RingBuilder.load(self.tmpfile) + rb.rebalance() + rd = rb.get_ring() + rd.save(self.tmpfile + ".ring.gz", format_version=2) + + # read version from ring when builder file given as argument + argv = ["", self.tmpfile, "version"] + mock_stdout = io.StringIO() + with mock.patch("sys.stdout", mock_stdout): + self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + + # output still reports ring file + expected = ("%s.ring.gz: Serialization version: 2 (2-byte IDs), " + "build version: 5\n" % self.tmpfile) + self.assertEqual(expected, mock_stdout.getvalue()) + + def test_version_with_builder_file_missing(self): + self.create_sample_ring() + rb = RingBuilder.load(self.tmpfile) + rb.rebalance() + rd = rb.get_ring() + rd.save(self.tmpfile + ".ring.gz", format_version=2) + + # remove the builder to hit some interesting except blocks in main + os.unlink(self.tmpfile) + + test_args = [ + # explicit ring file version of course works when builder missing + self.tmpfile + ".ring.gz", + # even when builder file is missing you can still implicitly + # identify the ring file and read the version + self.tmpfile, + ] + + for path in test_args: + argv = ["", path, "version"] + mock_stdout = io.StringIO() + with mock.patch("sys.stdout", mock_stdout): + self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv) + + expected = ("%s.ring.gz: Serialization version: 2 (2-byte IDs), " + "build version: 5\n" % self.tmpfile) + self.assertEqual(expected, mock_stdout.getvalue()) + + # but of course if the path is nonsensical we get an error + argv = ["", self.tmpfile + ".nonsense", "version"] + with self.assertRaises(FileNotFoundError): + ringbuilder.main(argv) + + def test_version_from_builder_file_with_ring_missing(self): + self.create_sample_ring() + rb = RingBuilder.load(self.tmpfile) + rb.rebalance() + # Don't even bother to write the ring + + test_args = [ + self.tmpfile + ".ring.gz", + # If provided with the (existing) builder, we can infer the + # (nonexisting) ring + self.tmpfile, + ] + + for path in test_args: + argv = ["", path, "version"] + # Gotta have a ring to get the version info + with self.assertRaises(FileNotFoundError): + ringbuilder.main(argv) + def test_warn_at_risk(self): # check that warning is generated when rebalance does not achieve # satisfactory balance diff --git a/test/unit/common/ring/test_builder.py b/test/unit/common/ring/test_builder.py index 4b58b8f4f9..bec767bc02 100644 --- a/test/unit/common/ring/test_builder.py +++ b/test/unit/common/ring/test_builder.py @@ -865,7 +865,7 @@ class TestRingBuilder(unittest.TestCase): rb.add_dev({'id': 2, 'region': 0, 'zone': 2, 'weight': 1, 'ip': '127.0.0.1', 'port': 10002, 'device': 'sda1'}) self.assertFalse(rb.ever_rebalanced) - builder_file = os.path.join(self.testdir, 'test.buider') + builder_file = os.path.join(self.testdir, 'test.builder') rb.save(builder_file) rb = ring.RingBuilder.load(builder_file) self.assertFalse(rb.ever_rebalanced) @@ -2055,12 +2055,18 @@ class TestRingBuilder(unittest.TestCase): for d in devs: rb.add_dev(d) rb.rebalance() + # There are so few devs, they should fit into 1 byte dev_ids but we + # store in a minimum of 2 for backwards compat. + self.assertEqual(rb.dev_id_bytes, 2) + self.assertEqual(rb._replica2part2dev[0].itemsize, 2) builder_file = os.path.join(self.testdir, 'test_save.builder') rb.save(builder_file) loaded_rb = ring.RingBuilder.load(builder_file) self.maxDiff = None self.assertEqual(loaded_rb.to_dict(), rb.to_dict()) self.assertEqual(loaded_rb.overload, 3.14159) + self.assertEqual(loaded_rb.dev_id_bytes, 2) + self.assertEqual(loaded_rb._replica2part2dev[0].itemsize, 2) @mock.patch('builtins.open', autospec=True) @mock.patch('swift.common.ring.builder.pickle.dump', autospec=True) @@ -2718,13 +2724,14 @@ class TestRingBuilder(unittest.TestCase): # try with contiguous holes at beginning add_dev_count = 6 rb = self._add_dev_delete_first_n(add_dev_count, add_dev_count - 3) + self.assertEqual([None, None, None, 3, 4, 5], [ + None if d is None else d['id'] for d in rb.devs]) new_dev_id = rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1', 'port': 6200, 'weight': 1.0, 'device': 'sda'}) self.assertLess(new_dev_id, add_dev_count) # try with non-contiguous holes - # [0, 1, None, 3, 4, None] rb2 = ring.RingBuilder(8, 3, 1) for i in range(6): rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1', @@ -2735,23 +2742,33 @@ class TestRingBuilder(unittest.TestCase): rb2.remove_dev(5) rb2.pretend_min_part_hours_passed() rb2.rebalance() + # List gets trimmed during rebalance + self.assertEqual([0, 1, None, 3, 4], [ + None if d is None else d['id'] for d in rb2.devs]) first = rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1', 'port': 6200, 'weight': 1.0, 'device': 'sda'}) + self.assertEqual(first, 2) + self.assertEqual([0, 1, 2, 3, 4], [ + None if d is None else d['id'] for d in rb2.devs]) second = rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1', 'port': 6200, 'weight': 1.0, 'device': 'sda'}) + self.assertEqual(second, 5) + self.assertEqual([0, 1, 2, 3, 4, 5], [ + None if d is None else d['id'] for d in rb2.devs]) # add a new one (without reusing a hole) third = rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1', 'port': 6200, 'weight': 1.0, 'device': 'sda'}) - self.assertEqual(first, 2) - self.assertEqual(second, 5) self.assertEqual(third, 6) + self.assertEqual([0, 1, 2, 3, 4, 5, 6], [ + None if d is None else d['id'] for d in rb2.devs]) def test_reuse_of_dev_holes_with_id(self): add_dev_count = 6 rb = self._add_dev_delete_first_n(add_dev_count, add_dev_count - 3) + self.assertEqual([None, None, None, 3, 4, 5], [ + None if d is None else d['id'] for d in rb.devs]) # add specifying id exp_new_dev_id = 2 - # [dev, dev, None, dev, dev, None] try: new_dev_id = rb.add_dev({'id': exp_new_dev_id, 'region': 0, 'zone': 0, 'ip': '127.0.0.1', @@ -2760,6 +2777,41 @@ class TestRingBuilder(unittest.TestCase): self.assertEqual(new_dev_id, exp_new_dev_id) except exceptions.DuplicateDeviceError: self.fail("device hole not reused") + self.assertEqual([None, None, 2, 3, 4, 5], [ + None if d is None else d['id'] for d in rb.devs]) + + def test_wide_device_limits(self): + rb = ring.RingBuilder(8, 2, 1) + rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'ip': '127.0.0.1', + 'port': 6200, 'weight': 1.0, 'device': 'sda'}) + new_id = 2 ** 16 - 2 + rb.add_dev({'id': new_id, 'region': 0, 'zone': 0, 'ip': '127.0.0.1', + 'port': 6200, 'weight': 1.0, 'device': 'sdb'}) + rb.rebalance() + self.assertEqual(rb._replica2part2dev[0].itemsize, 2) + self.assertEqual([0] + [None] * (new_id - 1) + [new_id], [ + None if d is None else d['id'] for d in rb.devs]) + + # Special value used for removed devices in 2-byte-dev-id rings + new_id = 2 ** 16 - 1 + rb.add_dev({'id': new_id, 'region': 0, 'zone': 0, 'ip': '127.0.0.1', + 'port': 6200, 'weight': 1.0, 'device': 'sdc'}) + rb.rebalance() + # so we get kicked over to 4 + self.assertEqual(rb._replica2part2dev[0].itemsize, 4) + self.assertEqual([0] + [None] * (new_id - 2) + [new_id - 1, new_id], [ + None if d is None else d['id'] for d in rb.devs]) + + +class TestPartPowerIncrease(unittest.TestCase): + + FORMAT_VERSION = 1 + + def setUp(self): + self.testdir = mkdtemp() + + def tearDown(self): + rmtree(self.testdir, ignore_errors=1) def test_prepare_increase_partition_power(self): ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz') @@ -2788,7 +2840,7 @@ class TestRingBuilder(unittest.TestCase): # Save .ring.gz, and load ring from it to ensure prev/next is set rd = rb.get_ring() - rd.save(ring_file) + rd.save(ring_file, format_version=self.FORMAT_VERSION) r = ring.Ring(ring_file) expected_part_shift = 32 - 8 @@ -2809,7 +2861,7 @@ class TestRingBuilder(unittest.TestCase): # Let's save the ring, and get the nodes for an object ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz') rd = rb.get_ring() - rd.save(ring_file) + rd.save(ring_file, format_version=self.FORMAT_VERSION) r = ring.Ring(ring_file) old_part, old_nodes = r.get_nodes("acc", "cont", "obj") old_version = rb.version @@ -2828,7 +2880,7 @@ class TestRingBuilder(unittest.TestCase): old_ring = r rd = rb.get_ring() - rd.save(ring_file) + rd.save(ring_file, format_version=self.FORMAT_VERSION) r = ring.Ring(ring_file) new_part, new_nodes = r.get_nodes("acc", "cont", "obj") @@ -2900,7 +2952,7 @@ class TestRingBuilder(unittest.TestCase): # Save .ring.gz, and load ring from it to ensure prev/next is set rd = rb.get_ring() - rd.save(ring_file) + rd.save(ring_file, format_version=self.FORMAT_VERSION) r = ring.Ring(ring_file) expected_part_shift = 32 - 9 @@ -2969,6 +3021,10 @@ class TestRingBuilder(unittest.TestCase): self.assertEqual(rb.version, old_version + 2) +class TestPartPowerIncreaseV2(TestPartPowerIncrease): + FORMAT_VERSION = 2 + + class TestGetRequiredOverload(unittest.TestCase): maxDiff = None diff --git a/test/unit/common/ring/test_composite_builder.py b/test/unit/common/ring/test_composite_builder.py index 99d51f16dc..956eb5ff14 100644 --- a/test/unit/common/ring/test_composite_builder.py +++ b/test/unit/common/ring/test_composite_builder.py @@ -36,7 +36,8 @@ def make_device_iter(): x = 0 base_port = 6000 while True: - yield {'region': 0, # Note that region may be replaced on the tests + yield {'id': 200 + x, + 'region': 0, # Note that region may be replaced on the tests 'zone': 0, 'ip': '10.0.0.%s' % x, 'replication_ip': '10.0.0.%s' % x, @@ -242,7 +243,7 @@ class TestCompositeBuilder(BaseTestCompositeBuilder): def test_composite_same_device_in_the_different_rings_error(self): builders = self.create_sample_ringbuilders(2) - same_device = copy.deepcopy(builders[0].devs[0]) + same_device = copy.deepcopy(builders[0].devs[200]) # create one more ring which duplicates a device in the first ring builder = RingBuilder(6, 3, 1) @@ -987,7 +988,7 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder): c = Counter(builder.devs[dev_id]['id'] for part2dev_id in builder._replica2part2dev for dev_id in part2dev_id) - return [c[d['id']] for d in builder.devs] + return [c[d['id']] for d in builder.devs if d] def get_moved_parts(self, after, before): def uniqueness(dev): diff --git a/test/unit/common/ring/test_io.py b/test/unit/common/ring/test_io.py new file mode 100644 index 0000000000..58615a9233 --- /dev/null +++ b/test/unit/common/ring/test_io.py @@ -0,0 +1,284 @@ +# Copyright (c) 2022 NVIDIA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import dataclasses +import io +import json +import os.path +import unittest +from unittest import mock +import zlib + +from swift.common.ring.io import IndexEntry, RingReader, RingWriter + +from test.unit import with_tempdir + + +class TestRoundTrip(unittest.TestCase): + def assertRepeats(self, data, pattern, n): + l = len(pattern) + self.assertEqual(len(data), n * l) + actual = collections.Counter( + data[x * l:(x + 1) * l] + for x in range(n)) + self.assertEqual(actual, {pattern: n}) + + @with_tempdir + def test_write_failure(self, tempd): + tempf = os.path.join(tempd, 'not-persisted') + try: + with RingWriter.open(tempf): + self.assertEqual(1, len(os.listdir(tempd))) + raise RuntimeError + except RuntimeError: + pass + self.assertEqual(0, len(os.listdir(tempd))) + + def test_arbitrary_bytes(self): + buf = io.BytesIO() + with RingWriter(buf) as writer: + # Still need to write good magic, or we won't be able to read + writer.write_magic(1) + # but after that, we can kinda do whatever + writer.write(b'\xde\xad\xbe\xef' * 10240) + writer.write(b'\xda\x7a\xda\x7a' * 10240) + good_pos = writer.tell() + + self.assertTrue(writer.flushed) + pos = writer.raw_fp.tell() + writer.write(b'') + self.assertTrue(writer.flushed) + self.assertEqual(pos, writer.raw_fp.tell()) + + writer.write(b'more' * 10240) + self.assertFalse(writer.flushed) + + buf.seek(0) + reader = RingReader(buf) + self.assertEqual(reader.version, 1) + self.assertEqual(reader.raw_size, 6 + 12 * 10240) + self.assertEqual(reader.read(6), b'R1NG\x00\x01') + self.assertRepeats(reader.read(40960), b'\xde\xad\xbe\xef', 10240) + self.assertRepeats(reader.read(40960), b'\xda\x7a\xda\x7a', 10240) + self.assertRepeats(reader.read(40960), b'more', 10240) + # Can seek backwards + reader.seek(good_pos) + self.assertRepeats(reader.read(40960), b'more', 10240) + # Even all the way to the beginning + reader.seek(0) + self.assertEqual(reader.read(6), b'R1NG\x00\x01') + self.assertRepeats(reader.read(40960), b'\xde\xad\xbe\xef', 10240) + # but not arbitrarily + reader.seek(good_pos - 100) + with self.assertRaises(zlib.error): + reader.read(1) + + def test_sections(self): + buf = io.BytesIO() + with RingWriter(buf) as writer: + writer.write_magic(2) + with writer.section('foo'): + writer.write_blob(b'\xde\xad\xbe\xef' * 10240) + + with writer.section('bar'): + # Sometimes you might not want to get the whole section into + # memory as a byte-string all at once (eg, when writing ring + # assignments) + writer.write_size(40960) + for _ in range(10): + writer.write(b'\xda\x7a\xda\x7a' * 1024) + + with writer.section('baz'): + writer.write_blob(b'more' * 10240) + + # Can't nest sections + with self.assertRaises(ValueError): + with writer.section('inner'): + pass + self.assertNotIn('inner', writer.index) + + writer.write(b'can add arbitrary bytes') + # ...though accessing them on read may be difficult; see below. + # This *is not* a recommended pattern -- write proper length-value + # blobs instead (even if you don't include them as sections in the + # index). + + with writer.section('quux'): + writer.write_blob(b'data' * 10240) + + # Gotta do this at the start + with self.assertRaises(IOError): + writer.write_magic(2) + + # Can't write duplicate sections + with self.assertRaises(ValueError): + with writer.section('foo'): + pass + + # We're reserving globs, so we can later support something like + # reader.load_sections('swift/ring/*') + with self.assertRaises(ValueError): + with writer.section('foo*'): + pass + + buf.seek(0) + reader = RingReader(buf) + self.assertEqual(reader.version, 2) + # Order matters! + self.assertEqual(list(reader.index), [ + 'foo', 'bar', 'baz', 'quux', 'swift/index']) + self.assertEqual({ + k: (v.uncompressed_start, v.uncompressed_end, v.checksum_method) + for k, v in reader.index.items() + }, { + 'foo': (6, 40974, 'sha256'), + 'bar': (40974, 81942, 'sha256'), + 'baz': (81942, 122910, 'sha256'), + # note the gap between baz and quux for the raw bytes + 'quux': (122933, 163901, 'sha256'), + 'swift/index': (163901, None, None), + }) + + self.assertIn('foo', reader) + self.assertNotIn('inner', reader) + + self.assertRepeats(reader.read_section('foo'), + b'\xde\xad\xbe\xef', 10240) + with reader.open_section('bar') as s: + for _ in range(10): + self.assertEqual(s.read(4), b'\xda\x7a\xda\x7a') + self.assertRepeats(s.read(), b'\xda\x7a\xda\x7a', 10230) + # If you know that one section follows another, you don't *have* + # to "open" the next one + self.assertRepeats(reader.read_blob(), b'more', 10240) + self.assertRepeats(reader.read_section('quux'), + b'data', 10240) + index_dict = json.loads(reader.read_section('swift/index')) + self.assertEqual(reader.index, { + section: IndexEntry(*entry) + for section, entry in index_dict.items()}) + + # Missing section + with self.assertRaises(KeyError) as caught: + with reader.open_section('foobar'): + pass + self.assertEqual("'foobar'", str(caught.exception)) + + # seek to the end of baz + reader.seek(reader.index['baz'].compressed_end) + # so we can read the raw bytes we stuffed in + gap_length = (reader.index['quux'].uncompressed_start - + reader.index['baz'].uncompressed_end) + self.assertGreater(gap_length, 0) + self.assertEqual(b'can add arbitrary bytes', + reader.read(gap_length)) + + def test_sections_with_corruption(self): + buf = io.BytesIO() + with RingWriter(buf) as writer: + writer.write_magic(2) + with writer.section('foo'): + writer.write_blob(b'\xde\xad\xbe\xef' * 10240) + + buf.seek(0) + reader = RingReader(buf) + # if you open a section, you better read it all! + read_bytes = b'' + with self.assertRaises(ValueError) as caught: + with reader.open_section('foo') as s: + read_bytes = s.read(4) + self.assertEqual( + 'Incomplete read; expected 40956 more bytes to be read', + str(caught.exception)) + self.assertEqual(b'\xde\xad\xbe\xef', read_bytes) + + # if there's a digest mismatch, you can read data, but it'll + # throw an error on close + self.assertEqual('sha256', reader.index['foo'].checksum_method) + self.assertEqual( + 'c51d6703d54cd7cf57b4d4b7ecfcca60' + '56dbd41ebf1c1e83c0e8e48baeff629a', + reader.index['foo'].checksum_value) + reader.index['foo'] = dataclasses.replace( + writer.index['foo'], + checksum_value='not-the-sha', + ) + read_bytes = b'' + with self.assertRaises(ValueError) as caught: + with reader.open_section('foo') as s: + read_bytes = s.read() + self.assertIn('Hash mismatch in block: ', str(caught.exception)) + self.assertRepeats(read_bytes, b'\xde\xad\xbe\xef', 10240) + + @mock.patch('logging.getLogger') + def test_sections_with_unsupported_checksum(self, mock_logging): + buf = io.BytesIO() + with RingWriter(buf) as writer: + writer.write_magic(2) + with writer.section('foo'): + writer.write_blob(b'\xde\xad\xbe\xef') + writer.index['foo'] = dataclasses.replace( + writer.index['foo'], + checksum_method='not_a_digest', + checksum_value='do not care', + ) + + buf.seek(0) + reader = RingReader(buf) + with reader.open_section('foo') as s: + read_bytes = s.read(4) + self.assertEqual(b'\xde\xad\xbe\xef', read_bytes) + self.assertEqual(mock_logging.mock_calls, [ + mock.call('swift.ring'), + mock.call('swift.ring').warning( + 'Ignoring unsupported checksum %s:%s for section %s', + 'not_a_digest', mock.ANY, 'foo'), + ]) + + def test_recompressed(self): + buf = io.BytesIO() + with RingWriter(buf) as writer: + writer.write_magic(2) + with writer.section('foo'): + writer.write_blob(b'\xde\xad\xbe\xef' * 10240) + + buf.seek(0) + reader = RingReader(buf) + with self.assertRaises(IOError): + reader.read(-1) # don't be greedy + uncompressed_bytes = reader.read(2 ** 20) + + buf = io.BytesIO() + with RingWriter(buf) as writer: + writer.write(uncompressed_bytes) + + buf.seek(0) + with self.assertRaises(IOError): + # ...but we can't read it + RingReader(buf) + + def test_version_too_high(self): + buf = io.BytesIO() + with RingWriter(buf) as writer: + # you can write it... + writer.write_magic(3) + with writer.section('foo'): + writer.write_blob(b'\xde\xad\xbe\xef' * 10240) + + buf.seek(0) + with self.assertRaises(ValueError): + # ...but we can't read it + RingReader(buf) diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index a2b0bf6432..b28507d419 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -15,19 +15,23 @@ import array import collections +from gzip import GzipFile +import json import os import unittest import stat +import struct from tempfile import mkdtemp from shutil import rmtree from time import sleep, time import sys import copy from unittest import mock +import zlib +from swift.common.exceptions import DevIdBytesTooSmall from swift.common import ring, utils -from swift.common.ring import utils as ring_utils -from swift.common.utils import md5 +from swift.common.ring import io, utils as ring_utils class TestRingBase(unittest.TestCase): @@ -52,13 +56,19 @@ class TestRingData(unittest.TestCase): def tearDown(self): rmtree(self.testdir, ignore_errors=1) - def assert_ring_data_equal(self, rd_expected, rd_got): - self.assertEqual(rd_expected._replica2part2dev_id, - rd_got._replica2part2dev_id) + def assert_ring_data_equal(self, rd_expected, rd_got, metadata_only=False): self.assertEqual(rd_expected.devs, rd_got.devs) self.assertEqual(rd_expected._part_shift, rd_got._part_shift) self.assertEqual(rd_expected.next_part_power, rd_got.next_part_power) self.assertEqual(rd_expected.version, rd_got.version) + self.assertEqual(rd_expected.dev_id_bytes, rd_got.dev_id_bytes) + self.assertEqual(rd_expected.replica_count, rd_got.replica_count) + + if metadata_only: + self.assertEqual([], rd_got._replica2part2dev_id) + else: + self.assertEqual(rd_expected._replica2part2dev_id, + rd_got._replica2part2dev_id) def test_attrs(self): r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]] @@ -82,12 +92,10 @@ class TestRingData(unittest.TestCase): ], 30) rd.save(ring_fname) + meta_only = ring.RingData.load(ring_fname, metadata_only=True) - self.assertEqual([ - {'id': 0, 'zone': 0, 'region': 1}, - {'id': 1, 'zone': 1, 'region': 1}, - ], meta_only.devs) - self.assertEqual([], meta_only._replica2part2dev_id) + self.assert_ring_data_equal(rd, meta_only, metadata_only=True) + rd2 = ring.RingData.load(ring_fname) self.assert_ring_data_equal(rd, rd2) @@ -98,19 +106,11 @@ class TestRingData(unittest.TestCase): [{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30) rd.save(ring_fname) - class MockReader(ring.ring.RingReader): - calls = [] - - def close(self): - self.calls.append(('close', self.fp)) - return super(MockReader, self).close() - - with mock.patch('swift.common.ring.ring.RingReader', - MockReader) as mock_reader: + with mock.patch('swift.common.ring.io.open', + return_value=open(ring_fname, 'rb')) as mock_open: + self.assertFalse(mock_open.return_value.closed) # sanity ring.RingData.load(ring_fname) - - self.assertEqual([('close', mock.ANY)], mock_reader.calls) - self.assertTrue(mock_reader.calls[0][1].closed) + self.assertTrue(mock_open.return_value.closed) def test_byteswapped_serialization(self): # Manually byte swap a ring and write it out, claiming it was written @@ -129,7 +129,9 @@ class TestRingData(unittest.TestCase): rds = ring.RingData(swapped_data, [{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30) - rds.save(ring_fname) + # note that this can only be an issue for v1 rings; + # v2 rings always write network order + rds.save(ring_fname, format_version=1) rd1 = ring.RingData(data, [{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30) @@ -183,8 +185,263 @@ class TestRingData(unittest.TestCase): 30) self.assertEqual(rd.replica_count, 1.75) + def test_deserialize_v1(self): + # First save it as a ring v2 and then try and load it using + # deserialize_v1 + ring_fname = os.path.join(self.testdir, 'foo.ring.gz') + rd = ring.RingData( + [[0, 1, 0, 1], [0, 1, 0, 1]], + [{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0', + 'port': 7000}, + {'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1', + 'port': 7000}], + 30) + rd.save(ring_fname, format_version=2) + + with self.assertRaises(ValueError) as err: + ring.RingData.deserialize_v1(io.RingReader(open(ring_fname, 'rb'))) + self.assertIn("unexpected magic:", str(err.exception)) + + # Now let's save it as v1 then load it up metadata_only + rd.save(ring_fname, format_version=1) + loaded_rd = ring.RingData.deserialize_v1( + io.RingReader(open(ring_fname, 'rb')), + metadata_only=True) + self.assertTrue(loaded_rd['byteorder']) + expected_devs = [ + {'id': 0, 'ip': '10.1.1.0', 'port': 7000, 'region': 1, 'zone': 0, + 'replication_ip': '10.1.1.0', 'replication_port': 7000}, + {'id': 1, 'ip': '10.1.1.1', 'port': 7000, 'region': 1, 'zone': 1, + 'replication_ip': '10.1.1.1', 'replication_port': 7000}] + self.assertEqual(loaded_rd['devs'], expected_devs) + self.assertEqual(loaded_rd['part_shift'], 30) + self.assertEqual(loaded_rd['replica_count'], 2) + self.assertEqual(loaded_rd['dev_id_bytes'], 2) + + # but there is no replica2part2dev table + self.assertFalse(loaded_rd['replica2part2dev_id']) + + # But if we load it up with metadata_only = false + loaded_rd = ring.RingData.deserialize_v1( + io.RingReader(open(ring_fname, 'rb'))) + self.assertTrue(loaded_rd['byteorder']) + self.assertEqual(loaded_rd['devs'], expected_devs) + self.assertEqual(loaded_rd['part_shift'], 30) + self.assertEqual(loaded_rd['replica_count'], 2) + self.assertEqual(loaded_rd['dev_id_bytes'], 2) + self.assertTrue(loaded_rd['replica2part2dev_id']) + + def test_deserialize_v2(self): + # First save it as a ring v1 and then try and load it using + # deserialize_v2 + ring_fname = os.path.join(self.testdir, 'foo.ring.gz') + rd = ring.RingData( + [[0, 1, 0, 1], [0, 1, 0, 1]], + [{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0', + 'port': 7000}, + {'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1', + 'port': 7000}], + 30) + rd.save(ring_fname, format_version=2) + loaded_rd = ring.RingData.deserialize_v2( + io.RingReader(open(ring_fname, 'rb')), + metadata_only=True, + include_devices=False) + self.assertEqual(loaded_rd['part_shift'], 30) + self.assertEqual(loaded_rd['replica_count'], 2) + # minimum size we use is 2 byte dev ids + self.assertEqual(loaded_rd['dev_id_bytes'], 2) + + # but there is no replica2part2dev table or devs + self.assertFalse(loaded_rd['devs']) + self.assertFalse(loaded_rd['replica2part2dev_id']) + + # Next we load it up with metadata and devs only + loaded_rd = ring.RingData.deserialize_v2( + io.RingReader(open(ring_fname, 'rb')), + metadata_only=True) + expected_devs = [ + {'id': 0, 'ip': '10.1.1.0', 'port': 7000, 'region': 1, 'zone': 0, + 'replication_ip': '10.1.1.0', 'replication_port': 7000}, + {'id': 1, 'ip': '10.1.1.1', 'port': 7000, 'region': 1, 'zone': 1, + 'replication_ip': '10.1.1.1', 'replication_port': 7000}] + self.assertEqual(loaded_rd['devs'], expected_devs) + self.assertEqual(loaded_rd['part_shift'], 30) + self.assertEqual(loaded_rd['replica_count'], 2) + self.assertEqual(loaded_rd['dev_id_bytes'], 2) + self.assertFalse(loaded_rd['replica2part2dev_id']) + + # But if we load it up with metadata_only = false + loaded_rd = ring.RingData.deserialize_v2( + io.RingReader(open(ring_fname, 'rb'))) + self.assertEqual(loaded_rd['devs'], expected_devs) + self.assertEqual(loaded_rd['part_shift'], 30) + self.assertEqual(loaded_rd['replica_count'], 2) + self.assertEqual(loaded_rd['dev_id_bytes'], 2) + self.assertTrue(loaded_rd['replica2part2dev_id']) + + def test_load(self): + rd = ring.RingData( + [[0, 1, 0, 1], [0, 1, 0, 1]], + [{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0', + 'port': 7000}, + {'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1', + 'port': 7000}], + 30) + ring_fname_1 = os.path.join(self.testdir, 'foo-1.ring.gz') + ring_fname_2 = os.path.join(self.testdir, 'foo-2.ring.gz') + ring_fname_bad_version = os.path.join(self.testdir, 'foo-bar.ring.gz') + rd.save(ring_fname_1, format_version=1) + rd.save(ring_fname_2, format_version=2) + with io.RingWriter.open(ring_fname_bad_version) as writer: + writer.write_magic(5) + with writer.section('foo'): + writer.write_blob(b'\xde\xad\xbe\xef' * 10240) + + # Loading the bad ring will fail because it's an unknown version + with self.assertRaises(Exception) as ex: + ring.RingData.load(ring_fname_bad_version) + self.assertEqual( + f'Unsupported ring version: 5 for {ring_fname_bad_version!r}', + str(ex.exception)) + + orig_load_index = io.RingReader.load_index + + def mock_load_index(cls): + cls.version = 5 + orig_load_index(cls) + + with mock.patch('swift.common.ring.io.RingReader.load_index', + mock_load_index): + with self.assertRaises(Exception) as ex: + ring.RingData.load(ring_fname_1) + self.assertEqual( + f'Unknown ring format version 5 for {ring_fname_1!r}', + str(ex.exception)) + + expected_r2p2d = [ + array.array('H', [0, 1, 0, 1]), + array.array('H', [0, 1, 0, 1])] + expected_rd_dict = { + 'devs': [ + {'id': 0, 'region': 1, 'zone': 0, + 'ip': '10.1.1.0', 'port': 7000, + 'replication_ip': '10.1.1.0', 'replication_port': 7000}, + {'id': 1, 'zone': 1, 'region': 1, + 'ip': '10.1.1.1', 'port': 7000, + 'replication_ip': '10.1.1.1', 'replication_port': 7000}], + 'replica2part2dev_id': expected_r2p2d, + 'part_shift': 30, + 'next_part_power': None, + 'dev_id_bytes': 2, + 'version': None} + + # version 2 + loaded_rd = ring.RingData.load(ring_fname_2) + self.assertEqual(loaded_rd.to_dict(), expected_rd_dict) + + # version 1 + loaded_rd = ring.RingData.load(ring_fname_1) + self.assertEqual(loaded_rd.to_dict(), expected_rd_dict) + + def test_load_metadata_only(self): + rd = ring.RingData( + [[0, 1, 0, 1], [0, 1, 0, 1]], + [{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0', + 'port': 7000}, + {'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1', + 'port': 7000}], + 30) + ring_fname_1 = os.path.join(self.testdir, 'foo-1.ring.gz') + ring_fname_2 = os.path.join(self.testdir, 'foo-2.ring.gz') + ring_fname_bad_version = os.path.join(self.testdir, 'foo-bar.ring.gz') + rd.save(ring_fname_1, format_version=1) + rd.save(ring_fname_2, format_version=2) + with io.RingWriter.open(ring_fname_bad_version) as writer: + writer.write_magic(5) + with writer.section('foo'): + writer.write_blob(b'\xde\xad\xbe\xef' * 10240) + + # Loading the bad ring will fail because it's an unknown version + with self.assertRaises(Exception) as ex: + ring.RingData.load(ring_fname_bad_version) + self.assertEqual( + f'Unsupported ring version: 5 for {ring_fname_bad_version!r}', + str(ex.exception)) + + orig_load_index = io.RingReader.load_index + + def mock_load_index(cls): + cls.version = 5 + orig_load_index(cls) + + with mock.patch('swift.common.ring.io.RingReader.load_index', + mock_load_index): + with self.assertRaises(Exception) as ex: + ring.RingData.load(ring_fname_1) + self.assertEqual( + f'Unknown ring format version 5 for {ring_fname_1!r}', + str(ex.exception)) + + expected_rd_dict = { + 'devs': [ + {'id': 0, 'region': 1, 'zone': 0, + 'ip': '10.1.1.0', 'port': 7000, + 'replication_ip': '10.1.1.0', 'replication_port': 7000}, + {'id': 1, 'zone': 1, 'region': 1, + 'ip': '10.1.1.1', 'port': 7000, + 'replication_ip': '10.1.1.1', 'replication_port': 7000}], + 'replica2part2dev_id': [], + 'part_shift': 30, + 'next_part_power': None, + 'dev_id_bytes': 2, + 'version': None} + + # version 2 + loaded_rd = ring.RingData.load(ring_fname_2, metadata_only=True) + self.assertEqual(loaded_rd.to_dict(), expected_rd_dict) + + # version 1 + loaded_rd = ring.RingData.load(ring_fname_1, metadata_only=True) + self.assertEqual(loaded_rd.to_dict(), expected_rd_dict) + + def test_save(self): + ring_fname = os.path.join(self.testdir, 'foo.ring.gz') + rd = ring.RingData( + [[0, 1, 0, 1], [0, 1, 0, 1]], + [{'id': 0, 'zone': 0, 'ip': '10.1.1.0', 'port': 7000}, + {'id': 1, 'zone': 1, 'ip': '10.1.1.1', 'port': 7000}], + 30) + + # First test the supported versions + for version in (1, 2): + rd.save(ring_fname, format_version=version) + + # Now try an unknown version + with self.assertRaises(ValueError) as err: + for version in (3, None, "some version"): + rd.save(ring_fname, format_version=version) + self.assertEqual("format_version must be one of (1, 2)", + str(err.exception)) + # re-serialisation is already handled in test_load. + + def test_save_bad_dev_id_bytes(self): + ring_fname = os.path.join(self.testdir, 'foo.ring.gz') + rd = ring.RingData( + [array.array('I', [0, 1, 0, 1]), array.array('I', [0, 1, 0, 1])], + [{'id': 0, 'zone': 0, 'ip': '10.1.1.0', 'port': 7000}, + {'id': 1, 'zone': 1, 'ip': '10.1.1.1', 'port': 7000}], + 30) + + # v2 ring can handle wide devices fine + rd.save(ring_fname, format_version=2) + # but not v1! Only 2-byte dev ids there! + with self.assertRaises(DevIdBytesTooSmall): + rd.save(ring_fname, format_version=1) + class TestRing(TestRingBase): + FORMAT_VERSION = 1 def setUp(self): super(TestRing, self).setUp() @@ -213,9 +470,10 @@ class TestRing(TestRingBase): 'replication_port': 6066}] self.intended_part_shift = 30 self.intended_reload_time = 15 - ring.RingData( + rd = ring.RingData( self.intended_replica2part2dev_id, - self.intended_devs, self.intended_part_shift).save(self.testgz) + self.intended_devs, self.intended_part_shift) + rd.save(self.testgz, format_version=self.FORMAT_VERSION) self.ring = ring.Ring( self.testdir, reload_time=self.intended_reload_time, ring_name='whatever') @@ -234,12 +492,9 @@ class TestRing(TestRingBase): self.assertIsNone(self.ring.version) with open(self.testgz, 'rb') as fp: - expected_md5 = md5(usedforsecurity=False) expected_size = 0 for chunk in iter(lambda: fp.read(2 ** 16), b''): - expected_md5.update(chunk) expected_size += len(chunk) - self.assertEqual(self.ring.md5, expected_md5.hexdigest()) self.assertEqual(self.ring.size, expected_size) # test invalid endcap @@ -269,7 +524,8 @@ class TestRing(TestRingBase): 'ip': '10.1.1.1', 'port': 9876}) ring.RingData( self.intended_replica2part2dev_id, - self.intended_devs, self.intended_part_shift).save(self.testgz) + self.intended_devs, self.intended_part_shift, + ).save(self.testgz, format_version=self.FORMAT_VERSION) sleep(0.1) self.ring.get_nodes('a') self.assertEqual(len(self.ring.devs), 6) @@ -285,7 +541,8 @@ class TestRing(TestRingBase): 'ip': '10.5.5.5', 'port': 9876}) ring.RingData( self.intended_replica2part2dev_id, - self.intended_devs, self.intended_part_shift).save(self.testgz) + self.intended_devs, self.intended_part_shift, + ).save(self.testgz, format_version=self.FORMAT_VERSION) sleep(0.1) self.ring.get_part_nodes(0) self.assertEqual(len(self.ring.devs), 7) @@ -302,7 +559,8 @@ class TestRing(TestRingBase): 'ip': '10.6.6.6', 'port': 6200}) ring.RingData( self.intended_replica2part2dev_id, - self.intended_devs, self.intended_part_shift).save(self.testgz) + self.intended_devs, self.intended_part_shift, + ).save(self.testgz, format_version=self.FORMAT_VERSION) sleep(0.1) next(self.ring.get_more_nodes(part)) self.assertEqual(len(self.ring.devs), 8) @@ -318,7 +576,8 @@ class TestRing(TestRingBase): 'ip': '10.5.5.5', 'port': 6200}) ring.RingData( self.intended_replica2part2dev_id, - self.intended_devs, self.intended_part_shift).save(self.testgz) + self.intended_devs, self.intended_part_shift, + ).save(self.testgz, format_version=self.FORMAT_VERSION) sleep(0.1) self.assertEqual(len(self.ring.devs), 9) self.assertNotEqual(self.ring._mtime, orig_mtime) @@ -357,7 +616,8 @@ class TestRing(TestRingBase): testgz = os.path.join(self.testdir, 'without_replication.ring.gz') ring.RingData( self.intended_replica2part2dev_id, - replication_less_devs, self.intended_part_shift).save(testgz) + replication_less_devs, self.intended_part_shift, + ).save(testgz, format_version=self.FORMAT_VERSION) self.ring = ring.Ring( self.testdir, reload_time=self.intended_reload_time, @@ -508,7 +768,7 @@ class TestRing(TestRingBase): 'device': "d%s" % device}) next_dev_id += 1 rb.rebalance(seed=43) - rb.get_ring().save(self.testgz) + rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION) r = ring.Ring(self.testdir, ring_name='whatever') # every part has the same number of handoffs @@ -555,7 +815,7 @@ class TestRing(TestRingBase): next_dev_id += 1 rb.pretend_min_part_hours_passed() num_parts_changed, _balance, _removed_dev = rb.rebalance(seed=43) - rb.get_ring().save(self.testgz) + rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION) r = ring.Ring(self.testdir, ring_name='whatever') # so now we expect the device list to be longer by one device @@ -603,7 +863,7 @@ class TestRing(TestRingBase): # Remove a device - no need to fluff min_part_hours. rb.remove_dev(0) num_parts_changed, _balance, _removed_dev = rb.rebalance(seed=87) - rb.get_ring().save(self.testgz) + rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION) r = ring.Ring(self.testdir, ring_name='whatever') # so now we expect the device list to be shorter by one device @@ -673,7 +933,7 @@ class TestRing(TestRingBase): # Add a partial replica rb.set_replicas(3.5) num_parts_changed, _balance, _removed_dev = rb.rebalance(seed=164) - rb.get_ring().save(self.testgz) + rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION) r = ring.Ring(self.testdir, ring_name='whatever') # Change expectations @@ -791,7 +1051,7 @@ class TestRing(TestRingBase): rb.rebalance(seed=1) rb.pretend_min_part_hours_passed() rb.rebalance(seed=1) - rb.get_ring().save(self.testgz) + rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION) r = ring.Ring(self.testdir, ring_name='whatever') # There's 5 regions now, so the primary nodes + first 2 handoffs @@ -861,7 +1121,7 @@ class TestRing(TestRingBase): dev['weight'] = 1.0 rb.add_dev(dev) rb.rebalance() - rb.get_ring().save(self.testgz) + rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION) r = ring.Ring(self.testdir, ring_name='whatever') self.assertEqual(r.version, rb.version) @@ -921,5 +1181,164 @@ class TestRing(TestRingBase): histogram) +class TestRingV2(TestRing): + FORMAT_VERSION = 2 + + def test_4_byte_dev_ids(self): + ring_file = os.path.join(self.testdir, 'test.ring.gz') + index = {} + with GzipFile(ring_file, 'wb') as fp: + fp.write(b'R1NG\x00\x02') + fp.flush(zlib.Z_FULL_FLUSH) + + index['swift/ring/metadata'] = [ + os.fstat(fp.fileno()).st_size, fp.tell(), + None, None, None, None] + meta = json.dumps({ + "dev_id_bytes": 4, + "part_shift": 29, + "replica_count": 1.5, + }).encode('ascii') + fp.write(struct.pack('!Q', len(meta)) + meta) + fp.flush(zlib.Z_FULL_FLUSH) + + index['swift/ring/devices'] = [ + os.fstat(fp.fileno()).st_size, fp.tell(), + None, None, None, None] + devs = json.dumps([ + {"id": 0, "region": 1, "zone": 1, "ip": "127.0.0.1", + "port": 6200, "device": "sda", "weight": 1}, + None, + {"id": 2, "region": 1, "zone": 1, "ip": "127.0.0.1", + "port": 6201, "device": "sdb", "weight": 1}, + {"id": 3, "region": 1, "zone": 1, "ip": "127.0.0.1", + "port": 6202, "device": "sdc", "weight": 1}, + ]).encode('ascii') + fp.write(struct.pack('!Q', len(devs)) + devs) + fp.flush(zlib.Z_FULL_FLUSH) + + index['swift/ring/assignments'] = [ + os.fstat(fp.fileno()).st_size, fp.tell(), + None, None, None, None] + fp.write(struct.pack('!Q', 48) + 4 * ( + b'\x00\x00\x00\x03' + b'\x00\x00\x00\x02' + b'\x00\x00\x00\x00')) + fp.flush(zlib.Z_FULL_FLUSH) + + index['swift/index'] = [ + os.fstat(fp.fileno()).st_size, fp.tell(), + None, None, None, None] + blob = json.dumps(index).encode('ascii') + fp.write(struct.pack('!Q', len(blob)) + blob) + fp.flush(zlib.Z_FULL_FLUSH) + + fp.compress = zlib.compressobj( + 0, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) + fp.write(struct.pack('!Q', index['swift/index'][0])) + fp.flush(zlib.Z_FULL_FLUSH) + + r = ring.Ring(ring_file) + self.assertEqual( + [[d['id'] for d in r.get_part_nodes(p)] for p in range(8)], + [[3, 0], [2, 3], [0, 2], [3, 0], [2], [0], [3], [2]]) + + +class ExtendedRingData(ring.RingData): + extra = b'some super-specific data' + + def to_dict(self): + ring_data = super().to_dict() + ring_data.setdefault('extra', self.extra) + return ring_data + + def serialize_v2(self, writer): + super().serialize_v2(writer) + with writer.section('my-custom-section') as s: + s.write_blob(self.extra) + + @classmethod + def deserialize_v2(cls, reader, *args, **kwargs): + ring_data = super().deserialize_v2(reader, *args, **kwargs) + # If you're adding custom data to your rings, you probably want an + # upgrade story that includes that data not being present + if 'my-custom-section' in reader.index: + with reader.open_section('my-custom-section') as s: + ring_data['extra'] = s.read() + return ring_data + + @classmethod + def from_dict(cls, ring_data): + obj = super().from_dict(ring_data) + obj.extra = ring_data.get('extra') + return obj + + +class TestRingExtensibility(unittest.TestCase): + def test(self): + r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]] + d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000}, + {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000}] + s = 30 + rd = ExtendedRingData(r2p2d, d, s) + self.assertEqual(rd._replica2part2dev_id, r2p2d) + self.assertEqual(rd.devs, d) + self.assertEqual(rd._part_shift, s) + self.assertEqual(rd.extra, b'some super-specific data') + + # Can update it and round-trip to disk and back + rd.extra = b'some other value' + testdir = mkdtemp() + try: + ring_fname = os.path.join(testdir, 'foo.ring.gz') + rd.save(ring_fname, format_version=2) + bytes_written = os.path.getsize(ring_fname) + rd2 = ExtendedRingData.load(ring_fname) + # Vanilla Swift can also read the custom ring + vanilla_ringdata = ring.RingData.load(ring_fname) + finally: + rmtree(testdir, ignore_errors=1) + + self.assertEqual(rd2._replica2part2dev_id, r2p2d) + self.assertEqual(rd2.devs, d) + self.assertEqual(rd2._part_shift, s) + self.assertEqual(rd2.extra, b'some other value') + self.assertEqual(rd2.size, bytes_written) + + self.assertEqual(vanilla_ringdata._replica2part2dev_id, r2p2d) + self.assertEqual(vanilla_ringdata.devs, d) + self.assertEqual(vanilla_ringdata._part_shift, s) + self.assertFalse(hasattr(vanilla_ringdata, 'extra')) + self.assertEqual(vanilla_ringdata.size, bytes_written) + + def test_missing_custom_data(self): + r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]] + d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000}, + {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000}] + s = 30 + rd = ring.RingData(r2p2d, d, s) + self.assertEqual(rd._replica2part2dev_id, r2p2d) + self.assertEqual(rd.devs, d) + self.assertEqual(rd._part_shift, s) + self.assertFalse(hasattr(rd, 'extra')) + + # Can load a vanilla ring and get some default behavior based on the + # overridden from_dict + testdir = mkdtemp() + try: + ring_fname = os.path.join(testdir, 'foo.ring.gz') + rd.save(ring_fname, format_version=2) + bytes_written = os.path.getsize(ring_fname) + rd2 = ExtendedRingData.load(ring_fname) + finally: + rmtree(testdir, ignore_errors=1) + + self.assertEqual(rd2._replica2part2dev_id, r2p2d) + self.assertEqual(rd2.devs, d) + self.assertEqual(rd2._part_shift, s) + self.assertIsNone(rd2.extra) + self.assertEqual(rd2.size, bytes_written) + + if __name__ == '__main__': unittest.main()