Merge "Limit partition movement when adding a new tier"

This commit is contained in:
Jenkins 2014-10-03 02:46:25 +00:00 committed by Gerrit Code Review
commit fbc58b082c
2 changed files with 61 additions and 4 deletions

View File

@ -644,6 +644,26 @@ class RingBuilder(object):
self._last_part_moves[part] = 0xff self._last_part_moves[part] = 0xff
self._last_part_moves_epoch = int(time()) self._last_part_moves_epoch = int(time())
def _get_available_parts(self):
"""
Returns a tuple (wanted_parts_total, dict of (tier: available parts in
other tiers) for all tiers in the ring.
Devices that have too much partitions (negative parts_wanted) are
ignored, otherwise the sum of all parts_wanted is 0 +/- rounding
errors.
"""
wanted_parts_total = 0
wanted_parts_for_tier = {}
for dev in self._iter_devs():
wanted_parts_total += max(0, dev['parts_wanted'])
for tier in tiers_for_dev(dev):
if tier not in wanted_parts_for_tier:
wanted_parts_for_tier[tier] = 0
wanted_parts_for_tier[tier] += max(0, dev['parts_wanted'])
return (wanted_parts_total, wanted_parts_for_tier)
def _gather_reassign_parts(self): def _gather_reassign_parts(self):
""" """
Returns a list of (partition, replicas) pairs to be reassigned by Returns a list of (partition, replicas) pairs to be reassigned by
@ -672,6 +692,9 @@ class RingBuilder(object):
# currently sufficient spread out across the cluster. # currently sufficient spread out across the cluster.
spread_out_parts = defaultdict(list) spread_out_parts = defaultdict(list)
max_allowed_replicas = self._build_max_replicas_by_tier() max_allowed_replicas = self._build_max_replicas_by_tier()
wanted_parts_total, wanted_parts_for_tier = \
self._get_available_parts()
moved_parts = 0
for part in xrange(self.parts): for part in xrange(self.parts):
# Only move one replica at a time if possible. # Only move one replica at a time if possible.
if part in removed_dev_parts: if part in removed_dev_parts:
@ -702,14 +725,20 @@ class RingBuilder(object):
rep_at_tier = 0 rep_at_tier = 0
if tier in replicas_at_tier: if tier in replicas_at_tier:
rep_at_tier = replicas_at_tier[tier] rep_at_tier = replicas_at_tier[tier]
# Only allowing parts to be gathered if
# there are wanted parts on other tiers
available_parts_for_tier = wanted_parts_total - \
wanted_parts_for_tier[tier] - moved_parts
if (rep_at_tier > max_allowed_replicas[tier] and if (rep_at_tier > max_allowed_replicas[tier] and
self._last_part_moves[part] >= self._last_part_moves[part] >=
self.min_part_hours): self.min_part_hours and
available_parts_for_tier > 0):
self._last_part_moves[part] = 0 self._last_part_moves[part] = 0
spread_out_parts[part].append(replica) spread_out_parts[part].append(replica)
dev['parts_wanted'] += 1 dev['parts_wanted'] += 1
dev['parts'] -= 1 dev['parts'] -= 1
removed_replica = True removed_replica = True
moved_parts += 1
break break
if removed_replica: if removed_replica:
if dev['id'] not in tfd: if dev['id'] not in tfd:

View File

@ -19,6 +19,7 @@ import os
import unittest import unittest
import cPickle as pickle import cPickle as pickle
from collections import defaultdict from collections import defaultdict
from math import ceil
from tempfile import mkdtemp from tempfile import mkdtemp
from shutil import rmtree from shutil import rmtree
@ -718,9 +719,7 @@ class TestRingBuilder(unittest.TestCase):
population_by_region = self._get_population_by_region(rb) population_by_region = self._get_population_by_region(rb)
self.assertEquals(population_by_region, {0: 682, 1: 86}) self.assertEquals(population_by_region, {0: 682, 1: 86})
# Rebalancing will reassign 143 of the partitions, which is ~1/5 self.assertEqual(87, changed_parts)
# of the total amount of partitions (3*256)
self.assertEqual(143, changed_parts)
# and since there's not enough room, subsequent rebalances will not # and since there's not enough room, subsequent rebalances will not
# cause additional assignments to r1 # cause additional assignments to r1
@ -744,6 +743,35 @@ class TestRingBuilder(unittest.TestCase):
population_by_region = self._get_population_by_region(rb) population_by_region = self._get_population_by_region(rb)
self.assertEquals(population_by_region, {0: 512, 1: 256}) self.assertEquals(population_by_region, {0: 512, 1: 256})
def test_avoid_tier_change_new_region(self):
rb = ring.RingBuilder(8, 3, 1)
for i in range(5):
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.1', 'port': i, 'device': 'sda1'})
rb.rebalance(seed=2)
# Add a new device in new region to a balanced ring
rb.add_dev({'id': 5, 'region': 1, 'zone': 0, 'weight': 0,
'ip': '127.0.0.5', 'port': 10000, 'device': 'sda1'})
# Increase the weight of region 1 slowly
moved_partitions = []
for weight in range(0, 101, 10):
rb.set_dev_weight(5, weight)
rb.pretend_min_part_hours_passed()
changed_parts, _balance = rb.rebalance(seed=2)
moved_partitions.append(changed_parts)
# Ensure that the second region has enough partitions
# Otherwise there will be replicas at risk
min_parts_for_r1 = ceil(weight / (500.0 + weight) * 768)
parts_for_r1 = self._get_population_by_region(rb).get(1, 0)
self.assertEqual(min_parts_for_r1, parts_for_r1)
# Number of partitions moved on each rebalance
# 10/510 * 768 ~ 15.06 -> move at least 15 partitions in first step
ref = [0, 17, 16, 16, 14, 15, 13, 13, 12, 12, 14]
self.assertEqual(ref, moved_partitions)
def test_set_replicas_increase(self): def test_set_replicas_increase(self):
rb = ring.RingBuilder(8, 2, 0) rb = ring.RingBuilder(8, 2, 0)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1, rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,