Modify _get_hashes() arguments to be more generic

Some public functions in the diskfile manager expect or return full
file paths. It implies a filesystem diskfile implementation.
To make it easier to plug alternate diskfile implementations, patch
functions to take more generic arguments.

This commit changes DiskFileManager _get_hashes() arguments from:
  - partition_path, recalculate=None, do_listdir=False
to :
  - device, partition, policy, recalculate=None, do_listdir=False

Callers are modified accordingly, in diskfile.py, reconstructor.py,
and replicator.py

Change-Id: I8e2d7075572e466ae2fa5ebef5e31d87eed90fec
This commit is contained in:
Alexandre Lécuyer 2017-03-24 15:41:45 +01:00
parent 5507df14e8
commit 95905b0174
6 changed files with 52 additions and 22 deletions

View File

@ -340,6 +340,14 @@ def invalidate_hash(suffix_dir):
inv_fh.write(suffix + "\n") inv_fh.write(suffix + "\n")
def get_part_path(dev_path, policy, partition):
"""
Given the device path, policy, and partition, returns the full
path to the partition
"""
return os.path.join(dev_path, get_data_dir(policy), str(partition))
class AuditLocation(object): class AuditLocation(object):
""" """
Represents an object location to be audited. Represents an object location to be audited.
@ -1041,13 +1049,16 @@ class BaseDiskFileManager(object):
hashes.pop('valid', None) hashes.pop('valid', None)
return hashed, hashes return hashed, hashes
def __get_hashes(self, partition_path, recalculate=None, do_listdir=False): def __get_hashes(self, device, partition, policy, recalculate=None,
do_listdir=False):
""" """
Get hashes for each suffix dir in a partition. do_listdir causes it to Get hashes for each suffix dir in a partition. do_listdir causes it to
mistrust the hash cache for suffix existence at the (unexpectedly high) mistrust the hash cache for suffix existence at the (unexpectedly high)
cost of a listdir. cost of a listdir.
:param partition_path: absolute path of partition to get hashes for :param device: name of target device
:param partition: partition on the device in which the object lives
:param policy: the StoragePolicy instance
:param recalculate: list of suffixes which should be recalculated when :param recalculate: list of suffixes which should be recalculated when
got got
:param do_listdir: force existence check for all hashes in the :param do_listdir: force existence check for all hashes in the
@ -1056,6 +1067,8 @@ class BaseDiskFileManager(object):
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes) :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
""" """
hashed = 0 hashed = 0
dev_path = self.get_dev_path(device)
partition_path = get_part_path(dev_path, policy, partition)
hashes_file = join(partition_path, HASH_FILE) hashes_file = join(partition_path, HASH_FILE)
modified = False modified = False
orig_hashes = {'valid': False} orig_hashes = {'valid': False}
@ -1115,7 +1128,9 @@ class BaseDiskFileManager(object):
if read_hashes(partition_path) == orig_hashes: if read_hashes(partition_path) == orig_hashes:
write_hashes(partition_path, hashes) write_hashes(partition_path, hashes)
return hashed, hashes return hashed, hashes
return self.__get_hashes(partition_path, recalculate, do_listdir) return self.__get_hashes(device, partition, policy,
recalculate=recalculate,
do_listdir=do_listdir)
else: else:
return hashed, hashes return hashed, hashes
@ -1304,12 +1319,11 @@ class BaseDiskFileManager(object):
dev_path = self.get_dev_path(device) dev_path = self.get_dev_path(device)
if not dev_path: if not dev_path:
raise DiskFileDeviceUnavailable() raise DiskFileDeviceUnavailable()
partition_path = os.path.join(dev_path, get_data_dir(policy), partition_path = get_part_path(dev_path, policy, partition)
partition)
if not os.path.exists(partition_path): if not os.path.exists(partition_path):
mkdirs(partition_path) mkdirs(partition_path)
_junk, hashes = tpool_reraise( _junk, hashes = tpool_reraise(
self._get_hashes, partition_path, recalculate=suffixes) self._get_hashes, device, partition, policy, recalculate=suffixes)
return hashes return hashes
def _listdir(self, path): def _listdir(self, path):
@ -1337,8 +1351,7 @@ class BaseDiskFileManager(object):
dev_path = self.get_dev_path(device) dev_path = self.get_dev_path(device)
if not dev_path: if not dev_path:
raise DiskFileDeviceUnavailable() raise DiskFileDeviceUnavailable()
partition_path = os.path.join(dev_path, get_data_dir(policy), partition_path = get_part_path(dev_path, policy, partition)
partition)
for suffix in self._listdir(partition_path): for suffix in self._listdir(partition_path):
if len(suffix) != 3: if len(suffix) != 3:
continue continue
@ -1379,9 +1392,7 @@ class BaseDiskFileManager(object):
if suffixes is None: if suffixes is None:
suffixes = self.yield_suffixes(device, partition, policy) suffixes = self.yield_suffixes(device, partition, policy)
else: else:
partition_path = os.path.join(dev_path, partition_path = get_part_path(dev_path, policy, partition)
get_data_dir(policy),
str(partition))
suffixes = ( suffixes = (
(os.path.join(partition_path, suffix), suffix) (os.path.join(partition_path, suffix), suffix)
for suffix in suffixes) for suffix in suffixes)

View File

@ -505,11 +505,12 @@ class ObjectReconstructor(Daemon):
self.kill_coros() self.kill_coros()
self.last_reconstruction_count = self.reconstruction_count self.last_reconstruction_count = self.reconstruction_count
def _get_hashes(self, policy, path, recalculate=None, do_listdir=False): def _get_hashes(self, device, partition, policy, recalculate=None,
do_listdir=False):
df_mgr = self._df_router[policy] df_mgr = self._df_router[policy]
hashed, suffix_hashes = tpool_reraise( hashed, suffix_hashes = tpool_reraise(
df_mgr._get_hashes, path, recalculate=recalculate, df_mgr._get_hashes, device, partition, policy,
do_listdir=do_listdir) recalculate=recalculate, do_listdir=do_listdir)
self.logger.update_stats('suffix.hashes', hashed) self.logger.update_stats('suffix.hashes', hashed)
return suffix_hashes return suffix_hashes
@ -602,8 +603,9 @@ class ObjectReconstructor(Daemon):
node['index']) node['index'])
# now recalculate local hashes for suffixes that don't # now recalculate local hashes for suffixes that don't
# match so we're comparing the latest # match so we're comparing the latest
local_suff = self._get_hashes(job['policy'], job['path'], local_suff = self._get_hashes(job['local_dev']['device'],
recalculate=suffixes) job['partition'],
job['policy'], recalculate=suffixes)
suffixes = self.get_suffix_delta(local_suff, suffixes = self.get_suffix_delta(local_suff,
job['frag_index'], job['frag_index'],
@ -769,7 +771,8 @@ class ObjectReconstructor(Daemon):
""" """
# find all the fi's in the part, and which suffixes have them # find all the fi's in the part, and which suffixes have them
try: try:
hashes = self._get_hashes(policy, part_path, do_listdir=True) hashes = self._get_hashes(local_dev['device'], partition, policy,
do_listdir=True)
except OSError as e: except OSError as e:
if e.errno != errno.ENOTDIR: if e.errno != errno.ENOTDIR:
raise raise

View File

@ -408,7 +408,8 @@ class ObjectReplicator(Daemon):
df_mgr = self._df_router[job['policy']] df_mgr = self._df_router[job['policy']]
try: try:
hashed, local_hash = tpool_reraise( hashed, local_hash = tpool_reraise(
df_mgr._get_hashes, job['path'], df_mgr._get_hashes, job['device'],
job['partition'], job['policy'],
do_listdir=_do_listdir( do_listdir=_do_listdir(
int(job['partition']), int(job['partition']),
self.replication_cycle)) self.replication_cycle))
@ -462,7 +463,8 @@ class ObjectReplicator(Daemon):
continue continue
hashed, recalc_hash = tpool_reraise( hashed, recalc_hash = tpool_reraise(
df_mgr._get_hashes, df_mgr._get_hashes,
job['path'], recalculate=suffixes) job['device'], job['partition'], job['policy'],
recalculate=suffixes)
self.logger.update_stats('suffix.hashes', hashed) self.logger.update_stats('suffix.hashes', hashed)
local_hash = recalc_hash local_hash = recalc_hash
suffixes = [suffix for suffix in local_hash if suffixes = [suffix for suffix in local_hash if

View File

@ -275,6 +275,17 @@ class TestDiskFileModuleMethods(unittest.TestCase):
# check tempdir # check tempdir
self.assertTrue(os.path.isdir(tmp_path)) self.assertTrue(os.path.isdir(tmp_path))
def test_get_part_path(self):
# partition passed as 'str'
part_dir = diskfile.get_part_path('/srv/node/sda1', POLICIES[0], '123')
exp_dir = '/srv/node/sda1/objects/123'
self.assertEqual(part_dir, exp_dir)
# partition passed as 'int'
part_dir = diskfile.get_part_path('/srv/node/sdb5', POLICIES[1], 123)
exp_dir = '/srv/node/sdb5/objects-1/123'
self.assertEqual(part_dir, exp_dir)
@patch_policies @patch_policies
class TestObjectAuditLocationGenerator(unittest.TestCase): class TestObjectAuditLocationGenerator(unittest.TestCase):

View File

@ -1790,7 +1790,7 @@ class TestObjectReconstructor(unittest.TestCase):
diskfile.HASH_FILE) diskfile.HASH_FILE)
self.assertTrue(os.path.exists(hashes_file)) self.assertTrue(os.path.exists(hashes_file))
suffixes = self.reconstructor._get_hashes( suffixes = self.reconstructor._get_hashes(
self.policy, part_path, do_listdir=True) self.local_dev['device'], 0, self.policy, do_listdir=True)
self.assertEqual(suffixes, {}) self.assertEqual(suffixes, {})
def test_build_jobs_no_hashes(self): def test_build_jobs_no_hashes(self):

View File

@ -1627,8 +1627,11 @@ class TestObjectReplicator(unittest.TestCase):
# if a timeout occurs while replicating one partition to one node. # if a timeout occurs while replicating one partition to one node.
timeouts = [Timeout()] timeouts = [Timeout()]
def fake_get_hashes(df_mgr, part_path, **kwargs): def fake_get_hashes(df_mgr, device, partition, policy, **kwargs):
self.get_hash_count += 1 self.get_hash_count += 1
dev_path = df_mgr.get_dev_path(device)
part_path = os.path.join(dev_path, diskfile.get_data_dir(policy),
str(partition))
# Simulate a REPLICATE timeout by raising Timeout for second call # Simulate a REPLICATE timeout by raising Timeout for second call
# to get_hashes (with recalculate suffixes) for a specific # to get_hashes (with recalculate suffixes) for a specific
# partition # partition
@ -1750,7 +1753,7 @@ class TestObjectReplicator(unittest.TestCase):
mock_do_listdir.side_effect = do_listdir_results mock_do_listdir.side_effect = do_listdir_results
expected_tpool_calls = [ expected_tpool_calls = [
mock.call(self.replicator._df_router[job['policy']]._get_hashes, mock.call(self.replicator._df_router[job['policy']]._get_hashes,
job['path'], job['device'], job['partition'], job['policy'],
do_listdir=do_listdir) do_listdir=do_listdir)
for job, do_listdir in zip(jobs, do_listdir_results) for job, do_listdir in zip(jobs, do_listdir_results)
] ]