Ring v2 follow-up

Signed-off-by: Tim Burke <tim.burke@gmail.com>
Change-Id: I75bd005a4a3bc79c1bd8f8fa1153a64059970865
This commit is contained in:
Tim Burke
2025-08-04 23:10:19 -07:00
parent ae062f8b09
commit 683218c523
4 changed files with 129 additions and 148 deletions

View File

@@ -19,12 +19,10 @@ import dataclasses
import gzip
import hashlib
import json
import logging
import os
import string
import struct
import tempfile
from typing import Optional
import zlib
from swift.common.ring.utils import BYTES_TO_TYPE_CODE, network_order_array, \
@@ -38,7 +36,7 @@ DEFAULT_BUFFER_SIZE = 2 ** 16
V2_SIZE_FORMAT = "!Q"
class GzipReader(object):
class _RingGzReader(object):
chunk_size = DEFAULT_BUFFER_SIZE
def __init__(self, fileobj):
@@ -76,7 +74,7 @@ class GzipReader(object):
self.decompressor = zlib.decompressobj(wbits)
self.buffer = self.compressed_buffer = b""
def seek(self, pos, whence=os.SEEK_SET):
def compressed_seek(self, pos, whence=os.SEEK_SET):
"""
Seek to the given point in the compressed stream.
@@ -85,7 +83,7 @@ class GzipReader(object):
As a result, callers should be careful to ``seek()`` to flush
boundaries, to ensure that subsequent ``read()`` calls work properly.
Note that when using ``GzipWriter``, all ``tell()`` results will be
Note that when using ``_RingGzWriter``, all ``tell()`` results will be
flush boundaries and appropriate to later use as ``seek()`` arguments.
"""
if (pos, whence) == (self.pos, os.SEEK_SET):
@@ -94,7 +92,7 @@ class GzipReader(object):
self.fp.seek(pos, whence)
self.reset_decompressor()
def tell(self):
def compressed_tell(self):
return self.fp.tell()
@classmethod
@@ -178,11 +176,11 @@ class SectionReader(object):
"""
A file-like wrapper that limits how many bytes may be read.
Optionally, also verify data integrity.
Also verify data integrity.
:param fp: a file-like object opened with mode "rb"
:param length: the maximum number of bytes that should be read
:param digest: optional hex digest of the expected bytes
:param digest: hex digest of the expected bytes
:param checksum: checksumming instance to be fed bytes and later compared
against ``digest``; e.g. ``hashlib.sha256()``
"""
@@ -201,8 +199,7 @@ class SectionReader(object):
amt = min(amt, self._remaining)
data = self._fp.read(amt)
self._remaining -= len(data)
if self._checksum:
self._checksum.update(data)
self._checksum.update(data)
return data
def read_ring_table(self, itemsize, partition_count):
@@ -240,31 +237,25 @@ class SectionReader(object):
class IndexEntry:
compressed_start: int
uncompressed_start: int
compressed_end: Optional[int] = None
uncompressed_end: Optional[int] = None
checksum_method: Optional[str] = None
checksum_value: Optional[str] = None
compressed_end: int
uncompressed_end: int
checksum_method: str
checksum_value: str
@property
def uncompressed_length(self) -> Optional[int]:
if self.uncompressed_end is None:
return None
def uncompressed_length(self) -> int:
return self.uncompressed_end - self.uncompressed_start
@property
def compressed_length(self) -> Optional[int]:
if self.compressed_end is None:
return None
def compressed_length(self) -> int:
return self.compressed_end - self.compressed_start
@property
def compression_ratio(self) -> Optional[float]:
if self.uncompressed_end is None:
return None
def compression_ratio(self) -> float:
return 1 - self.compressed_length / self.uncompressed_length
class RingReader(GzipReader):
class RingReader(_RingGzReader):
"""
Helper for reading ring files.
@@ -293,7 +284,7 @@ class RingReader(GzipReader):
self.load_index()
self.seek(0)
self.compressed_seek(0)
def load_index(self):
"""
@@ -307,14 +298,14 @@ class RingReader(GzipReader):
# See notes in RingWriter.write_index and RingWriter.__exit__ for
# where this 31 (= 18 + 13) came from.
self.seek(-31, os.SEEK_END)
self.compressed_seek(-31, os.SEEK_END)
try:
index_start, = struct.unpack(V2_SIZE_FORMAT, self.read(8))
except zlib.error:
# TODO: we can still fix this if we're willing to read everything
raise IOError("Could not read index offset "
"(was the file recompressed?)")
self.seek(index_start)
self.compressed_seek(index_start)
# ensure index entries are sorted by position
self.index = collections.OrderedDict(sorted(
((section, IndexEntry(*entry))
@@ -362,7 +353,7 @@ class RingReader(GzipReader):
if not self.index:
raise ValueError("No index loaded")
entry = self.index[section]
self.seek(entry.compressed_start)
self.compressed_seek(entry.compressed_start)
size_len = struct.calcsize(V2_SIZE_FORMAT)
prefix = self.read(size_len)
blob_length, = struct.unpack(V2_SIZE_FORMAT, prefix)
@@ -374,11 +365,8 @@ class RingReader(GzipReader):
checksum = getattr(hashlib, entry.checksum_method)(prefix)
checksum_value = entry.checksum_value
else:
if entry.checksum_method is not None:
logging.getLogger('swift.ring').warning(
"Ignoring unsupported checksum %s:%s for section %s",
entry.checksum_method, entry.checksum_value, section)
checksum = checksum_value = None
raise ValueError(f"Unsupported checksum {entry.checksum_method}:"
f"{entry.checksum_value} for section {section}")
with SectionReader(
self,
@@ -389,7 +377,7 @@ class RingReader(GzipReader):
yield reader
class GzipWriter(object):
class _RingGzWriter(object):
def __init__(self, fileobj, filename='', mtime=1300507380.0):
self.raw_fp = fileobj
self.gzip_fp = gzip.GzipFile(
@@ -398,7 +386,6 @@ class GzipWriter(object):
fileobj=self.raw_fp,
mtime=mtime)
self.flushed = True
self.pos = 0
@classmethod
@contextlib.contextmanager
@@ -408,7 +395,7 @@ class GzipWriter(object):
Note that this also guarantees atomic writes using a temporary file
:returns: a context manager that provides a ``GzipWriter`` instance
:returns: a context manager that provides a ``_RingGzWriter`` instance
"""
fp = tempfile.NamedTemporaryFile(
dir=os.path.dirname(filename),
@@ -450,7 +437,6 @@ class GzipWriter(object):
if not data:
return 0
self.flushed = False
self.pos += len(data)
return self.gzip_fp.write(data)
def flush(self):
@@ -469,19 +455,23 @@ class GzipWriter(object):
self.gzip_fp.flush(zlib.Z_FULL_FLUSH)
self.flushed = True
def tell(self):
def compressed_tell(self):
"""
Return the position in the underlying (compressed) stream.
Since this is primarily useful to get a position you may seek to later
and start reading, flush the writer first.
If you want the position within the *uncompressed* stream, use the
``pos`` attribute.
"""
self.flush()
return self.raw_fp.tell()
def tell(self):
"""
Return the position in the decompressed stream.
"""
self.flush()
return self.gzip_fp.tell()
def _set_compression_level(self, lvl):
# two valid deflate streams may be concatenated to produce another
# valid deflate stream, so finish the one stream...
@@ -491,7 +481,7 @@ class GzipWriter(object):
lvl, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
class RingWriter(GzipWriter):
class RingWriter(_RingGzWriter):
"""
Helper for writing ring files to later be read by a ``RingReader``
@@ -544,16 +534,18 @@ class RingWriter(GzipWriter):
raise ValueError('Cannot write duplicate section: %s' % name)
self.flush()
self.current_section = name
self.index[name] = IndexEntry(self.tell(), self.pos)
compressed_start = self.compressed_tell()
uncompressed_start = self.tell()
checksum_class = getattr(hashlib, self.checksum_method)
self.checksum = checksum_class()
try:
yield self
self.flush()
self.index[name] = dataclasses.replace(
self.index[name],
compressed_end=self.tell(),
uncompressed_end=self.pos,
self.index[name] = IndexEntry(
compressed_start,
uncompressed_start,
compressed_end=self.compressed_tell(),
uncompressed_end=self.tell(),
checksum_method=self.checksum_method,
checksum_value=self.checksum.hexdigest(),
)
@@ -579,7 +571,7 @@ class RingWriter(GzipWriter):
:param version: the ring version; should be 1 or 2
"""
if self.pos != 0:
if self.tell() != 0:
raise IOError("Magic must be written at the start of the file")
# switch to uncompressed, so libmagic can know what to expect
self._set_compression_level(0)
@@ -639,19 +631,20 @@ class RingWriter(GzipWriter):
Callers should not need to use this themselves; it will be done
automatically when using the writer as a context manager.
"""
with self.section('swift/index'):
self.write_json({
k: dataclasses.astuple(v)
for k, v in self.index.items()
})
uncompressed_start = self.tell()
compressed_start = self.compressed_tell()
self.write_json({
k: dataclasses.astuple(v)
for k, v in self.index.items()
})
# switch to uncompressed
self._set_compression_level(0)
# ... which allows us to know that each of these write_size/flush pairs
# will write exactly 18 bytes to disk
self.write_size(self.index['swift/index'].uncompressed_start)
self.write_size(uncompressed_start)
self.flush()
# This is the one we really care about in Swift code, but sometimes
# ops write their own tools and sometimes those just buffer all the
# decoded content
self.write_size(self.index['swift/index'].compressed_start)
self.write_size(compressed_start)
self.flush()

View File

@@ -27,7 +27,7 @@ import sys
from swift.common.exceptions import RingLoadError, DevIdBytesTooSmall
from swift.common.utils import hash_path, validate_configuration, md5
from swift.common.ring.io import RingReader, RingWriter
from swift.common.ring.utils import tiers_for_dev, BYTES_TO_TYPE_CODE
from swift.common.ring.utils import tiers_for_dev
DEFAULT_RELOAD_TIME = 15
@@ -123,32 +123,26 @@ class RingData(object):
`replica2part2dev_id` is not loaded and that key in the returned
dictionary just has the value `[]`.
:param RingReader reader: An opened RingReader which has already
loaded the index at the end, gone back to the
front, and consumed the 6 bytes of magic and
version.
:param RingReader reader: An opened RingReader at the start of the file
:param bool metadata_only: If True, only load `devs` and `part_shift`
:returns: A dict containing `devs`, `part_shift`, and
`replica2part2dev_id`
"""
if reader.tell() == 0:
magic = reader.read(6)
if magic != b'R1NG\x00\x01':
raise ValueError('unexpected magic: %r' % magic)
magic = reader.read(6)
if magic != b'R1NG\x00\x01':
raise ValueError('unexpected magic: %r' % magic)
ring_dict = json.loads(reader.read_blob('!I'))
ring_dict['replica2part2dev_id'] = []
ring_dict['dev_id_bytes'] = 2
if metadata_only:
return ring_dict
byteswap = (ring_dict.get('byteorder', sys.byteorder) != sys.byteorder)
type_code = BYTES_TO_TYPE_CODE[ring_dict['dev_id_bytes']]
partition_count = 1 << (32 - ring_dict['part_shift'])
for x in range(ring_dict['replica_count']):
part2dev = array.array(type_code, reader.read(2 * partition_count))
part2dev = array.array('H', reader.read(2 * partition_count))
if byteswap:
part2dev.byteswap()
ring_dict['replica2part2dev_id'].append(part2dev)
@@ -169,12 +163,11 @@ class RingData(object):
list is not loaded and that key in the returned dictionary just has
the value ``[]``.
:param file reader: An opened file-like object which has already
consumed the 6 bytes of magic and version.
:param RingReader reader: An opened RingReader which has already
loaded up the index at the end of the file.
:param bool metadata_only: If True, skip loading
``replica2part2dev_id``
:param bool include_devices: If False and ``metadata_only`` is True,
skip loading ``devs``
:param bool include_devices: If False, skip loading ``devs``
:returns: A dict containing ``devs``, ``part_shift``,
``dev_id_bytes``, and ``replica2part2dev_id``
"""
@@ -183,7 +176,7 @@ class RingData(object):
ring_dict['replica2part2dev_id'] = []
ring_dict['devs'] = []
if not metadata_only or include_devices:
if include_devices:
ring_dict['devs'] = json.loads(
reader.read_section('swift/ring/devices'))
@@ -253,8 +246,11 @@ class RingData(object):
if next_part_power is not None:
_text['next_part_power'] = next_part_power
writer.write_json(_text, '!I')
json_text = json.dumps(_text, sort_keys=True,
ensure_ascii=True).encode('ascii')
json_len = len(json_text)
writer.write(struct.pack('!I', json_len))
writer.write(json_text)
for part2dev_id in ring['replica2part2dev_id']:
part2dev_id.tofile(writer)

View File

@@ -20,7 +20,6 @@ import json
import os.path
import unittest
from unittest import mock
import zlib
from swift.common.ring.io import IndexEntry, RingReader, RingWriter
@@ -47,45 +46,6 @@ class TestRoundTrip(unittest.TestCase):
pass
self.assertEqual(0, len(os.listdir(tempd)))
def test_arbitrary_bytes(self):
buf = io.BytesIO()
with RingWriter(buf) as writer:
# Still need to write good magic, or we won't be able to read
writer.write_magic(1)
# but after that, we can kinda do whatever
writer.write(b'\xde\xad\xbe\xef' * 10240)
writer.write(b'\xda\x7a\xda\x7a' * 10240)
good_pos = writer.tell()
self.assertTrue(writer.flushed)
pos = writer.raw_fp.tell()
writer.write(b'')
self.assertTrue(writer.flushed)
self.assertEqual(pos, writer.raw_fp.tell())
writer.write(b'more' * 10240)
self.assertFalse(writer.flushed)
buf.seek(0)
reader = RingReader(buf)
self.assertEqual(reader.version, 1)
self.assertEqual(reader.raw_size, 6 + 12 * 10240)
self.assertEqual(reader.read(6), b'R1NG\x00\x01')
self.assertRepeats(reader.read(40960), b'\xde\xad\xbe\xef', 10240)
self.assertRepeats(reader.read(40960), b'\xda\x7a\xda\x7a', 10240)
self.assertRepeats(reader.read(40960), b'more', 10240)
# Can seek backwards
reader.seek(good_pos)
self.assertRepeats(reader.read(40960), b'more', 10240)
# Even all the way to the beginning
reader.seek(0)
self.assertEqual(reader.read(6), b'R1NG\x00\x01')
self.assertRepeats(reader.read(40960), b'\xde\xad\xbe\xef', 10240)
# but not arbitrarily
reader.seek(good_pos - 100)
with self.assertRaises(zlib.error):
reader.read(1)
def test_sections(self):
buf = io.BytesIO()
with RingWriter(buf) as writer:
@@ -139,7 +99,7 @@ class TestRoundTrip(unittest.TestCase):
self.assertEqual(reader.version, 2)
# Order matters!
self.assertEqual(list(reader.index), [
'foo', 'bar', 'baz', 'quux', 'swift/index'])
'foo', 'bar', 'baz', 'quux'])
self.assertEqual({
k: (v.uncompressed_start, v.uncompressed_end, v.checksum_method)
for k, v in reader.index.items()
@@ -149,7 +109,6 @@ class TestRoundTrip(unittest.TestCase):
'baz': (81942, 122910, 'sha256'),
# note the gap between baz and quux for the raw bytes
'quux': (122933, 163901, 'sha256'),
'swift/index': (163901, None, None),
})
self.assertIn('foo', reader)
@@ -166,7 +125,8 @@ class TestRoundTrip(unittest.TestCase):
self.assertRepeats(reader.read_blob(), b'more', 10240)
self.assertRepeats(reader.read_section('quux'),
b'data', 10240)
index_dict = json.loads(reader.read_section('swift/index'))
# Index is just a final (length-prefixed) JSON blob
index_dict = json.loads(reader.read_blob())
self.assertEqual(reader.index, {
section: IndexEntry(*entry)
for section, entry in index_dict.items()})
@@ -178,7 +138,7 @@ class TestRoundTrip(unittest.TestCase):
self.assertEqual("'foobar'", str(caught.exception))
# seek to the end of baz
reader.seek(reader.index['baz'].compressed_end)
reader.compressed_seek(reader.index['baz'].compressed_end)
# so we can read the raw bytes we stuffed in
gap_length = (reader.index['quux'].uncompressed_start -
reader.index['baz'].uncompressed_end)
@@ -238,15 +198,9 @@ class TestRoundTrip(unittest.TestCase):
buf.seek(0)
reader = RingReader(buf)
with reader.open_section('foo') as s:
read_bytes = s.read(4)
self.assertEqual(b'\xde\xad\xbe\xef', read_bytes)
self.assertEqual(mock_logging.mock_calls, [
mock.call('swift.ring'),
mock.call('swift.ring').warning(
'Ignoring unsupported checksum %s:%s for section %s',
'not_a_digest', mock.ANY, 'foo'),
])
with self.assertRaises(ValueError):
with reader.open_section('foo'):
pass
def test_recompressed(self):
buf = io.BytesIO()

View File

@@ -16,6 +16,7 @@
import array
import collections
from gzip import GzipFile
import hashlib
import json
import os
import unittest
@@ -216,9 +217,8 @@ class TestRingData(unittest.TestCase):
self.assertEqual(loaded_rd['devs'], expected_devs)
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
# but there is no replica2part2dev table
# but the replica2part2dev table is empty
self.assertFalse(loaded_rd['replica2part2dev_id'])
# But if we load it up with metadata_only = false
@@ -228,7 +228,6 @@ class TestRingData(unittest.TestCase):
self.assertEqual(loaded_rd['devs'], expected_devs)
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
self.assertTrue(loaded_rd['replica2part2dev_id'])
def test_deserialize_v2(self):
@@ -242,6 +241,12 @@ class TestRingData(unittest.TestCase):
{'id': 1, 'region': 1, 'zone': 1, 'ip': '10.1.1.1',
'port': 7000}],
30)
rd.save(ring_fname, format_version=1)
with self.assertRaises(ValueError) as err:
ring.RingData.deserialize_v2(io.RingReader(open(ring_fname, 'rb')))
self.assertEqual("No index loaded", str(err.exception))
# Now let's save it as v2 then load it up metadata_only
rd.save(ring_fname, format_version=2)
loaded_rd = ring.RingData.deserialize_v2(
io.RingReader(open(ring_fname, 'rb')),
@@ -252,7 +257,7 @@ class TestRingData(unittest.TestCase):
# minimum size we use is 2 byte dev ids
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
# but there is no replica2part2dev table or devs
# but the replica2part2dev table and devs are both empty
self.assertFalse(loaded_rd['devs'])
self.assertFalse(loaded_rd['replica2part2dev_id'])
@@ -280,6 +285,17 @@ class TestRingData(unittest.TestCase):
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
self.assertTrue(loaded_rd['replica2part2dev_id'])
# Can also load up assignments but not devs; idk why you'd want that
loaded_rd = ring.RingData.deserialize_v2(
io.RingReader(open(ring_fname, 'rb')),
metadata_only=False,
include_devices=False)
self.assertFalse(loaded_rd['devs'])
self.assertEqual(loaded_rd['part_shift'], 30)
self.assertEqual(loaded_rd['replica_count'], 2)
self.assertEqual(loaded_rd['dev_id_bytes'], 2)
self.assertTrue(loaded_rd['replica2part2dev_id'])
def test_load(self):
rd = ring.RingData(
[[0, 1, 0, 1], [0, 1, 0, 1]],
@@ -1191,20 +1207,26 @@ class TestRingV2(TestRing):
fp.write(b'R1NG\x00\x02')
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/ring/metadata'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
comp_start = os.fstat(fp.fileno()).st_size
uncomp_start = fp.tell()
meta = json.dumps({
"dev_id_bytes": 4,
"part_shift": 29,
"replica_count": 1.5,
}).encode('ascii')
fp.write(struct.pack('!Q', len(meta)) + meta)
to_write = struct.pack('!Q', len(meta)) + meta
fp.write(to_write)
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/ring/metadata'] = [
comp_start,
uncomp_start,
os.fstat(fp.fileno()).st_size,
fp.tell(),
'sha256',
hashlib.sha256(to_write).hexdigest()]
index['swift/ring/devices'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
comp_start = os.fstat(fp.fileno()).st_size
uncomp_start = fp.tell()
devs = json.dumps([
{"id": 0, "region": 1, "zone": 1, "ip": "127.0.0.1",
"port": 6200, "device": "sda", "weight": 1},
@@ -1214,28 +1236,44 @@ class TestRingV2(TestRing):
{"id": 3, "region": 1, "zone": 1, "ip": "127.0.0.1",
"port": 6202, "device": "sdc", "weight": 1},
]).encode('ascii')
fp.write(struct.pack('!Q', len(devs)) + devs)
to_write = struct.pack('!Q', len(devs)) + devs
fp.write(to_write)
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/ring/devices'] = [
comp_start,
uncomp_start,
os.fstat(fp.fileno()).st_size,
fp.tell(),
'sha256',
hashlib.sha256(to_write).hexdigest()]
index['swift/ring/assignments'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
fp.write(struct.pack('!Q', 48) + 4 * (
comp_start = os.fstat(fp.fileno()).st_size
uncomp_start = fp.tell()
to_write = struct.pack('!Q', 48) + 4 * (
b'\x00\x00\x00\x03'
b'\x00\x00\x00\x02'
b'\x00\x00\x00\x00'))
b'\x00\x00\x00\x00')
fp.write(to_write)
fp.flush(zlib.Z_FULL_FLUSH)
index['swift/ring/assignments'] = [
comp_start,
uncomp_start,
os.fstat(fp.fileno()).st_size,
fp.tell(),
'sha256',
hashlib.sha256(to_write).hexdigest()]
index['swift/index'] = [
os.fstat(fp.fileno()).st_size, fp.tell(),
None, None, None, None]
comp_start = os.fstat(fp.fileno()).st_size
uncomp_start = fp.tell()
blob = json.dumps(index).encode('ascii')
fp.write(struct.pack('!Q', len(blob)) + blob)
fp.flush(zlib.Z_FULL_FLUSH)
fp.compress = zlib.compressobj(
0, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
fp.write(struct.pack('!Q', index['swift/index'][0]))
fp.write(struct.pack('!Q', uncomp_start))
fp.flush(zlib.Z_FULL_FLUSH)
fp.write(struct.pack('!Q', comp_start))
fp.flush(zlib.Z_FULL_FLUSH)
r = ring.Ring(ring_file)