@ -22,6 +22,7 @@ import tempfile
import unittest
import shutil
import copy
import time
from collections import defaultdict , Counter
@ -965,8 +966,9 @@ class TestComposeLoadComponents(TestLoadComponents):
class TestCooperativeRingBuilder ( BaseTestCompositeBuilder ) :
def _make_coop_builder ( self , region , composite_builder , rebalance = False ) :
rb = CooperativeRingBuilder ( 8 , 3 , 1 , composite_builder )
def _make_coop_builder ( self , region , composite_builder , rebalance = False ,
min_part_hours = 1 ) :
rb = CooperativeRingBuilder ( 8 , 3 , min_part_hours , composite_builder )
if composite_builder . _builders is None :
composite_builder . _builders = [ rb ]
for i in range ( 3 ) :
@ -986,93 +988,231 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
for part2dev_id in builder . _replica2part2dev
for dev_id in part2dev_id )
def get_moved_parts ( self , after , before ) :
def uniqueness ( dev ) :
return dev [ ' ip ' ] , dev [ ' port ' ] , dev [ ' device ' ]
moved_parts = set ( )
for p in range ( before . parts ) :
if ( { uniqueness ( dev ) for dev in before . _devs_for_part ( p ) } !=
{ uniqueness ( dev ) for dev in after . _devs_for_part ( p ) } ) :
moved_parts . add ( p )
return moved_parts
def num_parts_can_move ( self , builder ) :
# note that can_part_move() gives consideration to the
# _part_moved_bitmap which is only reset when a rebalance starts
return len (
[ p for p in range ( builder . parts )
if super ( CooperativeRingBuilder , builder ) . _can_part_move ( p ) ] )
@mock.patch ( ' swift.common.ring.builder.time ' )
def test_rebalance_respects_cobuilder_part_moves ( self , mock_time ) :
def do_rebalance ( builder ) :
old_part_devs = [ builder . _devs_for_part ( part )
for part in range ( builder . parts ) ]
num_moved , _ , _ = builder . rebalance ( )
moved_parts = {
p for p in range ( builder . parts )
if old_part_devs [ p ] != builder . _devs_for_part ( p ) }
self . assertEqual ( len ( moved_parts ) , num_moved ) # sanity check
return num_moved , moved_parts
def num_parts_can_move ( builder ) :
# note that can_part_move() gives consideration to the
# _part_moved_bitmap which is only reset when a rebalance starts
return len (
[ p for p in range ( builder . parts )
if super ( CooperativeRingBuilder , builder ) . _can_part_move ( p ) ] )
mock_time . return_value = 0
def _check_rebalance_respects_cobuilder_part_moves (
self , min_part_hours , mock_time ) :
mock_time . return_value = now = int ( time . time ( ) )
builder_files = [ ]
cb = CompositeRingBuilder ( )
rb1 = self . _make_coop_builder ( 1 , cb )
rb2 = self . _make_coop_builder ( 2 , cb )
rb3 = self . _make_coop_builder ( 3 , cb )
cb . _builders = [ rb1 , rb2 , rb3 ]
for i in ( 1 , 2 , 3 ) :
b = self . _make_coop_builder ( i , cb , min_part_hours = min_part_hours )
fname = os . path . join ( self . tmpdir , ' builder_ %s .builder ' % i )
b . save ( fname )
builder_files . append ( fname )
builder_files , builders = cb . load_components ( builder_files )
# all cobuilders can perform initial rebalance
for rb in ( rb1 , rb2 , rb3 ) :
rb . rebalance ( )
actual = self . _partition_counts ( rb )
exp = { 0 : 256 , 1 : 256 , 2 : 256 }
self . assertEqual ( exp , actual ,
' Expected %s but got %s for region %s ' %
( exp , actual , next ( rb . _iter_devs ( ) ) [ ' region ' ] ) )
# jump forwards min_part_hours, both builders can move all parts
mock_time . return_value = 3600
self . add_dev ( rb1 )
# sanity checks: rb1 and rb2 are both ready for rebalance
self . assertEqual ( 0 , rb2 . min_part_seconds_left )
self . assertEqual ( 0 , rb1 . min_part_seconds_left )
cb . rebalance ( )
exp = { 0 : 256 , 1 : 256 , 2 : 256 }
self . assertEqual ( exp , self . _partition_counts ( builders [ 0 ] ) )
self . assertEqual ( exp , self . _partition_counts ( builders [ 1 ] ) )
self . assertEqual ( exp , self . _partition_counts ( builders [ 2 ] ) )
exp = min_part_hours * 3600
self . assertEqual ( exp , builders [ 0 ] . min_part_seconds_left )
self . assertEqual ( exp , builders [ 1 ] . min_part_seconds_left )
self . assertEqual ( exp , builders [ 2 ] . min_part_seconds_left )
# jump forwards min_part_hours
now + = min_part_hours * 3600
mock_time . return_value = now
old_builders = [ ]
for builder in builders :
old_builder = CooperativeRingBuilder ( 8 , 3 , min_part_hours , None )
old_builder . copy_from ( copy . deepcopy ( builder . to_dict ( ) ) )
old_builders . append ( old_builder )
for builder in builders :
self . add_dev ( builder )
# sanity checks: all builders are ready for rebalance
self . assertEqual ( 0 , builders [ 0 ] . min_part_seconds_left )
self . assertEqual ( 0 , builders [ 1 ] . min_part_seconds_left )
self . assertEqual ( 0 , builders [ 2 ] . min_part_seconds_left )
# ... but last_part_moves not yet updated to current epoch
self . assertEqual ( 0 , num_parts_can_move ( rb1 ) )
self . assertEqual ( 0 , num_parts_can_move ( rb2 ) )
# rebalancing rb1 will update epoch for both builders' last_part_moves
num_moved , rb1_parts_moved = do_rebalance ( rb1 )
self . assertEqual ( 192 , num_moved )
self . assertEqual ( self . _partition_counts ( rb1 ) ,
if min_part_hours > 0 :
self . assertEqual ( 0 , self . num_parts_can_move ( builders [ 0 ] ) )
self . assertEqual ( 0 , self . num_parts_can_move ( builders [ 1 ] ) )
self . assertEqual ( 0 , self . num_parts_can_move ( builders [ 2 ] ) )
with mock . patch ( ' swift.common.ring.composite_builder.shuffle ' ,
lambda x : x ) :
cb . rebalance ( )
rb1_parts_moved = self . get_moved_parts ( builders [ 0 ] , old_builders [ 0 ] )
self . assertEqual ( 192 , len ( rb1_parts_moved ) )
self . assertEqual ( self . _partition_counts ( builders [ 0 ] ) ,
{ 0 : 192 , 1 : 192 , 2 : 192 , 3 : 192 } )
rb2_parts_moved = self . get_moved_parts ( builders [ 1 ] , old_builders [ 1 ] )
self . assertEqual ( 64 , len ( rb2_parts_moved ) )
counts = self . _partition_counts ( builders [ 1 ] )
self . assertEqual ( counts [ 3 ] , 64 )
self . assertEqual ( [ 234 , 235 , 235 ] , sorted ( counts . values ( ) [ : 3 ] ) )
self . assertFalse ( rb2_parts_moved . intersection ( rb1_parts_moved ) )
# rb3 can't rebalance - all parts moved while rebalancing rb1 and rb2
self . assertEqual (
0 , len ( self . get_moved_parts ( builders [ 2 ] , old_builders [ 2 ] ) ) )
# jump forwards min_part_hours, all builders can move all parts again,
# so now rb2 should be able to further rebalance
now + = min_part_hours * 3600
mock_time . return_value = now
old_builders = [ ]
for builder in builders :
old_builder = CooperativeRingBuilder ( 8 , 3 , min_part_hours , None )
old_builder . copy_from ( copy . deepcopy ( builder . to_dict ( ) ) )
old_builders . append ( old_builder )
with mock . patch ( ' swift.common.ring.composite_builder.shuffle ' ,
lambda x : x ) :
cb . rebalance ( )
rb2_parts_moved = self . get_moved_parts ( builders [ 1 ] , old_builders [ 1 ] )
self . assertGreater ( len ( rb2_parts_moved ) , 64 )
self . assertGreater ( self . _partition_counts ( builders [ 1 ] ) [ 3 ] , 64 )
self . assertLess ( self . num_parts_can_move ( builders [ 2 ] ) , 256 )
self . assertEqual ( 256 , self . num_parts_can_move ( builders [ 0 ] ) )
# and rb3 should also have been able to move some parts
rb3_parts_moved = self . get_moved_parts ( builders [ 2 ] , old_builders [ 2 ] )
self . assertGreater ( len ( rb3_parts_moved ) , 0 )
self . assertFalse ( rb3_parts_moved . intersection ( rb2_parts_moved ) )
# but cobuilders will not prevent a new rb rebalancing for first time
rb4 = self . _make_coop_builder ( 4 , cb , rebalance = False ,
min_part_hours = min_part_hours )
builders . append ( rb4 )
builder_files = [ ]
for i , builder in enumerate ( builders ) :
fname = os . path . join ( self . tmpdir , ' builder_ %s .builder ' % i )
builder . save ( fname )
builder_files . append ( fname )
cb = CompositeRingBuilder ( )
builder_files , builders = cb . load_components ( builder_files )
cb . rebalance ( )
self . assertEqual ( 256 , len ( self . get_moved_parts ( builders [ 3 ] , rb4 ) ) )
def test_rebalance_respects_cobuilder_part_moves ( self ) :
self . _check_rebalance_respects_cobuilder_part_moves ( 1 )
self . _check_rebalance_respects_cobuilder_part_moves ( 0 )
@mock.patch ( ' swift.common.ring.builder.time ' )
def _check_rebalance_cobuilder_states (
self , min_part_hours , mock_time ) :
@contextmanager
def mock_rebalance ( ) :
# wrap rebalance() in order to capture builder states before and
# after each component rebalance
orig_rebalance = RingBuilder . rebalance
# a dict mapping builder -> (list of captured builder states)
captured_builder_states = defaultdict ( list )
def update_states ( ) :
for b in cb . _builders :
rb = CooperativeRingBuilder ( 8 , 3 , min_part_hours , None )
rb . copy_from ( copy . deepcopy ( b . to_dict ( ) ) )
rb . _part_moved_bitmap = bytearray ( b . _part_moved_bitmap )
captured_builder_states [ b ] . append ( rb )
def wrap_rebalance ( builder_instance ) :
update_states ( )
results = orig_rebalance ( builder_instance )
update_states ( )
return results
with mock . patch ( ' swift.common.ring.RingBuilder.rebalance ' ,
wrap_rebalance ) :
yield captured_builder_states
mock_time . return_value = now = int ( time . time ( ) )
builder_files = [ ]
cb = CompositeRingBuilder ( )
for i in ( 1 , 2 , 3 ) :
b = self . _make_coop_builder ( i , cb , min_part_hours = min_part_hours )
fname = os . path . join ( self . tmpdir , ' builder_ %s .builder ' % i )
b . save ( fname )
builder_files . append ( fname )
builder_files , builders = cb . load_components ( builder_files )
# all cobuilders can perform initial rebalance
cb . rebalance ( )
# jump forwards min_part_hours
now + = min_part_hours * 3600
mock_time . return_value = now
for builder in builders :
self . add_dev ( builder )
with mock . patch ( ' swift.common.ring.composite_builder.shuffle ' ,
lambda x : x ) :
with mock_rebalance ( ) as captured_states :
cb . rebalance ( )
# sanity - state captured before and after each component rebalance
self . assertEqual ( len ( builders ) , len ( captured_states ) )
for states in captured_states . values ( ) :
self . assertEqual ( 2 * len ( builders ) , len ( states ) )
# for each component we have a list of it's builder states
rb1s = captured_states [ builders [ 0 ] ]
rb2s = captured_states [ builders [ 1 ] ]
rb3s = captured_states [ builders [ 2 ] ]
# rebalancing will update epoch for all builders' last_part_moves
self . assertEqual ( now , rb1s [ 0 ] . _last_part_moves_epoch )
self . assertEqual ( now , rb2s [ 0 ] . _last_part_moves_epoch )
self . assertEqual ( now , rb3s [ 0 ] . _last_part_moves_epoch )
# so, in state before any component rebalance, all can now move parts
# N.B. num_parts_can_move gathers super class's (i.e. RingBuilder)
# _can_part_move so that it doesn't refer cobuilders state.
self . assertEqual ( 256 , num_parts_can_move ( rb2 ) )
self . assertEqual ( 64 , num_parts_can_move ( rb1 ) )
# _can_part_move so that it doesn't refer to cobuilders state.
self . assertEqual ( 256 , self . num_parts_can_move ( rb1s [ 0 ] ) )
self . assertEqual ( 256 , self . num_parts_can_move ( rb2s [ 0 ] ) )
self . assertEqual ( 256 , self . num_parts_can_move ( rb3s [ 0 ] ) )
# after first component has been rebalanced it has moved parts
self . assertEqual ( 64 , self . num_parts_can_move ( rb1s [ 1 ] ) )
self . assertEqual ( 256 , self . num_parts_can_move ( rb2s [ 2 ] ) )
self . assertEqual ( 256 , self . num_parts_can_move ( rb3s [ 2 ] ) )
rb1_parts_moved = self . get_moved_parts ( rb1s [ 1 ] , rb1s [ 0 ] )
self . assertEqual ( 192 , len ( rb1_parts_moved ) )
self . assertEqual ( self . _partition_counts ( rb1s [ 1 ] ) ,
{ 0 : 192 , 1 : 192 , 2 : 192 , 3 : 192 } )
# rebalancing rb2 - rb2 in isolation could potentially move all parts
# so would move 192 parts to new device, but it is constrained by rb1
# only having 64 parts that can move
self . add_dev ( rb2 )
num_moved , rb2_parts_moved = do_rebalance ( rb2 )
self . assertEqual ( 64 , num_moved )
counts = self . _partition_counts ( rb2 )
rb2_parts_moved = self . get_moved_parts ( rb2s [ 3 ] , rb2s [ 2 ] )
self . assertEqual ( 64 , len ( rb2_parts_moved ) )
counts = self . _partition_counts ( rb2s [ 3 ] )
self . assertEqual ( counts [ 3 ] , 64 )
self . assertEqual ( [ 234 , 235 , 235 ] , sorted ( counts . values ( ) [ : 3 ] ) )
self . assertFalse ( rb2_parts_moved . intersection ( rb1_parts_moved ) )
self . assertEqual ( 192 , num_parts_can_move ( rb2 ) )
self . assertEqual ( 64 , num_parts_can_move ( rb1 ) )
self . assertEqual ( 192 , self . num_parts_can_move ( rb2s [ 3 ] ) )
self . assertEqual ( 64 , self . num_parts_can_move ( rb1s [ 3 ] ) )
# rb3 can't rebalance - all parts moved while rebalancing rb1 and rb2
self . add_dev ( rb3 )
num_moved , rb3_parts_moved = do_rebalance ( rb3 )
self . assertEqual ( 0 , num_moved )
self . assertEqual ( 0 , len ( self . get_moved_parts ( rb3s [ 5 ] , rb3s [ 0 ] ) ) )
# jump forwards min_part_hours, both builders can move all parts again,
# so now rb2 should be able to further rebalance
mock_time . return_value = 7200
do_rebalance ( rb2 )
self . assertGreater ( self . _partition_counts ( rb2 ) [ 3 ] , 64 )
self . assertLess ( num_parts_can_move ( rb2 ) , 256 )
self . assertEqual ( 256 , num_parts_can_move ( rb1 ) ) # sanity check
# but cobuilders will not prevent a rb rebalancing for first time
rb4 = self . _make_coop_builder ( 4 , cb , rebalance = False )
cb . _builders . append ( rb4 )
num_moved , _ , _ = rb4 . rebalance ( )
self . assertEqual ( 3 * 256 , num_moved )
def test_rebalance_cobuilders ( self ) :
def test_rebalance_cobuilder_states ( self ) :
self . _check_rebalance_cobuilder_states ( 1 )
self . _check_rebalance_cobuilder_states ( 0 )
def _check_rebalance_cobuilders_calls ( self , min_part_hours ) :
# verify that co-builder methods are called during one builder's
# rebalance
@contextmanager
@ -1107,26 +1247,20 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
fake_can_part_move ) :
yield calls
# single component builder in parent builder
cb = CompositeRingBuilder ( )
rb1 = self . _make_coop_builder ( 1 , cb )
rb1 = self . _make_coop_builder ( 1 , cb , min_part_hours = min_part_hours )
rb2 = self . _make_coop_builder ( 2 , cb , min_part_hours = min_part_hours )
cb . _builders = [ rb1 , rb2 ]
# composite rebalance updates last_part_moves before any component
# rebalance - after that expect no more updates
with mock_update_last_part_moves ( ) as update_calls :
with mock_can_part_move ( ) as can_part_move_calls :
rb1 . rebalance ( )
self . assertEqual ( [ rb1 ] , update_calls )
self . assertEqual ( [ rb1 ] , can_part_move_calls . keys ( ) )
self . assertEqual ( 768 , len ( can_part_move_calls [ rb1 ] ) )
cb . update_last_part_moves ( )
self . assertEqual ( sorted ( [ rb1 , rb2 ] ) , sorted ( update_calls ) )
# two component builders with same parent builder
cb = CompositeRingBuilder ( )
rb1 = self . _make_coop_builder ( 1 , cb )
rb2 = self . _make_coop_builder ( 2 , cb )
cb . _builders = [ rb1 , rb2 ]
with mock_update_last_part_moves ( ) as update_calls :
with mock_can_part_move ( ) as can_part_move_calls :
rb2 . rebalance ( )
# both builders get updated
self . assertEqual ( sorted ( [ rb1 , rb2 ] ) , sorted ( update_calls ) )
self . assertFalse ( update_calls )
# rb1 has never been rebalanced so no calls propagate from its
# can_part_move method to to its superclass _can_part_move method
self . assertEqual ( [ rb2 ] , can_part_move_calls . keys ( ) )
@ -1134,14 +1268,16 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
with mock_update_last_part_moves ( ) as update_calls :
with mock_can_part_move ( ) as can_part_move_calls :
rb1 . rebalance ( )
# both builders get updated
self . assertEqual ( sorted ( [ rb1 , rb2 ] ) , sorted ( update_calls ) )
self . assertFalse ( update_calls )
# rb1 is being rebalanced so gets checked, and rb2 also gets checked
self . assertEqual ( sorted ( [ rb1 , rb2 ] ) , sorted ( can_part_move_calls ) )
self . assertEqual ( 768 , len ( can_part_move_calls [ rb1 ] ) )
self . assertEqual ( 768 , len ( can_part_move_calls [ rb2 ] ) )
def test_rebalance_cobuilders_calls ( self ) :
self . _check_rebalance_cobuilders_calls ( 1 )
self . _check_rebalance_cobuilders_calls ( 0 )
def test_save_then_load ( self ) :
cb = CompositeRingBuilder ( )
coop_rb = self . _make_coop_builder ( 1 , cb , rebalance = True )