Merge "ring: Track more properties of the ring"
This commit is contained in:
commit
ff91df2302
@ -1167,7 +1167,7 @@ swift-ring-builder <ring_file> write_builder [min_part_hours]
|
||||
'parts': ring.partition_count,
|
||||
'devs': ring.devs,
|
||||
'devs_changed': False,
|
||||
'version': 0,
|
||||
'version': ring.version or 0,
|
||||
'_replica2part2dev': ring._replica2part2dev_id,
|
||||
'_last_part_moves_epoch': None,
|
||||
'_last_part_moves': None,
|
||||
|
@ -364,13 +364,15 @@ class RingBuilder(object):
|
||||
# shift an unsigned int >I right to obtain the partition for the
|
||||
# int).
|
||||
if not self._replica2part2dev:
|
||||
self._ring = RingData([], devs, self.part_shift)
|
||||
self._ring = RingData([], devs, self.part_shift,
|
||||
version=self.version)
|
||||
else:
|
||||
self._ring = \
|
||||
RingData([array('H', p2d) for p2d in
|
||||
self._replica2part2dev],
|
||||
devs, self.part_shift,
|
||||
self.next_part_power)
|
||||
self.next_part_power,
|
||||
self.version)
|
||||
return self._ring
|
||||
|
||||
def add_dev(self, dev):
|
||||
|
@ -22,11 +22,11 @@ from os.path import getmtime
|
||||
import struct
|
||||
from time import time
|
||||
import os
|
||||
from io import BufferedReader
|
||||
from hashlib import md5
|
||||
from itertools import chain, count
|
||||
from tempfile import NamedTemporaryFile
|
||||
import sys
|
||||
import zlib
|
||||
|
||||
from six.moves import range
|
||||
|
||||
@ -41,15 +41,77 @@ def calc_replica_count(replica2part2dev_id):
|
||||
return base + extra
|
||||
|
||||
|
||||
class RingReader(object):
|
||||
chunk_size = 2 ** 16
|
||||
|
||||
def __init__(self, filename):
|
||||
self.fp = open(filename, 'rb')
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
self._buffer = b''
|
||||
self.size = 0
|
||||
self.raw_size = 0
|
||||
self._md5 = md5()
|
||||
self._decomp = zlib.decompressobj(32 + zlib.MAX_WBITS)
|
||||
|
||||
@property
|
||||
def close(self):
|
||||
return self.fp.close
|
||||
|
||||
def seek(self, pos, ref=0):
|
||||
if (pos, ref) != (0, 0):
|
||||
raise NotImplementedError
|
||||
self._reset()
|
||||
return self.fp.seek(pos, ref)
|
||||
|
||||
def _buffer_chunk(self):
|
||||
chunk = self.fp.read(self.chunk_size)
|
||||
if not chunk:
|
||||
return False
|
||||
self.size += len(chunk)
|
||||
self._md5.update(chunk)
|
||||
chunk = self._decomp.decompress(chunk)
|
||||
self.raw_size += len(chunk)
|
||||
self._buffer += chunk
|
||||
return True
|
||||
|
||||
def read(self, amount=-1):
|
||||
if amount < 0:
|
||||
raise IOError("don't be greedy")
|
||||
|
||||
while amount > len(self._buffer):
|
||||
if not self._buffer_chunk():
|
||||
break
|
||||
|
||||
result, self._buffer = self._buffer[:amount], self._buffer[amount:]
|
||||
return result
|
||||
|
||||
def readline(self):
|
||||
# apparently pickle needs this?
|
||||
while b'\n' not in self._buffer:
|
||||
if not self._buffer_chunk():
|
||||
break
|
||||
|
||||
line, sep, self._buffer = self._buffer.partition(b'\n')
|
||||
return line + sep
|
||||
|
||||
@property
|
||||
def md5(self):
|
||||
return self._md5.hexdigest()
|
||||
|
||||
|
||||
class RingData(object):
|
||||
"""Partitioned consistent hashing ring data (used for serialization)."""
|
||||
|
||||
def __init__(self, replica2part2dev_id, devs, part_shift,
|
||||
next_part_power=None):
|
||||
next_part_power=None, version=None):
|
||||
self.devs = devs
|
||||
self._replica2part2dev_id = replica2part2dev_id
|
||||
self._part_shift = part_shift
|
||||
self.next_part_power = next_part_power
|
||||
self.version = version
|
||||
self.md5 = self.size = self.raw_size = None
|
||||
|
||||
for dev in self.devs:
|
||||
if dev is not None:
|
||||
@ -104,7 +166,7 @@ class RingData(object):
|
||||
:param bool metadata_only: If True, only load `devs` and `part_shift`.
|
||||
:returns: A RingData instance containing the loaded data.
|
||||
"""
|
||||
gz_file = BufferedReader(GzipFile(filename, 'rb'))
|
||||
gz_file = RingReader(filename)
|
||||
|
||||
# See if the file is in the new format
|
||||
magic = gz_file.read(4)
|
||||
@ -124,7 +186,10 @@ class RingData(object):
|
||||
if not hasattr(ring_data, 'devs'):
|
||||
ring_data = RingData(ring_data['replica2part2dev_id'],
|
||||
ring_data['devs'], ring_data['part_shift'],
|
||||
ring_data.get('next_part_power'))
|
||||
ring_data.get('next_part_power'),
|
||||
ring_data.get('version'))
|
||||
for attr in ('md5', 'size', 'raw_size'):
|
||||
setattr(ring_data, attr, getattr(gz_file, attr))
|
||||
return ring_data
|
||||
|
||||
def serialize_v1(self, file_obj):
|
||||
@ -138,6 +203,9 @@ class RingData(object):
|
||||
'replica_count': len(ring['replica2part2dev_id']),
|
||||
'byteorder': sys.byteorder}
|
||||
|
||||
if ring['version'] is not None:
|
||||
_text['version'] = ring['version']
|
||||
|
||||
next_part_power = ring.get('next_part_power')
|
||||
if next_part_power is not None:
|
||||
_text['next_part_power'] = next_part_power
|
||||
@ -175,7 +243,8 @@ class RingData(object):
|
||||
return {'devs': self.devs,
|
||||
'replica2part2dev_id': self._replica2part2dev_id,
|
||||
'part_shift': self._part_shift,
|
||||
'next_part_power': self.next_part_power}
|
||||
'next_part_power': self.next_part_power,
|
||||
'version': self.version}
|
||||
|
||||
|
||||
class Ring(object):
|
||||
@ -239,6 +308,10 @@ class Ring(object):
|
||||
self._rebuild_tier_data()
|
||||
self._update_bookkeeping()
|
||||
self._next_part_power = ring_data.next_part_power
|
||||
self._version = ring_data.version
|
||||
self._md5 = ring_data.md5
|
||||
self._size = ring_data.size
|
||||
self._raw_size = ring_data.raw_size
|
||||
|
||||
def _update_bookkeeping(self):
|
||||
# Do this now, when we know the data has changed, rather than
|
||||
@ -257,12 +330,19 @@ class Ring(object):
|
||||
zones = set()
|
||||
ips = set()
|
||||
self._num_devs = 0
|
||||
self._num_assigned_devs = 0
|
||||
self._num_weighted_devs = 0
|
||||
for dev in self._devs:
|
||||
if dev and dev['id'] in dev_ids_with_parts:
|
||||
if dev is None:
|
||||
continue
|
||||
self._num_devs += 1
|
||||
if dev.get('weight', 0) > 0:
|
||||
self._num_weighted_devs += 1
|
||||
if dev['id'] in dev_ids_with_parts:
|
||||
regions.add(dev['region'])
|
||||
zones.add((dev['region'], dev['zone']))
|
||||
ips.add((dev['region'], dev['zone'], dev['ip']))
|
||||
self._num_devs += 1
|
||||
self._num_assigned_devs += 1
|
||||
self._num_regions = len(regions)
|
||||
self._num_zones = len(zones)
|
||||
self._num_ips = len(ips)
|
||||
@ -275,6 +355,22 @@ class Ring(object):
|
||||
def part_power(self):
|
||||
return 32 - self._part_shift
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return self._version
|
||||
|
||||
@property
|
||||
def md5(self):
|
||||
return self._md5
|
||||
|
||||
@property
|
||||
def size(self):
|
||||
return self._size
|
||||
|
||||
@property
|
||||
def raw_size(self):
|
||||
return self._raw_size
|
||||
|
||||
def _rebuild_tier_data(self):
|
||||
self.tier2devs = defaultdict(list)
|
||||
for dev in self._devs:
|
||||
@ -301,6 +397,21 @@ class Ring(object):
|
||||
"""Number of partitions in the ring."""
|
||||
return len(self._replica2part2dev_id[0])
|
||||
|
||||
@property
|
||||
def device_count(self):
|
||||
"""Number of devices in the ring."""
|
||||
return self._num_devs
|
||||
|
||||
@property
|
||||
def weighted_device_count(self):
|
||||
"""Number of devices with weight in the ring."""
|
||||
return self._num_weighted_devs
|
||||
|
||||
@property
|
||||
def assigned_device_count(self):
|
||||
"""Number of devices with assignments in the ring."""
|
||||
return self._num_assigned_devs
|
||||
|
||||
@property
|
||||
def devs(self):
|
||||
"""devices in the ring"""
|
||||
@ -490,7 +601,7 @@ class Ring(object):
|
||||
hit_all_ips = True
|
||||
break
|
||||
|
||||
hit_all_devs = len(used) == self._num_devs
|
||||
hit_all_devs = len(used) == self._num_assigned_devs
|
||||
for handoff_part in chain(range(start, parts, inc),
|
||||
range(inc - ((parts - start) % inc),
|
||||
start, inc)):
|
||||
@ -505,6 +616,6 @@ class Ring(object):
|
||||
dev = self._devs[dev_id]
|
||||
yield dict(dev, handoff_index=next(index))
|
||||
used.add(dev_id)
|
||||
if len(used) == self._num_devs:
|
||||
if len(used) == self._num_assigned_devs:
|
||||
hit_all_devs = True
|
||||
break
|
||||
|
@ -2263,10 +2263,34 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
||||
|
||||
# Note that we've picked up an extension
|
||||
builder = RingBuilder.load(self.tmpfile + '.builder')
|
||||
# Version was recorded in the .ring.gz!
|
||||
self.assertEqual(builder.version, 5)
|
||||
# Note that this is different from the original! But it more-closely
|
||||
# reflects the reality that we have an extra replica for 12 of 64 parts
|
||||
self.assertEqual(builder.replicas, 1.1875)
|
||||
|
||||
def test_write_builder_no_version(self):
|
||||
self.create_sample_ring()
|
||||
rb = RingBuilder.load(self.tmpfile)
|
||||
rb.rebalance()
|
||||
|
||||
# Make sure we write down the ring in the old way, with no version
|
||||
rd = rb.get_ring()
|
||||
rd.version = None
|
||||
rd.save(self.tmpfile + ".ring.gz")
|
||||
|
||||
ring_file = os.path.join(os.path.dirname(self.tmpfile),
|
||||
os.path.basename(self.tmpfile) + ".ring.gz")
|
||||
os.remove(self.tmpfile) # loses file...
|
||||
|
||||
argv = ["", ring_file, "write_builder", "24"]
|
||||
self.assertIsNone(ringbuilder.main(argv))
|
||||
|
||||
# Note that we've picked up an extension
|
||||
builder = RingBuilder.load(self.tmpfile + '.builder')
|
||||
# No version in the .ring.gz; default to 0
|
||||
self.assertEqual(builder.version, 0)
|
||||
|
||||
def test_write_builder_after_device_removal(self):
|
||||
# Test regenerating builder file after having removed a device
|
||||
# and lost the builder file
|
||||
|
@ -16,6 +16,7 @@
|
||||
import array
|
||||
import collections
|
||||
import six.moves.cPickle as pickle
|
||||
import hashlib
|
||||
import os
|
||||
import unittest
|
||||
import stat
|
||||
@ -63,6 +64,8 @@ class TestRingData(unittest.TestCase):
|
||||
rd_got._replica2part2dev_id)
|
||||
self.assertEqual(rd_expected.devs, rd_got.devs)
|
||||
self.assertEqual(rd_expected._part_shift, rd_got._part_shift)
|
||||
self.assertEqual(rd_expected.next_part_power, rd_got.next_part_power)
|
||||
self.assertEqual(rd_expected.version, rd_got.version)
|
||||
|
||||
def test_attrs(self):
|
||||
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
|
||||
@ -230,6 +233,17 @@ class TestRing(TestRingBase):
|
||||
self.assertEqual(self.ring.devs, self.intended_devs)
|
||||
self.assertEqual(self.ring.reload_time, self.intended_reload_time)
|
||||
self.assertEqual(self.ring.serialized_path, self.testgz)
|
||||
self.assertIsNone(self.ring.version)
|
||||
|
||||
with open(self.testgz, 'rb') as fp:
|
||||
expected_md5 = hashlib.md5()
|
||||
expected_size = 0
|
||||
for chunk in iter(lambda: fp.read(2 ** 16), b''):
|
||||
expected_md5.update(chunk)
|
||||
expected_size += len(chunk)
|
||||
self.assertEqual(self.ring.md5, expected_md5.hexdigest())
|
||||
self.assertEqual(self.ring.size, expected_size)
|
||||
|
||||
# test invalid endcap
|
||||
with mock.patch.object(utils, 'HASH_PATH_SUFFIX', b''), \
|
||||
mock.patch.object(utils, 'HASH_PATH_PREFIX', b''), \
|
||||
@ -900,6 +914,7 @@ class TestRing(TestRingBase):
|
||||
rb.rebalance()
|
||||
rb.get_ring().save(self.testgz)
|
||||
r = ring.Ring(self.testdir, ring_name='whatever')
|
||||
self.assertEqual(r.version, rb.version)
|
||||
|
||||
class CountingRingTable(object):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user