Allow specification of object devices for audit

In object audit "once" mode we are allowing the user to specify
a sub-set of devices to audit using the "--devices" command-line
option.  The sub-set is specified as a comma-separated list.  This
patch is taken from a larger patch to enable parallel processing
in the object auditor.

We've had to modify recon so that it will work properly with this
change to "once" mode.    We've modified dump_recon_cache()
so that it will store nested dictionaries, in other words it will
store a recon cache entry such as {'key1': {'key2': {...}}}. When
the object auditor is run in "once" mode with "--devices" set the
object_auditor_stats_ALL and ZBF entries look like:
{'object_auditor_stats_ALL': {'disk1disk2..diskn': {...}}}. When
swift-recon is run, it hunts through the nested dicts to find the
appropriate entries.  The object auditor recon cache entries are set
to {} at the beginning of each audit cycle, and individual disk
entries are cleared from cache at the end of each disk's audit cycle.

DocImpact

Change-Id: Icc53dac0a8136f1b2f61d5e08baf7b4fd87c8123
This commit is contained in:
Eamonn O'Toole 2014-02-24 11:24:56 +00:00
parent 86668aa1ac
commit 793489b80d
9 changed files with 340 additions and 73 deletions

View File

@ -23,5 +23,7 @@ if __name__ == '__main__':
parser = OptionParser("%prog CONFIG [options]")
parser.add_option('-z', '--zero_byte_fps',
help='Audit only zero byte files at specified files/sec')
parser.add_option('-d', '--devices',
help='Audit only given devices. Comma-separated list')
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectAuditor, conf_file, **options)

View File

@ -1067,6 +1067,13 @@ run this command as follows:
`swift-object-auditor /path/to/object-server/config/file.conf once -z 1000`
"-z" means to only check for zero-byte files at 1000 files per second.
At times it is useful to be able to run the object auditor on a specific
device or set of devices. You can run the object-auditor as follows:
swift-object-auditor /path/to/object-server/config/file.conf once --devices=sda,sdb
This will run the object auditor on only the sda and sdb devices. This param
accepts a comma separated list of values.
-----------------
Object Replicator
-----------------

View File

@ -488,6 +488,24 @@ class SwiftRecon(object):
(self._ptime(low), self._ptime(high), self._ptime(average))
print "=" * 79
def nested_get_value(self, key, recon_entry):
"""
Generator that yields all values for given key in a recon cache entry.
This is for use with object auditor recon cache entries. If the
object auditor has run in 'once' mode with a subset of devices
specified the checksum auditor section will have an entry of the form:
{'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}}
The same is true of the ZBF auditor cache entry section. We use this
generator to find all instances of a particular key in these multi-
level dictionaries.
"""
for k, v in recon_entry.items():
if isinstance(v, dict):
for value in self.nested_get_value(key, v):
yield value
if k == key:
yield v
def object_auditor_check(self, hosts):
"""
Obtain and print obj auditor statistics
@ -513,11 +531,16 @@ class SwiftRecon(object):
zbf_scan[url] = response['object_auditor_stats_ZBF']
if len(all_scan) > 0:
stats = {}
stats[atime] = [all_scan[i][atime] for i in all_scan]
stats[bprocessed] = [all_scan[i][bprocessed] for i in all_scan]
stats[passes] = [all_scan[i][passes] for i in all_scan]
stats[errors] = [all_scan[i][errors] for i in all_scan]
stats[quarantined] = [all_scan[i][quarantined] for i in all_scan]
stats[atime] = [(self.nested_get_value(atime, all_scan[i]))
for i in all_scan]
stats[bprocessed] = [(self.nested_get_value(bprocessed,
all_scan[i])) for i in all_scan]
stats[passes] = [(self.nested_get_value(passes, all_scan[i]))
for i in all_scan]
stats[errors] = [(self.nested_get_value(errors, all_scan[i]))
for i in all_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
all_scan[i])) for i in all_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]
@ -534,10 +557,14 @@ class SwiftRecon(object):
print "[ALL_auditor] - No hosts returned valid data."
if len(zbf_scan) > 0:
stats = {}
stats[atime] = [zbf_scan[i][atime] for i in zbf_scan]
stats[bprocessed] = [zbf_scan[i][bprocessed] for i in zbf_scan]
stats[errors] = [zbf_scan[i][errors] for i in zbf_scan]
stats[quarantined] = [zbf_scan[i][quarantined] for i in zbf_scan]
stats[atime] = [(self.nested_get_value(atime, zbf_scan[i]))
for i in zbf_scan]
stats[bprocessed] = [(self.nested_get_value(bprocessed,
zbf_scan[i])) for i in zbf_scan]
stats[errors] = [(self.nested_get_value(errors, zbf_scan[i]))
for i in zbf_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
zbf_scan[i])) for i in zbf_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]

View File

@ -2079,6 +2079,28 @@ def human_readable(value):
return '%d%si' % (round(value), suffixes[index])
def put_recon_cache_entry(cache_entry, key, item):
"""
Function that will check if item is a dict, and if so put it under
cache_entry[key]. We use nested recon cache entries when the object
auditor runs in 'once' mode with a specified subset of devices.
"""
if isinstance(item, dict):
if key not in cache_entry or key in cache_entry and not \
isinstance(cache_entry[key], dict):
cache_entry[key] = {}
elif key in cache_entry and item == {}:
cache_entry.pop(key, None)
return
for k, v in item.items():
if v == {}:
cache_entry[key].pop(k, None)
else:
cache_entry[key][k] = v
else:
cache_entry[key] = item
def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
"""Update recon cache values
@ -2098,7 +2120,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
#file doesn't have a valid entry, we'll recreate it
pass
for cache_key, cache_value in cache_dict.items():
cache_entry[cache_key] = cache_value
put_recon_cache_entry(cache_entry, cache_key, cache_value)
try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf:

View File

@ -14,14 +14,16 @@
# limitations under the License.
import os
import sys
import time
import signal
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
from swift.obj import diskfile
from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \
list_from_csv, json
list_from_csv, json, listdir
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
from swift.common.daemon import Daemon
@ -31,10 +33,10 @@ SLEEP_BETWEEN_AUDITS = 30
class AuditorWorker(object):
"""Walk through file system to audit objects"""
def __init__(self, conf, logger, zero_byte_only_at_fps=0):
def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0):
self.conf = conf
self.logger = logger
self.devices = conf.get('devices', '/srv/node')
self.devices = devices
self.diskfile_mgr = diskfile.DiskFileManager(conf, self.logger)
self.max_files_per_second = float(conf.get('files_per_second', 20))
self.max_bytes_per_second = float(conf.get('bytes_per_second',
@ -53,24 +55,34 @@ class AuditorWorker(object):
self.passes = 0
self.quarantines = 0
self.errors = 0
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
self.rcache = rcache
self.stats_sizes = sorted(
[int(s) for s in list_from_csv(conf.get('object_size_stats'))])
self.stats_buckets = dict(
[(s, 0) for s in self.stats_sizes + ['OVER']])
def audit_all_objects(self, mode='once'):
self.logger.info(_('Begin object audit "%s" mode (%s)') %
(mode, self.auditor_type))
def create_recon_nested_dict(self, top_level_key, device_list, item):
if device_list:
device_key = ''.join(sorted(device_list))
return {top_level_key: {device_key: item}}
else:
return {top_level_key: item}
def audit_all_objects(self, mode='once', device_dirs=None):
description = ''
if device_dirs:
device_dir_str = ','.join(sorted(device_dirs))
description = _(' - %s') % device_dir_str
self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
(mode, self.auditor_type, description))
begin = reported = time.time()
self.total_bytes_processed = 0
self.total_files_processed = 0
total_quarantines = 0
total_errors = 0
time_auditing = 0
all_locs = self.diskfile_mgr.object_audit_location_generator()
all_locs = self.diskfile_mgr.object_audit_location_generator(
device_dirs=device_dirs)
for location in all_locs:
loop_time = time.time()
self.failsafe_object_audit(location)
@ -87,7 +99,7 @@ class AuditorWorker(object):
'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
'Total time: %(total).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % {
'type': self.auditor_type,
'type': '%s%s' % (self.auditor_type, description),
'start_time': time.ctime(reported),
'passes': self.passes, 'quars': self.quarantines,
'errors': self.errors,
@ -95,15 +107,14 @@ class AuditorWorker(object):
'brate': self.bytes_processed / (now - reported),
'total': (now - begin), 'audit': time_auditing,
'audit_rate': time_auditing / (now - begin)})
dump_recon_cache({'object_auditor_stats_%s' %
self.auditor_type: {
'errors': self.errors,
'passes': self.passes,
'quarantined': self.quarantines,
'bytes_processed': self.bytes_processed,
'start_time': reported,
'audit_time': time_auditing}},
self.rcache, self.logger)
cache_entry = self.create_recon_nested_dict(
'object_auditor_stats_%s' % (self.auditor_type),
device_dirs,
{'errors': self.errors, 'passes': self.passes,
'quarantined': self.quarantines,
'bytes_processed': self.bytes_processed,
'start_time': reported, 'audit_time': time_auditing})
dump_recon_cache(cache_entry, self.rcache, self.logger)
reported = now
total_quarantines += self.quarantines
total_errors += self.errors
@ -120,12 +131,19 @@ class AuditorWorker(object):
'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % {
'type': self.auditor_type, 'mode': mode, 'elapsed': elapsed,
'type': '%s%s' % (self.auditor_type, description),
'mode': mode, 'elapsed': elapsed,
'quars': total_quarantines + self.quarantines,
'errors': total_errors + self.errors,
'frate': self.total_files_processed / elapsed,
'brate': self.total_bytes_processed / elapsed,
'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
# Clear recon cache entry if device_dirs is set
if device_dirs:
cache_entry = self.create_recon_nested_dict(
'object_auditor_stats_%s' % (self.auditor_type),
device_dirs, {})
dump_recon_cache(cache_entry, self.rcache, self.logger)
if self.stats_sizes:
self.logger.info(
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
@ -204,35 +222,100 @@ class ObjectAuditor(Daemon):
def __init__(self, conf, **options):
self.conf = conf
self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.conf_zero_byte_fps = int(
conf.get('zero_byte_files_per_second', 50))
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
def _sleep(self):
time.sleep(SLEEP_BETWEEN_AUDITS)
def clear_recon_cache(self, auditor_type):
"""Clear recon cache entries"""
dump_recon_cache({'object_auditor_stats_%s' % auditor_type: {}},
self.rcache, self.logger)
def run_audit(self, **kwargs):
"""Run the object audit"""
mode = kwargs.get('mode')
zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
device_dirs = kwargs.get('device_dirs')
worker = AuditorWorker(self.conf, self.logger, self.rcache,
self.devices,
zero_byte_only_at_fps=zero_byte_only_at_fps)
worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
def fork_child(self, zero_byte_fps=False, **kwargs):
"""Child execution"""
pid = os.fork()
if pid:
return pid
else:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
if zero_byte_fps:
kwargs['zero_byte_fps'] = self.conf_zero_byte_fps
self.run_audit(**kwargs)
sys.exit()
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
"""Audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
kwargs['device_dirs'] = override_devices
if parent:
kwargs['zero_byte_fps'] = zbo_fps
self.run_audit(**kwargs)
else:
pids = []
if self.conf_zero_byte_fps:
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.append(self.fork_child(**kwargs))
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes
if self.conf_zero_byte_fps and pid == zbf_pid and \
len(pids) > 1:
kwargs['device_dirs'] = override_devices
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.remove(pid)
def run_forever(self, *args, **kwargs):
"""Run the object audit until stopped."""
# zero byte only command line option
zbo_fps = kwargs.get('zero_byte_fps', 0)
parent = False
if zbo_fps:
# only start parent
parent = True
else:
parent = os.fork() # child gets parent = 0
kwargs = {'mode': 'forever'}
if parent:
kwargs['zero_byte_fps'] = zbo_fps or self.conf_zero_byte_fps
while True:
try:
self.run_once(**kwargs)
self.audit_loop(parent, zbo_fps, **kwargs)
except (Exception, Timeout):
self.logger.exception(_('ERROR auditing'))
self._sleep()
def run_once(self, *args, **kwargs):
"""Run the object audit once."""
mode = kwargs.get('mode', 'once')
zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
worker = AuditorWorker(self.conf, self.logger,
zero_byte_only_at_fps=zero_byte_only_at_fps)
worker.audit_all_objects(mode=mode)
"""Run the object audit once"""
# zero byte only command line option
zbo_fps = kwargs.get('zero_byte_fps', 0)
override_devices = list_from_csv(kwargs.get('devices'))
# Remove bogus entries and duplicates from override_devices
override_devices = list(
set(listdir(self.devices)).intersection(set(override_devices)))
parent = False
if zbo_fps:
# only start parent
parent = True
kwargs = {'mode': 'once'}
try:
self.audit_loop(parent, zbo_fps, override_devices=override_devices,
**kwargs)
except (Exception, Timeout):
self.logger.exception(_('ERROR auditing'))

View File

@ -351,22 +351,32 @@ class AuditLocation(object):
return str(self.path)
def object_audit_location_generator(devices, mount_check=True, logger=None):
def object_audit_location_generator(devices, mount_check=True, logger=None,
device_dirs=None):
"""
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
objects stored under that directory. The AuditLocation only knows the path
to the hash directory, not to the .data file therein (if any). This is to
avoid a double listdir(hash_dir); the DiskFile object will always do one,
so we don't.
objects stored under that directory if device_dirs isn't set. If
device_dirs is set, only yield AuditLocation for the objects under the
entries in device_dirs. The AuditLocation only knows the path to the hash
directory, not to the .data file therein (if any). This is to avoid a
double listdir(hash_dir); the DiskFile object will always do one, so
we don't.
:param devices: parent directory of the devices to be audited
:param mount_check: flag to check if a mount check should be performed
on devices
:param logger: a logger object
:device_dirs: a list of directories under devices to traverse
"""
device_dirs = listdir(devices)
if not device_dirs:
device_dirs = listdir(devices)
else:
# remove bogus devices and duplicates from device_dirs
device_dirs = list(
set(listdir(devices)).intersection(set(device_dirs)))
# randomize devices in case of process restart before sweep completed
shuffle(device_dirs)
for device in device_dirs:
if mount_check and not \
ismount(os.path.join(devices, device)):
@ -502,9 +512,9 @@ class DiskFileManager(object):
return DiskFile(self, dev_path, self.threadpools[device],
partition, account, container, obj, **kwargs)
def object_audit_location_generator(self):
def object_audit_location_generator(self, device_dirs=None):
return object_audit_location_generator(self.devices, self.mount_check,
self.logger)
self.logger, device_dirs)
def get_diskfile_from_audit_location(self, audit_location):
dev_path = self.get_dev_path(audit_location.device, mount_check=False)

View File

@ -562,6 +562,45 @@ class TestReconSuccess(TestCase):
"files_processed": 2310,
"quarantined": 0}})
def test_get_auditor_info_object_once(self):
from_cache_response = {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}}}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_auditor_stats_ALL',
'object_auditor_stats_ZBF'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}}})
def test_get_unmounted(self):
unmounted_resp = [{'device': 'fakeone', 'mounted': False},
{'device': 'faketwo', 'mounted': False}]

View File

@ -28,6 +28,7 @@ import random
import re
import socket
import sys
import json
from textwrap import dedent
@ -486,6 +487,29 @@ class TestUtils(unittest.TestCase):
utils.sys.stdout = orig_stdout
utils.sys.stderr = orig_stderr
def test_dump_recon_cache(self):
testdir_base = mkdtemp()
testcache_file = os.path.join(testdir_base, 'cache.recon')
logger = utils.get_logger(None, 'server', log_route='server')
try:
submit_dict = {'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
fd = open(testcache_file)
file_dict = json.loads(fd.readline())
fd.close()
self.assertEquals(submit_dict, file_dict)
# Use a nested entry
submit_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}}}
result_dict = {'key1': {'key2': {'value1': 1, 'value2': 2},
'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
fd = open(testcache_file)
file_dict = json.loads(fd.readline())
fd.close()
self.assertEquals(result_dict, file_dict)
finally:
rmtree(testdir_base)
def test_get_logger(self):
sio = StringIO()
logger = logging.getLogger('server')

View File

@ -18,6 +18,7 @@ import unittest
import mock
import os
import time
import string
from shutil import rmtree
from hashlib import md5
from tempfile import mkdtemp
@ -34,6 +35,7 @@ class TestAuditor(unittest.TestCase):
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.devices = os.path.join(self.testdir, 'node')
self.rcache = os.path.join(self.testdir, 'object.recon')
self.logger = FakeLogger()
rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, 'sda'))
@ -60,7 +62,8 @@ class TestAuditor(unittest.TestCase):
unit.xattr_data = {}
def test_object_audit_extra_data(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = '0' * 1024
etag = md5()
with self.disk_file.create() as writer:
@ -86,7 +89,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_audit_diff_data(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = '0' * 1024
etag = md5()
timestamp = str(normalize_timestamp(time.time()))
@ -129,7 +133,8 @@ class TestAuditor(unittest.TestCase):
fp.write('0' * 1024)
fp.close()
invalidate_hash(os.path.dirname(self.disk_file._datadir))
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0'))
@ -141,7 +146,8 @@ class TestAuditor(unittest.TestCase):
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
@ -156,7 +162,8 @@ class TestAuditor(unittest.TestCase):
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
@ -166,7 +173,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.errors, 1)
def test_generic_exception_handling(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_errors = auditor_worker.errors
data = '0' * 1024
@ -186,7 +194,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.errors, pre_errors + 1)
def test_object_run_once_pass(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.log_time = 0
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
@ -208,7 +217,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = '0' * 1024
@ -228,7 +238,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_run_once_multi_devices(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = '0' * 10
@ -284,9 +295,12 @@ class TestAuditor(unittest.TestCase):
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.auditor.run_once(zero_byte_fps=50)
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
self.auditor.run_once()
del(kwargs['zero_byte_fps'])
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
def setup_bad_zero_byte(self, with_ts=False):
@ -322,14 +336,17 @@ class TestAuditor(unittest.TestCase):
def test_object_run_fast_track_all(self):
self.setup_bad_zero_byte()
self.auditor.run_once()
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
def test_object_run_fast_track_zero(self):
self.setup_bad_zero_byte()
self.auditor.run_once(zero_byte_fps=50)
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
@ -347,7 +364,9 @@ class TestAuditor(unittest.TestCase):
was_df = auditor.diskfile.DiskFile
try:
auditor.diskfile.DiskFile = FakeFile
self.auditor.run_once(zero_byte_fps=50)
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
@ -358,7 +377,8 @@ class TestAuditor(unittest.TestCase):
def test_with_tombstone(self):
ts_file_path = self.setup_bad_zero_byte(with_ts=True)
self.assertTrue(ts_file_path.endswith('ts'))
self.auditor.run_once()
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.exists(ts_file_path))
def test_sleeper(self):
@ -370,7 +390,7 @@ class TestAuditor(unittest.TestCase):
self.assert_(delta_t > 0.08)
self.assert_(delta_t < 0.12)
def test_run_forever(self):
def test_run_audit(self):
class StopForever(Exception):
pass
@ -378,45 +398,78 @@ class TestAuditor(unittest.TestCase):
class ObjectAuditorMock(object):
check_args = ()
check_kwargs = {}
check_device_dir = None
fork_called = 0
fork_res = 0
master = 0
wait_called = 0
def mock_run(self, *args, **kwargs):
self.check_args = args
self.check_kwargs = kwargs
if 'zero_byte_fps' in kwargs:
self.check_device_dir = kwargs.get('device_dirs')
def mock_sleep(self):
raise StopForever('stop')
def mock_fork(self):
self.fork_called += 1
return self.fork_res
if self.master:
return self.fork_called
else:
return 0
def mock_wait(self):
self.wait_called += 1
return (self.wait_called, 0)
for i in string.ascii_letters[2:26]:
mkdirs(os.path.join(self.devices, 'sd%s' % i))
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89))
mocker = ObjectAuditorMock()
my_auditor.run_once = mocker.mock_run
my_auditor.run_audit = mocker.mock_run
my_auditor._sleep = mocker.mock_sleep
was_fork = os.fork
was_wait = os.wait
try:
os.fork = mocker.mock_fork
os.wait = mocker.mock_wait
self.assertRaises(StopForever,
my_auditor.run_forever, zero_byte_fps=50)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 50)
self.assertEquals(mocker.fork_called, 0)
self.assertRaises(StopForever, my_auditor.run_forever)
self.assertRaises(SystemExit, my_auditor.run_forever)
self.assertEquals(mocker.fork_called, 1)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEquals(mocker.check_device_dir, None)
self.assertEquals(mocker.check_args, ())
mocker.fork_res = 1
self.assertRaises(StopForever, my_auditor.run_forever)
self.assertEquals(mocker.fork_called, 2)
device_list = ['sd%s' % i for i in string.ascii_letters[2:10]]
device_string = ','.join(device_list)
device_string_bogus = device_string + ',bogus'
mocker.fork_called = 0
self.assertRaises(SystemExit, my_auditor.run_once,
devices=device_string_bogus)
self.assertEquals(mocker.fork_called, 1)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEquals(sorted(mocker.check_device_dir), device_list)
mocker.master = 1
mocker.fork_called = 0
self.assertRaises(StopForever, my_auditor.run_forever)
# Fork is called 3 times since the zbf process is forked twice
self.assertEquals(mocker.fork_called, 3)
self.assertEquals(mocker.wait_called, 3)
finally:
os.fork = was_fork
os.wait = was_wait
if __name__ == '__main__':
unittest.main()