Merge "Refactor auditors to rely on expected gen names"
This commit is contained in:
commit
167897bba5
|
@ -49,7 +49,7 @@ class AccountAuditor(Daemon):
|
|||
|
||||
def _one_audit_pass(self, reported):
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
account_server.DATADIR,
|
||||
account_server.DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
|
@ -113,8 +113,6 @@ class AccountAuditor(Daemon):
|
|||
"""
|
||||
start_time = time.time()
|
||||
try:
|
||||
if not path.endswith('.db'):
|
||||
return
|
||||
broker = AccountBroker(path)
|
||||
if not broker.is_deleted():
|
||||
broker.get_info()
|
||||
|
|
|
@ -1506,7 +1506,8 @@ def remove_file(path):
|
|||
pass
|
||||
|
||||
|
||||
def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
||||
def audit_location_generator(devices, datadir, suffix='',
|
||||
mount_check=True, logger=None):
|
||||
'''
|
||||
Given a devices path and a data directory, yield (path, device,
|
||||
partition) for all files in that directory
|
||||
|
@ -1515,6 +1516,7 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
|||
:param datadir: a directory located under self.devices. This should be
|
||||
one of the DATADIR constants defined in the account,
|
||||
container, and object servers.
|
||||
:param suffix: path name suffix required for all names returned
|
||||
:param mount_check: Flag to check if a mount check should be performed
|
||||
on devices
|
||||
:param logger: a logger object
|
||||
|
@ -1538,8 +1540,8 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
|||
if not os.path.isdir(part_path):
|
||||
continue
|
||||
suffixes = listdir(part_path)
|
||||
for suffix in suffixes:
|
||||
suff_path = os.path.join(part_path, suffix)
|
||||
for asuffix in suffixes:
|
||||
suff_path = os.path.join(part_path, asuffix)
|
||||
if not os.path.isdir(suff_path):
|
||||
continue
|
||||
hashes = listdir(suff_path)
|
||||
|
@ -1549,6 +1551,8 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
|||
continue
|
||||
for fname in sorted(listdir(hash_path),
|
||||
reverse=True):
|
||||
if suffix and not fname.endswith(suffix):
|
||||
continue
|
||||
path = os.path.join(hash_path, fname)
|
||||
yield path, device, partition
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ class ContainerAuditor(Daemon):
|
|||
|
||||
def _one_audit_pass(self, reported):
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
container_server.DATADIR,
|
||||
container_server.DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
|
@ -112,8 +112,6 @@ class ContainerAuditor(Daemon):
|
|||
"""
|
||||
start_time = time.time()
|
||||
try:
|
||||
if not path.endswith('.db'):
|
||||
return
|
||||
broker = ContainerBroker(path)
|
||||
if not broker.is_deleted():
|
||||
broker.get_info()
|
||||
|
|
|
@ -155,6 +155,7 @@ class ContainerSync(Daemon):
|
|||
begin = time()
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
container_server.DATADIR,
|
||||
'.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
|
@ -172,7 +173,7 @@ class ContainerSync(Daemon):
|
|||
self.logger.info(_('Begin container sync "once" mode'))
|
||||
begin = time()
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
container_server.DATADIR,
|
||||
container_server.DATADIR, '.db',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
|
@ -213,9 +214,8 @@ class ContainerSync(Daemon):
|
|||
|
||||
:param path: the path to a container db
|
||||
"""
|
||||
broker = None
|
||||
try:
|
||||
if not path.endswith('.db'):
|
||||
return
|
||||
broker = ContainerBroker(path)
|
||||
info = broker.get_info()
|
||||
x, nodes = self.container_ring.get_nodes(info['account'],
|
||||
|
@ -294,10 +294,11 @@ class ContainerSync(Daemon):
|
|||
broker.set_x_container_sync_points(sync_point1, None)
|
||||
self.container_syncs += 1
|
||||
self.logger.increment('syncs')
|
||||
except (Exception, Timeout), err:
|
||||
except (Exception, Timeout) as err:
|
||||
self.container_failures += 1
|
||||
self.logger.increment('failures')
|
||||
self.logger.exception(_('ERROR Syncing %s'), (broker.db_file))
|
||||
self.logger.exception(_('ERROR Syncing %s'),
|
||||
broker.db_file if broker else path)
|
||||
|
||||
def container_sync_row(self, row, sync_to, sync_key, broker, info):
|
||||
"""
|
||||
|
|
|
@ -71,7 +71,7 @@ class AuditorWorker(object):
|
|||
total_errors = 0
|
||||
time_auditing = 0
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
object_server.DATADIR,
|
||||
object_server.DATADIR, '.data',
|
||||
mount_check=self.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
|
@ -158,23 +158,18 @@ class AuditorWorker(object):
|
|||
:param partition: the partition the path is on
|
||||
"""
|
||||
try:
|
||||
if not path.endswith('.data'):
|
||||
return
|
||||
try:
|
||||
name = object_server.read_metadata(path)['name']
|
||||
except (Exception, Timeout), exc:
|
||||
except (Exception, Timeout) as exc:
|
||||
raise AuditException('Error when reading metadata: %s' % exc)
|
||||
_junk, account, container, obj = name.split('/', 3)
|
||||
df = object_server.DiskFile(self.devices, device, partition,
|
||||
account, container, obj, self.logger,
|
||||
keep_data_fp=True)
|
||||
try:
|
||||
if df.data_file is None:
|
||||
# file is deleted, we found the tombstone
|
||||
return
|
||||
try:
|
||||
obj_size = df.get_data_file_size()
|
||||
except DiskFileError, e:
|
||||
except DiskFileError as e:
|
||||
raise AuditException(str(e))
|
||||
except DiskFileNotExist:
|
||||
return
|
||||
|
@ -198,7 +193,7 @@ class AuditorWorker(object):
|
|||
{'path': path})
|
||||
finally:
|
||||
df.close(verify_file=False)
|
||||
except AuditException, err:
|
||||
except AuditException as err:
|
||||
self.logger.increment('quarantines')
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and will '
|
||||
|
|
|
@ -126,14 +126,17 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
audit_location_generator_calls[0] += 1
|
||||
# Makes .container_sync() short-circuit because 'path' doesn't end
|
||||
# with .db
|
||||
return [('path', 'device', 'partition')]
|
||||
# Makes .container_sync() short-circuit
|
||||
yield 'container.db', 'device', 'partition'
|
||||
return
|
||||
|
||||
orig_time = sync.time
|
||||
orig_sleep = sync.sleep
|
||||
orig_audit_location_generator = sync.audit_location_generator
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||
info={'account': 'a', 'container': 'c'})
|
||||
sync.time = fake_time
|
||||
sync.sleep = fake_sleep
|
||||
sync.audit_location_generator = fake_audit_location_generator
|
||||
|
@ -147,6 +150,7 @@ class TestContainerSync(unittest.TestCase):
|
|||
sync.time = orig_time
|
||||
sync.sleep = orig_sleep
|
||||
sync.audit_location_generator = orig_audit_location_generator
|
||||
sync.ContainerBroker = orig_ContainerBroker
|
||||
|
||||
self.assertEquals(time_calls, [9])
|
||||
self.assertEquals(len(sleep_calls), 2)
|
||||
|
@ -180,13 +184,16 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_audit_location_generator(*args, **kwargs):
|
||||
audit_location_generator_calls[0] += 1
|
||||
# Makes .container_sync() short-circuit because 'path' doesn't end
|
||||
# with .db
|
||||
return [('path', 'device', 'partition')]
|
||||
# Makes .container_sync() short-circuit
|
||||
yield 'container.db', 'device', 'partition'
|
||||
return
|
||||
|
||||
orig_time = sync.time
|
||||
orig_audit_location_generator = sync.audit_location_generator
|
||||
orig_ContainerBroker = sync.ContainerBroker
|
||||
try:
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||
info={'account': 'a', 'container': 'c'})
|
||||
sync.time = fake_time
|
||||
sync.audit_location_generator = fake_audit_location_generator
|
||||
cs = sync.ContainerSync({}, container_ring=FakeRing(),
|
||||
|
@ -202,6 +209,7 @@ class TestContainerSync(unittest.TestCase):
|
|||
finally:
|
||||
sync.time = orig_time
|
||||
sync.audit_location_generator = orig_audit_location_generator
|
||||
sync.ContainerBroker = orig_ContainerBroker
|
||||
|
||||
self.assertEquals(time_calls, [10])
|
||||
self.assertEquals(audit_location_generator_calls, [2])
|
||||
|
@ -295,7 +303,7 @@ class TestContainerSync(unittest.TestCase):
|
|||
cs.container_sync('isa.db')
|
||||
self.assertEquals(cs.container_failures, 0)
|
||||
self.assertEquals(cs.container_skips, 1)
|
||||
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'x_container_sync_point1': -1,
|
||||
|
@ -308,7 +316,7 @@ class TestContainerSync(unittest.TestCase):
|
|||
cs.container_sync('isa.db')
|
||||
self.assertEquals(cs.container_failures, 0)
|
||||
self.assertEquals(cs.container_skips, 2)
|
||||
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'x_container_sync_point1': -1,
|
||||
|
@ -321,7 +329,7 @@ class TestContainerSync(unittest.TestCase):
|
|||
cs.container_sync('isa.db')
|
||||
self.assertEquals(cs.container_failures, 0)
|
||||
self.assertEquals(cs.container_skips, 3)
|
||||
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'x_container_sync_point1': -1,
|
||||
|
@ -336,7 +344,7 @@ class TestContainerSync(unittest.TestCase):
|
|||
cs.container_sync('isa.db')
|
||||
self.assertEquals(cs.container_failures, 1)
|
||||
self.assertEquals(cs.container_skips, 3)
|
||||
|
||||
|
||||
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
|
||||
info={'account': 'a', 'container': 'c',
|
||||
'x_container_sync_point1': -1,
|
||||
|
@ -385,7 +393,7 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_time():
|
||||
return fake_times.pop(0)
|
||||
|
||||
|
||||
sync.time = fake_time
|
||||
# This same sync won't fail since it will look like it took so long
|
||||
# as to be time to move on (before it ever actually tries to do
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
from test import unit
|
||||
import unittest
|
||||
import mock
|
||||
import os
|
||||
import time
|
||||
from shutil import rmtree
|
||||
|
@ -141,14 +142,26 @@ class TestAuditor(unittest.TestCase):
|
|||
'sda', '0')
|
||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_audit_bad_args(self):
|
||||
def test_generic_exception_handling(self):
|
||||
self.auditor = auditor.AuditorWorker(self.conf, self.logger)
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
pre_errors = self.auditor.errors
|
||||
self.auditor.object_audit(5, 'sda', '0')
|
||||
data = '0' * 1024
|
||||
etag = md5()
|
||||
with self.disk_file.writer() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp,
|
||||
'Content-Length': str(os.fstat(writer.fd).st_size),
|
||||
}
|
||||
writer.put(metadata)
|
||||
with mock.patch('swift.obj.server.DiskFile',
|
||||
lambda *_: 1 / 0):
|
||||
self.auditor.audit_all_objects()
|
||||
self.assertEquals(self.auditor.errors, pre_errors + 1)
|
||||
pre_errors = self.auditor.errors
|
||||
self.auditor.object_audit('badpath', 'sda', '0')
|
||||
self.assertEquals(self.auditor.errors, pre_errors) # just returns
|
||||
|
||||
def test_object_run_once_pass(self):
|
||||
self.auditor = auditor.AuditorWorker(self.conf, self.logger)
|
||||
|
|
Loading…
Reference in New Issue