ring: Introduce a v2 ring format

There's a bunch of moving pieces here:

- Add a new RingWriter class.

  Stick it in a new swift.common.ring.io module. You *can* use it like
  the old gzip file, but you can also define named sections which can
  be referenced later on read. Section names may be arbitrary strings,
  but the "swift/" prefix is reserved for upstream use. Sections must
  contain a single length-value encoded BLOB. If sections are used, an
  additional BLOB is written at the end containing a JSON section-index,
  followed by an uncompressed offset for the index.

  Move RingReader to ring/io.py, too.

- Clean up some ring metadata handling:

  - Drop MD5 tracking in RingReader. It was brittle at best anyway, and
    nothing uses it. YAGNI

  - Fix size/raw_size attributes when loading only metadata.

- Add the ability to seek within RingReaders, though you need to know
  what you're doing and only seek to flush points.

- Let RingBuilder objects change how wide their replica2part2dev_id
  arrays are. Add a dev_id_bytes key to serialized ring metadata.

  dev_id_bytes may be either 2 or 4, but 4 requires v2 rings. We
  considered allowing dev_id_bytes of 1, but dropped it as unnecessary
  complexity for a niche use case.

- swift-ring-builder version subcommand added, which takes a ring. This
  lets operators see the serialization format of a ring on disk:

  $ swift-ring-builder object.ring.gz version
  object.ring.gz: Serialization version: 2 (2-byte IDs), build version: 54

Signed-off-by: Tim Burke <tim.burke@gmail.com>
Change-Id: Ia0ac4ea2006d8965d7fdb6659d355c77386adb70
This commit is contained in:
Tim Burke
2022-03-17 22:30:40 -07:00
parent e75e93f11c
commit ae062f8b09
22 changed files with 2325 additions and 200 deletions

1
.gitignore vendored
View File

@@ -24,3 +24,4 @@ test/probe/.noseids
RELEASENOTES.rst
releasenotes/notes/reno.cache
/tools/playbooks/**/*.retry
.vscode/*

View File

@@ -47,6 +47,7 @@ Overview and Concepts
overview_architecture
overview_wsgi_management
overview_ring
overview_ring_format
overview_policies
overview_reaper
overview_auth

View File

@@ -0,0 +1,253 @@
=================
Ring File Formats
=================
The ring is the most important data structure in Swift. How this data structure
been serialized to disk has changed over the years.
Initially ring files contain three key pieces of information:
* the part_power value (often stored as ``part_shift := 32 - part_power``)
* which determines how many partitions are in the ring,
* the device list
* which includes all the disks participating in the ring, and
* the replica-to-part-to-device table
* which has all ``replica_count * (2 ** part_power)`` partition assignments.
But the ability to extend the serialization format to add more data structures
to the ring serialization format has meant a new ring v2 format has been created.
Ring files have always been gzipped when serialized, though the inner,
raw format has evolved over the years.
Ring v0
-------
Initially, rings were simply pickle dumps of the RingData object. `With
Swift 1.3.0 <https://opendev.org/openstack/swift/commit/fc6391ea>`__, this
changed to pickling a pure-stdlib data structure, but the core concept
was the same.
.. note:
Swift 2.36.0 dropped support for v0 rings.
Ring v1
-------
Pickle presented some problems, however. While `there are security
concerns <https://docs.python.org/3/library/pickle.html>`__ around unpickling
untrusted data, security boundaries are generally drawn such that rings are
assumed to be trusted. Ultimately, what pushed us to a new format were
`performance considerations <https://bugs.launchpad.net/swift/+bug/1031954>`__.
Starting in `Swift 1.7.0 <https://opendev.org/openstack/swift/commit/f8ce43a2>`__,
Swift began using a new format (while still being willing to read the old one).
The new format starts with some magic so we may identify it as such::
+---------------+-------+
|'R' '1' 'N' 'G'| <vrs> |
+---------------+-------+
where ``<vrs>`` is a network-order two-byte version number (which is always 1).
After that, a JSON object is serialized as::
+---------------+-------...---+
| <data-length> | <data ... > |
+---------------+-------...---+
where ``<data-length>`` is the network-order four-byte length (in bytes) of
``<data>``, which is the ASCII-encoded JSON-serialized object. This object
has at minimum three keys:
* ``devs`` for the device list
* ``part_shift`` (i.e., ``32 - part_power``)
* ``replica_count`` for the integer number of part-to-device rows to read
The replica-to-part-to-device table then follows::
+-------+-------+...+-------+-------+
| <dev> | <dev> |...| <dev> | <dev> |
+-------+-------+...+-------+-------+
| <dev> | <dev> |...| <dev> | <dev> |
+-------+-------+...+-------+-------+
| ... |
+-------+-------+...+-------+-------+
| <dev> | <dev> |...|
+-------+-------+...+
Each ``<dev>`` is a host-order two-byte index into the ``devs`` list. Every row
except the last has exactly ``2 ** part_power`` entries; the last row may
have the same or fewer.
The metadata object has proven quite versatile: new keys have been added
to provide additional information while remaining backwards-compatible.
In order, the following new fields have been added:
* ``byteorder`` specifies whether the host-order for the
replica-to-part-to-device table is "big" or "little" endian. Added in
`Swift 2.12.0 <https://opendev.org/openstack/swift/commit/1ec6e2bb>`__,
this allows rings written on big-endian machines to be read on
little-endian machines and vice-versa.
* ``next_part_power`` indicates whether a partition-power increase is in
progress. Added in `Swift 2.15.0 <https://opendev.org/openstack/swift/commit/e1140666>`__,
this will have one of two values, if present: the ring's current
``part_power``, indicating that there may be hardlinks to clean up,
or ``part_power + 1`` indicating that hardlinks may need to be created.
See :ref:`the documentation<modify_part_power>`
for more information.
* ``version`` specifies the version number of the ring-builder that was used
to write this ring. Added in `Swift 2.24.0 <https://opendev.org/openstack/swift/commit/6853616a>`__,
this allows the comparing of rings from different machines to determine
which is newer.
Ring v2
-------
The way that v1 rings dealt with fractional replicas made it impossible
to reliably serialize additional large data structures after the
replica-to-part-to-device table. The v2 format has been designed to be
extensable.
The new format starts with magic similar to v1::
+---------------+-------+
|'R' '1' 'N' 'G'| <vrs> |
+---------------+-------+
where <vrs> is again a network-order two-byte version number (which is now 2).
By bumping the version number, we ensure that old versions of Swift refuse to
read the ring, rather than misinterpret the content.
After that, a series of BLOBs are serialized, each as::
+-------------------------------+-------...---+
| <data-length> | <data ... > |
+-------------------------------+-------...---+
where ``<data-length>`` is the network-order eight-byte length (in bytes) of
``<data>``. Each BLOB is preceded by a ``Z_FULL_FLUSH`` to allow it to be
decompressed without reading the whole file.
The order of the BLOBs isn't important, although they do tend to be written
in the order Swift will read them while loading. This reduces the disk seeks
necessary to load.
The final BLOB is an index: a JSON object mapping named sections to an array
of offsets within the file, like
.. code::
{
section: [
compressed start,
uncompressed start,
compressed end,
uncompressed end,
checksum method,
checksum value
],
...
}
Section names may be arbitrary strings, but the "swift/" prefix is reserved
for upstream use. The start/end values mark the beginning and ending of the
section's BLOB. Note that some end values may be ``null`` if they were not
known when the index was written -- in particular, this *will* be true for
the index itself. The checksum method should be one of ``"md5"``, ``"sha1"``,
``"sha256"``, or ``"sha512"``; other values will be ignored in anticipation
of a need to support further algorithms. The checksum value will be the
hex-encoded digest of the uncompressed section's bytes. Like end values,
checksum data may be ``null`` if not known when the index is written.
Finally, a "tail" is written:
* the gzip stream is flushed with another ``Z_FULL_FLUSH``,
* the stream is switched to uncompressed,
* the eight-byte offset of the uncompressed start of the index is written,
* the gzip stream is flushed with another ``Z_FULL_FLUSH``,
* the eight-byte offset of the compressed start of the index is written,
* the gzip stream is flushed with another ``Z_FULL_FLUSH``, and
* the gzip stream is closed; this involves:
* flushing the underlying deflate stream with ``Z_FINISH``
* writing ``CRC32`` (of the full uncompressed data)
* writing ``ISIZE`` (the length of the full uncompressed data ``mod 2 ** 32``)
By switching to uncompressed, we can know exactly how many bytes will be
written in the tail, so that when reading we can quickly seek to and read the
index offset, seek to the index start, and read the index. From there we
can do similar things for any other section.
* Seek to the end of the file
* Go back 31 bytes in the underlying file; this should leave us at the start of
the deflate block containing the offset for the compressed start
* Decompress 8 bytes from the deflate stream to get the location of the
compressed start of the index BLOB
* Seek to that location
* Read/decompress the size of the index BLOB
* Read/decompress the json serialized index.
.. note:: This 31 bytes is the deflate block containing the 8 byte location,
a ``Z_FULL_FLUSH`` block, the ``Z_FINISH`` block, and the ``CRC32`` and
``ISIZE``. For more information, see `RFC 1951`_ (for the deflate stream)
and `RFC 1952`_ (for the gzip format).
The currently defined section and section names upstream are as follows:
* ``swift/index`` - The swift index
* ``swift/ring/metadata`` - Ring metadata serialized as json
* ``swift/ring/devices`` - Devices json serialized data structure.
* This has been seperated from the ring metadata structure in v1 as it
gets large
* ``swift/ring/assignments`` - The ring replica2part2dev_id data structure
.. note::
Third-parties may find it useful to add their own sections; however,
the ``swift/`` prefix is reserved for future upstream enhancements.
swift/ring/metadata
~~~~~~~~~~~~~~~~~~~
This BLOB is an ASCII-encoded JSON object full of metadata, similar
to v1 rings. It has the following required keys:
* ``part_shift``
* ``dev_id_bytes`` specifies the number of bytes used for each ``<dev>`` in the
replica-to-part-to-device table; will be one of 2, 4, or 8
Additionally, there are several optional keys which may be present:
* ``next_part_power``
* ``version``
Notice that two keys are no longer present: ``replica_count`` is no longer
needed as the size of the replica-to-part-to-device table is explicit, and
``byteorder`` is not needed as all data in v2 rings should be written using
network-order.
swift/ring/devices
~~~~~~~~~~~~~~~~~~
This BLOB contains a list of swift device dictionarys. And was seperated out
from the metadata BLOB as this can become a large structure in it's own right.
swift/ring/assignments
~~~~~~~~~~~~~~~~~~~~~~
This BLOB is the replica-to-part-to-device table. It's length will be
``replicas * (2 ** part_power) * dev_id_bytes``, where ``replicas`` is the exact
(potentially fractional) replica count for the ring. Unlike in v1, each
``<dev>`` is written using network-order.
Note that this is why we increased the size of ``<data-length>`` as compared to
the v1 format -- otherwise, we may not be able to represent rings with both
high ``replica_count`` and high ``part_power``.
.. _RFC 1952: https://rfc-editor.org/rfc/rfc1952
.. _RFC 1951: https://rfc-editor.org/rfc/rfc1951

View File

@@ -4,6 +4,16 @@
Partitioned Consistent Hash Ring
********************************
.. _ring-io:
Ring IO
=======
.. automodule:: swift.common.ring.io
:members:
:undoc-members:
:show-inheritance:
.. _ring:
Ring

View File

@@ -1,3 +1,5 @@
.. _modify_part_power:
==============================
Modifying Ring Partition Power
==============================

20
etc/magic Normal file
View File

@@ -0,0 +1,20 @@
#-------------------------------------------------------------------------------
# Openstack swift
# Note: add this snippet to either /etc/magic or ~/.magic
#-------------------------------------------------------------------------------
# gzip compressed
0 beshort 0x1f8b
# compress method: deflate, flags: FNAME
>&0 beshort 0x0808
# skip ahead another 6 (MTIME, XLF, OS); read FNAME
>>&6 search/0x40 \0
# Skip ahead five; should cover
# 00 -- uncompressed block
# 06 00 -- ... of length 6
# f9 ff -- (one's complement of length)
>>>&5 string/4 R1NG Swift ring,
>>>>&0 clear x
>>>>&0 beshort 1 version 1
>>>>&0 beshort 2 version 2
>>>>&0 default x
>>>>>&0 beshort x unknown version (0x%04x)

View File

@@ -34,6 +34,7 @@ from swift.common import exceptions
from swift.common.ring import RingBuilder, Ring, RingData
from swift.common.ring.builder import MAX_BALANCE
from swift.common.ring.composite_builder import CompositeRingBuilder
from swift.common.ring.ring import RING_CODECS, DEFAULT_RING_FORMAT_VERSION
from swift.common.ring.utils import validate_args, \
validate_and_normalize_ip, build_dev_from_opts, \
parse_builder_ring_filename_args, parse_search_value, \
@@ -47,6 +48,8 @@ EXIT_SUCCESS = 0
EXIT_WARNING = 1
EXIT_ERROR = 2
FORMAT_CHOICES = [str(v) for v in RING_CODECS]
global argv, backup_dir, builder, builder_file, ring_file
argv = backup_dir = builder = builder_file = ring_file = None
@@ -594,9 +597,9 @@ swift-ring-builder <builder_file>
dispersion_trailer = '' if builder.dispersion is None else (
', %.02f dispersion' % (builder.dispersion))
print('%d partitions, %.6f replicas, %d regions, %d zones, '
'%d devices, %.02f balance%s' % (
'%d devices, %d-byte IDs, %.02f balance%s' % (
builder.parts, builder.replicas, regions, zones, dev_count,
balance, dispersion_trailer))
builder.dev_id_bytes, balance, dispersion_trailer))
print('The minimum number of hours before a partition can be '
'reassigned is %s (%s remaining)' % (
builder.min_part_hours,
@@ -617,6 +620,9 @@ swift-ring-builder <builder_file>
except Exception as exc:
print('Ring file %s is invalid: %r' % (ring_file, exc))
else:
# mostly just an implementation detail
builder_dict.pop('dev_id_bytes', None)
ring_dict.pop('dev_id_bytes', None)
if builder_dict == ring_dict:
print('Ring file %s is up-to-date' % ring_file)
else:
@@ -656,6 +662,24 @@ swift-ring-builder <builder_file>
print(ring_empty_error)
exit(EXIT_SUCCESS)
@staticmethod
def version():
"""
swift-ring-builder <ring_file> version
"""
if len(argv) < 3:
print(Commands.create.__doc__.strip())
exit(EXIT_ERROR)
try:
rd = RingData.load(ring_file, metadata_only=True)
except ValueError as e:
print(e)
exit(EXIT_ERROR)
print('%s: Serialization version: %d (%d-byte IDs), '
'build version: %d' %
(ring_file, rd.format_version, rd.dev_id_bytes, rd.version))
exit(EXIT_SUCCESS)
@staticmethod
def search():
"""
@@ -1051,7 +1075,19 @@ swift-ring-builder <builder_file> rebalance [options]
parser.add_option('-s', '--seed', help="seed to use for rebalance")
parser.add_option('-d', '--debug', action='store_true',
help="print debug information")
parser.add_option('--format-version',
choices=FORMAT_CHOICES, default=None,
help="specify ring format version")
options, args = parser.parse_args(argv)
if options.format_version is None:
print("Defaulting to --format-version=1. This ensures the ring\n"
"written will be readable by older versions of Swift.\n"
"In a future release, the default will change to\n"
"--format-version=2\n")
options.format_version = DEFAULT_RING_FORMAT_VERSION
else:
# N.B. choices doesn't work with type=int
options.format_version = int(options.format_version)
def get_seed(index):
if options.seed:
@@ -1166,9 +1202,11 @@ swift-ring-builder <builder_file> rebalance [options]
status = EXIT_WARNING
ts = time()
builder.get_ring().save(
pathjoin(backup_dir, '%d.' % ts + basename(ring_file)))
pathjoin(backup_dir, '%d.' % ts + basename(ring_file)),
format_version=options.format_version)
builder.save(pathjoin(backup_dir, '%d.' % ts + basename(builder_file)))
builder.get_ring().save(ring_file)
builder.get_ring().save(
ring_file, format_version=options.format_version)
builder.save(builder_file)
exit(status)
@@ -1293,6 +1331,22 @@ swift-ring-builder <builder_file> write_ring
'set_info' calls when no rebalance is needed but you want to send out the
new device information.
"""
usage = Commands.write_ring.__doc__.strip()
parser = optparse.OptionParser(usage)
parser.add_option('--format-version',
choices=FORMAT_CHOICES, default=None,
help="specify ring format version")
options, args = parser.parse_args(argv)
if options.format_version is None:
print("Defaulting to --format-version=1. This ensures the ring\n"
"written will be readable by older versions of Swift.\n"
"In a future release, the default will change to\n"
"--format-version=2\n")
options.format_version = DEFAULT_RING_FORMAT_VERSION
else:
# N.B. choices doesn't work with type=int
options.format_version = int(options.format_version)
if not builder.devs:
print('Unable to write empty ring.')
exit(EXIT_ERROR)
@@ -1304,8 +1358,9 @@ swift-ring-builder <builder_file> write_ring
'assignments but with devices; did you forget to run '
'"rebalance"?', file=sys.stderr)
ring_data.save(
pathjoin(backup_dir, '%d.' % time() + basename(ring_file)))
ring_data.save(ring_file)
pathjoin(backup_dir, '%d.' % time() + basename(ring_file)),
format_version=options.format_version)
ring_data.save(ring_file, format_version=options.format_version)
exit(EXIT_SUCCESS)
@staticmethod
@@ -1653,8 +1708,11 @@ def main(arguments=None):
builder_file, ring_file = parse_builder_ring_filename_args(argv)
if builder_file != argv[1]:
print('Note: using %s instead of %s as builder file' % (
builder_file, argv[1]))
if len(argv) > 2 and argv[2] in ('write_builder', 'version'):
pass
else:
print('Note: using %s instead of %s as builder file' % (
builder_file, argv[1]))
try:
builder = RingBuilder.load(builder_file)
@@ -1668,7 +1726,8 @@ def main(arguments=None):
print(msg)
exit(EXIT_ERROR)
except (exceptions.FileNotFoundError, exceptions.PermissionError) as e:
if len(argv) < 3 or argv[2] not in ('create', 'write_builder'):
if len(argv) < 3 or argv[2] not in ('create', 'write_builder',
'version'):
print(e)
exit(EXIT_ERROR)
except Exception as e:

View File

@@ -133,6 +133,10 @@ class PathNotDir(OSError):
pass
class DevIdBytesTooSmall(ValueError):
pass
class ChunkReadError(SwiftException):
pass

View File

@@ -33,12 +33,12 @@ from time import time
from swift.common import exceptions
from swift.common.ring.ring import RingData
from swift.common.ring.utils import tiers_for_dev, build_tier_tree, \
validate_and_normalize_address, validate_replicas_by_tier, pretty_dev
validate_and_normalize_address, validate_replicas_by_tier, pretty_dev, \
none_dev_id, calc_dev_id_bytes, BYTES_TO_TYPE_CODE, resize_array
# we can't store None's in the replica2part2dev array, so we high-jack
# the max value for magic to represent the part is not currently
# assigned to any device.
NONE_DEV = 2 ** 16 - 1
MAX_BALANCE = 999.99
MAX_BALANCE_GATHER_COUNT = 3
@@ -156,6 +156,31 @@ class RingBuilder(object):
def part_shift(self):
return 32 - self.part_power
@property
def dev_id_bytes(self):
if not self._replica2part2dev:
max_dev_id = len(self.devs) - 1 if self.devs else 0
return calc_dev_id_bytes(max_dev_id)
return self._replica2part2dev[0].itemsize
def set_dev_id_bytes(self, new_dev_id_bytes):
if self._replica2part2dev:
self._replica2part2dev = [
resize_array(p2d, new_dev_id_bytes)
for p2d in self._replica2part2dev]
@property
def dev_id_type_code(self):
return BYTES_TO_TYPE_CODE[self.dev_id_bytes]
@property
def max_dev_id(self):
return none_dev_id(self.dev_id_bytes) - 1
@property
def none_dev_id(self):
return none_dev_id(self.dev_id_bytes)
@property
def ever_rebalanced(self):
return self._replica2part2dev is not None
@@ -295,6 +320,7 @@ class RingBuilder(object):
'parts': self.parts,
'devs': self.devs,
'devs_changed': self.devs_changed,
'dev_id_bytes': self.dev_id_bytes,
'version': self.version,
'overload': self.overload,
'_replica2part2dev': self._replica2part2dev,
@@ -369,8 +395,8 @@ class RingBuilder(object):
version=self.version)
else:
self._ring = \
RingData([array('H', p2d) for p2d in
self._replica2part2dev],
RingData([array(self.dev_id_type_code, p2d)
for p2d in self._replica2part2dev],
devs, self.part_shift,
self.next_part_power,
self.version)
@@ -417,6 +443,9 @@ class RingBuilder(object):
if dev['id'] < len(self.devs) and self.devs[dev['id']] is not None:
raise exceptions.DuplicateDeviceError(
'Duplicate device id: %d' % dev['id'])
if dev['id'] > self.max_dev_id:
self.set_dev_id_bytes(calc_dev_id_bytes(dev['id']))
# Add holes to self.devs to ensure self.devs[dev['id']] will be the dev
while dev['id'] >= len(self.devs):
self.devs.append(None)
@@ -559,10 +588,11 @@ class RingBuilder(object):
# gather parts from replica count adjustment
self._adjust_replica2part2dev_size(assign_parts)
# gather parts from failed devices
removed_devs = self._gather_parts_from_failed_devices(assign_parts)
self._gather_parts_from_failed_devices(assign_parts)
# gather parts for dispersion (N.B. this only picks up parts that
# *must* disperse according to the replica plan)
self._gather_parts_for_dispersion(assign_parts, replica_plan)
removed_devs = self._remove_failed_devices()
# we'll gather a few times, or until we archive the plan
for gather_count in range(MAX_BALANCE_GATHER_COUNT):
@@ -747,7 +777,8 @@ class RingBuilder(object):
))
break
dev_id = self._replica2part2dev[replica][part]
if dev_id >= dev_len or not self.devs[dev_id]:
if dev_id == self.none_dev_id or dev_id >= dev_len or \
self.devs[dev_id] is None:
raise exceptions.RingValidationError(
"Partition %d, replica %d was not allocated "
"to a device." %
@@ -987,24 +1018,45 @@ class RingBuilder(object):
# reassign these partitions. However, we mark them as moved so later
# choices will skip other replicas of the same partition if possible.
gathered_parts = 0
if self._remove_devs:
dev_ids = [d['id'] for d in self._remove_devs if d['parts']]
if dev_ids:
for part, replica in self._each_part_replica():
dev_id = self._replica2part2dev[replica][part]
if dev_id in dev_ids:
self._replica2part2dev[replica][part] = NONE_DEV
self._replica2part2dev[replica][part] = \
self.none_dev_id
self._set_part_moved(part)
assign_parts[part].append(replica)
gathered_parts += 1
self.logger.debug(
"Gathered %d/%d from dev %d [dev removed]",
part, replica, dev_id)
return gathered_parts
def _remove_failed_devices(self):
removed_devs = 0
while self._remove_devs:
remove_dev_id = self._remove_devs.pop()['id']
self.logger.debug("Removing dev %d", remove_dev_id)
self.devs[remove_dev_id] = None
removed_devs += 1
# Trim the dev list
while self.devs and self.devs[-1] is None:
self.devs.pop()
if self.dev_id_bytes > 2:
# Consider shrinking the device IDs themselves
new_dev_id_bytes = self.dev_id_bytes // 2
new_none_dev_id = none_dev_id(new_dev_id_bytes)
# Only shrink if the IDs all fit in the lower half of the next size
# down; this avoids excess churn when adding/removing devices near
# the limit of a particular dev_id_bytes
if len(self.devs) < new_none_dev_id // 2:
self.set_dev_id_bytes(new_dev_id_bytes)
return removed_devs
def _adjust_replica2part2dev_size(self, to_assign):
@@ -1052,7 +1104,7 @@ class RingBuilder(object):
# newly-added pieces assigned to devices.
for part in range(len(part2dev), desired_length):
to_assign[part].append(replica)
part2dev.append(NONE_DEV)
part2dev.append(self.none_dev_id)
new_parts += 1
elif len(part2dev) > desired_length:
# Too long: truncate this mapping.
@@ -1068,7 +1120,8 @@ class RingBuilder(object):
to_assign[part].append(replica)
new_parts += 1
self._replica2part2dev.append(
array('H', itertools.repeat(NONE_DEV, desired_length)))
array(self.dev_id_type_code,
itertools.repeat(self.none_dev_id, desired_length)))
self.logger.debug(
"%d new parts and %d removed parts from replica-count change",
@@ -1095,7 +1148,7 @@ class RingBuilder(object):
undispersed_dev_replicas = []
for replica in self._replicas_for_part(part):
dev_id = self._replica2part2dev[replica][part]
if dev_id == NONE_DEV:
if dev_id == self.none_dev_id:
continue
dev = self.devs[dev_id]
if all(replicas_at_tier[tier] <=
@@ -1123,7 +1176,7 @@ class RingBuilder(object):
self.logger.debug(
"Gathered %d/%d from dev %s [dispersion]",
part, replica, pretty_dev(dev))
self._replica2part2dev[replica][part] = NONE_DEV
self._replica2part2dev[replica][part] = self.none_dev_id
for tier in dev['tiers']:
replicas_at_tier[tier] -= 1
self._set_part_moved(part)
@@ -1158,7 +1211,7 @@ class RingBuilder(object):
replicas_at_tier = defaultdict(int)
for replica in self._replicas_for_part(part):
dev_id = self._replica2part2dev[replica][part]
if dev_id == NONE_DEV:
if dev_id == self.none_dev_id:
continue
dev = self.devs[dev_id]
for tier in dev['tiers']:
@@ -1195,7 +1248,7 @@ class RingBuilder(object):
self.logger.debug(
"Gathered %d/%d from dev %s [weight disperse]",
part, replica, pretty_dev(dev))
self._replica2part2dev[replica][part] = NONE_DEV
self._replica2part2dev[replica][part] = self.none_dev_id
for tier in dev['tiers']:
replicas_at_tier[tier] -= 1
parts_wanted_in_tier[tier] -= 1
@@ -1249,7 +1302,7 @@ class RingBuilder(object):
overweight_dev_replica = []
for replica in self._replicas_for_part(part):
dev_id = self._replica2part2dev[replica][part]
if dev_id == NONE_DEV:
if dev_id == self.none_dev_id:
continue
dev = self.devs[dev_id]
if dev['parts_wanted'] < 0:
@@ -1271,7 +1324,7 @@ class RingBuilder(object):
self.logger.debug(
"Gathered %d/%d from dev %s [weight forced]",
part, replica, pretty_dev(dev))
self._replica2part2dev[replica][part] = NONE_DEV
self._replica2part2dev[replica][part] = self.none_dev_id
self._set_part_moved(part)
def _reassign_parts(self, reassign_parts, replica_plan):
@@ -1692,7 +1745,7 @@ class RingBuilder(object):
if part >= len(part2dev):
continue
dev_id = part2dev[part]
if dev_id == NONE_DEV:
if dev_id == self.none_dev_id:
continue
devs.append(self.devs[dev_id])
return devs
@@ -1863,7 +1916,7 @@ class RingBuilder(object):
new_replica2part2dev = []
for replica in self._replica2part2dev:
new_replica = array('H')
new_replica = array(self.dev_id_type_code)
for device in replica:
new_replica.append(device)
new_replica.append(device) # append device a second time

View File

@@ -98,6 +98,8 @@ from random import shuffle
from swift.common.exceptions import RingBuilderError
from swift.common.ring import RingBuilder
from swift.common.ring import RingData
from swift.common.ring.utils import calc_dev_id_bytes
from swift.common.ring.utils import resize_array
from collections import defaultdict
from itertools import combinations
@@ -198,6 +200,9 @@ def _make_composite_ring(builders):
:return: a new RingData instance built from the component builders
:raises ValueError: if the builders are invalid with respect to each other
"""
total_devices = sum(len(builder.devs) for builder in builders)
dev_id_bytes = calc_dev_id_bytes(total_devices)
composite_r2p2d = []
composite_devs = []
device_offset = 0
@@ -205,7 +210,9 @@ def _make_composite_ring(builders):
# copy all devs list and replica2part2dev table to be able
# to modify the id for each dev
devs = copy.deepcopy(builder.devs)
r2p2d = copy.deepcopy(builder._replica2part2dev)
# Note that resize_array() always makes a copy
r2p2d = [resize_array(p2d, dev_id_bytes)
for p2d in builder._replica2part2dev]
for part2dev in r2p2d:
for part, dev in enumerate(part2dev):
part2dev[part] += device_offset

657
swift/common/ring/io.py Normal file
View File

@@ -0,0 +1,657 @@
# Copyright (c) 2022 NVIDIA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import contextlib
import dataclasses
import gzip
import hashlib
import json
import logging
import os
import string
import struct
import tempfile
from typing import Optional
import zlib
from swift.common.ring.utils import BYTES_TO_TYPE_CODE, network_order_array, \
read_network_order_array
ZLIB_FLUSH_MARKER = b"\x00\x00\xff\xff"
# we could pull from io.DEFAULT_BUFFER_SIZE, but... 8k seems small
DEFAULT_BUFFER_SIZE = 2 ** 16
# v2 rings have sizes written with each section, as well as offsets at the end
# We *hope* we never need to go past 2**32-1 for those, but just in case...
V2_SIZE_FORMAT = "!Q"
class GzipReader(object):
chunk_size = DEFAULT_BUFFER_SIZE
def __init__(self, fileobj):
self.fp = fileobj
self.reset_decompressor()
@property
def name(self):
return self.fp.name
def close(self):
self.fp.close()
def read_sizes(self):
"""
Read the uncompressed and compressed sizes of the whole file.
Gzip writes the uncompressed length (mod 2**32) write at the end.
Then we just need to ``tell()`` to get the compressed length.
"""
self.fp.seek(-4, os.SEEK_END)
uncompressed_size, = struct.unpack("<L", self.fp.read(4))
# between the seek(-4, SEEK_END) and the read(4), we're at the end
compressed_size = self.fp.tell()
return uncompressed_size, compressed_size
def reset_decompressor(self):
self.pos = self.fp.tell()
if self.pos == 0:
# Expect gzip header
wbits = 16 + zlib.MAX_WBITS
else:
# Bare deflate stream
wbits = -zlib.MAX_WBITS
self.decompressor = zlib.decompressobj(wbits)
self.buffer = self.compressed_buffer = b""
def seek(self, pos, whence=os.SEEK_SET):
"""
Seek to the given point in the compressed stream.
Buffers are dropped and a new decompressor is created (unless using
``os.SEEK_SET`` and the reader is already at the desired position).
As a result, callers should be careful to ``seek()`` to flush
boundaries, to ensure that subsequent ``read()`` calls work properly.
Note that when using ``GzipWriter``, all ``tell()`` results will be
flush boundaries and appropriate to later use as ``seek()`` arguments.
"""
if (pos, whence) == (self.pos, os.SEEK_SET):
# small optimization for linear reads
return
self.fp.seek(pos, whence)
self.reset_decompressor()
def tell(self):
return self.fp.tell()
@classmethod
@contextlib.contextmanager
def open(cls, filename):
"""
Open the ring file ``filename``
:returns: a context manager that provides an instance of this class
"""
with open(filename, 'rb') as fp:
yield cls(fp)
def _decompress_from_buffer(self, offset):
if offset < 0:
raise ValueError('buffer offset must be non-negative')
chunk = self.compressed_buffer[:offset]
self.compressed_buffer = self.compressed_buffer[offset:]
self.pos += len(chunk)
self.buffer += self.decompressor.decompress(chunk)
def _buffer_chunk(self):
"""
Buffer some data.
The underlying file-like may or may not be read, though ``pos`` should
always advance (unless we're already at EOF).
Callers (i.e., ``read`` and ``readline``) should call this in a loop
and monitor the size of ``buffer`` and whether we've hit EOF.
:returns: True if we hit the end of the file, False otherwise
"""
# stop at flushes, so we can save buffers on seek during a linear read
x = self.compressed_buffer.find(ZLIB_FLUSH_MARKER)
if x >= 0:
self._decompress_from_buffer(x + len(ZLIB_FLUSH_MARKER))
return False
chunk = self.fp.read(self.chunk_size)
if not chunk:
self._decompress_from_buffer(len(self.compressed_buffer))
return True
self.compressed_buffer += chunk
# if we found a flush marker in the new chunk, only go that far
x = self.compressed_buffer.find(ZLIB_FLUSH_MARKER)
if x >= 0:
self._decompress_from_buffer(x + len(ZLIB_FLUSH_MARKER))
return False
# we may have *almost* found the flush marker;
# gotta keep some of the tail
keep = len(ZLIB_FLUSH_MARKER) - 1
# note that there's no guarantee that buffer will actually grow --
# but we don't want to have more in compressed_buffer than strictly
# necessary
self._decompress_from_buffer(len(self.compressed_buffer) - keep)
return False
def read(self, amount=-1):
"""
Read ``amount`` uncompressed bytes.
:raises IOError: if you try to read everything
:raises zlib.error: if ``seek()`` was last called with a position
not at a flush boundary
"""
if amount < 0:
raise IOError("don't be greedy")
while amount > len(self.buffer):
if self._buffer_chunk():
break
data, self.buffer = self.buffer[:amount], self.buffer[amount:]
return data
class SectionReader(object):
"""
A file-like wrapper that limits how many bytes may be read.
Optionally, also verify data integrity.
:param fp: a file-like object opened with mode "rb"
:param length: the maximum number of bytes that should be read
:param digest: optional hex digest of the expected bytes
:param checksum: checksumming instance to be fed bytes and later compared
against ``digest``; e.g. ``hashlib.sha256()``
"""
def __init__(self, fp, length, digest=None, checksum=None):
self._fp = fp
self._remaining = length
self._digest = digest
self._checksum = checksum
def read(self, amt=None):
"""
Read ``amt`` bytes, defaulting to "all remaining available bytes".
"""
if amt is None or amt < 0:
amt = self._remaining
amt = min(amt, self._remaining)
data = self._fp.read(amt)
self._remaining -= len(data)
if self._checksum:
self._checksum.update(data)
return data
def read_ring_table(self, itemsize, partition_count):
max_row_len = itemsize * partition_count
type_code = BYTES_TO_TYPE_CODE[itemsize]
return [
read_network_order_array(type_code, row)
for row in iter(lambda: self.read(max_row_len), b'')
]
def close(self):
"""
Verify that all bytes were read.
If a digest was provided, also verify that the bytes read match
the digest. Does *not* close the underlying file-like.
:raises ValueError: if verification fails
"""
if self._remaining:
raise ValueError('Incomplete read; expected %d more bytes '
'to be read' % self._remaining)
if self._digest and self._checksum.hexdigest() != self._digest:
raise ValueError('Hash mismatch in block: %r found; %r expected' %
(self._checksum.hexdigest(), self._digest))
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@dataclasses.dataclass(frozen=True)
class IndexEntry:
compressed_start: int
uncompressed_start: int
compressed_end: Optional[int] = None
uncompressed_end: Optional[int] = None
checksum_method: Optional[str] = None
checksum_value: Optional[str] = None
@property
def uncompressed_length(self) -> Optional[int]:
if self.uncompressed_end is None:
return None
return self.uncompressed_end - self.uncompressed_start
@property
def compressed_length(self) -> Optional[int]:
if self.compressed_end is None:
return None
return self.compressed_end - self.compressed_start
@property
def compression_ratio(self) -> Optional[float]:
if self.uncompressed_end is None:
return None
return 1 - self.compressed_length / self.uncompressed_length
class RingReader(GzipReader):
"""
Helper for reading ring files.
Provides format-version detection, and loads the index for v2 rings.
"""
chunk_size = DEFAULT_BUFFER_SIZE
def __init__(self, fileobj):
super(RingReader, self).__init__(fileobj)
self.index = {}
magic = self.read(4)
if magic != b"R1NG":
raise ValueError(f"Bad ring magic: {magic!r}")
self.version, = struct.unpack("!H", self.read(2))
if self.version not in (1, 2):
msg = f"Unsupported ring version: {self.version}"
if hasattr(fileobj, "name"):
msg += f" for {fileobj.name!r}"
raise ValueError(msg)
# NB: In a lot of places, "raw" implies "file on disk", i.e., the
# compressed stream -- but here it's actually the uncompressed stream.
self.raw_size, self.size = self.read_sizes()
self.load_index()
self.seek(0)
def load_index(self):
"""
If this is a v2 ring, load the index stored at the end.
This will be done as part of initialization; users shouldn't need to
do this themselves.
"""
if self.version != 2:
return
# See notes in RingWriter.write_index and RingWriter.__exit__ for
# where this 31 (= 18 + 13) came from.
self.seek(-31, os.SEEK_END)
try:
index_start, = struct.unpack(V2_SIZE_FORMAT, self.read(8))
except zlib.error:
# TODO: we can still fix this if we're willing to read everything
raise IOError("Could not read index offset "
"(was the file recompressed?)")
self.seek(index_start)
# ensure index entries are sorted by position
self.index = collections.OrderedDict(sorted(
((section, IndexEntry(*entry))
for section, entry in json.loads(self.read_blob()).items()),
key=lambda x: x[1].compressed_start))
def __contains__(self, section):
if self.version != 2:
return False
return section in self.index
def read_blob(self, fmt=V2_SIZE_FORMAT):
"""
Read a length-value encoded BLOB
Note that the RingReader needs to already be positioned correctly.
:param fmt: the format code used to write the length of the BLOB.
All v2 BLOBs use ``!Q``, but v1 may require ``!I``
:returns: the BLOB value
"""
prefix = self.read(struct.calcsize(fmt))
blob_length, = struct.unpack(fmt, prefix)
return self.read(blob_length)
def read_section(self, section):
"""
Seek to a section and read all its data
"""
with self.open_section(section) as reader:
return reader.read()
@contextlib.contextmanager
def open_section(self, section):
"""
Open up a section without buffering the whole thing in memory
:raises ValueError: if there is no index
:raises KeyError: if ``section`` is not in the index
:raises IOError: if there is a conflict between the section size in
the index and the length at the start of the blob
:returns: a ``SectionReader`` wrapping the section
"""
if not self.index:
raise ValueError("No index loaded")
entry = self.index[section]
self.seek(entry.compressed_start)
size_len = struct.calcsize(V2_SIZE_FORMAT)
prefix = self.read(size_len)
blob_length, = struct.unpack(V2_SIZE_FORMAT, prefix)
if entry.compressed_end is not None and \
size_len + blob_length != entry.uncompressed_length:
raise IOError("Inconsistent section size")
if entry.checksum_method in ('md5', 'sha1', 'sha256', 'sha512'):
checksum = getattr(hashlib, entry.checksum_method)(prefix)
checksum_value = entry.checksum_value
else:
if entry.checksum_method is not None:
logging.getLogger('swift.ring').warning(
"Ignoring unsupported checksum %s:%s for section %s",
entry.checksum_method, entry.checksum_value, section)
checksum = checksum_value = None
with SectionReader(
self,
blob_length,
digest=checksum_value,
checksum=checksum,
) as reader:
yield reader
class GzipWriter(object):
def __init__(self, fileobj, filename='', mtime=1300507380.0):
self.raw_fp = fileobj
self.gzip_fp = gzip.GzipFile(
filename,
mode='wb',
fileobj=self.raw_fp,
mtime=mtime)
self.flushed = True
self.pos = 0
@classmethod
@contextlib.contextmanager
def open(cls, filename, *a, **kw):
"""
Open a compressed writer for ``filename``
Note that this also guarantees atomic writes using a temporary file
:returns: a context manager that provides a ``GzipWriter`` instance
"""
fp = tempfile.NamedTemporaryFile(
dir=os.path.dirname(filename),
prefix=os.path.basename(filename),
delete=False)
try:
with cls(fp, filename, *a, **kw) as writer:
yield writer
except BaseException:
fp.close()
os.unlink(fp.name)
raise
else:
fp.flush()
os.fsync(fp.fileno())
fp.close()
os.chmod(fp.name, 0o644)
os.rename(fp.name, filename)
def __enter__(self):
return self
def __exit__(self, e, v, t):
if e is None:
# only finalize if there was no error
self.close()
def close(self):
# This does three things:
# * Flush the underlying compressobj (with Z_FINISH) and write
# the result
# * Write the (4-byte) CRC
# * Write the (4-byte) uncompressed length
# NB: if we wrote an index, the flush writes exactly 5 bytes,
# for 13 bytes total
self.gzip_fp.close()
def write(self, data):
if not data:
return 0
self.flushed = False
self.pos += len(data)
return self.gzip_fp.write(data)
def flush(self):
"""
Ensure the gzip stream has been flushed using Z_FULL_FLUSH.
By default, the gzip module uses Z_SYNC_FLUSH; this ensures that all
data is compressed and written to the stream, but retains some state
in the compressor. A full flush, by contrast, ensures no state may
carry over, allowing a reader to seek to the end of the flush and
start reading with a fresh decompressor.
"""
if not self.flushed:
# always use full flushes; this allows us to just start reading
# at the start of any section
self.gzip_fp.flush(zlib.Z_FULL_FLUSH)
self.flushed = True
def tell(self):
"""
Return the position in the underlying (compressed) stream.
Since this is primarily useful to get a position you may seek to later
and start reading, flush the writer first.
If you want the position within the *uncompressed* stream, use the
``pos`` attribute.
"""
self.flush()
return self.raw_fp.tell()
def _set_compression_level(self, lvl):
# two valid deflate streams may be concatenated to produce another
# valid deflate stream, so finish the one stream...
self.flush()
# ... so we can start up another with whatever level we want
self.gzip_fp.compress = zlib.compressobj(
lvl, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
class RingWriter(GzipWriter):
"""
Helper for writing ring files to later be read by a ``RingReader``
This has a few key features on top of a standard ``GzipFile``:
* Helpers for writing length-value encoded BLOBs
* The ability to define named sections which will be written as
an index at the end of the file
* Flushes always use Z_FULL_FLUSH to support seeking.
Note that the index will only be written if named sections were defined.
"""
checksum_method = 'sha256'
def __init__(self, *a, **kw):
super(RingWriter, self).__init__(*a, **kw)
# index entries look like
# section: [
# compressed start,
# uncompressed start,
# compressed end,
# uncompressed end,
# checksum_method,
# checksum_value
# ]
self.index = {}
self.current_section = None
self.checksum = None
@contextlib.contextmanager
def section(self, name):
"""
Define a named section.
Return a context manager; the section contains whatever data is written
within that context.
The index will be updated to include the section and its starting
positions upon entering the context; upon exiting normally, the index
will be updated again with the ending positions and checksum
information.
"""
if self.current_section:
raise ValueError('Cannot create new section; currently writing %r'
% self.current_section)
allowed = string.ascii_letters + string.digits + '/-'
if any(c not in allowed for c in name):
raise ValueError('Section has invalid name: %s' % name)
if name in self.index:
raise ValueError('Cannot write duplicate section: %s' % name)
self.flush()
self.current_section = name
self.index[name] = IndexEntry(self.tell(), self.pos)
checksum_class = getattr(hashlib, self.checksum_method)
self.checksum = checksum_class()
try:
yield self
self.flush()
self.index[name] = dataclasses.replace(
self.index[name],
compressed_end=self.tell(),
uncompressed_end=self.pos,
checksum_method=self.checksum_method,
checksum_value=self.checksum.hexdigest(),
)
finally:
self.flush()
self.checksum = None
self.current_section = None
def write(self, data):
if self.checksum:
self.checksum.update(data)
return super().write(data)
def close(self):
if self.index:
# only write index if we made use of any sections
self.write_index()
super().close()
def write_magic(self, version):
"""
Write our file magic for identifying Swift rings.
:param version: the ring version; should be 1 or 2
"""
if self.pos != 0:
raise IOError("Magic must be written at the start of the file")
# switch to uncompressed, so libmagic can know what to expect
self._set_compression_level(0)
self.write(struct.pack("!4sH", b"R1NG", version))
self._set_compression_level(9)
def write_size(self, size, fmt=V2_SIZE_FORMAT):
"""
Write a size (often a BLOB-length, but sometimes a file offset).
:param data: the size to write
:param fmt: the struct format to use when writing the length.
All v2 BLOBs should use ``!Q``.
"""
self.write(struct.pack(fmt, size))
def write_blob(self, data, fmt=V2_SIZE_FORMAT):
"""
Write a length-value encoded BLOB.
:param data: the bytes to write
:param fmt: the struct format to use when writing the length.
All v2 BLOBs should use ``!Q``.
"""
self.write_size(len(data), fmt)
self.write(data)
def write_json(self, data, fmt=V2_SIZE_FORMAT):
"""
Write a length-value encoded JSON BLOB.
:param data: the JSON-serializable data to write
:param fmt: the struct format to use when writing the length.
All v2 BLOBs should use ``!Q``.
"""
json_data = json.dumps(data, sort_keys=True, ensure_ascii=True)
self.write_blob(json_data.encode('ascii'), fmt)
def write_ring_table(self, table):
"""
Write a length-value encoded replica2part2dev table, or similar.
Should *not* be used for v1 rings, as there's always a ``!Q`` size
prefix, and values are written in network order.
:param table: list of arrays
"""
dev_id_bytes = table[0].itemsize if table else 0
assignments = sum(len(a) for a in table)
self.write_size(assignments * dev_id_bytes)
for row in table:
with network_order_array(row):
row.tofile(self)
def write_index(self):
"""
Write the index and its starting position at the end of the file.
Callers should not need to use this themselves; it will be done
automatically when using the writer as a context manager.
"""
with self.section('swift/index'):
self.write_json({
k: dataclasses.astuple(v)
for k, v in self.index.items()
})
# switch to uncompressed
self._set_compression_level(0)
# ... which allows us to know that each of these write_size/flush pairs
# will write exactly 18 bytes to disk
self.write_size(self.index['swift/index'].uncompressed_start)
self.flush()
# This is the one we really care about in Swift code, but sometimes
# ops write their own tools and sometimes those just buffer all the
# decoded content
self.write_size(self.index['swift/index'].compressed_start)
self.flush()

View File

@@ -14,26 +14,37 @@
# limitations under the License.
import array
import contextlib
import json
from collections import defaultdict
from gzip import GzipFile
from os.path import getmtime
import struct
from time import time
import os
from itertools import chain, count
from tempfile import NamedTemporaryFile
import sys
import zlib
from swift.common.exceptions import RingLoadError
from swift.common.exceptions import RingLoadError, DevIdBytesTooSmall
from swift.common.utils import hash_path, validate_configuration, md5
from swift.common.ring.utils import tiers_for_dev
from swift.common.ring.io import RingReader, RingWriter
from swift.common.ring.utils import tiers_for_dev, BYTES_TO_TYPE_CODE
DEFAULT_RELOAD_TIME = 15
RING_CODECS = {
1: {
"serialize": lambda ring_data, writer: ring_data.serialize_v1(writer),
"deserialize": lambda cls, reader, metadata_only, _include_devices:
cls.deserialize_v1(reader, metadata_only=metadata_only),
},
2: {
"serialize": lambda ring_data, writer: ring_data.serialize_v2(writer),
"deserialize": lambda cls, reader, metadata_only, include_devices:
cls.deserialize_v2(reader, metadata_only=metadata_only,
include_devices=include_devices),
},
}
DEFAULT_RING_FORMAT_VERSION = 1
def calc_replica_count(replica2part2dev_id):
@@ -59,57 +70,6 @@ def normalize_devices(devs):
dev.setdefault('replication_port', dev['port'])
class RingReader(object):
chunk_size = 2 ** 16
def __init__(self, filename):
self.fp = open(filename, 'rb')
self._reset()
def _reset(self):
self._buffer = b''
self.size = 0
self.raw_size = 0
self._md5 = md5(usedforsecurity=False)
self._decomp = zlib.decompressobj(32 + zlib.MAX_WBITS)
@property
def close(self):
return self.fp.close
def seek(self, pos, ref=0):
if (pos, ref) != (0, 0):
raise NotImplementedError
self._reset()
return self.fp.seek(pos, ref)
def _buffer_chunk(self):
chunk = self.fp.read(self.chunk_size)
if not chunk:
return False
self.size += len(chunk)
self._md5.update(chunk)
chunk = self._decomp.decompress(chunk)
self.raw_size += len(chunk)
self._buffer += chunk
return True
def read(self, amount=-1):
if amount < 0:
raise IOError("don't be greedy")
while amount > len(self._buffer):
if not self._buffer_chunk():
break
result, self._buffer = self._buffer[:amount], self._buffer[amount:]
return result
@property
def md5(self):
return self._md5.hexdigest()
class RingData(object):
"""Partitioned consistent hashing ring data (used for serialization)."""
@@ -124,15 +84,37 @@ class RingData(object):
self._part_shift = part_shift
self.next_part_power = next_part_power
self.version = version
self.md5 = self.size = self.raw_size = None
self.format_version = None
self.size = self.raw_size = None
# Next two are used when replica2part2dev is empty
self._dev_id_bytes = 2
self._replica_count = 0
self._num_devs = sum(1 if dev is not None else 0 for dev in self.devs)
@property
def replica_count(self):
"""Number of replicas (full or partial) used in the ring."""
return calc_replica_count(self._replica2part2dev_id)
if self._replica2part2dev_id:
return calc_replica_count(self._replica2part2dev_id)
else:
return self._replica_count
@property
def part_power(self):
return 32 - self._part_shift
@property
def dev_id_bytes(self):
if self._replica2part2dev_id:
# There's an assumption that these will all have the same itemsize,
# but just in case...
return max(part2dev_id.itemsize
for part2dev_id in self._replica2part2dev_id)
else:
return self._dev_id_bytes
@classmethod
def deserialize_v1(cls, gz_file, metadata_only=False):
def deserialize_v1(cls, reader, metadata_only=False):
"""
Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`,
and `replica2part2dev_id` keys.
@@ -141,25 +123,32 @@ class RingData(object):
`replica2part2dev_id` is not loaded and that key in the returned
dictionary just has the value `[]`.
:param file gz_file: An opened file-like object which has already
consumed the 6 bytes of magic and version.
:param RingReader reader: An opened RingReader which has already
loaded the index at the end, gone back to the
front, and consumed the 6 bytes of magic and
version.
:param bool metadata_only: If True, only load `devs` and `part_shift`
:returns: A dict containing `devs`, `part_shift`, and
`replica2part2dev_id`
"""
if reader.tell() == 0:
magic = reader.read(6)
if magic != b'R1NG\x00\x01':
raise ValueError('unexpected magic: %r' % magic)
json_len, = struct.unpack('!I', gz_file.read(4))
ring_dict = json.loads(gz_file.read(json_len))
ring_dict = json.loads(reader.read_blob('!I'))
ring_dict['replica2part2dev_id'] = []
ring_dict['dev_id_bytes'] = 2
if metadata_only:
return ring_dict
byteswap = (ring_dict.get('byteorder', sys.byteorder) != sys.byteorder)
type_code = BYTES_TO_TYPE_CODE[ring_dict['dev_id_bytes']]
partition_count = 1 << (32 - ring_dict['part_shift'])
for x in range(ring_dict['replica_count']):
part2dev = array.array('H', gz_file.read(2 * partition_count))
part2dev = array.array(type_code, reader.read(2 * partition_count))
if byteswap:
part2dev.byteswap()
ring_dict['replica2part2dev_id'].append(part2dev)
@@ -167,7 +156,50 @@ class RingData(object):
return ring_dict
@classmethod
def load(cls, filename, metadata_only=False):
def deserialize_v2(cls, reader, metadata_only=False, include_devices=True):
"""
Deserialize a v2 ring file into a dictionary with ``devs``,
``part_shift``, and ``replica2part2dev_id`` keys.
If the optional kwarg ``metadata_only`` is True, then the
``replica2part2dev_id`` is not loaded and that key in the returned
dictionary just has the value ``[]``.
If the optional kwarg ``include_devices`` is False, then the ``devs``
list is not loaded and that key in the returned dictionary just has
the value ``[]``.
:param file reader: An opened file-like object which has already
consumed the 6 bytes of magic and version.
:param bool metadata_only: If True, skip loading
``replica2part2dev_id``
:param bool include_devices: If False and ``metadata_only`` is True,
skip loading ``devs``
:returns: A dict containing ``devs``, ``part_shift``,
``dev_id_bytes``, and ``replica2part2dev_id``
"""
ring_dict = json.loads(reader.read_section('swift/ring/metadata'))
ring_dict['replica2part2dev_id'] = []
ring_dict['devs'] = []
if not metadata_only or include_devices:
ring_dict['devs'] = json.loads(
reader.read_section('swift/ring/devices'))
if metadata_only:
return ring_dict
partition_count = 1 << (32 - ring_dict['part_shift'])
with reader.open_section('swift/ring/assignments') as section:
ring_dict['replica2part2dev_id'] = section.read_ring_table(
ring_dict['dev_id_bytes'], partition_count)
return ring_dict
@classmethod
def load(cls, filename, metadata_only=False, include_devices=True):
"""
Load ring data from a file.
@@ -175,32 +207,37 @@ class RingData(object):
:param bool metadata_only: If True, only load `devs` and `part_shift`.
:returns: A RingData instance containing the loaded data.
"""
with contextlib.closing(RingReader(filename)) as gz_file:
# See if the file is in the new format
magic = gz_file.read(4)
if magic != b'R1NG':
raise Exception('Bad ring magic %r for %r' % (
magic, filename))
with RingReader.open(filename) as reader:
if reader.version not in RING_CODECS:
raise Exception('Unknown ring format version %d for %r' % (
reader.version, filename))
ring_data = RING_CODECS[reader.version]['deserialize'](
cls, reader, metadata_only, include_devices)
format_version, = struct.unpack('!H', gz_file.read(2))
if format_version == 1:
ring_data = cls.deserialize_v1(
gz_file, metadata_only=metadata_only)
else:
raise Exception('Unknown ring format version %d for %r' %
(format_version, filename))
ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'],
ring_data.get('next_part_power'),
ring_data.get('version'))
for attr in ('md5', 'size', 'raw_size'):
setattr(ring_data, attr, getattr(gz_file, attr))
ring_data = cls.from_dict(ring_data)
ring_data.format_version = reader.version
for attr in ('size', 'raw_size'):
setattr(ring_data, attr, getattr(reader, attr))
return ring_data
def serialize_v1(self, file_obj):
@classmethod
def from_dict(cls, ring_data):
ring = cls(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'],
ring_data.get('next_part_power'),
ring_data.get('version'))
# For loading with metadata_only=True
if 'replica_count' in ring_data:
ring._replica_count = ring_data['replica_count']
# dev_id_bytes only written down in v2 and above
ring._dev_id_bytes = ring_data.get('dev_id_bytes', 2)
return ring
def serialize_v1(self, writer):
if self.dev_id_bytes != 2:
raise DevIdBytesTooSmall('Ring v1 only supports 2-byte dev IDs')
# Write out new-style serialization magic and version:
file_obj.write(struct.pack('!4sH', b'R1NG', 1))
writer.write_magic(version=1)
ring = self.to_dict()
# Only include next_part_power if it is set in the
@@ -216,40 +253,62 @@ class RingData(object):
if next_part_power is not None:
_text['next_part_power'] = next_part_power
json_text = json.dumps(_text, sort_keys=True,
ensure_ascii=True).encode('ascii')
json_len = len(json_text)
file_obj.write(struct.pack('!I', json_len))
file_obj.write(json_text)
for part2dev_id in ring['replica2part2dev_id']:
part2dev_id.tofile(file_obj)
writer.write_json(_text, '!I')
def save(self, filename, mtime=1300507380.0):
for part2dev_id in ring['replica2part2dev_id']:
part2dev_id.tofile(writer)
def serialize_v2(self, writer):
writer.write_magic(version=2)
ring = self.to_dict()
# Only include next_part_power if it is set in the
# builder, otherwise just ignore it
_text = {
'part_shift': ring['part_shift'],
'dev_id_bytes': ring['dev_id_bytes'],
'replica_count': calc_replica_count(ring['replica2part2dev_id']),
'version': ring['version']}
next_part_power = ring.get('next_part_power')
if next_part_power is not None:
_text['next_part_power'] = next_part_power
with writer.section('swift/ring/metadata'):
writer.write_json(_text)
with writer.section('swift/ring/devices'):
writer.write_json(ring['devs'])
with writer.section('swift/ring/assignments'):
writer.write_ring_table(ring['replica2part2dev_id'])
def save(self, filename, mtime=1300507380.0,
format_version=DEFAULT_RING_FORMAT_VERSION):
"""
Serialize this RingData instance to disk.
:param filename: File into which this instance should be serialized.
:param mtime: time used to override mtime for gzip, default or None
if the caller wants to include time
:param format_version: one of 0, 1, or 2. Older versions are retained
for the sake of clusters on older versions
"""
if format_version not in RING_CODECS:
raise ValueError("format_version must be one of %r" % (tuple(
RING_CODECS.keys()),))
# Override the timestamp so that the same ring data creates
# the same bytes on disk. This makes a checksum comparison a
# good way to see if two rings are identical.
tempf = NamedTemporaryFile(dir=".", prefix=filename, delete=False)
gz_file = GzipFile(filename, mode='wb', fileobj=tempf, mtime=mtime)
self.serialize_v1(gz_file)
gz_file.close()
tempf.flush()
os.fsync(tempf.fileno())
tempf.close()
os.chmod(tempf.name, 0o644)
os.rename(tempf.name, filename)
with RingWriter.open(filename, mtime) as writer:
RING_CODECS[format_version]['serialize'](self, writer)
def to_dict(self):
return {'devs': self.devs,
'replica2part2dev_id': self._replica2part2dev_id,
'part_shift': self._part_shift,
'next_part_power': self.next_part_power,
'dev_id_bytes': self.dev_id_bytes,
'version': self.version}
@@ -296,13 +355,13 @@ class Ring(object):
self._mtime = getmtime(self.serialized_path)
self._devs = ring_data.devs
self._dev_id_bytes = ring_data._dev_id_bytes
self._replica2part2dev_id = ring_data._replica2part2dev_id
self._part_shift = ring_data._part_shift
self._rebuild_tier_data()
self._update_bookkeeping()
self._next_part_power = ring_data.next_part_power
self._version = ring_data.version
self._md5 = ring_data.md5
self._size = ring_data.size
self._raw_size = ring_data.raw_size
@@ -340,6 +399,16 @@ class Ring(object):
self._num_zones = len(zones)
self._num_ips = len(ips)
@property
def dev_id_bytes(self):
if self._replica2part2dev_id:
# There's an assumption that these will all have the same itemsize,
# but just in case...
return max(part2dev_id.itemsize
for part2dev_id in self._replica2part2dev_id)
else:
return self._dev_id_bytes
@property
def next_part_power(self):
if time() > self._rtime:
@@ -354,10 +423,6 @@ class Ring(object):
def version(self):
return self._version
@property
def md5(self):
return self._md5
@property
def size(self):
return self._size

View File

@@ -12,16 +12,84 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import array
from collections import defaultdict
import contextlib
import optparse
import re
import socket
import sys
from swift.common import exceptions
from swift.common.utils import expand_ipv6, is_valid_ip, is_valid_ipv4, \
is_valid_ipv6
BYTES_TO_TYPE_CODE = {
# We don't support 1 byte arrays. For backwards compatibility reasons.
2: 'H',
# Note that on some platforms, array.array('I') will be limited to 2-byte
# values. At the same time, however, using 'L' would get us 8-byte values
# on many platforms we care about. Use 'I' for now; hold off on writing
# custom array (de)serialization methods until someone actually complains.
4: 'I',
# This just seems excessive; besides, array.array() only takes it on py33+
# 8: 'Q',
}
def none_dev_id(dev_id_bytes):
'''
we can't store None's in the replica2part2dev array, so we high-jack
the max value for magic to represent the part is not currently
assigned to any device.
'''
return 2 ** (8 * dev_id_bytes) - 1
def calc_dev_id_bytes(max_dev_id):
if max_dev_id < 0:
raise ValueError("Can't have negative device IDs")
for x in sorted(BYTES_TO_TYPE_CODE):
if max_dev_id < none_dev_id(x):
return x
else:
# > 4B devices??
raise exceptions.DevIdBytesTooSmall('Way too many devices!')
def resize_array(old_arr, new_dev_id_bytes):
"""
Copy an array to use a new itemsize, while preserving none_dev_id values
"""
old_none_dev = none_dev_id(old_arr.itemsize)
new_none_dev = none_dev_id(new_dev_id_bytes)
return array.array(
BYTES_TO_TYPE_CODE[new_dev_id_bytes],
(new_none_dev if dev_id == old_none_dev else dev_id
for dev_id in old_arr))
@contextlib.contextmanager
def network_order_array(arr):
if sys.byteorder == 'little':
# Switch to network-order for serialization
arr.byteswap()
try:
yield arr
finally:
if sys.byteorder == 'little':
# Didn't make a copy; switch it back
arr.byteswap()
def read_network_order_array(type_code, data):
arr = array.array(type_code, data)
if sys.byteorder == 'little':
arr.byteswap()
return arr
def tiers_for_dev(dev):
"""
Returns a tuple of tiers for a given device in ascending order by

View File

@@ -1,5 +1,5 @@
__RINGFILE__, build version 4, id (not assigned)
64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion
64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion
The minimum number of hours before a partition can be reassigned is 1 (0:00:00 remaining)
The overload factor is 0.00% (0.000000)
Ring file __RINGFILE__.ring.gz not found, probably it hasn't been written yet

View File

@@ -1,5 +1,5 @@
__RINGFILE__, build version 4, id __BUILDER_ID__
64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion
64 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion
The minimum number of hours before a partition can be reassigned is 1 (0:00:00 remaining)
The overload factor is 0.00% (0.000000)
Ring file __RINGFILE__.ring.gz not found, probably it hasn't been written yet

View File

@@ -1,5 +1,5 @@
__RINGFILE__, build version 9, id __BUILDER_ID__
64 partitions, 3.000000 replicas, 2 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion
64 partitions, 3.000000 replicas, 2 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion
The minimum number of hours before a partition can be reassigned is 1 (1:00:00 remaining)
The overload factor is 0.00% (0.000000)
Ring file __RINGFILE__.ring.gz is obsolete

View File

@@ -1,5 +1,5 @@
__RINGFILE__, build version 4, id __BUILDER_ID__
256 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 100.00 balance, 0.00 dispersion
256 partitions, 3.000000 replicas, 4 regions, 4 zones, 4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion
The minimum number of hours before a partition can be reassigned is 1 (0:00:00 remaining)
The overload factor is 0.00% (0.000000)
Ring file __RINGFILE__.ring.gz not found, probably it hasn't been written yet

View File

@@ -31,6 +31,7 @@ from swift.cli import ringbuilder
from swift.cli.ringbuilder import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
from swift.common import exceptions
from swift.common.ring import RingBuilder
from swift.common.ring.io import RingReader
from swift.common.ring.composite_builder import CompositeRingBuilder
from test.unit import Timeout, write_stub_builder
@@ -2121,7 +2122,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
expected = "%s, build version 6, id %s\n" \
"64 partitions, 3.000000 replicas, 4 regions, 4 zones, " \
"4 devices, 100.00 balance, 0.00 dispersion\n" \
"4 devices, 2-byte IDs, 100.00 balance, 0.00 dispersion\n" \
"The minimum number of hours before a partition can be " \
"reassigned is 1 (0:00:00 remaining)\n" \
"The overload factor is 0.00%% (0.000000)\n" \
@@ -2395,6 +2396,23 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
self.assertSystemExit(EXIT_ERROR, ringbuilder.main, argv)
def test_rebalance_remove_zero_weighted_device(self):
self.create_sample_ring()
ring = RingBuilder.load(self.tmpfile)
ring.set_dev_weight(2, 0.0)
ring.rebalance()
ring.pretend_min_part_hours_passed()
ring.remove_dev(2)
ring.save(self.tmpfile)
# Test rebalance after remove 0 weighted device
argv = ["", self.tmpfile, "rebalance", "3"]
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
ring = RingBuilder.load(self.tmpfile)
self.assertTrue(ring.validate())
self.assertEqual(len(ring.devs), 4)
self.assertIsNone(ring.devs[2])
def test_rebalance_remove_off_end_trims_dev_list(self):
self.create_sample_ring()
ring = RingBuilder.load(self.tmpfile)
ring.set_dev_weight(3, 0.0)
@@ -2408,7 +2426,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
ring = RingBuilder.load(self.tmpfile)
self.assertTrue(ring.validate())
self.assertIsNone(ring.devs[3])
self.assertEqual(len(ring.devs), 3)
def test_rebalance_resets_time_remaining(self):
self.create_sample_ring()
@@ -2546,12 +2564,32 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
argv = ["", self.tmpfile, "write_ring"]
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
for version in ("1", "2"):
argv = ["", self.tmpfile, "write_ring", "--format-version",
version]
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
with RingReader.open("%s.ring.gz" % self.tmpfile) as reader:
self.assertEqual(int(version), reader.version)
exp_results = {'valid_exit_codes': [EXIT_ERROR]}
out, err = self.run_srb("write_ring", "--format-version", "3",
exp_results=exp_results)
self.assertIn('invalid choice', err)
def test_write_empty_ring(self):
ring = RingBuilder(6, 3, 1)
ring.save(self.tmpfile)
exp_results = {'valid_exit_codes': [2]}
exp_results = {'valid_exit_codes': [EXIT_ERROR]}
out, err = self.run_srb("write_ring", exp_results=exp_results)
self.assertEqual('Unable to write empty ring.\n', out)
exp_out = 'Unable to write empty ring.\n'
self.assertEqual(exp_out, out[-len(exp_out):])
self.assertIn("Defaulting to --format-version=1", out)
for version in (1, 2):
out, err = self.run_srb("write_ring",
"--format-version={}".format(version),
exp_results=exp_results)
self.assertEqual(exp_out, out)
def test_write_builder(self):
# Test builder file already exists
@@ -2637,6 +2675,133 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
argv = ["", self.tmpfile + '.builder', "rebalance"]
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
def test_version_serialization_default(self):
self.create_sample_ring()
rb = RingBuilder.load(self.tmpfile)
rb.rebalance()
rd = rb.get_ring()
rd.save(self.tmpfile + ".ring.gz")
ring_file = os.path.join(os.path.dirname(self.tmpfile),
os.path.basename(self.tmpfile) + ".ring.gz")
argv = ["", ring_file, "version"]
mock_stdout = io.StringIO()
with mock.patch("sys.stdout", mock_stdout):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
expected = ("%s.ring.gz: Serialization version: 1 (2-byte IDs), "
"build version: 5\n" % self.tmpfile)
self.assertEqual(expected, mock_stdout.getvalue())
def test_version_serialization_1(self):
self.create_sample_ring()
rb = RingBuilder.load(self.tmpfile)
rb.rebalance()
rd = rb.get_ring()
rd.save(self.tmpfile + ".ring.gz", format_version=1)
ring_file = os.path.join(os.path.dirname(self.tmpfile),
os.path.basename(self.tmpfile) + ".ring.gz")
argv = ["", ring_file, "version"]
mock_stdout = io.StringIO()
with mock.patch("sys.stdout", mock_stdout):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
expected = ("%s.ring.gz: Serialization version: 1 (2-byte IDs), "
"build version: 5\n" % self.tmpfile)
self.assertEqual(expected, mock_stdout.getvalue())
def test_version_serialization_2(self):
self.create_sample_ring()
rb = RingBuilder.load(self.tmpfile)
rb.rebalance()
rd = rb.get_ring()
rd.save(self.tmpfile + ".ring.gz", format_version=2)
ring_file = os.path.join(os.path.dirname(self.tmpfile),
os.path.basename(self.tmpfile) + ".ring.gz")
argv = ["", ring_file, "version"]
mock_stdout = io.StringIO()
with mock.patch("sys.stdout", mock_stdout):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
expected = ("%s.ring.gz: Serialization version: 2 (2-byte IDs), "
"build version: 5\n" % self.tmpfile)
self.assertEqual(expected, mock_stdout.getvalue())
def test_version_from_builder_file(self):
self.create_sample_ring()
rb = RingBuilder.load(self.tmpfile)
rb.rebalance()
rd = rb.get_ring()
rd.save(self.tmpfile + ".ring.gz", format_version=2)
# read version from ring when builder file given as argument
argv = ["", self.tmpfile, "version"]
mock_stdout = io.StringIO()
with mock.patch("sys.stdout", mock_stdout):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
# output still reports ring file
expected = ("%s.ring.gz: Serialization version: 2 (2-byte IDs), "
"build version: 5\n" % self.tmpfile)
self.assertEqual(expected, mock_stdout.getvalue())
def test_version_with_builder_file_missing(self):
self.create_sample_ring()
rb = RingBuilder.load(self.tmpfile)
rb.rebalance()
rd = rb.get_ring()
rd.save(self.tmpfile + ".ring.gz", format_version=2)
# remove the builder to hit some interesting except blocks in main
os.unlink(self.tmpfile)
test_args = [
# explicit ring file version of course works when builder missing
self.tmpfile + ".ring.gz",
# even when builder file is missing you can still implicitly
# identify the ring file and read the version
self.tmpfile,
]
for path in test_args:
argv = ["", path, "version"]
mock_stdout = io.StringIO()
with mock.patch("sys.stdout", mock_stdout):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
expected = ("%s.ring.gz: Serialization version: 2 (2-byte IDs), "
"build version: 5\n" % self.tmpfile)
self.assertEqual(expected, mock_stdout.getvalue())
# but of course if the path is nonsensical we get an error
argv = ["", self.tmpfile + ".nonsense", "version"]
with self.assertRaises(FileNotFoundError):
ringbuilder.main(argv)
def test_version_from_builder_file_with_ring_missing(self):
self.create_sample_ring()
rb = RingBuilder.load(self.tmpfile)
rb.rebalance()
# Don't even bother to write the ring
test_args = [
self.tmpfile + ".ring.gz",
# If provided with the (existing) builder, we can infer the
# (nonexisting) ring
self.tmpfile,
]
for path in test_args:
argv = ["", path, "version"]
# Gotta have a ring to get the version info
with self.assertRaises(FileNotFoundError):
ringbuilder.main(argv)
def test_warn_at_risk(self):
# check that warning is generated when rebalance does not achieve
# satisfactory balance

View File

@@ -865,7 +865,7 @@ class TestRingBuilder(unittest.TestCase):
rb.add_dev({'id': 2, 'region': 0, 'zone': 2, 'weight': 1,
'ip': '127.0.0.1', 'port': 10002, 'device': 'sda1'})
self.assertFalse(rb.ever_rebalanced)
builder_file = os.path.join(self.testdir, 'test.buider')
builder_file = os.path.join(self.testdir, 'test.builder')
rb.save(builder_file)
rb = ring.RingBuilder.load(builder_file)
self.assertFalse(rb.ever_rebalanced)
@@ -2055,12 +2055,18 @@ class TestRingBuilder(unittest.TestCase):
for d in devs:
rb.add_dev(d)
rb.rebalance()
# There are so few devs, they should fit into 1 byte dev_ids but we
# store in a minimum of 2 for backwards compat.
self.assertEqual(rb.dev_id_bytes, 2)
self.assertEqual(rb._replica2part2dev[0].itemsize, 2)
builder_file = os.path.join(self.testdir, 'test_save.builder')
rb.save(builder_file)
loaded_rb = ring.RingBuilder.load(builder_file)
self.maxDiff = None
self.assertEqual(loaded_rb.to_dict(), rb.to_dict())
self.assertEqual(loaded_rb.overload, 3.14159)
self.assertEqual(loaded_rb.dev_id_bytes, 2)
self.assertEqual(loaded_rb._replica2part2dev[0].itemsize, 2)
@mock.patch('builtins.open', autospec=True)
@mock.patch('swift.common.ring.builder.pickle.dump', autospec=True)
@@ -2718,13 +2724,14 @@ class TestRingBuilder(unittest.TestCase):
# try with contiguous holes at beginning
add_dev_count = 6
rb = self._add_dev_delete_first_n(add_dev_count, add_dev_count - 3)
self.assertEqual([None, None, None, 3, 4, 5], [
None if d is None else d['id'] for d in rb.devs])
new_dev_id = rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6200, 'weight': 1.0,
'device': 'sda'})
self.assertLess(new_dev_id, add_dev_count)
# try with non-contiguous holes
# [0, 1, None, 3, 4, None]
rb2 = ring.RingBuilder(8, 3, 1)
for i in range(6):
rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1',
@@ -2735,23 +2742,33 @@ class TestRingBuilder(unittest.TestCase):
rb2.remove_dev(5)
rb2.pretend_min_part_hours_passed()
rb2.rebalance()
# List gets trimmed during rebalance
self.assertEqual([0, 1, None, 3, 4], [
None if d is None else d['id'] for d in rb2.devs])
first = rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6200, 'weight': 1.0, 'device': 'sda'})
self.assertEqual(first, 2)
self.assertEqual([0, 1, 2, 3, 4], [
None if d is None else d['id'] for d in rb2.devs])
second = rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6200, 'weight': 1.0, 'device': 'sda'})
self.assertEqual(second, 5)
self.assertEqual([0, 1, 2, 3, 4, 5], [
None if d is None else d['id'] for d in rb2.devs])
# add a new one (without reusing a hole)
third = rb2.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6200, 'weight': 1.0, 'device': 'sda'})
self.assertEqual(first, 2)
self.assertEqual(second, 5)
self.assertEqual(third, 6)
self.assertEqual([0, 1, 2, 3, 4, 5, 6], [
None if d is None else d['id'] for d in rb2.devs])
def test_reuse_of_dev_holes_with_id(self):
add_dev_count = 6
rb = self._add_dev_delete_first_n(add_dev_count, add_dev_count - 3)
self.assertEqual([None, None, None, 3, 4, 5], [
None if d is None else d['id'] for d in rb.devs])
# add specifying id
exp_new_dev_id = 2
# [dev, dev, None, dev, dev, None]
try:
new_dev_id = rb.add_dev({'id': exp_new_dev_id, 'region': 0,
'zone': 0, 'ip': '127.0.0.1',
@@ -2760,6 +2777,41 @@ class TestRingBuilder(unittest.TestCase):
self.assertEqual(new_dev_id, exp_new_dev_id)
except exceptions.DuplicateDeviceError:
self.fail("device hole not reused")
self.assertEqual([None, None, 2, 3, 4, 5], [
None if d is None else d['id'] for d in rb.devs])
def test_wide_device_limits(self):
rb = ring.RingBuilder(8, 2, 1)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6200, 'weight': 1.0, 'device': 'sda'})
new_id = 2 ** 16 - 2
rb.add_dev({'id': new_id, 'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6200, 'weight': 1.0, 'device': 'sdb'})
rb.rebalance()
self.assertEqual(rb._replica2part2dev[0].itemsize, 2)
self.assertEqual([0] + [None] * (new_id - 1) + [new_id], [
None if d is None else d['id'] for d in rb.devs])
# Special value used for removed devices in 2-byte-dev-id rings
new_id = 2 ** 16 - 1
rb.add_dev({'id': new_id, 'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6200, 'weight': 1.0, 'device': 'sdc'})
rb.rebalance()
# so we get kicked over to 4
self.assertEqual(rb._replica2part2dev[0].itemsize, 4)
self.assertEqual([0] + [None] * (new_id - 2) + [new_id - 1, new_id], [
None if d is None else d['id'] for d in rb.devs])
class TestPartPowerIncrease(unittest.TestCase):
FORMAT_VERSION = 1
def setUp(self):
self.testdir = mkdtemp()
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
def test_prepare_increase_partition_power(self):
ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz')
@@ -2788,7 +2840,7 @@ class TestRingBuilder(unittest.TestCase):
# Save .ring.gz, and load ring from it to ensure prev/next is set
rd = rb.get_ring()
rd.save(ring_file)
rd.save(ring_file, format_version=self.FORMAT_VERSION)
r = ring.Ring(ring_file)
expected_part_shift = 32 - 8
@@ -2809,7 +2861,7 @@ class TestRingBuilder(unittest.TestCase):
# Let's save the ring, and get the nodes for an object
ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz')
rd = rb.get_ring()
rd.save(ring_file)
rd.save(ring_file, format_version=self.FORMAT_VERSION)
r = ring.Ring(ring_file)
old_part, old_nodes = r.get_nodes("acc", "cont", "obj")
old_version = rb.version
@@ -2828,7 +2880,7 @@ class TestRingBuilder(unittest.TestCase):
old_ring = r
rd = rb.get_ring()
rd.save(ring_file)
rd.save(ring_file, format_version=self.FORMAT_VERSION)
r = ring.Ring(ring_file)
new_part, new_nodes = r.get_nodes("acc", "cont", "obj")
@@ -2900,7 +2952,7 @@ class TestRingBuilder(unittest.TestCase):
# Save .ring.gz, and load ring from it to ensure prev/next is set
rd = rb.get_ring()
rd.save(ring_file)
rd.save(ring_file, format_version=self.FORMAT_VERSION)
r = ring.Ring(ring_file)
expected_part_shift = 32 - 9
@@ -2969,6 +3021,10 @@ class TestRingBuilder(unittest.TestCase):
self.assertEqual(rb.version, old_version + 2)
class TestPartPowerIncreaseV2(TestPartPowerIncrease):
FORMAT_VERSION = 2
class TestGetRequiredOverload(unittest.TestCase):
maxDiff = None

View File

@@ -36,7 +36,8 @@ def make_device_iter():
x = 0
base_port = 6000
while True:
yield {'region': 0, # Note that region may be replaced on the tests
yield {'id': 200 + x,
'region': 0, # Note that region may be replaced on the tests
'zone': 0,
'ip': '10.0.0.%s' % x,
'replication_ip': '10.0.0.%s' % x,
@@ -242,7 +243,7 @@ class TestCompositeBuilder(BaseTestCompositeBuilder):
def test_composite_same_device_in_the_different_rings_error(self):
builders = self.create_sample_ringbuilders(2)
same_device = copy.deepcopy(builders[0].devs[0])
same_device = copy.deepcopy(builders[0].devs[200])
# create one more ring which duplicates a device in the first ring
builder = RingBuilder(6, 3, 1)
@@ -987,7 +988,7 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
c = Counter(builder.devs[dev_id]['id']
for part2dev_id in builder._replica2part2dev
for dev_id in part2dev_id)
return [c[d['id']] for d in builder.devs]
return [c[d['id']] for d in builder.devs if d]
def get_moved_parts(self, after, before):
def uniqueness(dev):

View File

@@ -0,0 +1,284 @@
# Copyright (c) 2022 NVIDIA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import dataclasses
import io
import json
import os.path
import unittest
from unittest import mock
import zlib
from swift.common.ring.io import IndexEntry, RingReader, RingWriter
from test.unit import with_tempdir
class TestRoundTrip(unittest.TestCase):
def assertRepeats(self, data, pattern, n):
l = len(pattern)
self.assertEqual(len(data), n * l)
actual = collections.Counter(
data[x * l:(x + 1) * l]
for x in range(n))
self.assertEqual(actual, {pattern: n})
@with_tempdir
def test_write_failure(self, tempd):
tempf = os.path.join(tempd, 'not-persisted')
try:
with RingWriter.open(tempf):
self.assertEqual(1, len(os.listdir(tempd)))
raise RuntimeError
except RuntimeError:
pass
self.assertEqual(0, len(os.listdir(tempd)))
def test_arbitrary_bytes(self):
buf = io.BytesIO()
with RingWriter(buf) as writer:
# Still need to write good magic, or we won't be able to read
writer.write_magic(1)
# but after that, we can kinda do whatever
writer.write(b'\xde\xad\xbe\xef' * 10240)
writer.write(b'\xda\x7a\xda\x7a' * 10240)
good_pos = writer.tell()
self.assertTrue(writer.flushed)
pos = writer.raw_fp.tell()
writer.write(b'')
self.assertTrue(writer.flushed)
self.assertEqual(pos, writer.raw_fp.tell())
writer.write(b'more' * 10240)
self.assertFalse(writer.flushed)
buf.seek(0)
reader = RingReader(buf)
self.assertEqual(reader.version, 1)
self.assertEqual(reader.raw_size, 6 + 12 * 10240)
self.assertEqual(reader.read(6), b'R1NG\x00\x01')
self.assertRepeats(reader.read(40960), b'\xde\xad\xbe\xef', 10240)
self.assertRepeats(reader.read(40960), b'\xda\x7a\xda\x7a', 10240)
self.assertRepeats(reader.read(40960), b'more', 10240)
# Can seek backwards
reader.seek(good_pos)
self.assertRepeats(reader.read(40960), b'more', 10240)
# Even all the way to the beginning
reader.seek(0)
self.assertEqual(reader.read(6), b'R1NG\x00\x01')
self.assertRepeats(reader.read(40960), b'\xde\xad\xbe\xef', 10240)
# but not arbitrarily
reader.seek(good_pos - 100)
with self.assertRaises(zlib.error):
reader.read(1)
def test_sections(self):
buf = io.BytesIO()
with RingWriter(buf) as writer:
writer.write_magic(2)
with writer.section('foo'):
writer.write_blob(b'\xde\xad\xbe\xef' * 10240)
with writer.section('bar'):
# Sometimes you might not want to get the whole section into
# memory as a byte-string all at once (eg, when writing ring
# assignments)
writer.write_size(40960)
for _ in range(10):
writer.write(b'\xda\x7a\xda\x7a' * 1024)
with writer.section('baz'):
writer.write_blob(b'more' * 10240)
# Can't nest sections
with self.assertRaises(ValueError):
with writer.section('inner'):
pass
self.assertNotIn('inner', writer.index)
writer.write(b'can add arbitrary bytes')
# ...though accessing them on read may be difficult; see below.
# This *is not* a recommended pattern -- write proper length-value
# blobs instead (even if you don't include them as sections in the
# index).
with writer.section('quux'):
writer.write_blob(b'data' * 10240)
# Gotta do this at the start
with self.assertRaises(IOError):
writer.write_magic(2)
# Can't write duplicate sections
with self.assertRaises(ValueError):
with writer.section('foo'):
pass
# We're reserving globs, so we can later support something like
# reader.load_sections('swift/ring/*')
with self.assertRaises(ValueError):
with writer.section('foo*'):
pass
buf.seek(0)
reader = RingReader(buf)
self.assertEqual(reader.version, 2)
# Order matters!
self.assertEqual(list(reader.index), [
'foo', 'bar', 'baz', 'quux', 'swift/index'])
self.assertEqual({
k: (v.uncompressed_start, v.uncompressed_end, v.checksum_method)
for k, v in reader.index.items()
}, {
'foo': (6, 40974, 'sha256'),
'bar': (40974, 81942, 'sha256'),
'baz': (81942, 122910, 'sha256'),
# note the gap between baz and quux for the raw bytes
'quux': (122933, 163901, 'sha256'),
'swift/index': (163901, None, None),
})
self.assertIn('foo', reader)
self.assertNotIn('inner', reader)
self.assertRepeats(reader.read_section('foo'),
b'\xde\xad\xbe\xef', 10240)
with reader.open_section('bar') as s:
for _ in range(10):
self.assertEqual(s.read(4), b'\xda\x7a\xda\x7a')
self.assertRepeats(s.read(), b'\xda\x7a\xda\x7a', 10230)
# If you know that one section follows another, you don't *have*
# to "open" the next one
self.assertRepeats(reader.read_blob(), b'more', 10240)
self.assertRepeats(reader.read_section('quux'),
b'data', 10240)
index_dict = json.loads(reader.read_section('swift/index'))
self.assertEqual(reader.index, {
section: IndexEntry(*entry)
for section, entry in index_dict.items()})
# Missing section
with self.assertRaises(KeyError) as caught:
with reader.open_section('foobar'):
pass
self.assertEqual("'foobar'", str(caught.exception))
# seek to the end of baz
reader.seek(reader.index['baz'].compressed_end)
# so we can read the raw bytes we stuffed in
gap_length = (reader.index['quux'].uncompressed_start -
reader.index['baz'].uncompressed_end)
self.assertGreater(gap_length, 0)
self.assertEqual(b'can add arbitrary bytes',
reader.read(gap_length))
def test_sections_with_corruption(self):
buf = io.BytesIO()
with RingWriter(buf) as writer:
writer.write_magic(2)
with writer.section('foo'):
writer.write_blob(b'\xde\xad\xbe\xef' * 10240)
buf.seek(0)
reader = RingReader(buf)
# if you open a section, you better read it all!
read_bytes = b''
with self.assertRaises(ValueError) as caught:
with reader.open_section('foo') as s:
read_bytes = s.read(4)
self.assertEqual(
'Incomplete read; expected 40956 more bytes to be read',
str(caught.exception))
self.assertEqual(b'\xde\xad\xbe\xef', read_bytes)
# if there's a digest mismatch, you can read data, but it'll
# throw an error on close
self.assertEqual('sha256', reader.index['foo'].checksum_method)
self.assertEqual(
'c51d6703d54cd7cf57b4d4b7ecfcca60'
'56dbd41ebf1c1e83c0e8e48baeff629a',
reader.index['foo'].checksum_value)
reader.index['foo'] = dataclasses.replace(
writer.index['foo'],
checksum_value='not-the-sha',
)
read_bytes = b''
with self.assertRaises(ValueError) as caught:
with reader.open_section('foo') as s:
read_bytes = s.read()
self.assertIn('Hash mismatch in block: ', str(caught.exception))
self.assertRepeats(read_bytes, b'\xde\xad\xbe\xef', 10240)
@mock.patch('logging.getLogger')
def test_sections_with_unsupported_checksum(self, mock_logging):
buf = io.BytesIO()
with RingWriter(buf) as writer:
writer.write_magic(2)
with writer.section('foo'):
writer.write_blob(b'\xde\xad\xbe\xef')
writer.index['foo'] = dataclasses.replace(
writer.index['foo'],
checksum_method='not_a_digest',
checksum_value='do not care',
)
buf.seek(0)
reader = RingReader(buf)
with reader.open_section('foo') as s:
read_bytes = s.read(4)
self.assertEqual(b'\xde\xad\xbe\xef', read_bytes)
self.assertEqual(mock_logging.mock_calls, [
mock.call('swift.ring'),
mock.call('swift.ring').warning(
'Ignoring unsupported checksum %s:%s for section %s',
'not_a_digest', mock.ANY, 'foo'),
])
def test_recompressed(self):
buf = io.BytesIO()
with RingWriter(buf) as writer:
writer.write_magic(2)
with writer.section('foo'):
writer.write_blob(b'\xde\xad\xbe\xef' * 10240)
buf.seek(0)
reader = RingReader(buf)
with self.assertRaises(IOError):
reader.read(-1) # don't be greedy
uncompressed_bytes = reader.read(2 ** 20)
buf = io.BytesIO()
with RingWriter(buf) as writer:
writer.write(uncompressed_bytes)
buf.seek(0)
with self.assertRaises(IOError):
# ...but we can't read it
RingReader(buf)
def test_version_too_high(self):
buf = io.BytesIO()
with RingWriter(buf) as writer:
# you can write it...
writer.write_magic(3)
with writer.section('foo'):
writer.write_blob(b'\xde\xad\xbe\xef' * 10240)
buf.seek(0)
with self.assertRaises(ValueError):
# ...but we can't read it
RingReader(buf)

View File

@@ -15,19 +15,23 @@
import array
import collections
from gzip import GzipFile
import json
import os
import unittest
import stat
import struct
from tempfile import mkdtemp
from shutil import rmtree
from time import sleep, time
import sys
import copy
from unittest import mock
import zlib
from swift.common.exceptions import DevIdBytesTooSmall
from swift.common import ring, utils
from swift.common.ring import utils as ring_utils
from swift.common.utils import md5
from swift.common.ring import io, utils as ring_utils
class TestRingBase(unittest.TestCase):
@@ -52,13 +56,19 @@ class TestRingData(unittest.TestCase):
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
def assert_ring_data_equal(self, rd_expected, rd_got):
self.assertEqual(rd_expected._replica2part2dev_id,
rd_got._replica2part2dev_id)
def assert_ring_data_equal(self, rd_expected, rd_got, metadata_only=False):
self.assertEqual(rd_expected.devs, rd_got.devs)
self.assertEqual(rd_expected._part_shift, rd_got._part_shift)
self.assertEqual(rd_expected.next_part_power, rd_got.next_part_power)
self.assertEqual(rd_expected.version, rd_got.version)
self.assertEqual(rd_expected.dev_id_bytes, rd_got.dev_id_bytes)
self.assertEqual(rd_expected.replica_count, rd_got.replica_count)
if metadata_only:
self.assertEqual([], rd_got._replica2part2dev_id)
else:
self.assertEqual(rd_expected._replica2part2dev_id,
rd_got._replica2part2dev_id)
def test_attrs(self):
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
@@ -82,12 +92,10 @@ class TestRingData(unittest.TestCase):
],
30)
rd.save(ring_fname)
meta_only = ring.RingData.load(ring_fname, metadata_only=True)
self.assertEqual([
{'id': 0, 'zone': 0, 'region': 1},
{'id': 1, 'zone': 1, 'region': 1},
], meta_only.devs)
self.assertEqual([], meta_only._replica2part2dev_id)
self.assert_ring_data_equal(rd, meta_only, metadata_only=True)
rd2 = ring.RingData.load(ring_fname)
self.assert_ring_data_equal(rd, rd2)
@@ -98,19 +106,11 @@ class TestRingData(unittest.TestCase):
[{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30)
rd.save(ring_fname)
class MockReader(ring.ring.RingReader):
calls = []
def close(self):
self.calls.append(('close', self.fp))
return super(MockReader, self).close()
with mock.patch('swift.common.ring.ring.RingReader',
MockReader) as mock_reader:
with mock.patch('swift.common.ring.io.open',
return_value=open(ring_fname, 'rb')) as mock_open:
self.assertFalse(mock_open.return_value.closed) # sanity
ring.RingData.load(ring_fname)
self.assertEqual([('close', mock.ANY)], mock_reader.calls)
self.assertTrue(mock_reader.calls[0][1].closed)
self.assertTrue(mock_open.return_value.closed)
def test_byteswapped_serialization(self):
# Manually byte swap a ring and write it out, claiming it was written
@@ -129,7 +129,9 @@ class TestRingData(unittest.TestCase):
rds = ring.RingData(swapped_data,
[{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}],
30)
rds.save(ring_fname)
# note that this can only be an issue for v1 rings;
# v2 rings always write network order
rds.save(ring_fname, format_version=1)
rd1 = ring.RingData(data, [{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}],
30)
@@ -183,8 +185,263 @@ class TestRingData(unittest.TestCase):
30)
self.assertEqual(rd.replica_count, 1.75)
def test_deserialize_v1(self):
# First save it as a ring v2 and then try and load it using
# deserialize_v1
ring_fname = os.path.join(self.testdir, 'foo.ring.gz')
rd = ring.RingData(
[[0, 1, 0, 1], [0, 1, 0, 1]],
[{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0',
'port': 7000},
{'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1',
'port': 7000}],
30)
rd.save(ring_fname, format_version=2)
with self.assertRaises(ValueError) as err:
ring.RingData.deserialize_v1(io.RingReader(open(ring_fname, 'rb')))
self.assertIn("unexpected magic:", str(err.exception))
# Now let's save it as v1 then load it up metadata_only
rd.save(ring_fname, format_version=1)
loaded_rd = ring.RingData.deserialize_v1(
io.RingReader(open(ring_fname, 'rb')),
metadata_only=True)
self.assertTrue(loaded_rd['byteorder'])
expected_devs = [
{'id': 0, 'ip': '10.1.1.0', 'port': 7000, 'region': 1, 'zone': 0,
'replication_ip': '10.1.1.0', 'replication_port': 7000},
{'id': 1, 'ip': '10.1.1.1', 'port': 7000, 'region': 1, 'zone': 1,
'replication_ip': '10.1.1.1', 'replication_port': 7000}]
self.assertEqual(loaded_rd['devs'], expected_devs)
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
# but there is no replica2part2dev table
self.assertFalse(loaded_rd['replica2part2dev_id'])
# But if we load it up with metadata_only = false
loaded_rd = ring.RingData.deserialize_v1(
io.RingReader(open(ring_fname, 'rb')))
self.assertTrue(loaded_rd['byteorder'])
self.assertEqual(loaded_rd['devs'], expected_devs)
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
self.assertTrue(loaded_rd['replica2part2dev_id'])
def test_deserialize_v2(self):
# First save it as a ring v1 and then try and load it using
# deserialize_v2
ring_fname = os.path.join(self.testdir, 'foo.ring.gz')
rd = ring.RingData(
[[0, 1, 0, 1], [0, 1, 0, 1]],
[{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0',
'port': 7000},
{'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1',
'port': 7000}],
30)
rd.save(ring_fname, format_version=2)
loaded_rd = ring.RingData.deserialize_v2(
io.RingReader(open(ring_fname, 'rb')),
metadata_only=True,
include_devices=False)
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
# minimum size we use is 2 byte dev ids
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
# but there is no replica2part2dev table or devs
self.assertFalse(loaded_rd['devs'])
self.assertFalse(loaded_rd['replica2part2dev_id'])
# Next we load it up with metadata and devs only
loaded_rd = ring.RingData.deserialize_v2(
io.RingReader(open(ring_fname, 'rb')),
metadata_only=True)
expected_devs = [
{'id': 0, 'ip': '10.1.1.0', 'port': 7000, 'region': 1, 'zone': 0,
'replication_ip': '10.1.1.0', 'replication_port': 7000},
{'id': 1, 'ip': '10.1.1.1', 'port': 7000, 'region': 1, 'zone': 1,
'replication_ip': '10.1.1.1', 'replication_port': 7000}]
self.assertEqual(loaded_rd['devs'], expected_devs)
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
self.assertFalse(loaded_rd['replica2part2dev_id'])
# But if we load it up with metadata_only = false
loaded_rd = ring.RingData.deserialize_v2(
io.RingReader(open(ring_fname, 'rb')))
self.assertEqual(loaded_rd['devs'], expected_devs)
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
self.assertTrue(loaded_rd['replica2part2dev_id'])
def test_load(self):
rd = ring.RingData(
[[0, 1, 0, 1], [0, 1, 0, 1]],
[{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0',
'port': 7000},
{'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1',
'port': 7000}],
30)
ring_fname_1 = os.path.join(self.testdir, 'foo-1.ring.gz')
ring_fname_2 = os.path.join(self.testdir, 'foo-2.ring.gz')
ring_fname_bad_version = os.path.join(self.testdir, 'foo-bar.ring.gz')
rd.save(ring_fname_1, format_version=1)
rd.save(ring_fname_2, format_version=2)
with io.RingWriter.open(ring_fname_bad_version) as writer:
writer.write_magic(5)
with writer.section('foo'):
writer.write_blob(b'\xde\xad\xbe\xef' * 10240)
# Loading the bad ring will fail because it's an unknown version
with self.assertRaises(Exception) as ex:
ring.RingData.load(ring_fname_bad_version)
self.assertEqual(
f'Unsupported ring version: 5 for {ring_fname_bad_version!r}',
str(ex.exception))
orig_load_index = io.RingReader.load_index
def mock_load_index(cls):
cls.version = 5
orig_load_index(cls)
with mock.patch('swift.common.ring.io.RingReader.load_index',
mock_load_index):
with self.assertRaises(Exception) as ex:
ring.RingData.load(ring_fname_1)
self.assertEqual(
f'Unknown ring format version 5 for {ring_fname_1!r}',
str(ex.exception))
expected_r2p2d = [
array.array('H', [0, 1, 0, 1]),
array.array('H', [0, 1, 0, 1])]
expected_rd_dict = {
'devs': [
{'id': 0, 'region': 1, 'zone': 0,
'ip': '10.1.1.0', 'port': 7000,
'replication_ip': '10.1.1.0', 'replication_port': 7000},
{'id': 1, 'zone': 1, 'region': 1,
'ip': '10.1.1.1', 'port': 7000,
'replication_ip': '10.1.1.1', 'replication_port': 7000}],
'replica2part2dev_id': expected_r2p2d,
'part_shift': 30,
'next_part_power': None,
'dev_id_bytes': 2,
'version': None}
# version 2
loaded_rd = ring.RingData.load(ring_fname_2)
self.assertEqual(loaded_rd.to_dict(), expected_rd_dict)
# version 1
loaded_rd = ring.RingData.load(ring_fname_1)
self.assertEqual(loaded_rd.to_dict(), expected_rd_dict)
def test_load_metadata_only(self):
rd = ring.RingData(
[[0, 1, 0, 1], [0, 1, 0, 1]],
[{'id': 0, 'region': 1, 'zone': 0, 'ip': '10.1.1.0',
'port': 7000},
{'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1',
'port': 7000}],
30)
ring_fname_1 = os.path.join(self.testdir, 'foo-1.ring.gz')
ring_fname_2 = os.path.join(self.testdir, 'foo-2.ring.gz')
ring_fname_bad_version = os.path.join(self.testdir, 'foo-bar.ring.gz')
rd.save(ring_fname_1, format_version=1)
rd.save(ring_fname_2, format_version=2)
with io.RingWriter.open(ring_fname_bad_version) as writer:
writer.write_magic(5)
with writer.section('foo'):
writer.write_blob(b'\xde\xad\xbe\xef' * 10240)
# Loading the bad ring will fail because it's an unknown version
with self.assertRaises(Exception) as ex:
ring.RingData.load(ring_fname_bad_version)
self.assertEqual(
f'Unsupported ring version: 5 for {ring_fname_bad_version!r}',
str(ex.exception))
orig_load_index = io.RingReader.load_index
def mock_load_index(cls):
cls.version = 5
orig_load_index(cls)
with mock.patch('swift.common.ring.io.RingReader.load_index',
mock_load_index):
with self.assertRaises(Exception) as ex:
ring.RingData.load(ring_fname_1)
self.assertEqual(
f'Unknown ring format version 5 for {ring_fname_1!r}',
str(ex.exception))
expected_rd_dict = {
'devs': [
{'id': 0, 'region': 1, 'zone': 0,
'ip': '10.1.1.0', 'port': 7000,
'replication_ip': '10.1.1.0', 'replication_port': 7000},
{'id': 1, 'zone': 1, 'region': 1,
'ip': '10.1.1.1', 'port': 7000,
'replication_ip': '10.1.1.1', 'replication_port': 7000}],
'replica2part2dev_id': [],
'part_shift': 30,
'next_part_power': None,
'dev_id_bytes': 2,
'version': None}
# version 2
loaded_rd = ring.RingData.load(ring_fname_2, metadata_only=True)
self.assertEqual(loaded_rd.to_dict(), expected_rd_dict)
# version 1
loaded_rd = ring.RingData.load(ring_fname_1, metadata_only=True)
self.assertEqual(loaded_rd.to_dict(), expected_rd_dict)
def test_save(self):
ring_fname = os.path.join(self.testdir, 'foo.ring.gz')
rd = ring.RingData(
[[0, 1, 0, 1], [0, 1, 0, 1]],
[{'id': 0, 'zone': 0, 'ip': '10.1.1.0', 'port': 7000},
{'id': 1, 'zone': 1, 'ip': '10.1.1.1', 'port': 7000}],
30)
# First test the supported versions
for version in (1, 2):
rd.save(ring_fname, format_version=version)
# Now try an unknown version
with self.assertRaises(ValueError) as err:
for version in (3, None, "some version"):
rd.save(ring_fname, format_version=version)
self.assertEqual("format_version must be one of (1, 2)",
str(err.exception))
# re-serialisation is already handled in test_load.
def test_save_bad_dev_id_bytes(self):
ring_fname = os.path.join(self.testdir, 'foo.ring.gz')
rd = ring.RingData(
[array.array('I', [0, 1, 0, 1]), array.array('I', [0, 1, 0, 1])],
[{'id': 0, 'zone': 0, 'ip': '10.1.1.0', 'port': 7000},
{'id': 1, 'zone': 1, 'ip': '10.1.1.1', 'port': 7000}],
30)
# v2 ring can handle wide devices fine
rd.save(ring_fname, format_version=2)
# but not v1! Only 2-byte dev ids there!
with self.assertRaises(DevIdBytesTooSmall):
rd.save(ring_fname, format_version=1)
class TestRing(TestRingBase):
FORMAT_VERSION = 1
def setUp(self):
super(TestRing, self).setUp()
@@ -213,9 +470,10 @@ class TestRing(TestRingBase):
'replication_port': 6066}]
self.intended_part_shift = 30
self.intended_reload_time = 15
ring.RingData(
rd = ring.RingData(
self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
self.intended_devs, self.intended_part_shift)
rd.save(self.testgz, format_version=self.FORMAT_VERSION)
self.ring = ring.Ring(
self.testdir,
reload_time=self.intended_reload_time, ring_name='whatever')
@@ -234,12 +492,9 @@ class TestRing(TestRingBase):
self.assertIsNone(self.ring.version)
with open(self.testgz, 'rb') as fp:
expected_md5 = md5(usedforsecurity=False)
expected_size = 0
for chunk in iter(lambda: fp.read(2 ** 16), b''):
expected_md5.update(chunk)
expected_size += len(chunk)
self.assertEqual(self.ring.md5, expected_md5.hexdigest())
self.assertEqual(self.ring.size, expected_size)
# test invalid endcap
@@ -269,7 +524,8 @@ class TestRing(TestRingBase):
'ip': '10.1.1.1', 'port': 9876})
ring.RingData(
self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
self.intended_devs, self.intended_part_shift,
).save(self.testgz, format_version=self.FORMAT_VERSION)
sleep(0.1)
self.ring.get_nodes('a')
self.assertEqual(len(self.ring.devs), 6)
@@ -285,7 +541,8 @@ class TestRing(TestRingBase):
'ip': '10.5.5.5', 'port': 9876})
ring.RingData(
self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
self.intended_devs, self.intended_part_shift,
).save(self.testgz, format_version=self.FORMAT_VERSION)
sleep(0.1)
self.ring.get_part_nodes(0)
self.assertEqual(len(self.ring.devs), 7)
@@ -302,7 +559,8 @@ class TestRing(TestRingBase):
'ip': '10.6.6.6', 'port': 6200})
ring.RingData(
self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
self.intended_devs, self.intended_part_shift,
).save(self.testgz, format_version=self.FORMAT_VERSION)
sleep(0.1)
next(self.ring.get_more_nodes(part))
self.assertEqual(len(self.ring.devs), 8)
@@ -318,7 +576,8 @@ class TestRing(TestRingBase):
'ip': '10.5.5.5', 'port': 6200})
ring.RingData(
self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz)
self.intended_devs, self.intended_part_shift,
).save(self.testgz, format_version=self.FORMAT_VERSION)
sleep(0.1)
self.assertEqual(len(self.ring.devs), 9)
self.assertNotEqual(self.ring._mtime, orig_mtime)
@@ -357,7 +616,8 @@ class TestRing(TestRingBase):
testgz = os.path.join(self.testdir, 'without_replication.ring.gz')
ring.RingData(
self.intended_replica2part2dev_id,
replication_less_devs, self.intended_part_shift).save(testgz)
replication_less_devs, self.intended_part_shift,
).save(testgz, format_version=self.FORMAT_VERSION)
self.ring = ring.Ring(
self.testdir,
reload_time=self.intended_reload_time,
@@ -508,7 +768,7 @@ class TestRing(TestRingBase):
'device': "d%s" % device})
next_dev_id += 1
rb.rebalance(seed=43)
rb.get_ring().save(self.testgz)
rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION)
r = ring.Ring(self.testdir, ring_name='whatever')
# every part has the same number of handoffs
@@ -555,7 +815,7 @@ class TestRing(TestRingBase):
next_dev_id += 1
rb.pretend_min_part_hours_passed()
num_parts_changed, _balance, _removed_dev = rb.rebalance(seed=43)
rb.get_ring().save(self.testgz)
rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION)
r = ring.Ring(self.testdir, ring_name='whatever')
# so now we expect the device list to be longer by one device
@@ -603,7 +863,7 @@ class TestRing(TestRingBase):
# Remove a device - no need to fluff min_part_hours.
rb.remove_dev(0)
num_parts_changed, _balance, _removed_dev = rb.rebalance(seed=87)
rb.get_ring().save(self.testgz)
rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION)
r = ring.Ring(self.testdir, ring_name='whatever')
# so now we expect the device list to be shorter by one device
@@ -673,7 +933,7 @@ class TestRing(TestRingBase):
# Add a partial replica
rb.set_replicas(3.5)
num_parts_changed, _balance, _removed_dev = rb.rebalance(seed=164)
rb.get_ring().save(self.testgz)
rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION)
r = ring.Ring(self.testdir, ring_name='whatever')
# Change expectations
@@ -791,7 +1051,7 @@ class TestRing(TestRingBase):
rb.rebalance(seed=1)
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=1)
rb.get_ring().save(self.testgz)
rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION)
r = ring.Ring(self.testdir, ring_name='whatever')
# There's 5 regions now, so the primary nodes + first 2 handoffs
@@ -861,7 +1121,7 @@ class TestRing(TestRingBase):
dev['weight'] = 1.0
rb.add_dev(dev)
rb.rebalance()
rb.get_ring().save(self.testgz)
rb.get_ring().save(self.testgz, format_version=self.FORMAT_VERSION)
r = ring.Ring(self.testdir, ring_name='whatever')
self.assertEqual(r.version, rb.version)
@@ -921,5 +1181,164 @@ class TestRing(TestRingBase):
histogram)
class TestRingV2(TestRing):
FORMAT_VERSION = 2
def test_4_byte_dev_ids(self):
ring_file = os.path.join(self.testdir, 'test.ring.gz')
index = {}
with GzipFile(ring_file, 'wb') as fp:
fp.write(b'R1NG\x00\x02')
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/ring/metadata'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
meta = json.dumps({
"dev_id_bytes": 4,
"part_shift": 29,
"replica_count": 1.5,
}).encode('ascii')
fp.write(struct.pack('!Q', len(meta)) + meta)
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/ring/devices'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
devs = json.dumps([
{"id": 0, "region": 1, "zone": 1, "ip": "127.0.0.1",
"port": 6200, "device": "sda", "weight": 1},
None,
{"id": 2, "region": 1, "zone": 1, "ip": "127.0.0.1",
"port": 6201, "device": "sdb", "weight": 1},
{"id": 3, "region": 1, "zone": 1, "ip": "127.0.0.1",
"port": 6202, "device": "sdc", "weight": 1},
]).encode('ascii')
fp.write(struct.pack('!Q', len(devs)) + devs)
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/ring/assignments'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
fp.write(struct.pack('!Q', 48) + 4 * (
b'\x00\x00\x00\x03'
b'\x00\x00\x00\x02'
b'\x00\x00\x00\x00'))
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/index'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
blob = json.dumps(index).encode('ascii')
fp.write(struct.pack('!Q', len(blob)) + blob)
fp.flush(zlib.Z_FULL_FLUSH)
fp.compress = zlib.compressobj(
0, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
fp.write(struct.pack('!Q', index['swift/index'][0]))
fp.flush(zlib.Z_FULL_FLUSH)
r = ring.Ring(ring_file)
self.assertEqual(
[[d['id'] for d in r.get_part_nodes(p)] for p in range(8)],
[[3, 0], [2, 3], [0, 2], [3, 0], [2], [0], [3], [2]])
class ExtendedRingData(ring.RingData):
extra = b'some super-specific data'
def to_dict(self):
ring_data = super().to_dict()
ring_data.setdefault('extra', self.extra)
return ring_data
def serialize_v2(self, writer):
super().serialize_v2(writer)
with writer.section('my-custom-section') as s:
s.write_blob(self.extra)
@classmethod
def deserialize_v2(cls, reader, *args, **kwargs):
ring_data = super().deserialize_v2(reader, *args, **kwargs)
# If you're adding custom data to your rings, you probably want an
# upgrade story that includes that data not being present
if 'my-custom-section' in reader.index:
with reader.open_section('my-custom-section') as s:
ring_data['extra'] = s.read()
return ring_data
@classmethod
def from_dict(cls, ring_data):
obj = super().from_dict(ring_data)
obj.extra = ring_data.get('extra')
return obj
class TestRingExtensibility(unittest.TestCase):
def test(self):
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000},
{'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000}]
s = 30
rd = ExtendedRingData(r2p2d, d, s)
self.assertEqual(rd._replica2part2dev_id, r2p2d)
self.assertEqual(rd.devs, d)
self.assertEqual(rd._part_shift, s)
self.assertEqual(rd.extra, b'some super-specific data')
# Can update it and round-trip to disk and back
rd.extra = b'some other value'
testdir = mkdtemp()
try:
ring_fname = os.path.join(testdir, 'foo.ring.gz')
rd.save(ring_fname, format_version=2)
bytes_written = os.path.getsize(ring_fname)
rd2 = ExtendedRingData.load(ring_fname)
# Vanilla Swift can also read the custom ring
vanilla_ringdata = ring.RingData.load(ring_fname)
finally:
rmtree(testdir, ignore_errors=1)
self.assertEqual(rd2._replica2part2dev_id, r2p2d)
self.assertEqual(rd2.devs, d)
self.assertEqual(rd2._part_shift, s)
self.assertEqual(rd2.extra, b'some other value')
self.assertEqual(rd2.size, bytes_written)
self.assertEqual(vanilla_ringdata._replica2part2dev_id, r2p2d)
self.assertEqual(vanilla_ringdata.devs, d)
self.assertEqual(vanilla_ringdata._part_shift, s)
self.assertFalse(hasattr(vanilla_ringdata, 'extra'))
self.assertEqual(vanilla_ringdata.size, bytes_written)
def test_missing_custom_data(self):
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000},
{'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000}]
s = 30
rd = ring.RingData(r2p2d, d, s)
self.assertEqual(rd._replica2part2dev_id, r2p2d)
self.assertEqual(rd.devs, d)
self.assertEqual(rd._part_shift, s)
self.assertFalse(hasattr(rd, 'extra'))
# Can load a vanilla ring and get some default behavior based on the
# overridden from_dict
testdir = mkdtemp()
try:
ring_fname = os.path.join(testdir, 'foo.ring.gz')
rd.save(ring_fname, format_version=2)
bytes_written = os.path.getsize(ring_fname)
rd2 = ExtendedRingData.load(ring_fname)
finally:
rmtree(testdir, ignore_errors=1)
self.assertEqual(rd2._replica2part2dev_id, r2p2d)
self.assertEqual(rd2.devs, d)
self.assertEqual(rd2._part_shift, s)
self.assertIsNone(rd2.extra)
self.assertEqual(rd2.size, bytes_written)
if __name__ == '__main__':
unittest.main()