Refactor auditors to rely on expected gen names
Refactor the various auditors to rely on the audit_location_generator yielding tuples containing paths with the expected suffix. We also fix the exception handling for container_sync to not expect a broker object (since the act of creating a broker object can raise an exception). For the object auditor we removed an unneeded check for disk_file since get_data_file_size() will raise DiskFileNotExist under the same condition (raises code coverage slightly). Change-Id: I11d405e629063177ef21543b75e9076da1a03b61
This commit is contained in:
parent
18a0813d9b
commit
9480ff8a28
|
@ -49,7 +49,7 @@ class AccountAuditor(Daemon):
|
||||||
|
|
||||||
def _one_audit_pass(self, reported):
|
def _one_audit_pass(self, reported):
|
||||||
all_locs = audit_location_generator(self.devices,
|
all_locs = audit_location_generator(self.devices,
|
||||||
account_server.DATADIR,
|
account_server.DATADIR, '.db',
|
||||||
mount_check=self.mount_check,
|
mount_check=self.mount_check,
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
for path, device, partition in all_locs:
|
for path, device, partition in all_locs:
|
||||||
|
@ -113,8 +113,6 @@ class AccountAuditor(Daemon):
|
||||||
"""
|
"""
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
try:
|
try:
|
||||||
if not path.endswith('.db'):
|
|
||||||
return
|
|
||||||
broker = AccountBroker(path)
|
broker = AccountBroker(path)
|
||||||
if not broker.is_deleted():
|
if not broker.is_deleted():
|
||||||
broker.get_info()
|
broker.get_info()
|
||||||
|
|
|
@ -1506,7 +1506,8 @@ def remove_file(path):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
def audit_location_generator(devices, datadir, suffix='',
|
||||||
|
mount_check=True, logger=None):
|
||||||
'''
|
'''
|
||||||
Given a devices path and a data directory, yield (path, device,
|
Given a devices path and a data directory, yield (path, device,
|
||||||
partition) for all files in that directory
|
partition) for all files in that directory
|
||||||
|
@ -1515,6 +1516,7 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
||||||
:param datadir: a directory located under self.devices. This should be
|
:param datadir: a directory located under self.devices. This should be
|
||||||
one of the DATADIR constants defined in the account,
|
one of the DATADIR constants defined in the account,
|
||||||
container, and object servers.
|
container, and object servers.
|
||||||
|
:param suffix: path name suffix required for all names returned
|
||||||
:param mount_check: Flag to check if a mount check should be performed
|
:param mount_check: Flag to check if a mount check should be performed
|
||||||
on devices
|
on devices
|
||||||
:param logger: a logger object
|
:param logger: a logger object
|
||||||
|
@ -1538,8 +1540,8 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
||||||
if not os.path.isdir(part_path):
|
if not os.path.isdir(part_path):
|
||||||
continue
|
continue
|
||||||
suffixes = listdir(part_path)
|
suffixes = listdir(part_path)
|
||||||
for suffix in suffixes:
|
for asuffix in suffixes:
|
||||||
suff_path = os.path.join(part_path, suffix)
|
suff_path = os.path.join(part_path, asuffix)
|
||||||
if not os.path.isdir(suff_path):
|
if not os.path.isdir(suff_path):
|
||||||
continue
|
continue
|
||||||
hashes = listdir(suff_path)
|
hashes = listdir(suff_path)
|
||||||
|
@ -1549,6 +1551,8 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
||||||
continue
|
continue
|
||||||
for fname in sorted(listdir(hash_path),
|
for fname in sorted(listdir(hash_path),
|
||||||
reverse=True):
|
reverse=True):
|
||||||
|
if suffix and not fname.endswith(suffix):
|
||||||
|
continue
|
||||||
path = os.path.join(hash_path, fname)
|
path = os.path.join(hash_path, fname)
|
||||||
yield path, device, partition
|
yield path, device, partition
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ class ContainerAuditor(Daemon):
|
||||||
|
|
||||||
def _one_audit_pass(self, reported):
|
def _one_audit_pass(self, reported):
|
||||||
all_locs = audit_location_generator(self.devices,
|
all_locs = audit_location_generator(self.devices,
|
||||||
container_server.DATADIR,
|
container_server.DATADIR, '.db',
|
||||||
mount_check=self.mount_check,
|
mount_check=self.mount_check,
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
for path, device, partition in all_locs:
|
for path, device, partition in all_locs:
|
||||||
|
@ -112,8 +112,6 @@ class ContainerAuditor(Daemon):
|
||||||
"""
|
"""
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
try:
|
try:
|
||||||
if not path.endswith('.db'):
|
|
||||||
return
|
|
||||||
broker = ContainerBroker(path)
|
broker = ContainerBroker(path)
|
||||||
if not broker.is_deleted():
|
if not broker.is_deleted():
|
||||||
broker.get_info()
|
broker.get_info()
|
||||||
|
|
|
@ -155,6 +155,7 @@ class ContainerSync(Daemon):
|
||||||
begin = time()
|
begin = time()
|
||||||
all_locs = audit_location_generator(self.devices,
|
all_locs = audit_location_generator(self.devices,
|
||||||
container_server.DATADIR,
|
container_server.DATADIR,
|
||||||
|
'.db',
|
||||||
mount_check=self.mount_check,
|
mount_check=self.mount_check,
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
for path, device, partition in all_locs:
|
for path, device, partition in all_locs:
|
||||||
|
@ -172,7 +173,7 @@ class ContainerSync(Daemon):
|
||||||
self.logger.info(_('Begin container sync "once" mode'))
|
self.logger.info(_('Begin container sync "once" mode'))
|
||||||
begin = time()
|
begin = time()
|
||||||
all_locs = audit_location_generator(self.devices,
|
all_locs = audit_location_generator(self.devices,
|
||||||
container_server.DATADIR,
|
container_server.DATADIR, '.db',
|
||||||
mount_check=self.mount_check,
|
mount_check=self.mount_check,
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
for path, device, partition in all_locs:
|
for path, device, partition in all_locs:
|
||||||
|
@ -213,9 +214,8 @@ class ContainerSync(Daemon):
|
||||||
|
|
||||||
:param path: the path to a container db
|
:param path: the path to a container db
|
||||||
"""
|
"""
|
||||||
|
broker = None
|
||||||
try:
|
try:
|
||||||
if not path.endswith('.db'):
|
|
||||||
return
|
|
||||||
broker = ContainerBroker(path)
|
broker = ContainerBroker(path)
|
||||||
info = broker.get_info()
|
info = broker.get_info()
|
||||||
x, nodes = self.container_ring.get_nodes(info['account'],
|
x, nodes = self.container_ring.get_nodes(info['account'],
|
||||||
|
@ -294,10 +294,11 @@ class ContainerSync(Daemon):
|
||||||
broker.set_x_container_sync_points(sync_point1, None)
|
broker.set_x_container_sync_points(sync_point1, None)
|
||||||
self.container_syncs += 1
|
self.container_syncs += 1
|
||||||
self.logger.increment('syncs')
|
self.logger.increment('syncs')
|
||||||
except (Exception, Timeout), err:
|
except (Exception, Timeout) as err:
|
||||||
self.container_failures += 1
|
self.container_failures += 1
|
||||||
self.logger.increment('failures')
|
self.logger.increment('failures')
|
||||||
self.logger.exception(_('ERROR Syncing %s'), (broker.db_file))
|
self.logger.exception(_('ERROR Syncing %s'),
|
||||||
|
broker.db_file if broker else path)
|
||||||
|
|
||||||
def container_sync_row(self, row, sync_to, sync_key, broker, info):
|
def container_sync_row(self, row, sync_to, sync_key, broker, info):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -71,7 +71,7 @@ class AuditorWorker(object):
|
||||||
total_errors = 0
|
total_errors = 0
|
||||||
time_auditing = 0
|
time_auditing = 0
|
||||||
all_locs = audit_location_generator(self.devices,
|
all_locs = audit_location_generator(self.devices,
|
||||||
object_server.DATADIR,
|
object_server.DATADIR, '.data',
|
||||||
mount_check=self.mount_check,
|
mount_check=self.mount_check,
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
for path, device, partition in all_locs:
|
for path, device, partition in all_locs:
|
||||||
|
@ -158,23 +158,18 @@ class AuditorWorker(object):
|
||||||
:param partition: the partition the path is on
|
:param partition: the partition the path is on
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
if not path.endswith('.data'):
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
name = object_server.read_metadata(path)['name']
|
name = object_server.read_metadata(path)['name']
|
||||||
except (Exception, Timeout), exc:
|
except (Exception, Timeout) as exc:
|
||||||
raise AuditException('Error when reading metadata: %s' % exc)
|
raise AuditException('Error when reading metadata: %s' % exc)
|
||||||
_junk, account, container, obj = name.split('/', 3)
|
_junk, account, container, obj = name.split('/', 3)
|
||||||
df = object_server.DiskFile(self.devices, device, partition,
|
df = object_server.DiskFile(self.devices, device, partition,
|
||||||
account, container, obj, self.logger,
|
account, container, obj, self.logger,
|
||||||
keep_data_fp=True)
|
keep_data_fp=True)
|
||||||
try:
|
try:
|
||||||
if df.data_file is None:
|
|
||||||
# file is deleted, we found the tombstone
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
obj_size = df.get_data_file_size()
|
obj_size = df.get_data_file_size()
|
||||||
except DiskFileError, e:
|
except DiskFileError as e:
|
||||||
raise AuditException(str(e))
|
raise AuditException(str(e))
|
||||||
except DiskFileNotExist:
|
except DiskFileNotExist:
|
||||||
return
|
return
|
||||||
|
@ -198,7 +193,7 @@ class AuditorWorker(object):
|
||||||
{'path': path})
|
{'path': path})
|
||||||
finally:
|
finally:
|
||||||
df.close(verify_file=False)
|
df.close(verify_file=False)
|
||||||
except AuditException, err:
|
except AuditException as err:
|
||||||
self.logger.increment('quarantines')
|
self.logger.increment('quarantines')
|
||||||
self.quarantines += 1
|
self.quarantines += 1
|
||||||
self.logger.error(_('ERROR Object %(obj)s failed audit and will '
|
self.logger.error(_('ERROR Object %(obj)s failed audit and will '
|
||||||
|
|
|
@ -126,14 +126,17 @@ class TestContainerSync(unittest.TestCase):
|
||||||
|
|
||||||
def fake_audit_location_generator(*args, **kwargs):
|
def fake_audit_location_generator(*args, **kwargs):
|
||||||
audit_location_generator_calls[0] += 1
|
audit_location_generator_calls[0] += 1
|
||||||
# Makes .container_sync() short-circuit because 'path' doesn't end
|
# Makes .container_sync() short-circuit
|
||||||
# with .db
|
yield 'container.db', 'device', 'partition'
|
||||||
return [('path', 'device', 'partition')]
|
return
|
||||||
|
|
||||||
orig_time = sync.time
|
orig_time = sync.time
|
||||||
orig_sleep = sync.sleep
|
orig_sleep = sync.sleep
|
||||||
orig_audit_location_generator = sync.audit_location_generator
|
orig_audit_location_generator = sync.audit_location_generator
|
||||||
|
orig_ContainerBroker = sync.ContainerBroker
|
||||||
try:
|
try:
|
||||||
|
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||||
|
info={'account': 'a', 'container': 'c'})
|
||||||
sync.time = fake_time
|
sync.time = fake_time
|
||||||
sync.sleep = fake_sleep
|
sync.sleep = fake_sleep
|
||||||
sync.audit_location_generator = fake_audit_location_generator
|
sync.audit_location_generator = fake_audit_location_generator
|
||||||
|
@ -147,6 +150,7 @@ class TestContainerSync(unittest.TestCase):
|
||||||
sync.time = orig_time
|
sync.time = orig_time
|
||||||
sync.sleep = orig_sleep
|
sync.sleep = orig_sleep
|
||||||
sync.audit_location_generator = orig_audit_location_generator
|
sync.audit_location_generator = orig_audit_location_generator
|
||||||
|
sync.ContainerBroker = orig_ContainerBroker
|
||||||
|
|
||||||
self.assertEquals(time_calls, [9])
|
self.assertEquals(time_calls, [9])
|
||||||
self.assertEquals(len(sleep_calls), 2)
|
self.assertEquals(len(sleep_calls), 2)
|
||||||
|
@ -180,13 +184,16 @@ class TestContainerSync(unittest.TestCase):
|
||||||
|
|
||||||
def fake_audit_location_generator(*args, **kwargs):
|
def fake_audit_location_generator(*args, **kwargs):
|
||||||
audit_location_generator_calls[0] += 1
|
audit_location_generator_calls[0] += 1
|
||||||
# Makes .container_sync() short-circuit because 'path' doesn't end
|
# Makes .container_sync() short-circuit
|
||||||
# with .db
|
yield 'container.db', 'device', 'partition'
|
||||||
return [('path', 'device', 'partition')]
|
return
|
||||||
|
|
||||||
orig_time = sync.time
|
orig_time = sync.time
|
||||||
orig_audit_location_generator = sync.audit_location_generator
|
orig_audit_location_generator = sync.audit_location_generator
|
||||||
|
orig_ContainerBroker = sync.ContainerBroker
|
||||||
try:
|
try:
|
||||||
|
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||||
|
info={'account': 'a', 'container': 'c'})
|
||||||
sync.time = fake_time
|
sync.time = fake_time
|
||||||
sync.audit_location_generator = fake_audit_location_generator
|
sync.audit_location_generator = fake_audit_location_generator
|
||||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||||
|
@ -202,6 +209,7 @@ class TestContainerSync(unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
sync.time = orig_time
|
sync.time = orig_time
|
||||||
sync.audit_location_generator = orig_audit_location_generator
|
sync.audit_location_generator = orig_audit_location_generator
|
||||||
|
sync.ContainerBroker = orig_ContainerBroker
|
||||||
|
|
||||||
self.assertEquals(time_calls, [10])
|
self.assertEquals(time_calls, [10])
|
||||||
self.assertEquals(audit_location_generator_calls, [2])
|
self.assertEquals(audit_location_generator_calls, [2])
|
||||||
|
@ -295,7 +303,7 @@ class TestContainerSync(unittest.TestCase):
|
||||||
cs.container_sync('isa.db')
|
cs.container_sync('isa.db')
|
||||||
self.assertEquals(cs.container_failures, 0)
|
self.assertEquals(cs.container_failures, 0)
|
||||||
self.assertEquals(cs.container_skips, 1)
|
self.assertEquals(cs.container_skips, 1)
|
||||||
|
|
||||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||||
info={'account': 'a', 'container': 'c',
|
info={'account': 'a', 'container': 'c',
|
||||||
'x_container_sync_point1': -1,
|
'x_container_sync_point1': -1,
|
||||||
|
@ -308,7 +316,7 @@ class TestContainerSync(unittest.TestCase):
|
||||||
cs.container_sync('isa.db')
|
cs.container_sync('isa.db')
|
||||||
self.assertEquals(cs.container_failures, 0)
|
self.assertEquals(cs.container_failures, 0)
|
||||||
self.assertEquals(cs.container_skips, 2)
|
self.assertEquals(cs.container_skips, 2)
|
||||||
|
|
||||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||||
info={'account': 'a', 'container': 'c',
|
info={'account': 'a', 'container': 'c',
|
||||||
'x_container_sync_point1': -1,
|
'x_container_sync_point1': -1,
|
||||||
|
@ -321,7 +329,7 @@ class TestContainerSync(unittest.TestCase):
|
||||||
cs.container_sync('isa.db')
|
cs.container_sync('isa.db')
|
||||||
self.assertEquals(cs.container_failures, 0)
|
self.assertEquals(cs.container_failures, 0)
|
||||||
self.assertEquals(cs.container_skips, 3)
|
self.assertEquals(cs.container_skips, 3)
|
||||||
|
|
||||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||||
info={'account': 'a', 'container': 'c',
|
info={'account': 'a', 'container': 'c',
|
||||||
'x_container_sync_point1': -1,
|
'x_container_sync_point1': -1,
|
||||||
|
@ -336,7 +344,7 @@ class TestContainerSync(unittest.TestCase):
|
||||||
cs.container_sync('isa.db')
|
cs.container_sync('isa.db')
|
||||||
self.assertEquals(cs.container_failures, 1)
|
self.assertEquals(cs.container_failures, 1)
|
||||||
self.assertEquals(cs.container_skips, 3)
|
self.assertEquals(cs.container_skips, 3)
|
||||||
|
|
||||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||||
info={'account': 'a', 'container': 'c',
|
info={'account': 'a', 'container': 'c',
|
||||||
'x_container_sync_point1': -1,
|
'x_container_sync_point1': -1,
|
||||||
|
@ -385,7 +393,7 @@ class TestContainerSync(unittest.TestCase):
|
||||||
|
|
||||||
def fake_time():
|
def fake_time():
|
||||||
return fake_times.pop(0)
|
return fake_times.pop(0)
|
||||||
|
|
||||||
sync.time = fake_time
|
sync.time = fake_time
|
||||||
# This same sync won't fail since it will look like it took so long
|
# This same sync won't fail since it will look like it took so long
|
||||||
# as to be time to move on (before it ever actually tries to do
|
# as to be time to move on (before it ever actually tries to do
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
from test import unit
|
from test import unit
|
||||||
import unittest
|
import unittest
|
||||||
|
import mock
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
|
@ -141,14 +142,26 @@ class TestAuditor(unittest.TestCase):
|
||||||
'sda', '0')
|
'sda', '0')
|
||||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||||
|
|
||||||
def test_object_audit_bad_args(self):
|
def test_generic_exception_handling(self):
|
||||||
self.auditor = auditor.AuditorWorker(self.conf, self.logger)
|
self.auditor = auditor.AuditorWorker(self.conf, self.logger)
|
||||||
|
timestamp = str(normalize_timestamp(time.time()))
|
||||||
pre_errors = self.auditor.errors
|
pre_errors = self.auditor.errors
|
||||||
self.auditor.object_audit(5, 'sda', '0')
|
data = '0' * 1024
|
||||||
|
etag = md5()
|
||||||
|
with self.disk_file.writer() as writer:
|
||||||
|
writer.write(data)
|
||||||
|
etag.update(data)
|
||||||
|
etag = etag.hexdigest()
|
||||||
|
metadata = {
|
||||||
|
'ETag': etag,
|
||||||
|
'X-Timestamp': timestamp,
|
||||||
|
'Content-Length': str(os.fstat(writer.fd).st_size),
|
||||||
|
}
|
||||||
|
writer.put(metadata)
|
||||||
|
with mock.patch('swift.obj.server.DiskFile',
|
||||||
|
lambda *_: 1 / 0):
|
||||||
|
self.auditor.audit_all_objects()
|
||||||
self.assertEquals(self.auditor.errors, pre_errors + 1)
|
self.assertEquals(self.auditor.errors, pre_errors + 1)
|
||||||
pre_errors = self.auditor.errors
|
|
||||||
self.auditor.object_audit('badpath', 'sda', '0')
|
|
||||||
self.assertEquals(self.auditor.errors, pre_errors) # just returns
|
|
||||||
|
|
||||||
def test_object_run_once_pass(self):
|
def test_object_run_once_pass(self):
|
||||||
self.auditor = auditor.AuditorWorker(self.conf, self.logger)
|
self.auditor = auditor.AuditorWorker(self.conf, self.logger)
|
||||||
|
|
Loading…
Reference in New Issue