Merge "Refactor db_replicator's roundrobin_datadirs"
This commit is contained in:
@@ -65,6 +65,47 @@ def quarantine_db(object_file, server_type):
|
|||||||
renamer(object_dir, quarantine_dir)
|
renamer(object_dir, quarantine_dir)
|
||||||
|
|
||||||
|
|
||||||
|
def roundrobin_datadirs(datadirs):
|
||||||
|
"""
|
||||||
|
Generator to walk the data dirs in a round robin manner, evenly
|
||||||
|
hitting each device on the system, and yielding any .db files
|
||||||
|
found (in their proper places). The partitions within each data
|
||||||
|
dir are walked randomly, however.
|
||||||
|
|
||||||
|
:param datadirs: a list of (path, node_id) to walk
|
||||||
|
:returns: A generator of (partition, path_to_db_file, node_id)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def walk_datadir(datadir, node_id):
|
||||||
|
partitions = os.listdir(datadir)
|
||||||
|
random.shuffle(partitions)
|
||||||
|
for partition in partitions:
|
||||||
|
part_dir = os.path.join(datadir, partition)
|
||||||
|
if not os.path.isdir(part_dir):
|
||||||
|
continue
|
||||||
|
suffixes = os.listdir(part_dir)
|
||||||
|
for suffix in suffixes:
|
||||||
|
suff_dir = os.path.join(part_dir, suffix)
|
||||||
|
if not os.path.isdir(suff_dir):
|
||||||
|
continue
|
||||||
|
hashes = os.listdir(suff_dir)
|
||||||
|
for hsh in hashes:
|
||||||
|
hash_dir = os.path.join(suff_dir, hsh)
|
||||||
|
if not os.path.isdir(hash_dir):
|
||||||
|
continue
|
||||||
|
object_file = os.path.join(hash_dir, hsh + '.db')
|
||||||
|
if os.path.exists(object_file):
|
||||||
|
yield (partition, object_file, node_id)
|
||||||
|
|
||||||
|
its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
|
||||||
|
while its:
|
||||||
|
for it in its:
|
||||||
|
try:
|
||||||
|
yield it.next()
|
||||||
|
except StopIteration:
|
||||||
|
its.remove(it)
|
||||||
|
|
||||||
|
|
||||||
class ReplConnection(BufferedHTTPConnection):
|
class ReplConnection(BufferedHTTPConnection):
|
||||||
"""
|
"""
|
||||||
Helper to simplify REPLICATEing to a remote server.
|
Helper to simplify REPLICATEing to a remote server.
|
||||||
@@ -458,31 +499,6 @@ class Replicator(Daemon):
|
|||||||
def report_up_to_date(self, full_info):
|
def report_up_to_date(self, full_info):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def roundrobin_datadirs(self, datadirs):
|
|
||||||
"""
|
|
||||||
Generator to walk the data dirs in a round robin manner, evenly
|
|
||||||
hitting each device on the system.
|
|
||||||
|
|
||||||
:param datadirs: a list of paths to walk
|
|
||||||
"""
|
|
||||||
|
|
||||||
def walk_datadir(datadir, node_id):
|
|
||||||
partitions = os.listdir(datadir)
|
|
||||||
random.shuffle(partitions)
|
|
||||||
for partition in partitions:
|
|
||||||
part_dir = os.path.join(datadir, partition)
|
|
||||||
for root, dirs, files in os.walk(part_dir, topdown=False):
|
|
||||||
for fname in (f for f in files if f.endswith('.db')):
|
|
||||||
object_file = os.path.join(root, fname)
|
|
||||||
yield (partition, object_file, node_id)
|
|
||||||
its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
|
|
||||||
while its:
|
|
||||||
for it in its:
|
|
||||||
try:
|
|
||||||
yield it.next()
|
|
||||||
except StopIteration:
|
|
||||||
its.remove(it)
|
|
||||||
|
|
||||||
def run_once(self, *args, **kwargs):
|
def run_once(self, *args, **kwargs):
|
||||||
"""Run a replication pass once."""
|
"""Run a replication pass once."""
|
||||||
self._zero_stats()
|
self._zero_stats()
|
||||||
@@ -505,7 +521,7 @@ class Replicator(Daemon):
|
|||||||
if os.path.isdir(datadir):
|
if os.path.isdir(datadir):
|
||||||
dirs.append((datadir, node['id']))
|
dirs.append((datadir, node['id']))
|
||||||
self.logger.info(_('Beginning replication run'))
|
self.logger.info(_('Beginning replication run'))
|
||||||
for part, object_file, node_id in self.roundrobin_datadirs(dirs):
|
for part, object_file, node_id in roundrobin_datadirs(dirs):
|
||||||
self.cpool.spawn_n(
|
self.cpool.spawn_n(
|
||||||
self._replicate_object, part, object_file, node_id)
|
self._replicate_object, part, object_file, node_id)
|
||||||
self.cpool.waitall()
|
self.cpool.waitall()
|
||||||
|
|||||||
@@ -420,6 +420,161 @@ class TestDBReplicator(unittest.TestCase):
|
|||||||
rpc.merge_syncs(fake_broker, args)
|
rpc.merge_syncs(fake_broker, args)
|
||||||
self.assertEquals(fake_broker.args, (args[0],))
|
self.assertEquals(fake_broker.args, (args[0],))
|
||||||
|
|
||||||
|
def test_roundrobin_datadirs(self):
|
||||||
|
listdir_calls = []
|
||||||
|
isdir_calls = []
|
||||||
|
exists_calls = []
|
||||||
|
shuffle_calls = []
|
||||||
|
|
||||||
|
def _listdir(path):
|
||||||
|
listdir_calls.append(path)
|
||||||
|
if not path.startswith('/srv/node/sda/containers') and \
|
||||||
|
not path.startswith('/srv/node/sdb/containers'):
|
||||||
|
return []
|
||||||
|
path = path[len('/srv/node/sdx/containers'):]
|
||||||
|
if path == '':
|
||||||
|
return ['123', '456', '789'] # 456 will pretend to be a file
|
||||||
|
elif path == '/123':
|
||||||
|
return ['abc', 'def.db'] # def.db will pretend to be a file
|
||||||
|
elif path == '/123/abc':
|
||||||
|
# 11111111111111111111111111111abc will pretend to be a file
|
||||||
|
return ['00000000000000000000000000000abc',
|
||||||
|
'11111111111111111111111111111abc']
|
||||||
|
elif path == '/123/abc/00000000000000000000000000000abc':
|
||||||
|
return ['00000000000000000000000000000abc.db',
|
||||||
|
# This other.db isn't in the right place, so should be
|
||||||
|
# ignored later.
|
||||||
|
'000000000000000000000000000other.db',
|
||||||
|
'weird1'] # weird1 will pretend to be a dir, if asked
|
||||||
|
elif path == '/789':
|
||||||
|
return ['ghi', 'jkl'] # jkl will pretend to be a file
|
||||||
|
elif path == '/789/ghi':
|
||||||
|
# 33333333333333333333333333333ghi will pretend to be a file
|
||||||
|
return ['22222222222222222222222222222ghi',
|
||||||
|
'33333333333333333333333333333ghi']
|
||||||
|
elif path == '/789/ghi/22222222222222222222222222222ghi':
|
||||||
|
return ['22222222222222222222222222222ghi.db',
|
||||||
|
'weird2'] # weird2 will pretend to be a dir, if asked
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _isdir(path):
|
||||||
|
isdir_calls.append(path)
|
||||||
|
if not path.startswith('/srv/node/sda/containers') and \
|
||||||
|
not path.startswith('/srv/node/sdb/containers'):
|
||||||
|
return False
|
||||||
|
path = path[len('/srv/node/sdx/containers'):]
|
||||||
|
if path in ('/123', '/123/abc',
|
||||||
|
'/123/abc/00000000000000000000000000000abc',
|
||||||
|
'/123/abc/00000000000000000000000000000abc/weird1',
|
||||||
|
'/789', '/789/ghi',
|
||||||
|
'/789/ghi/22222222222222222222222222222ghi',
|
||||||
|
'/789/ghi/22222222222222222222222222222ghi/weird2'):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _exists(arg):
|
||||||
|
exists_calls.append(arg)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _shuffle(arg):
|
||||||
|
shuffle_calls.append(arg)
|
||||||
|
|
||||||
|
orig_listdir = db_replicator.os.listdir
|
||||||
|
orig_isdir = db_replicator.os.path.isdir
|
||||||
|
orig_exists = db_replicator.os.path.exists
|
||||||
|
orig_shuffle = db_replicator.random.shuffle
|
||||||
|
try:
|
||||||
|
db_replicator.os.listdir = _listdir
|
||||||
|
db_replicator.os.path.isdir = _isdir
|
||||||
|
db_replicator.os.path.exists = _exists
|
||||||
|
db_replicator.random.shuffle = _shuffle
|
||||||
|
datadirs = [('/srv/node/sda/containers', 1),
|
||||||
|
('/srv/node/sdb/containers', 2)]
|
||||||
|
results = list(db_replicator.roundrobin_datadirs(datadirs))
|
||||||
|
# The results show that the .db files are returned, the devices
|
||||||
|
# interleaved.
|
||||||
|
self.assertEquals(results, [
|
||||||
|
('123', '/srv/node/sda/containers/123/abc/'
|
||||||
|
'00000000000000000000000000000abc/'
|
||||||
|
'00000000000000000000000000000abc.db', 1),
|
||||||
|
('123', '/srv/node/sdb/containers/123/abc/'
|
||||||
|
'00000000000000000000000000000abc/'
|
||||||
|
'00000000000000000000000000000abc.db', 2),
|
||||||
|
('789', '/srv/node/sda/containers/789/ghi/'
|
||||||
|
'22222222222222222222222222222ghi/'
|
||||||
|
'22222222222222222222222222222ghi.db', 1),
|
||||||
|
('789', '/srv/node/sdb/containers/789/ghi/'
|
||||||
|
'22222222222222222222222222222ghi/'
|
||||||
|
'22222222222222222222222222222ghi.db', 2)])
|
||||||
|
# The listdir calls show that we only listdir the dirs
|
||||||
|
self.assertEquals(listdir_calls, [
|
||||||
|
'/srv/node/sda/containers',
|
||||||
|
'/srv/node/sda/containers/123',
|
||||||
|
'/srv/node/sda/containers/123/abc',
|
||||||
|
'/srv/node/sdb/containers',
|
||||||
|
'/srv/node/sdb/containers/123',
|
||||||
|
'/srv/node/sdb/containers/123/abc',
|
||||||
|
'/srv/node/sda/containers/789',
|
||||||
|
'/srv/node/sda/containers/789/ghi',
|
||||||
|
'/srv/node/sdb/containers/789',
|
||||||
|
'/srv/node/sdb/containers/789/ghi'])
|
||||||
|
# The isdir calls show that we did ask about the things pretending
|
||||||
|
# to be files at various levels.
|
||||||
|
self.assertEquals(isdir_calls, [
|
||||||
|
'/srv/node/sda/containers/123',
|
||||||
|
'/srv/node/sda/containers/123/abc',
|
||||||
|
('/srv/node/sda/containers/123/abc/'
|
||||||
|
'00000000000000000000000000000abc'),
|
||||||
|
'/srv/node/sdb/containers/123',
|
||||||
|
'/srv/node/sdb/containers/123/abc',
|
||||||
|
('/srv/node/sdb/containers/123/abc/'
|
||||||
|
'00000000000000000000000000000abc'),
|
||||||
|
('/srv/node/sda/containers/123/abc/'
|
||||||
|
'11111111111111111111111111111abc'),
|
||||||
|
'/srv/node/sda/containers/123/def.db',
|
||||||
|
'/srv/node/sda/containers/456',
|
||||||
|
'/srv/node/sda/containers/789',
|
||||||
|
'/srv/node/sda/containers/789/ghi',
|
||||||
|
('/srv/node/sda/containers/789/ghi/'
|
||||||
|
'22222222222222222222222222222ghi'),
|
||||||
|
('/srv/node/sdb/containers/123/abc/'
|
||||||
|
'11111111111111111111111111111abc'),
|
||||||
|
'/srv/node/sdb/containers/123/def.db',
|
||||||
|
'/srv/node/sdb/containers/456',
|
||||||
|
'/srv/node/sdb/containers/789',
|
||||||
|
'/srv/node/sdb/containers/789/ghi',
|
||||||
|
('/srv/node/sdb/containers/789/ghi/'
|
||||||
|
'22222222222222222222222222222ghi'),
|
||||||
|
('/srv/node/sda/containers/789/ghi/'
|
||||||
|
'33333333333333333333333333333ghi'),
|
||||||
|
'/srv/node/sda/containers/789/jkl',
|
||||||
|
('/srv/node/sdb/containers/789/ghi/'
|
||||||
|
'33333333333333333333333333333ghi'),
|
||||||
|
'/srv/node/sdb/containers/789/jkl'])
|
||||||
|
# The exists calls are the .db files we looked for as we walked the
|
||||||
|
# structure.
|
||||||
|
self.assertEquals(exists_calls, [
|
||||||
|
('/srv/node/sda/containers/123/abc/'
|
||||||
|
'00000000000000000000000000000abc/'
|
||||||
|
'00000000000000000000000000000abc.db'),
|
||||||
|
('/srv/node/sdb/containers/123/abc/'
|
||||||
|
'00000000000000000000000000000abc/'
|
||||||
|
'00000000000000000000000000000abc.db'),
|
||||||
|
('/srv/node/sda/containers/789/ghi/'
|
||||||
|
'22222222222222222222222222222ghi/'
|
||||||
|
'22222222222222222222222222222ghi.db'),
|
||||||
|
('/srv/node/sdb/containers/789/ghi/'
|
||||||
|
'22222222222222222222222222222ghi/'
|
||||||
|
'22222222222222222222222222222ghi.db')])
|
||||||
|
# Shows that we called shuffle twice, once for each device.
|
||||||
|
self.assertEquals(
|
||||||
|
shuffle_calls, [['123', '456', '789'], ['123', '456', '789']])
|
||||||
|
finally:
|
||||||
|
db_replicator.os.listdir = orig_listdir
|
||||||
|
db_replicator.os.path.isdir = orig_isdir
|
||||||
|
db_replicator.os.path.exists = orig_exists
|
||||||
|
db_replicator.random.shuffle = orig_shuffle
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user