ring: Track more properties of the ring
Plumb the version from the ringbuilder through to the metadata at the start of the ring. Recover this (if available) when running swift-ring-builder <ring> write_builder When we load the ring, track the count and MD5 of the bytes off disk, as well as the number of uncompressed bytes. Expose all this new information as properties on the Ring, along with - device_count (number of non-None entries in self._devs), - weighted_device_count (number of devices that have weight), and - assigned_device_count (number of devices that actually have partition assignments). Co-Authored-By: Matthew Oliver <matt@oliver.net.au> Change-Id: I73deaf6f1d9c1d37630c37c02c597b8812592351
This commit is contained in:
@@ -1167,7 +1167,7 @@ swift-ring-builder <ring_file> write_builder [min_part_hours]
|
||||
'parts': ring.partition_count,
|
||||
'devs': ring.devs,
|
||||
'devs_changed': False,
|
||||
'version': 0,
|
||||
'version': ring.version or 0,
|
||||
'_replica2part2dev': ring._replica2part2dev_id,
|
||||
'_last_part_moves_epoch': None,
|
||||
'_last_part_moves': None,
|
||||
|
@@ -364,13 +364,15 @@ class RingBuilder(object):
|
||||
# shift an unsigned int >I right to obtain the partition for the
|
||||
# int).
|
||||
if not self._replica2part2dev:
|
||||
self._ring = RingData([], devs, self.part_shift)
|
||||
self._ring = RingData([], devs, self.part_shift,
|
||||
version=self.version)
|
||||
else:
|
||||
self._ring = \
|
||||
RingData([array('H', p2d) for p2d in
|
||||
self._replica2part2dev],
|
||||
devs, self.part_shift,
|
||||
self.next_part_power)
|
||||
self.next_part_power,
|
||||
self.version)
|
||||
return self._ring
|
||||
|
||||
def add_dev(self, dev):
|
||||
|
@@ -22,11 +22,11 @@ from os.path import getmtime
|
||||
import struct
|
||||
from time import time
|
||||
import os
|
||||
from io import BufferedReader
|
||||
from hashlib import md5
|
||||
from itertools import chain, count
|
||||
from tempfile import NamedTemporaryFile
|
||||
import sys
|
||||
import zlib
|
||||
|
||||
from six.moves import range
|
||||
|
||||
@@ -41,15 +41,77 @@ def calc_replica_count(replica2part2dev_id):
|
||||
return base + extra
|
||||
|
||||
|
||||
class RingReader(object):
|
||||
chunk_size = 2 ** 16
|
||||
|
||||
def __init__(self, filename):
|
||||
self.fp = open(filename, 'rb')
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
self._buffer = b''
|
||||
self.size = 0
|
||||
self.raw_size = 0
|
||||
self._md5 = md5()
|
||||
self._decomp = zlib.decompressobj(32 + zlib.MAX_WBITS)
|
||||
|
||||
@property
|
||||
def close(self):
|
||||
return self.fp.close
|
||||
|
||||
def seek(self, pos, ref=0):
|
||||
if (pos, ref) != (0, 0):
|
||||
raise NotImplementedError
|
||||
self._reset()
|
||||
return self.fp.seek(pos, ref)
|
||||
|
||||
def _buffer_chunk(self):
|
||||
chunk = self.fp.read(self.chunk_size)
|
||||
if not chunk:
|
||||
return False
|
||||
self.size += len(chunk)
|
||||
self._md5.update(chunk)
|
||||
chunk = self._decomp.decompress(chunk)
|
||||
self.raw_size += len(chunk)
|
||||
self._buffer += chunk
|
||||
return True
|
||||
|
||||
def read(self, amount=-1):
|
||||
if amount < 0:
|
||||
raise IOError("don't be greedy")
|
||||
|
||||
while amount > len(self._buffer):
|
||||
if not self._buffer_chunk():
|
||||
break
|
||||
|
||||
result, self._buffer = self._buffer[:amount], self._buffer[amount:]
|
||||
return result
|
||||
|
||||
def readline(self):
|
||||
# apparently pickle needs this?
|
||||
while b'\n' not in self._buffer:
|
||||
if not self._buffer_chunk():
|
||||
break
|
||||
|
||||
line, sep, self._buffer = self._buffer.partition(b'\n')
|
||||
return line + sep
|
||||
|
||||
@property
|
||||
def md5(self):
|
||||
return self._md5.hexdigest()
|
||||
|
||||
|
||||
class RingData(object):
|
||||
"""Partitioned consistent hashing ring data (used for serialization)."""
|
||||
|
||||
def __init__(self, replica2part2dev_id, devs, part_shift,
|
||||
next_part_power=None):
|
||||
next_part_power=None, version=None):
|
||||
self.devs = devs
|
||||
self._replica2part2dev_id = replica2part2dev_id
|
||||
self._part_shift = part_shift
|
||||
self.next_part_power = next_part_power
|
||||
self.version = version
|
||||
self.md5 = self.size = self.raw_size = None
|
||||
|
||||
for dev in self.devs:
|
||||
if dev is not None:
|
||||
@@ -104,7 +166,7 @@ class RingData(object):
|
||||
:param bool metadata_only: If True, only load `devs` and `part_shift`.
|
||||
:returns: A RingData instance containing the loaded data.
|
||||
"""
|
||||
gz_file = BufferedReader(GzipFile(filename, 'rb'))
|
||||
gz_file = RingReader(filename)
|
||||
|
||||
# See if the file is in the new format
|
||||
magic = gz_file.read(4)
|
||||
@@ -124,7 +186,10 @@ class RingData(object):
|
||||
if not hasattr(ring_data, 'devs'):
|
||||
ring_data = RingData(ring_data['replica2part2dev_id'],
|
||||
ring_data['devs'], ring_data['part_shift'],
|
||||
ring_data.get('next_part_power'))
|
||||
ring_data.get('next_part_power'),
|
||||
ring_data.get('version'))
|
||||
for attr in ('md5', 'size', 'raw_size'):
|
||||
setattr(ring_data, attr, getattr(gz_file, attr))
|
||||
return ring_data
|
||||
|
||||
def serialize_v1(self, file_obj):
|
||||
@@ -138,6 +203,9 @@ class RingData(object):
|
||||
'replica_count': len(ring['replica2part2dev_id']),
|
||||
'byteorder': sys.byteorder}
|
||||
|
||||
if ring['version'] is not None:
|
||||
_text['version'] = ring['version']
|
||||
|
||||
next_part_power = ring.get('next_part_power')
|
||||
if next_part_power is not None:
|
||||
_text['next_part_power'] = next_part_power
|
||||
@@ -175,7 +243,8 @@ class RingData(object):
|
||||
return {'devs': self.devs,
|
||||
'replica2part2dev_id': self._replica2part2dev_id,
|
||||
'part_shift': self._part_shift,
|
||||
'next_part_power': self.next_part_power}
|
||||
'next_part_power': self.next_part_power,
|
||||
'version': self.version}
|
||||
|
||||
|
||||
class Ring(object):
|
||||
@@ -239,6 +308,10 @@ class Ring(object):
|
||||
self._rebuild_tier_data()
|
||||
self._update_bookkeeping()
|
||||
self._next_part_power = ring_data.next_part_power
|
||||
self._version = ring_data.version
|
||||
self._md5 = ring_data.md5
|
||||
self._size = ring_data.size
|
||||
self._raw_size = ring_data.raw_size
|
||||
|
||||
def _update_bookkeeping(self):
|
||||
# Do this now, when we know the data has changed, rather than
|
||||
@@ -257,12 +330,19 @@ class Ring(object):
|
||||
zones = set()
|
||||
ips = set()
|
||||
self._num_devs = 0
|
||||
self._num_assigned_devs = 0
|
||||
self._num_weighted_devs = 0
|
||||
for dev in self._devs:
|
||||
if dev and dev['id'] in dev_ids_with_parts:
|
||||
if dev is None:
|
||||
continue
|
||||
self._num_devs += 1
|
||||
if dev.get('weight', 0) > 0:
|
||||
self._num_weighted_devs += 1
|
||||
if dev['id'] in dev_ids_with_parts:
|
||||
regions.add(dev['region'])
|
||||
zones.add((dev['region'], dev['zone']))
|
||||
ips.add((dev['region'], dev['zone'], dev['ip']))
|
||||
self._num_devs += 1
|
||||
self._num_assigned_devs += 1
|
||||
self._num_regions = len(regions)
|
||||
self._num_zones = len(zones)
|
||||
self._num_ips = len(ips)
|
||||
@@ -275,6 +355,22 @@ class Ring(object):
|
||||
def part_power(self):
|
||||
return 32 - self._part_shift
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return self._version
|
||||
|
||||
@property
|
||||
def md5(self):
|
||||
return self._md5
|
||||
|
||||
@property
|
||||
def size(self):
|
||||
return self._size
|
||||
|
||||
@property
|
||||
def raw_size(self):
|
||||
return self._raw_size
|
||||
|
||||
def _rebuild_tier_data(self):
|
||||
self.tier2devs = defaultdict(list)
|
||||
for dev in self._devs:
|
||||
@@ -301,6 +397,21 @@ class Ring(object):
|
||||
"""Number of partitions in the ring."""
|
||||
return len(self._replica2part2dev_id[0])
|
||||
|
||||
@property
|
||||
def device_count(self):
|
||||
"""Number of devices in the ring."""
|
||||
return self._num_devs
|
||||
|
||||
@property
|
||||
def weighted_device_count(self):
|
||||
"""Number of devices with weight in the ring."""
|
||||
return self._num_weighted_devs
|
||||
|
||||
@property
|
||||
def assigned_device_count(self):
|
||||
"""Number of devices with assignments in the ring."""
|
||||
return self._num_assigned_devs
|
||||
|
||||
@property
|
||||
def devs(self):
|
||||
"""devices in the ring"""
|
||||
@@ -490,7 +601,7 @@ class Ring(object):
|
||||
hit_all_ips = True
|
||||
break
|
||||
|
||||
hit_all_devs = len(used) == self._num_devs
|
||||
hit_all_devs = len(used) == self._num_assigned_devs
|
||||
for handoff_part in chain(range(start, parts, inc),
|
||||
range(inc - ((parts - start) % inc),
|
||||
start, inc)):
|
||||
@@ -505,6 +616,6 @@ class Ring(object):
|
||||
dev = self._devs[dev_id]
|
||||
yield dict(dev, handoff_index=next(index))
|
||||
used.add(dev_id)
|
||||
if len(used) == self._num_devs:
|
||||
if len(used) == self._num_assigned_devs:
|
||||
hit_all_devs = True
|
||||
break
|
||||
|
@@ -2263,10 +2263,34 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
||||
|
||||
# Note that we've picked up an extension
|
||||
builder = RingBuilder.load(self.tmpfile + '.builder')
|
||||
# Version was recorded in the .ring.gz!
|
||||
self.assertEqual(builder.version, 5)
|
||||
# Note that this is different from the original! But it more-closely
|
||||
# reflects the reality that we have an extra replica for 12 of 64 parts
|
||||
self.assertEqual(builder.replicas, 1.1875)
|
||||
|
||||
def test_write_builder_no_version(self):
|
||||
self.create_sample_ring()
|
||||
rb = RingBuilder.load(self.tmpfile)
|
||||
rb.rebalance()
|
||||
|
||||
# Make sure we write down the ring in the old way, with no version
|
||||
rd = rb.get_ring()
|
||||
rd.version = None
|
||||
rd.save(self.tmpfile + ".ring.gz")
|
||||
|
||||
ring_file = os.path.join(os.path.dirname(self.tmpfile),
|
||||
os.path.basename(self.tmpfile) + ".ring.gz")
|
||||
os.remove(self.tmpfile) # loses file...
|
||||
|
||||
argv = ["", ring_file, "write_builder", "24"]
|
||||
self.assertIsNone(ringbuilder.main(argv))
|
||||
|
||||
# Note that we've picked up an extension
|
||||
builder = RingBuilder.load(self.tmpfile + '.builder')
|
||||
# No version in the .ring.gz; default to 0
|
||||
self.assertEqual(builder.version, 0)
|
||||
|
||||
def test_write_builder_after_device_removal(self):
|
||||
# Test regenerating builder file after having removed a device
|
||||
# and lost the builder file
|
||||
|
@@ -16,6 +16,7 @@
|
||||
import array
|
||||
import collections
|
||||
import six.moves.cPickle as pickle
|
||||
import hashlib
|
||||
import os
|
||||
import unittest
|
||||
import stat
|
||||
@@ -63,6 +64,8 @@ class TestRingData(unittest.TestCase):
|
||||
rd_got._replica2part2dev_id)
|
||||
self.assertEqual(rd_expected.devs, rd_got.devs)
|
||||
self.assertEqual(rd_expected._part_shift, rd_got._part_shift)
|
||||
self.assertEqual(rd_expected.next_part_power, rd_got.next_part_power)
|
||||
self.assertEqual(rd_expected.version, rd_got.version)
|
||||
|
||||
def test_attrs(self):
|
||||
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
|
||||
@@ -230,6 +233,17 @@ class TestRing(TestRingBase):
|
||||
self.assertEqual(self.ring.devs, self.intended_devs)
|
||||
self.assertEqual(self.ring.reload_time, self.intended_reload_time)
|
||||
self.assertEqual(self.ring.serialized_path, self.testgz)
|
||||
self.assertIsNone(self.ring.version)
|
||||
|
||||
with open(self.testgz, 'rb') as fp:
|
||||
expected_md5 = hashlib.md5()
|
||||
expected_size = 0
|
||||
for chunk in iter(lambda: fp.read(2 ** 16), b''):
|
||||
expected_md5.update(chunk)
|
||||
expected_size += len(chunk)
|
||||
self.assertEqual(self.ring.md5, expected_md5.hexdigest())
|
||||
self.assertEqual(self.ring.size, expected_size)
|
||||
|
||||
# test invalid endcap
|
||||
with mock.patch.object(utils, 'HASH_PATH_SUFFIX', b''), \
|
||||
mock.patch.object(utils, 'HASH_PATH_PREFIX', b''), \
|
||||
@@ -900,6 +914,7 @@ class TestRing(TestRingBase):
|
||||
rb.rebalance()
|
||||
rb.get_ring().save(self.testgz)
|
||||
r = ring.Ring(self.testdir, ring_name='whatever')
|
||||
self.assertEqual(r.version, rb.version)
|
||||
|
||||
class CountingRingTable(object):
|
||||
|
||||
|
Reference in New Issue
Block a user