Merge "relinker: Improve performance by limiting I/O"

This commit is contained in:
Zuul 2020-06-05 06:09:24 +00:00 committed by Gerrit Code Review
commit 56278100a1
5 changed files with 576 additions and 12 deletions

View File

@ -28,6 +28,8 @@ if __name__ == '__main__':
dest='swift_dir', help='Path to swift directory')
parser.add_argument('--devices', default='/srv/node',
dest='devices', help='Path to swift device directory')
parser.add_argument('--device', default=None, dest='device',
help='Device name to relink (default: all)')
parser.add_argument('--skip-mount-check', default=False,
help='Don\'t test if disk is mounted',
action="store_true", dest='skip_mount_check')

View File

@ -14,8 +14,12 @@
# limitations under the License.
import errno
import fcntl
import json
import logging
import os
from functools import partial
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
DiskFileQuarantined
@ -24,10 +28,126 @@ from swift.common.utils import replace_partition_in_path, \
from swift.obj import diskfile
LOCK_FILE = '.relink.{datadir}.lock'
STATE_FILE = 'relink.{datadir}.json'
STATE_TMP_FILE = '.relink.{datadir}.json.tmp'
STEP_RELINK = 'relink'
STEP_CLEANUP = 'cleanup'
def devices_filter(device, _, devices):
if device:
devices = [d for d in devices if d == device]
return set(devices)
def hook_pre_device(locks, states, datadir, device_path):
lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir))
fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY)
fcntl.flock(fd, fcntl.LOCK_EX)
locks[0] = fd
state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir))
states.clear()
try:
with open(state_file, 'rt') as f:
tmp = json.load(f)
states.update(tmp)
except ValueError:
# Invalid JSON: remove the file to restart from scratch
os.unlink(state_file)
except IOError as err:
# Ignore file not found error
if err.errno != errno.ENOENT:
raise
def hook_post_device(locks, _):
os.close(locks[0])
locks[0] = None
def partitions_filter(states, step, part_power, next_part_power,
datadir_path, partitions):
# Remove all non partitions first (eg: auditor_status_ALL.json)
partitions = [p for p in partitions if p.isdigit()]
if not (step == STEP_CLEANUP and part_power == next_part_power):
# This is not a cleanup after cancel, partitions in the upper half are
# new partitions, there is nothing to relink/cleanup from there
partitions = [p for p in partitions
if int(p) < 2 ** next_part_power / 2]
# Format: { 'part': [relinked, cleaned] }
if states:
missing = list(set(partitions) - set(states.keys()))
if missing:
# All missing partitions was created after the first run of
# relink, so after the new ring was distribued, so they already
# are hardlinked in both partitions, but they will need to
# cleaned.. Just update the state file.
for part in missing:
states[part] = [True, False]
if step == STEP_RELINK:
partitions = [str(p) for p, (r, c) in states.items() if not r]
elif step == STEP_CLEANUP:
partitions = [str(p) for p, (r, c) in states.items() if not c]
else:
states.update({str(p): [False, False] for p in partitions})
# Always scan the partitions in reverse order to minimize the amount of IO
# (it actually only matters for relink, not for cleanup).
#
# Initial situation:
# objects/0/000/00000000000000000000000000000000/12345.data
# -> relinked to objects/1/000/10000000000000000000000000000000/12345.data
#
# If the relinker then scan partition 1, it will listdir that object while
# it's unnecessary. By working in reverse order of partitions, this is
# avoided.
partitions = sorted(partitions, key=lambda x: int(x), reverse=True)
return partitions
# Save states when a partition is done
def hook_post_partition(states, step,
partition_path):
part = os.path.basename(os.path.abspath(partition_path))
datadir_path = os.path.dirname(os.path.abspath(partition_path))
device_path = os.path.dirname(os.path.abspath(datadir_path))
datadir_name = os.path.basename(os.path.abspath(datadir_path))
state_tmp_file = os.path.join(device_path,
STATE_TMP_FILE.format(datadir=datadir_name))
state_file = os.path.join(device_path,
STATE_FILE.format(datadir=datadir_name))
if step == STEP_RELINK:
states[part][0] = True
elif step == STEP_CLEANUP:
states[part][1] = True
with open(state_tmp_file, 'wt') as f:
json.dump(states, f)
os.fsync(f.fileno())
os.rename(state_tmp_file, state_file)
def hashes_filter(next_part_power, suff_path, hashes):
hashes = list(hashes)
for hsh in hashes:
fname = os.path.join(suff_path, hsh, 'fake-file-name')
if replace_partition_in_path(fname, next_part_power) == fname:
hashes.remove(hsh)
return hashes
def relink(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
logger=logging.getLogger()):
logger=logging.getLogger(),
device=None):
mount_check = not skip_mount_check
run = False
relinked = errors = 0
@ -41,10 +161,31 @@ def relink(swift_dir='/etc/swift',
logging.info('Relinking files for policy %s under %s',
policy.name, devices)
run = True
datadir = diskfile.get_data_dir(policy)
locks = [None]
states = {}
relink_devices_filter = partial(devices_filter, device)
relink_hook_pre_device = partial(hook_pre_device, locks, states,
datadir)
relink_hook_post_device = partial(hook_post_device, locks)
relink_partition_filter = partial(partitions_filter,
states, STEP_RELINK,
part_power, next_part_power)
relink_hook_post_partition = partial(hook_post_partition,
states, STEP_RELINK)
relink_hashes_filter = partial(hashes_filter, next_part_power)
locations = audit_location_generator(
devices,
diskfile.get_data_dir(policy),
mount_check=mount_check)
datadir,
mount_check=mount_check,
devices_filter=relink_devices_filter,
hook_pre_device=relink_hook_pre_device,
hook_post_device=relink_hook_post_device,
partitions_filter=relink_partition_filter,
hook_post_partition=relink_hook_post_partition,
hashes_filter=relink_hashes_filter)
for fname, _, _ in locations:
newfname = replace_partition_in_path(fname, next_part_power)
try:
@ -67,7 +208,8 @@ def relink(swift_dir='/etc/swift',
def cleanup(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
logger=logging.getLogger()):
logger=logging.getLogger(),
device=None):
mount_check = not skip_mount_check
conf = {'devices': devices, 'mount_check': mount_check}
diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf))
@ -83,10 +225,31 @@ def cleanup(swift_dir='/etc/swift',
logging.info('Cleaning up files for policy %s under %s',
policy.name, devices)
run = True
datadir = diskfile.get_data_dir(policy)
locks = [None]
states = {}
cleanup_devices_filter = partial(devices_filter, device)
cleanup_hook_pre_device = partial(hook_pre_device, locks, states,
datadir)
cleanup_hook_post_device = partial(hook_post_device, locks)
cleanup_partition_filter = partial(partitions_filter,
states, STEP_CLEANUP,
part_power, next_part_power)
cleanup_hook_post_partition = partial(hook_post_partition,
states, STEP_CLEANUP)
cleanup_hashes_filter = partial(hashes_filter, next_part_power)
locations = audit_location_generator(
devices,
diskfile.get_data_dir(policy),
mount_check=mount_check)
datadir,
mount_check=mount_check,
devices_filter=cleanup_devices_filter,
hook_pre_device=cleanup_hook_pre_device,
hook_post_device=cleanup_hook_post_device,
partitions_filter=cleanup_partition_filter,
hook_post_partition=cleanup_hook_post_partition,
hashes_filter=cleanup_hashes_filter)
for fname, device, partition in locations:
expected_fname = replace_partition_in_path(fname, part_power)
if fname == expected_fname:
@ -152,8 +315,10 @@ def main(args):
if args.action == 'relink':
return relink(
args.swift_dir, args.devices, args.skip_mount_check, logger)
args.swift_dir, args.devices, args.skip_mount_check, logger,
device=args.device)
if args.action == 'cleanup':
return cleanup(
args.swift_dir, args.devices, args.skip_mount_check, logger)
args.swift_dir, args.devices, args.skip_mount_check, logger,
device=args.device)

View File

@ -3152,11 +3152,26 @@ def remove_directory(path):
def audit_location_generator(devices, datadir, suffix='',
mount_check=True, logger=None):
mount_check=True, logger=None,
devices_filter=None, partitions_filter=None,
suffixes_filter=None, hashes_filter=None,
hook_pre_device=None, hook_post_device=None,
hook_pre_partition=None, hook_post_partition=None,
hook_pre_suffix=None, hook_post_suffix=None,
hook_pre_hash=None, hook_post_hash=None):
"""
Given a devices path and a data directory, yield (path, device,
partition) for all files in that directory
(devices|partitions|suffixes|hashes)_filter are meant to modify the list of
elements that will be iterated. eg: they can be used to exclude some
elements based on a custom condition defined by the caller.
hook_pre_(device|partition|suffix|hash) are called before yielding the
element, hook_pos_(device|partition|suffix|hash) are called after the
element was yielded. They are meant to do some pre/post processing.
eg: saving a progress status.
:param devices: parent directory of the devices to be audited
:param datadir: a directory located under self.devices. This should be
one of the DATADIR constants defined in the account,
@ -3165,11 +3180,31 @@ def audit_location_generator(devices, datadir, suffix='',
:param mount_check: Flag to check if a mount check should be performed
on devices
:param logger: a logger object
:devices_filter: a callable taking (devices, [list of devices]) as
parameters and returning a [list of devices]
:partitions_filter: a callable taking (datadir_path, [list of parts]) as
parameters and returning a [list of parts]
:suffixes_filter: a callable taking (part_path, [list of suffixes]) as
parameters and returning a [list of suffixes]
:hashes_filter: a callable taking (suff_path, [list of hashes]) as
parameters and returning a [list of hashes]
:hook_pre_device: a callable taking device_path as parameter
:hook_post_device: a callable taking device_path as parameter
:hook_pre_partition: a callable taking part_path as parameter
:hook_post_partition: a callable taking part_path as parameter
:hook_pre_suffix: a callable taking suff_path as parameter
:hook_post_suffix: a callable taking suff_path as parameter
:hook_pre_hash: a callable taking hash_path as parameter
:hook_post_hash: a callable taking hash_path as parameter
"""
device_dir = listdir(devices)
# randomize devices in case of process restart before sweep completed
shuffle(device_dir)
if devices_filter:
device_dir = devices_filter(devices, device_dir)
for device in device_dir:
if hook_pre_device:
hook_pre_device(os.path.join(devices, device))
if mount_check and not ismount(os.path.join(devices, device)):
if logger:
logger.warning(
@ -3183,24 +3218,36 @@ def audit_location_generator(devices, datadir, suffix='',
logger.warning(_('Skipping %(datadir)s because %(err)s'),
{'datadir': datadir_path, 'err': e})
continue
if partitions_filter:
partitions = partitions_filter(datadir_path, partitions)
for partition in partitions:
part_path = os.path.join(datadir_path, partition)
if hook_pre_partition:
hook_pre_partition(part_path)
try:
suffixes = listdir(part_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
if suffixes_filter:
suffixes = suffixes_filter(part_path, suffixes)
for asuffix in suffixes:
suff_path = os.path.join(part_path, asuffix)
if hook_pre_suffix:
hook_pre_suffix(suff_path)
try:
hashes = listdir(suff_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
if hashes_filter:
hashes = hashes_filter(suff_path, hashes)
for hsh in hashes:
hash_path = os.path.join(suff_path, hsh)
if hook_pre_hash:
hook_pre_hash(hash_path)
try:
files = sorted(listdir(hash_path), reverse=True)
except OSError as e:
@ -3212,6 +3259,14 @@ def audit_location_generator(devices, datadir, suffix='',
continue
path = os.path.join(hash_path, fname)
yield path, device, partition
if hook_post_hash:
hook_post_hash(hash_path)
if hook_post_suffix:
hook_post_suffix(suff_path)
if hook_post_partition:
hook_post_partition(part_path)
if hook_post_device:
hook_post_device(os.path.join(devices, device))
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):

View File

@ -12,6 +12,9 @@
# limitations under the License.
import binascii
import errno
import fcntl
import json
import os
import shutil
import struct
@ -30,6 +33,9 @@ from test.unit import FakeLogger, skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \
patch_policies
PART_POWER = 8
class TestRelinker(unittest.TestCase):
def setUp(self):
skip_if_no_xattrs()
@ -40,7 +46,7 @@ class TestRelinker(unittest.TestCase):
os.mkdir(self.testdir)
os.mkdir(self.devices)
self.rb = ring.RingBuilder(8, 6.0, 1)
self.rb = ring.RingBuilder(PART_POWER, 6.0, 1)
for i in range(6):
ip = "127.0.0.%s" % i
@ -55,10 +61,10 @@ class TestRelinker(unittest.TestCase):
os.mkdir(self.objects)
self._hash = utils.hash_path('a/c/o')
digest = binascii.unhexlify(self._hash)
part = struct.unpack_from('>I', digest)[0] >> 24
self.part = struct.unpack_from('>I', digest)[0] >> 24
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
self.objdir = os.path.join(
self.objects, str(part), self._hash[-3:], self._hash)
self.objects, str(self.part), self._hash[-3:], self._hash)
os.makedirs(self.objdir)
self.object_fname = "1278553064.00000.data"
self.objname = os.path.join(self.objdir, self.object_fname)
@ -97,6 +103,27 @@ class TestRelinker(unittest.TestCase):
stat_new = os.stat(self.expected_file)
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
def test_relink_device_filter(self):
self.rb.prepare_increase_partition_power()
self._save_ring()
relinker.relink(self.testdir, self.devices, True,
device=self.existing_device)
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
stat_old = os.stat(os.path.join(self.objdir, self.object_fname))
stat_new = os.stat(self.expected_file)
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
def test_relink_device_filter_invalid(self):
self.rb.prepare_increase_partition_power()
self._save_ring()
relinker.relink(self.testdir, self.devices, True, device='none')
self.assertFalse(os.path.isdir(self.expected_dir))
self.assertFalse(os.path.isfile(self.expected_file))
def _common_test_cleanup(self, relink=True):
# Create a ring that has prev_part_power set
self.rb.prepare_increase_partition_power()
@ -121,6 +148,187 @@ class TestRelinker(unittest.TestCase):
self.assertFalse(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_cleanup_device_filter(self):
self._common_test_cleanup()
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True,
device=self.existing_device))
# Old objectname should be removed, new should still exist
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
self.assertFalse(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_cleanup_device_filter_invalid(self):
self._common_test_cleanup()
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True,
device='none'))
# Old objectname should still exist, new should still exist
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
self.assertTrue(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_relink_cleanup(self):
state_file = os.path.join(self.devices, self.existing_device,
'relink.objects.json')
self.rb.prepare_increase_partition_power()
self._save_ring()
relinker.relink(self.testdir, self.devices, True)
with open(state_file, 'rt') as f:
self.assertEqual(json.load(f), {str(self.part): [True, False]})
self.rb.increase_partition_power()
self.rb._ring = None # Force builder to reload ring
self._save_ring()
relinker.cleanup(self.testdir, self.devices, True)
with open(state_file, 'rt') as f:
self.assertEqual(json.load(f),
{str(self.part): [True, True],
str(self.next_part): [True, True]})
def test_devices_filter_filtering(self):
# With no filtering, returns all devices
devices = relinker.devices_filter(None, "", [self.existing_device])
self.assertEqual(set([self.existing_device]), devices)
# With a matching filter, returns what is matching
devices = relinker.devices_filter(self.existing_device, "",
[self.existing_device, 'sda2'])
self.assertEqual(set([self.existing_device]), devices)
# With a non matching filter, returns nothing
devices = relinker.devices_filter('none', "", [self.existing_device])
self.assertEqual(set(), devices)
def test_hook_pre_post_device_locking(self):
locks = [None]
device_path = os.path.join(self.devices, self.existing_device)
datadir = 'object'
lock_file = os.path.join(device_path, '.relink.%s.lock' % datadir)
# The first run gets the lock
relinker.hook_pre_device(locks, {}, datadir, device_path)
self.assertNotEqual([None], locks)
# A following run would block
with self.assertRaises(IOError) as raised:
with open(lock_file, 'a') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
self.assertEqual(errno.EAGAIN, raised.exception.errno)
# Another must not get the lock, so it must return an empty list
relinker.hook_post_device(locks, "")
self.assertEqual([None], locks)
with open(lock_file, 'a') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
def test_state_file(self):
device_path = os.path.join(self.devices, self.existing_device)
datadir = 'objects'
datadir_path = os.path.join(device_path, datadir)
state_file = os.path.join(device_path, 'relink.%s.json' % datadir)
def call_partition_filter(step, parts):
# Partition 312 will be ignored because it must have been created
# by the relinker
return relinker.partitions_filter(states, step,
PART_POWER, PART_POWER + 1,
datadir_path, parts)
# Start relinking
states = {}
# Load the states: As it starts, it must be empty
locks = [None]
relinker.hook_pre_device(locks, states, datadir, device_path)
self.assertEqual({}, states)
os.close(locks[0]) # Release the lock
# Partition 312 is ignored because it must have been created with the
# next_part_power, so it does not need to be relinked
# 96 and 227 are reverse ordered
# auditor_status_ALL.json is ignored because it's not a partition
self.assertEqual(['227', '96'],
call_partition_filter(relinker.STEP_RELINK,
['96', '227', '312',
'auditor_status.json']))
self.assertEqual(states, {'96': [False, False], '227': [False, False]})
# Ack partition 96
relinker.hook_post_partition(states, relinker.STEP_RELINK,
os.path.join(datadir_path, '96'))
self.assertEqual(states, {'96': [True, False], '227': [False, False]})
with open(state_file, 'rt') as f:
self.assertEqual(json.load(f), {'96': [True, False],
'227': [False, False]})
# Restart relinking after only part 96 was done
self.assertEqual(['227'],
call_partition_filter(relinker.STEP_RELINK,
['96', '227', '312']))
self.assertEqual(states, {'96': [True, False], '227': [False, False]})
# Ack partition 227
relinker.hook_post_partition(states, relinker.STEP_RELINK,
os.path.join(datadir_path, '227'))
self.assertEqual(states, {'96': [True, False], '227': [True, False]})
with open(state_file, 'rt') as f:
self.assertEqual(json.load(f), {'96': [True, False],
'227': [True, False]})
# If the process restarts, it reload the state
locks = [None]
states = {}
relinker.hook_pre_device(locks, states, datadir, device_path)
self.assertEqual(states, {'96': [True, False], '227': [True, False]})
os.close(locks[0]) # Release the lock
# Start cleanup
self.assertEqual(['227', '96'],
call_partition_filter(relinker.STEP_CLEANUP,
['96', '227', '312']))
# Ack partition 227
relinker.hook_post_partition(states, relinker.STEP_CLEANUP,
os.path.join(datadir_path, '227'))
self.assertEqual(states, {'96': [True, False], '227': [True, True]})
with open(state_file, 'rt') as f:
self.assertEqual(json.load(f), {'96': [True, False],
'227': [True, True]})
# Restart cleanup after only part 227 was done
self.assertEqual(['96'],
call_partition_filter(relinker.STEP_CLEANUP,
['96', '227', '312']))
self.assertEqual(states, {'96': [True, False], '227': [True, True]})
# Ack partition 96
relinker.hook_post_partition(states, relinker.STEP_CLEANUP,
os.path.join(datadir_path, '96'))
self.assertEqual(states, {'96': [True, True], '227': [True, True]})
with open(state_file, 'rt') as f:
self.assertEqual(json.load(f), {'96': [True, True],
'227': [True, True]})
# At the end, the state is still accurate
locks = [None]
states = {}
relinker.hook_pre_device(locks, states, datadir, device_path)
self.assertEqual(states, {'96': [True, True], '227': [True, True]})
os.close(locks[0]) # Release the lock
# If the file gets corrupted, restart from scratch
with open(state_file, 'wt') as f:
f.write('NOT JSON')
locks = [None]
states = {}
relinker.hook_pre_device(locks, states, datadir, device_path)
self.assertEqual(states, {})
os.close(locks[0]) # Release the lock
def test_cleanup_not_yet_relinked(self):
self._common_test_cleanup(relink=False)
self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True))
@ -176,3 +384,7 @@ class TestRelinker(unittest.TestCase):
self.assertIn('failed audit and was quarantined',
self.logger.get_lines_for_level('warning')[0])
if __name__ == '__main__':
unittest.main()

View File

@ -6080,6 +6080,136 @@ class TestAuditLocationGenerator(unittest.TestCase):
self.assertEqual(list(locations),
[(obj_path, "drive", "partition2")])
def test_hooks(self):
with temptree([]) as tmpdir:
logger = FakeLogger()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
partition = os.path.join(data, "partition1")
os.makedirs(partition)
suffix = os.path.join(partition, "suffix1")
os.makedirs(suffix)
hash_path = os.path.join(suffix, "hash1")
os.makedirs(hash_path)
obj_path = os.path.join(hash_path, "obj1.dat")
with open(obj_path, "w"):
pass
meta_path = os.path.join(hash_path, "obj1.meta")
with open(meta_path, "w"):
pass
hook_pre_device = MagicMock()
hook_post_device = MagicMock()
hook_pre_partition = MagicMock()
hook_post_partition = MagicMock()
hook_pre_suffix = MagicMock()
hook_post_suffix = MagicMock()
hook_pre_hash = MagicMock()
hook_post_hash = MagicMock()
locations = utils.audit_location_generator(
tmpdir, "data", ".dat", mount_check=False, logger=logger,
hook_pre_device=hook_pre_device,
hook_post_device=hook_post_device,
hook_pre_partition=hook_pre_partition,
hook_post_partition=hook_post_partition,
hook_pre_suffix=hook_pre_suffix,
hook_post_suffix=hook_post_suffix,
hook_pre_hash=hook_pre_hash,
hook_post_hash=hook_post_hash
)
list(locations)
hook_pre_device.assert_called_once_with(os.path.join(tmpdir,
"drive"))
hook_post_device.assert_called_once_with(os.path.join(tmpdir,
"drive"))
hook_pre_partition.assert_called_once_with(partition)
hook_post_partition.assert_called_once_with(partition)
hook_pre_suffix.assert_called_once_with(suffix)
hook_post_suffix.assert_called_once_with(suffix)
hook_pre_hash.assert_called_once_with(hash_path)
hook_post_hash.assert_called_once_with(hash_path)
def test_filters(self):
with temptree([]) as tmpdir:
logger = FakeLogger()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
partition = os.path.join(data, "partition1")
os.makedirs(partition)
suffix = os.path.join(partition, "suffix1")
os.makedirs(suffix)
hash_path = os.path.join(suffix, "hash1")
os.makedirs(hash_path)
obj_path = os.path.join(hash_path, "obj1.dat")
with open(obj_path, "w"):
pass
meta_path = os.path.join(hash_path, "obj1.meta")
with open(meta_path, "w"):
pass
def audit_location_generator(**kwargs):
return utils.audit_location_generator(
tmpdir, "data", ".dat", mount_check=False, logger=logger,
**kwargs)
# Return the list of devices
with patch('os.listdir', side_effect=os.listdir) as m_listdir:
# devices_filter
m_listdir.reset_mock()
devices_filter = MagicMock(return_value=["drive"])
list(audit_location_generator(devices_filter=devices_filter))
devices_filter.assert_called_once_with(tmpdir, ["drive"])
self.assertIn(((data,),), m_listdir.call_args_list)
m_listdir.reset_mock()
devices_filter = MagicMock(return_value=[])
list(audit_location_generator(devices_filter=devices_filter))
devices_filter.assert_called_once_with(tmpdir, ["drive"])
self.assertNotIn(((data,),), m_listdir.call_args_list)
# partitions_filter
m_listdir.reset_mock()
partitions_filter = MagicMock(return_value=["partition1"])
list(audit_location_generator(
partitions_filter=partitions_filter))
partitions_filter.assert_called_once_with(data,
["partition1"])
self.assertIn(((partition,),), m_listdir.call_args_list)
m_listdir.reset_mock()
partitions_filter = MagicMock(return_value=[])
list(audit_location_generator(
partitions_filter=partitions_filter))
partitions_filter.assert_called_once_with(data,
["partition1"])
self.assertNotIn(((partition,),), m_listdir.call_args_list)
# suffixes_filter
m_listdir.reset_mock()
suffixes_filter = MagicMock(return_value=["suffix1"])
list(audit_location_generator(suffixes_filter=suffixes_filter))
suffixes_filter.assert_called_once_with(partition, ["suffix1"])
self.assertIn(((suffix,),), m_listdir.call_args_list)
m_listdir.reset_mock()
suffixes_filter = MagicMock(return_value=[])
list(audit_location_generator(suffixes_filter=suffixes_filter))
suffixes_filter.assert_called_once_with(partition, ["suffix1"])
self.assertNotIn(((suffix,),), m_listdir.call_args_list)
# hashes_filter
m_listdir.reset_mock()
hashes_filter = MagicMock(return_value=["hash1"])
list(audit_location_generator(hashes_filter=hashes_filter))
hashes_filter.assert_called_once_with(suffix, ["hash1"])
self.assertIn(((hash_path,),), m_listdir.call_args_list)
m_listdir.reset_mock()
hashes_filter = MagicMock(return_value=[])
list(audit_location_generator(hashes_filter=hashes_filter))
hashes_filter.assert_called_once_with(suffix, ["hash1"])
self.assertNotIn(((hash_path,),), m_listdir.call_args_list)
class TestGreenAsyncPile(unittest.TestCase):