Merge "Unset random seed after rebalancing ring"

This commit is contained in:
Jenkins 2017-08-11 14:56:31 +00:00 committed by Gerrit Code Review
commit bf09a06708
4 changed files with 95 additions and 55 deletions

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import copy
import errno
import itertools
@ -58,6 +59,24 @@ except ImportError:
pass
@contextlib.contextmanager
def _set_random_seed(seed):
# If random seed is set when entering this context then reset original
# random state when exiting the context. This avoids a test calling this
# method with a fixed seed value causing all subsequent tests to use a
# repeatable random sequence.
random_state = None
if seed is not None:
random_state = random.getstate()
random.seed(seed)
try:
yield
finally:
if random_state:
# resetting state rather than calling seed() eases unit testing
random.setstate(random_state)
class RingBuilder(object):
"""
Used to build swift.common.ring.RingData instances to be written to disk
@ -482,9 +501,6 @@ class RingBuilder(object):
'num_devices': num_devices,
})
if seed is not None:
random.seed(seed)
self._ring = None
old_replica2part2dev = copy.deepcopy(self._replica2part2dev)
@ -494,45 +510,47 @@ class RingBuilder(object):
self._last_part_moves = array('B', itertools.repeat(0, self.parts))
self._update_last_part_moves()
replica_plan = self._build_replica_plan()
self._set_parts_wanted(replica_plan)
with _set_random_seed(seed):
replica_plan = self._build_replica_plan()
self._set_parts_wanted(replica_plan)
assign_parts = defaultdict(list)
# gather parts from replica count adjustment
self._adjust_replica2part2dev_size(assign_parts)
# gather parts from failed devices
removed_devs = self._gather_parts_from_failed_devices(assign_parts)
# gather parts for dispersion (N.B. this only picks up parts that
# *must* disperse according to the replica plan)
self._gather_parts_for_dispersion(assign_parts, replica_plan)
# we'll gather a few times, or until we archive the plan
for gather_count in range(MAX_BALANCE_GATHER_COUNT):
self._gather_parts_for_balance(assign_parts, replica_plan)
if not assign_parts:
# most likely min part hours
finish_status = 'Unable to finish'
break
assign_parts_list = list(assign_parts.items())
# shuffle the parts to be reassigned, we have no preference on the
# order in which the replica plan is fulfilled.
random.shuffle(assign_parts_list)
# reset assign_parts map for next iteration
assign_parts = defaultdict(list)
# gather parts from replica count adjustment
self._adjust_replica2part2dev_size(assign_parts)
# gather parts from failed devices
removed_devs = self._gather_parts_from_failed_devices(assign_parts)
# gather parts for dispersion (N.B. this only picks up parts that
# *must* disperse according to the replica plan)
self._gather_parts_for_dispersion(assign_parts, replica_plan)
num_part_replicas = sum(len(r) for p, r in assign_parts_list)
self.logger.debug("Gathered %d parts", num_part_replicas)
self._reassign_parts(assign_parts_list, replica_plan)
self.logger.debug("Assigned %d parts", num_part_replicas)
# we'll gather a few times, or until we archive the plan
for gather_count in range(MAX_BALANCE_GATHER_COUNT):
self._gather_parts_for_balance(assign_parts, replica_plan)
if not assign_parts:
# most likely min part hours
finish_status = 'Unable to finish'
break
assign_parts_list = list(assign_parts.items())
# shuffle the parts to be reassigned, we have no preference on
# the order in which the replica plan is fulfilled.
random.shuffle(assign_parts_list)
# reset assign_parts map for next iteration
assign_parts = defaultdict(list)
if not sum(d['parts_wanted'] < 0 for d in
self._iter_devs()):
finish_status = 'Finished'
break
else:
finish_status = 'Unable to finish'
self.logger.debug('%(status)s rebalance plan after %(count)s attempts',
{'status': finish_status, 'count': gather_count + 1})
num_part_replicas = sum(len(r) for p, r in assign_parts_list)
self.logger.debug("Gathered %d parts", num_part_replicas)
self._reassign_parts(assign_parts_list, replica_plan)
self.logger.debug("Assigned %d parts", num_part_replicas)
if not sum(d['parts_wanted'] < 0 for d in
self._iter_devs()):
finish_status = 'Finished'
break
else:
finish_status = 'Unable to finish'
self.logger.debug(
'%(status)s rebalance plan after %(count)s attempts',
{'status': finish_status, 'count': gather_count + 1})
self.devs_changed = False
self.version += 1

View File

@ -174,6 +174,19 @@ class TestRingBuilder(unittest.TestCase):
self.assertNotEqual(r0.to_dict(), r1.to_dict())
self.assertEqual(r1.to_dict(), r2.to_dict())
# check that random state is reset
pre_state = random.getstate()
rb2.rebalance(seed=10)
self.assertEqual(pre_state, random.getstate(),
"Random state was not reset")
pre_state = random.getstate()
with mock.patch.object(rb2, "_build_replica_plan",
side_effect=Exception()):
self.assertRaises(Exception, rb2.rebalance, seed=10)
self.assertEqual(pre_state, random.getstate(),
"Random state was not reset")
def test_rebalance_part_on_deleted_other_part_on_drained(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'region': 1, 'zone': 1, 'weight': 1,
@ -1057,18 +1070,18 @@ class TestRingBuilder(unittest.TestCase):
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 0, 'weight': 900,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 4, 'region': 0, 'zone': 0, 'weight': 900,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 6, 'region': 0, 'zone': 0, 'weight': 900,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdd'})
# 127.0.0.2 (odd devices)
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 500,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdb'})
'ip': '127.0.0.2', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 3, 'region': 0, 'zone': 0, 'weight': 500,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdc'})
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 5, 'region': 0, 'zone': 0, 'weight': 500,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdd'})
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 7, 'region': 0, 'zone': 0, 'weight': 500,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdd'})
@ -1175,9 +1188,13 @@ class TestRingBuilder(unittest.TestCase):
}
self.assertEqual(expected, {d['id']: d['parts_wanted']
for d in rb._iter_devs()})
self.assertEqual(rb.get_balance(), 100)
rb.pretend_min_part_hours_passed()
rb.rebalance()
# There's something like a 11% chance that we won't be able to get to
# a balance of 0 (and a 6% chance that we won't change anything at all)
# Pick a seed to make this pass.
rb.rebalance(seed=123)
self.assertEqual(rb.get_balance(), 0)
def test_multiple_duplicate_device_assignment(self):
@ -1601,7 +1618,7 @@ class TestRingBuilder(unittest.TestCase):
# overload is 10% (0.1).
rb.set_overload(0.1)
rb.pretend_min_part_hours_passed()
rb.rebalance()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb, key='zone')
self.assertEqual(part_counts[0], 212)

View File

@ -904,7 +904,7 @@ class TestRing(TestRingBase):
part = random.randint(0, r.partition_count)
node_iter = r.get_more_nodes(part)
next(node_iter)
self.assertEqual(5, counting_table.count)
self.assertLess(counting_table.count, 14)
# sanity
self.assertEqual(1, r._num_regions)
self.assertEqual(2, r._num_zones)

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import email.parser
import itertools
import math
@ -4310,13 +4311,17 @@ class TestECDuplicationObjController(
# all nodes have a frag but there is no one set that reaches quorum,
# which means there is no backend 404 response, but proxy should still
# return 404 rather than 503
obj1 = self._make_ec_object_stub()
obj2 = self._make_ec_object_stub()
obj3 = self._make_ec_object_stub()
obj4 = self._make_ec_object_stub()
obj5 = self._make_ec_object_stub()
obj6 = self._make_ec_object_stub()
obj7 = self._make_ec_object_stub()
stub_objects = [
self._make_ec_object_stub('obj1' * self.policy.ec_segment_size),
self._make_ec_object_stub('obj2' * self.policy.ec_segment_size),
self._make_ec_object_stub('obj3' * self.policy.ec_segment_size),
self._make_ec_object_stub('obj4' * self.policy.ec_segment_size),
self._make_ec_object_stub('obj5' * self.policy.ec_segment_size),
self._make_ec_object_stub('obj6' * self.policy.ec_segment_size),
self._make_ec_object_stub('obj7' * self.policy.ec_segment_size),
]
etags = collections.Counter(stub['etag'] for stub in stub_objects)
self.assertEqual(len(etags), 7, etags) # sanity
# primaries and handoffs for required nodes
# this is 10-4 * 2 case so that 56 requests (2 * replicas) required
@ -4326,7 +4331,7 @@ class TestECDuplicationObjController(
# fill them out to the primary and handoff nodes
node_frags = []
for frag in range(8):
for stub_obj in (obj1, obj2, obj3, obj4, obj5, obj6, obj7):
for stub_obj in stub_objects:
if len(node_frags) >= required_nodes:
# we already have enough responses
break