442cc1d16d
The assumption that we don't need to write an entry in the invalidations file when the hashes.pkl does not exist turned out to be a premature optimization and also wrong. Primarily we should recognize the creation of hashes.pkl is the first thing that happens in a part when it lands on a new primary. The code should be optimized toward the assumption of the most common disk state. Also, in this case the extra stat calls to check if the hashes.pkl exists were not only un-optimized - but introducing a race. Consider the common case: proc 1 | proc 2 -------------------------------|--------------------------- a) read then truncate journal | b) do work | c) append to journal d) apply "a" to index | The index written at "d" may not (yet) reflect the entry writen by proc 2 at "c"; however, it's clearly in the journal so it's easy to see we're safe. Adding in the extra stat call for the index existence check increases the state which can effect correctness. proc 1 | proc 2 ------------------------------|--------------------------- a) no index, truncate journal | b) do work | b) iff index exists | c) append to journal d) apply (or create) index | If step "c" doesn't happen because the index does not yet exist - the update is clearly lost. In our case we'd skip marking a suffix as invalid when the hashes.pkl does not exist because we know "the next time we rehash" we'll have to os.listdir suffixes anyway. But if another process is *currently* rehashing (and has already done it's os.listdir) instead we've just dropped an invalidation on the floor. Don't do that. Instead - write down the invalidation. The running rehash is welcome to proceed on outdated information - as long as the next pass will grab and hash the new suffix. Known-Issue(s): If the suffix already exists there's an even chance the running rehash will hash in the very update for which we want to invalidate the suffix, but that's ok it's idempotent. Co-Author: Pavel Kvasnička <pavel.kvasnicka@firma.seznam.cz> Co-Author: Alistair Coles <alistair.coles@hpe.com> Co-Author: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp> Related-Change-Id: I64cadb1a3feb4d819d545137eecfc295389794f0 Change-Id: I2b48238d9d684e831d9777a7b18f91a3cef57cd1 Closes-Bug: #1651530
1328 lines
56 KiB
Python
1328 lines
56 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from test import unit
|
|
import unittest
|
|
import mock
|
|
import os
|
|
import time
|
|
import string
|
|
from shutil import rmtree
|
|
from hashlib import md5
|
|
from tempfile import mkdtemp
|
|
import textwrap
|
|
from os.path import dirname, basename
|
|
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
|
|
DEFAULT_TEST_EC_TYPE)
|
|
from swift.obj import auditor, replicator
|
|
from swift.obj.diskfile import (
|
|
DiskFile, write_metadata, invalidate_hash, get_data_dir,
|
|
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
|
|
get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE)
|
|
from swift.common.utils import (
|
|
mkdirs, normalize_timestamp, Timestamp, readconf)
|
|
from swift.common.storage_policy import (
|
|
ECStoragePolicy, StoragePolicy, POLICIES, EC_POLICY)
|
|
from test.unit.obj.common import write_diskfile
|
|
|
|
_mocked_policies = [
|
|
StoragePolicy(0, 'zero', False),
|
|
StoragePolicy(1, 'one', True),
|
|
ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE,
|
|
ec_ndata=2, ec_nparity=1, ec_segment_size=4096),
|
|
]
|
|
|
|
|
|
def works_only_once(callable_thing, exception):
|
|
called = [False]
|
|
|
|
def only_once(*a, **kw):
|
|
if called[0]:
|
|
raise exception
|
|
else:
|
|
called[0] = True
|
|
return callable_thing(*a, **kw)
|
|
|
|
return only_once
|
|
|
|
|
|
@patch_policies(_mocked_policies)
|
|
class TestAuditor(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
|
|
self.devices = os.path.join(self.testdir, 'node')
|
|
self.rcache = os.path.join(self.testdir, 'object.recon')
|
|
self.logger = FakeLogger()
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
mkdirs(os.path.join(self.devices, 'sda'))
|
|
os.mkdir(os.path.join(self.devices, 'sdb'))
|
|
|
|
# policy 0
|
|
self.objects = os.path.join(self.devices, 'sda',
|
|
get_data_dir(POLICIES[0]))
|
|
self.objects_2 = os.path.join(self.devices, 'sdb',
|
|
get_data_dir(POLICIES[0]))
|
|
os.mkdir(self.objects)
|
|
# policy 1
|
|
self.objects_p1 = os.path.join(self.devices, 'sda',
|
|
get_data_dir(POLICIES[1]))
|
|
self.objects_2_p1 = os.path.join(self.devices, 'sdb',
|
|
get_data_dir(POLICIES[1]))
|
|
os.mkdir(self.objects_p1)
|
|
# policy 2
|
|
self.objects_p2 = os.path.join(self.devices, 'sda',
|
|
get_data_dir(POLICIES[2]))
|
|
self.objects_2_p2 = os.path.join(self.devices, 'sdb',
|
|
get_data_dir(POLICIES[2]))
|
|
os.mkdir(self.objects_p2)
|
|
|
|
self.parts = {}
|
|
self.parts_p1 = {}
|
|
self.parts_p2 = {}
|
|
for part in ['0', '1', '2', '3']:
|
|
self.parts[part] = os.path.join(self.objects, part)
|
|
self.parts_p1[part] = os.path.join(self.objects_p1, part)
|
|
self.parts_p2[part] = os.path.join(self.objects_p2, part)
|
|
os.mkdir(os.path.join(self.objects, part))
|
|
os.mkdir(os.path.join(self.objects_p1, part))
|
|
os.mkdir(os.path.join(self.objects_p2, part))
|
|
|
|
self.conf = dict(
|
|
devices=self.devices,
|
|
mount_check='false',
|
|
object_size_stats='10,100,1024,10240')
|
|
self.df_mgr = DiskFileManager(self.conf, self.logger)
|
|
self.ec_df_mgr = ECDiskFileManager(self.conf, self.logger)
|
|
|
|
# diskfiles for policy 0, 1, 2
|
|
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
|
|
policy=POLICIES[0])
|
|
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
|
|
'o', policy=POLICIES[1])
|
|
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
|
|
'sda', '0', 'a', 'c', 'o', policy=POLICIES[2], frag_index=1)
|
|
|
|
def tearDown(self):
|
|
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
|
unit.xattr_data = {}
|
|
|
|
def test_worker_conf_parms(self):
|
|
def check_common_defaults():
|
|
self.assertEqual(auditor_worker.max_bytes_per_second, 10000000)
|
|
self.assertEqual(auditor_worker.log_time, 3600)
|
|
|
|
# test default values
|
|
conf = dict(
|
|
devices=self.devices,
|
|
mount_check='false',
|
|
object_size_stats='10,100,1024,10240')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
check_common_defaults()
|
|
for policy in POLICIES:
|
|
mgr = auditor_worker.diskfile_router[policy]
|
|
self.assertEqual(mgr.disk_chunk_size, 65536)
|
|
self.assertEqual(auditor_worker.max_files_per_second, 20)
|
|
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 0)
|
|
|
|
# test specified audit value overrides
|
|
conf.update({'disk_chunk_size': 4096})
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices,
|
|
zero_byte_only_at_fps=50)
|
|
check_common_defaults()
|
|
for policy in POLICIES:
|
|
mgr = auditor_worker.diskfile_router[policy]
|
|
self.assertEqual(mgr.disk_chunk_size, 4096)
|
|
self.assertEqual(auditor_worker.max_files_per_second, 50)
|
|
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 50)
|
|
|
|
def test_object_audit_extra_data(self):
|
|
def run_tests(disk_file):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
data = b'0' * 1024
|
|
if disk_file.policy.policy_type == EC_POLICY:
|
|
data = disk_file.policy.pyeclib_driver.encode(data)[0]
|
|
etag = md5()
|
|
with disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(disk_file._datadir, 'sda', '0',
|
|
policy=disk_file.policy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
|
|
os.write(writer._fd, b'extra_data')
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(disk_file._datadir, 'sda', '0',
|
|
policy=disk_file.policy))
|
|
self.assertEqual(auditor_worker.quarantines,
|
|
pre_quarantines + 1)
|
|
run_tests(self.disk_file)
|
|
run_tests(self.disk_file_p1)
|
|
run_tests(self.disk_file_ec)
|
|
|
|
def test_object_audit_diff_data(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
data = b'0' * 1024
|
|
etag = md5()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
|
|
# remake so it will have metadata
|
|
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
|
|
policy=POLICIES.legacy)
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
etag = md5(b'1' + b'0' * 1023).hexdigest()
|
|
metadata['ETag'] = etag
|
|
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
|
|
auditor_worker.object_audit(
|
|
AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_audit_checks_EC_fragments(self):
|
|
disk_file = self.disk_file_ec
|
|
|
|
def do_test(data):
|
|
# create diskfile and set ETag and content-length to match the data
|
|
etag = md5(data).hexdigest()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
with disk_file.create() as writer:
|
|
writer.write(data)
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': len(data),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
|
|
auditor_worker = auditor.AuditorWorker(self.conf, FakeLogger(),
|
|
self.rcache, self.devices)
|
|
self.assertEqual(0, auditor_worker.quarantines) # sanity check
|
|
auditor_worker.object_audit(
|
|
AuditLocation(disk_file._datadir, 'sda', '0',
|
|
policy=disk_file.policy))
|
|
return auditor_worker
|
|
|
|
# two good frags in an EC archive
|
|
frag_0 = disk_file.policy.pyeclib_driver.encode(
|
|
'x' * disk_file.policy.ec_segment_size)[0]
|
|
frag_1 = disk_file.policy.pyeclib_driver.encode(
|
|
'y' * disk_file.policy.ec_segment_size)[0]
|
|
data = frag_0 + frag_1
|
|
auditor_worker = do_test(data)
|
|
self.assertEqual(0, auditor_worker.quarantines)
|
|
self.assertFalse(auditor_worker.logger.get_lines_for_level('error'))
|
|
|
|
# corrupt second frag headers
|
|
corrupt_frag_1 = 'blah' * 16 + frag_1[64:]
|
|
data = frag_0 + corrupt_frag_1
|
|
auditor_worker = do_test(data)
|
|
self.assertEqual(1, auditor_worker.quarantines)
|
|
log_lines = auditor_worker.logger.get_lines_for_level('error')
|
|
self.assertIn('failed audit and was quarantined: '
|
|
'Invalid EC metadata at offset 0x%x' %
|
|
len(frag_0),
|
|
log_lines[0])
|
|
|
|
# dangling extra corrupt frag data
|
|
data = frag_0 + frag_1 + 'wtf' * 100
|
|
auditor_worker = do_test(data)
|
|
self.assertEqual(1, auditor_worker.quarantines)
|
|
log_lines = auditor_worker.logger.get_lines_for_level('error')
|
|
self.assertIn('failed audit and was quarantined: '
|
|
'Invalid EC metadata at offset 0x%x' %
|
|
len(frag_0 + frag_1),
|
|
log_lines[0])
|
|
|
|
# simulate bug https://bugs.launchpad.net/bugs/1631144 by writing start
|
|
# of an ssync subrequest into the diskfile
|
|
data = (
|
|
b'PUT /a/c/o\r\n' +
|
|
b'Content-Length: 999\r\n' +
|
|
b'Content-Type: image/jpeg\r\n' +
|
|
b'X-Object-Sysmeta-Ec-Content-Length: 1024\r\n' +
|
|
b'X-Object-Sysmeta-Ec-Etag: 1234bff7eb767cc6d19627c6b6f9edef\r\n' +
|
|
b'X-Object-Sysmeta-Ec-Frag-Index: 1\r\n' +
|
|
b'X-Object-Sysmeta-Ec-Scheme: ' + DEFAULT_TEST_EC_TYPE + '\r\n' +
|
|
b'X-Object-Sysmeta-Ec-Segment-Size: 1048576\r\n' +
|
|
b'X-Timestamp: 1471512345.17333\r\n\r\n'
|
|
)
|
|
data += frag_0[:disk_file.policy.fragment_size - len(data)]
|
|
auditor_worker = do_test(data)
|
|
self.assertEqual(1, auditor_worker.quarantines)
|
|
log_lines = auditor_worker.logger.get_lines_for_level('error')
|
|
self.assertIn('failed audit and was quarantined: '
|
|
'Invalid EC metadata at offset 0x0',
|
|
log_lines[0])
|
|
|
|
def test_object_audit_no_meta(self):
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
|
|
mkdirs(self.disk_file._datadir)
|
|
fp = open(path, 'wb')
|
|
fp.write(b'0' * 1024)
|
|
fp.close()
|
|
invalidate_hash(os.path.dirname(self.disk_file._datadir))
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
pre_quarantines = auditor_worker.quarantines
|
|
auditor_worker.object_audit(
|
|
AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_audit_will_not_swallow_errors_in_tests(self):
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
|
|
mkdirs(self.disk_file._datadir)
|
|
with open(path, 'w') as f:
|
|
write_metadata(f, {'name': '/a/c/o'})
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
|
|
def blowup(*args):
|
|
raise NameError('tpyo')
|
|
with mock.patch.object(DiskFileManager,
|
|
'get_diskfile_from_audit_location', blowup):
|
|
self.assertRaises(NameError, auditor_worker.object_audit,
|
|
AuditLocation(os.path.dirname(path), 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
|
|
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
|
|
mkdirs(self.disk_file._datadir)
|
|
with open(path, 'w') as f:
|
|
write_metadata(f, {'name': '/a/c/o'})
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
|
|
def blowup(*args):
|
|
raise NameError('tpyo')
|
|
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
|
|
blowup):
|
|
auditor_worker.failsafe_object_audit(
|
|
AuditLocation(os.path.dirname(path), 'sda', '0',
|
|
policy=POLICIES.legacy))
|
|
self.assertEqual(auditor_worker.errors, 1)
|
|
|
|
def test_audit_location_gets_quarantined(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
|
|
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=self.disk_file.policy)
|
|
|
|
# instead of a datadir, we'll make a file!
|
|
mkdirs(os.path.dirname(self.disk_file._datadir))
|
|
open(self.disk_file._datadir, 'w')
|
|
|
|
# after we turn the crank ...
|
|
auditor_worker.object_audit(location)
|
|
|
|
# ... it should get quarantined
|
|
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
|
self.assertEqual(1, auditor_worker.quarantines)
|
|
|
|
def test_rsync_tempfile_timeout_auto_option(self):
|
|
# if we don't have access to the replicator config section we'll use
|
|
# our default
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
|
|
# if the rsync_tempfile_timeout option is set explicitly we use that
|
|
self.conf['rsync_tempfile_timeout'] = '1800'
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800)
|
|
# if we have a real config we can be a little smarter
|
|
config_path = os.path.join(self.testdir, 'objserver.conf')
|
|
stub_config = """
|
|
[object-auditor]
|
|
rsync_tempfile_timeout = auto
|
|
"""
|
|
with open(config_path, 'w') as f:
|
|
f.write(textwrap.dedent(stub_config))
|
|
# the Daemon loader will hand the object-auditor config to the
|
|
# auditor who will build the workers from it
|
|
conf = readconf(config_path, 'object-auditor')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
# if there is no object-replicator section we still have to fall back
|
|
# to default because we can't parse the config for that section!
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
|
|
stub_config = """
|
|
[object-replicator]
|
|
[object-auditor]
|
|
rsync_tempfile_timeout = auto
|
|
"""
|
|
with open(config_path, 'w') as f:
|
|
f.write(textwrap.dedent(stub_config))
|
|
conf = readconf(config_path, 'object-auditor')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
# if the object-replicator section will parse but does not override
|
|
# the default rsync_timeout we assume the default rsync_timeout value
|
|
# and add 15mins
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
|
|
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
|
|
stub_config = """
|
|
[DEFAULT]
|
|
reclaim_age = 1209600
|
|
[object-replicator]
|
|
rsync_timeout = 3600
|
|
[object-auditor]
|
|
rsync_tempfile_timeout = auto
|
|
"""
|
|
with open(config_path, 'w') as f:
|
|
f.write(textwrap.dedent(stub_config))
|
|
conf = readconf(config_path, 'object-auditor')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
# if there is an object-replicator section with a rsync_timeout
|
|
# configured we'll use that value (3600) + 900
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900)
|
|
|
|
def test_inprogress_rsync_tempfiles_get_cleaned_up(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
|
|
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
|
|
policy=self.disk_file.policy)
|
|
|
|
data = b'VERIFY'
|
|
etag = md5()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
metadata = {
|
|
'ETag': etag.hexdigest(),
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
|
|
datafilename = None
|
|
datadir_files = os.listdir(self.disk_file._datadir)
|
|
for filename in datadir_files:
|
|
if filename.endswith('.data'):
|
|
datafilename = filename
|
|
break
|
|
else:
|
|
self.fail('Did not find .data file in %r: %r' %
|
|
(self.disk_file._datadir, datadir_files))
|
|
rsynctempfile_path = os.path.join(self.disk_file._datadir,
|
|
'.%s.9ILVBL' % datafilename)
|
|
open(rsynctempfile_path, 'w')
|
|
# sanity check we have an extra file
|
|
rsync_files = os.listdir(self.disk_file._datadir)
|
|
self.assertEqual(len(datadir_files) + 1, len(rsync_files))
|
|
|
|
# and after we turn the crank ...
|
|
auditor_worker.object_audit(location)
|
|
|
|
# ... we've still got the rsync file
|
|
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
|
|
|
|
# and we'll keep it - depending on the rsync_tempfile_timeout
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
|
|
self.conf['rsync_tempfile_timeout'] = '3600'
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600)
|
|
now = time.time() + 1900
|
|
with mock.patch('swift.obj.auditor.time.time',
|
|
return_value=now):
|
|
auditor_worker.object_audit(location)
|
|
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
|
|
|
|
# but *tomorrow* when we run
|
|
tomorrow = time.time() + 86400
|
|
with mock.patch('swift.obj.auditor.time.time',
|
|
return_value=tomorrow):
|
|
auditor_worker.object_audit(location)
|
|
|
|
# ... we'll totally clean that stuff up!
|
|
self.assertEqual(datadir_files, os.listdir(self.disk_file._datadir))
|
|
|
|
# but if we have some random crazy file in there
|
|
random_crazy_file_path = os.path.join(self.disk_file._datadir,
|
|
'.random.crazy.file')
|
|
open(random_crazy_file_path, 'w')
|
|
|
|
tomorrow = time.time() + 86400
|
|
with mock.patch('swift.obj.auditor.time.time',
|
|
return_value=tomorrow):
|
|
auditor_worker.object_audit(location)
|
|
|
|
# that's someone elses problem
|
|
self.assertIn(os.path.basename(random_crazy_file_path),
|
|
os.listdir(self.disk_file._datadir))
|
|
|
|
def test_generic_exception_handling(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
# pretend that we logged (and reset counters) just now
|
|
auditor_worker.last_logged = time.time()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_errors = auditor_worker.errors
|
|
data = b'0' * 1024
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
|
|
lambda *_: 1 / 0):
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.errors, pre_errors + 1)
|
|
|
|
def test_object_run_once_pass(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
auditor_worker.log_time = 0
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
data = b'0' * 1024
|
|
|
|
def write_file(df):
|
|
with df.create() as writer:
|
|
writer.write(data)
|
|
metadata = {
|
|
'ETag': md5(data).hexdigest(),
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
|
|
# policy 0
|
|
write_file(self.disk_file)
|
|
# policy 1
|
|
write_file(self.disk_file_p1)
|
|
# policy 2
|
|
write_file(self.disk_file_ec)
|
|
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
# 1 object per policy falls into 1024 bucket
|
|
self.assertEqual(auditor_worker.stats_buckets[1024], 3)
|
|
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
|
|
|
|
# pick up some additional code coverage, large file
|
|
data = b'0' * 1024 * 1024
|
|
for df in (self.disk_file, self.disk_file_ec):
|
|
with df.create() as writer:
|
|
writer.write(data)
|
|
metadata = {
|
|
'ETag': md5(data).hexdigest(),
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
# still have the 1024 byte object left in policy-1 (plus the
|
|
# stats from the original 3)
|
|
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
|
|
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
|
|
# and then policy-0 disk_file was re-written as a larger object
|
|
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
|
|
|
|
# pick up even more additional code coverage, misc paths
|
|
auditor_worker.log_time = -1
|
|
auditor_worker.stats_sizes = []
|
|
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
|
|
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
|
|
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
|
|
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
|
|
|
|
def test_object_run_logging(self):
|
|
logger = FakeLogger()
|
|
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
|
self.rcache, self.devices)
|
|
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
|
log_lines = logger.get_lines_for_level('info')
|
|
self.assertGreater(len(log_lines), 0)
|
|
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
|
|
|
|
logger = FakeLogger()
|
|
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
|
self.rcache, self.devices,
|
|
zero_byte_only_at_fps=50)
|
|
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
|
log_lines = logger.get_lines_for_level('info')
|
|
self.assertGreater(len(log_lines), 0)
|
|
self.assertTrue(log_lines[0].index('ZBF - sda'))
|
|
|
|
def test_object_run_once_no_sda(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
# pretend that we logged (and reset counters) just now
|
|
auditor_worker.last_logged = time.time()
|
|
data = b'0' * 1024
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
os.write(writer._fd, b'extra_data')
|
|
writer.commit(Timestamp(timestamp))
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_run_once_multi_devices(self):
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
# pretend that we logged (and reset counters) just now
|
|
auditor_worker.last_logged = time.time()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
pre_quarantines = auditor_worker.quarantines
|
|
data = b'0' * 10
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
auditor_worker.audit_all_objects()
|
|
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'ob',
|
|
policy=POLICIES.legacy)
|
|
data = b'1' * 10
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
os.write(writer._fd, b'extra_data')
|
|
auditor_worker.audit_all_objects()
|
|
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
|
|
|
|
def test_object_run_fast_track_non_zero(self):
|
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
|
self.auditor.log_time = 0
|
|
data = b'0' * 1024
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
writer.write(data)
|
|
etag.update(data)
|
|
etag = etag.hexdigest()
|
|
timestamp = str(normalize_timestamp(time.time()))
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp,
|
|
'Content-Length': str(os.fstat(writer._fd).st_size),
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
etag = md5()
|
|
etag.update(b'1' + b'0' * 1023)
|
|
etag = etag.hexdigest()
|
|
metadata['ETag'] = etag
|
|
write_metadata(writer._fd, metadata)
|
|
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
kwargs = {'mode': 'once'}
|
|
kwargs['zero_byte_fps'] = 50
|
|
self.auditor.run_audit(**kwargs)
|
|
self.assertFalse(os.path.isdir(quarantine_path))
|
|
del(kwargs['zero_byte_fps'])
|
|
clear_auditor_status(self.devices)
|
|
self.auditor.run_audit(**kwargs)
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
|
|
def setup_bad_zero_byte(self, timestamp=None):
|
|
if timestamp is None:
|
|
timestamp = Timestamp(time.time())
|
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
|
self.auditor.log_time = 0
|
|
etag = md5()
|
|
with self.disk_file.create() as writer:
|
|
etag = etag.hexdigest()
|
|
metadata = {
|
|
'ETag': etag,
|
|
'X-Timestamp': timestamp.internal,
|
|
'Content-Length': 10,
|
|
}
|
|
writer.put(metadata)
|
|
writer.commit(Timestamp(timestamp))
|
|
etag = md5()
|
|
etag = etag.hexdigest()
|
|
metadata['ETag'] = etag
|
|
write_metadata(writer._fd, metadata)
|
|
|
|
def test_object_run_fast_track_all(self):
|
|
self.setup_bad_zero_byte()
|
|
kwargs = {'mode': 'once'}
|
|
self.auditor.run_audit(**kwargs)
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
|
|
def test_object_run_fast_track_zero(self):
|
|
self.setup_bad_zero_byte()
|
|
kwargs = {'mode': 'once'}
|
|
kwargs['zero_byte_fps'] = 50
|
|
|
|
called_args = [0]
|
|
|
|
def mock_get_auditor_status(path, logger, audit_type):
|
|
called_args[0] = audit_type
|
|
return get_auditor_status(path, logger, audit_type)
|
|
|
|
with mock.patch('swift.obj.diskfile.get_auditor_status',
|
|
mock_get_auditor_status):
|
|
self.auditor.run_audit(**kwargs)
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
self.assertEqual('ZBF', called_args[0])
|
|
|
|
def test_object_run_fast_track_zero_check_closed(self):
|
|
rat = [False]
|
|
|
|
class FakeFile(DiskFile):
|
|
|
|
def _quarantine(self, data_file, msg):
|
|
rat[0] = True
|
|
DiskFile._quarantine(self, data_file, msg)
|
|
|
|
self.setup_bad_zero_byte()
|
|
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
|
|
FakeFile):
|
|
kwargs = {'mode': 'once'}
|
|
kwargs['zero_byte_fps'] = 50
|
|
self.auditor.run_audit(**kwargs)
|
|
quarantine_path = os.path.join(self.devices,
|
|
'sda', 'quarantined', 'objects')
|
|
self.assertTrue(os.path.isdir(quarantine_path))
|
|
self.assertTrue(rat[0])
|
|
|
|
@mock.patch.object(auditor.ObjectAuditor, 'run_audit')
|
|
@mock.patch('os.fork', return_value=0)
|
|
def test_with_inaccessible_object_location(self, mock_os_fork,
|
|
mock_run_audit):
|
|
# Need to ensure that any failures in run_audit do
|
|
# not prevent sys.exit() from running. Otherwise we get
|
|
# zombie processes.
|
|
e = OSError('permission denied')
|
|
mock_run_audit.side_effect = e
|
|
self.auditor = auditor.ObjectAuditor(self.conf)
|
|
self.assertRaises(SystemExit, self.auditor.fork_child, self)
|
|
|
|
def test_with_only_tombstone(self):
|
|
# sanity check that auditor doesn't touch solitary tombstones
|
|
ts_iter = make_timestamp_iter()
|
|
self.setup_bad_zero_byte(timestamp=next(ts_iter))
|
|
self.disk_file.delete(next(ts_iter))
|
|
files = os.listdir(self.disk_file._datadir)
|
|
self.assertEqual(1, len(files))
|
|
self.assertTrue(files[0].endswith('ts'))
|
|
kwargs = {'mode': 'once'}
|
|
self.auditor.run_audit(**kwargs)
|
|
files_after = os.listdir(self.disk_file._datadir)
|
|
self.assertEqual(files, files_after)
|
|
|
|
def test_with_tombstone_and_data(self):
|
|
# rsync replication could leave a tombstone and data file in object
|
|
# dir - verify they are both removed during audit
|
|
ts_iter = make_timestamp_iter()
|
|
ts_tomb = next(ts_iter)
|
|
ts_data = next(ts_iter)
|
|
self.setup_bad_zero_byte(timestamp=ts_data)
|
|
tomb_file_path = os.path.join(self.disk_file._datadir,
|
|
'%s.ts' % ts_tomb.internal)
|
|
with open(tomb_file_path, 'wb') as fd:
|
|
write_metadata(fd, {'X-Timestamp': ts_tomb.internal})
|
|
files = os.listdir(self.disk_file._datadir)
|
|
self.assertEqual(2, len(files))
|
|
self.assertTrue(os.path.basename(tomb_file_path) in files, files)
|
|
kwargs = {'mode': 'once'}
|
|
self.auditor.run_audit(**kwargs)
|
|
self.assertFalse(os.path.exists(self.disk_file._datadir))
|
|
|
|
def _audit_tombstone(self, conf, ts_tomb, zero_byte_fps=0):
|
|
self.auditor = auditor.ObjectAuditor(conf)
|
|
self.auditor.log_time = 0
|
|
# create tombstone and hashes.pkl file, ensuring the tombstone is not
|
|
# reclaimed by mocking time to be the tombstone time
|
|
with mock.patch('time.time', return_value=float(ts_tomb)):
|
|
# this delete will create a invalid hashes entry
|
|
self.disk_file.delete(ts_tomb)
|
|
# this get_hashes call will truncate the invalid hashes entry
|
|
self.disk_file.manager.get_hashes(
|
|
self.devices + '/sda', '0', [], self.disk_file.policy)
|
|
suffix = basename(dirname(self.disk_file._datadir))
|
|
part_dir = dirname(dirname(self.disk_file._datadir))
|
|
# sanity checks...
|
|
self.assertEqual(['%s.ts' % ts_tomb.internal],
|
|
os.listdir(self.disk_file._datadir))
|
|
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
|
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
|
self.assertTrue(os.path.exists(hash_invalid))
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual('', fp.read().strip('\n'))
|
|
# Run auditor
|
|
self.auditor.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
|
|
# sanity check - auditor should not remove tombstone file
|
|
self.assertEqual(['%s.ts' % ts_tomb.internal],
|
|
os.listdir(self.disk_file._datadir))
|
|
return part_dir, suffix
|
|
|
|
def test_non_reclaimable_tombstone(self):
|
|
# audit with a recent tombstone
|
|
ts_tomb = Timestamp(time.time() - 55)
|
|
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
|
|
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
|
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
|
self.assertTrue(os.path.exists(hash_invalid))
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual('', fp.read().strip('\n'))
|
|
|
|
def test_reclaimable_tombstone(self):
|
|
# audit with a reclaimable tombstone
|
|
ts_tomb = Timestamp(time.time() - 604800)
|
|
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
|
|
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
|
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
|
self.assertTrue(os.path.exists(hash_invalid))
|
|
with open(hash_invalid, 'rb') as fp:
|
|
hash_val = fp.read()
|
|
self.assertEqual(suffix, hash_val.strip('\n'))
|
|
|
|
def test_non_reclaimable_tombstone_with_custom_reclaim_age(self):
|
|
# audit with a tombstone newer than custom reclaim age
|
|
ts_tomb = Timestamp(time.time() - 604800)
|
|
conf = dict(self.conf)
|
|
conf['reclaim_age'] = 2 * 604800
|
|
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
|
|
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
|
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
|
self.assertTrue(os.path.exists(hash_invalid))
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual('', fp.read().strip('\n'))
|
|
|
|
def test_reclaimable_tombstone_with_custom_reclaim_age(self):
|
|
# audit with a tombstone older than custom reclaim age
|
|
ts_tomb = Timestamp(time.time() - 55)
|
|
conf = dict(self.conf)
|
|
conf['reclaim_age'] = 10
|
|
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
|
|
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
|
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
|
self.assertTrue(os.path.exists(hash_invalid))
|
|
with open(hash_invalid, 'rb') as fp:
|
|
hash_val = fp.read()
|
|
self.assertEqual(suffix, hash_val.strip('\n'))
|
|
|
|
def test_reclaimable_tombstone_with_zero_byte_fps(self):
|
|
# audit with a tombstone older than reclaim age by a zero_byte_fps
|
|
# worker does not invalidate the hash
|
|
ts_tomb = Timestamp(time.time() - 604800)
|
|
part_dir, suffix = self._audit_tombstone(
|
|
self.conf, ts_tomb, zero_byte_fps=50)
|
|
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
|
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
|
self.assertTrue(os.path.exists(hash_invalid))
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual('', fp.read().strip('\n'))
|
|
|
|
def _test_expired_object_is_ignored(self, zero_byte_fps):
|
|
# verify that an expired object does not get mistaken for a tombstone
|
|
audit = auditor.ObjectAuditor(self.conf)
|
|
audit.logger = FakeLogger()
|
|
audit.log_time = 0
|
|
now = time.time()
|
|
write_diskfile(self.disk_file, Timestamp(now - 20),
|
|
extra_metadata={'X-Delete-At': now - 10})
|
|
files = os.listdir(self.disk_file._datadir)
|
|
self.assertTrue([f for f in files if f.endswith('.data')]) # sanity
|
|
# diskfile write appends to invalid hashes file
|
|
part_dir = dirname(dirname(self.disk_file._datadir))
|
|
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual(basename(dirname(self.disk_file._datadir)),
|
|
fp.read().strip('\n')) # sanity check
|
|
|
|
# run the auditor...
|
|
with mock.patch.object(auditor, 'dump_recon_cache'):
|
|
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
|
|
|
|
# the auditor doesn't touch anything on the invalidation file
|
|
# (i.e. not truncate and add no entry)
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual(basename(dirname(self.disk_file._datadir)),
|
|
fp.read().strip('\n')) # sanity check
|
|
|
|
# this get_hashes call will truncate the invalid hashes entry
|
|
self.disk_file.manager.get_hashes(
|
|
self.devices + '/sda', '0', [], self.disk_file.policy)
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual('', fp.read().strip('\n')) # sanity check
|
|
|
|
# run the auditor, again...
|
|
with mock.patch.object(auditor, 'dump_recon_cache'):
|
|
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
|
|
|
|
# verify nothing changed
|
|
self.assertTrue(os.path.exists(self.disk_file._datadir))
|
|
self.assertEqual(files, os.listdir(self.disk_file._datadir))
|
|
self.assertFalse(audit.logger.get_lines_for_level('error'))
|
|
self.assertFalse(audit.logger.get_lines_for_level('warning'))
|
|
# and there was no hash invalidation
|
|
with open(hash_invalid, 'rb') as fp:
|
|
self.assertEqual('', fp.read().strip('\n'))
|
|
|
|
def test_expired_object_is_ignored(self):
|
|
self._test_expired_object_is_ignored(0)
|
|
|
|
def test_expired_object_is_ignored_with_zero_byte_fps(self):
|
|
self._test_expired_object_is_ignored(50)
|
|
|
|
def test_auditor_reclaim_age(self):
|
|
# if we don't have access to the replicator config section we'll use
|
|
# diskfile's default
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
router = auditor_worker.diskfile_router
|
|
for policy in POLICIES:
|
|
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
|
|
|
|
# if the reclaim_age option is set explicitly we use that
|
|
self.conf['reclaim_age'] = '1800'
|
|
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
|
self.rcache, self.devices)
|
|
router = auditor_worker.diskfile_router
|
|
for policy in POLICIES:
|
|
self.assertEqual(router[policy].reclaim_age, 1800)
|
|
|
|
# if we have a real config we can be a little smarter
|
|
config_path = os.path.join(self.testdir, 'objserver.conf')
|
|
|
|
# if there is no object-replicator section we still have to fall back
|
|
# to default because we can't parse the config for that section!
|
|
stub_config = """
|
|
[object-auditor]
|
|
"""
|
|
with open(config_path, 'w') as f:
|
|
f.write(textwrap.dedent(stub_config))
|
|
conf = readconf(config_path, 'object-auditor')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
router = auditor_worker.diskfile_router
|
|
for policy in POLICIES:
|
|
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
|
|
|
|
# verify reclaim_age is of auditor config value
|
|
stub_config = """
|
|
[object-replicator]
|
|
[object-auditor]
|
|
reclaim_age = 60
|
|
"""
|
|
with open(config_path, 'w') as f:
|
|
f.write(textwrap.dedent(stub_config))
|
|
conf = readconf(config_path, 'object-auditor')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
router = auditor_worker.diskfile_router
|
|
for policy in POLICIES:
|
|
self.assertEqual(router[policy].reclaim_age, 60)
|
|
|
|
# verify reclaim_age falls back to replicator config value
|
|
# if there is no auditor config value
|
|
config_path = os.path.join(self.testdir, 'objserver.conf')
|
|
stub_config = """
|
|
[object-replicator]
|
|
reclaim_age = 60
|
|
[object-auditor]
|
|
"""
|
|
with open(config_path, 'w') as f:
|
|
f.write(textwrap.dedent(stub_config))
|
|
conf = readconf(config_path, 'object-auditor')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
router = auditor_worker.diskfile_router
|
|
for policy in POLICIES:
|
|
self.assertEqual(router[policy].reclaim_age, 60)
|
|
|
|
# we'll prefer our own DEFAULT section to the replicator though
|
|
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
|
|
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
|
|
stub_config = """
|
|
[DEFAULT]
|
|
reclaim_age = 1209600
|
|
[object-replicator]
|
|
reclaim_age = 1800
|
|
[object-auditor]
|
|
"""
|
|
with open(config_path, 'w') as f:
|
|
f.write(textwrap.dedent(stub_config))
|
|
conf = readconf(config_path, 'object-auditor')
|
|
auditor_worker = auditor.AuditorWorker(conf, self.logger,
|
|
self.rcache, self.devices)
|
|
router = auditor_worker.diskfile_router
|
|
for policy in POLICIES:
|
|
self.assertEqual(router[policy].reclaim_age, 1209600)
|
|
|
|
def test_sleeper(self):
|
|
with mock.patch(
|
|
'time.sleep', mock.MagicMock()) as mock_sleep:
|
|
my_auditor = auditor.ObjectAuditor(self.conf)
|
|
my_auditor._sleep()
|
|
mock_sleep.assert_called_with(30)
|
|
|
|
my_conf = dict(interval=2)
|
|
my_conf.update(self.conf)
|
|
my_auditor = auditor.ObjectAuditor(my_conf)
|
|
my_auditor._sleep()
|
|
mock_sleep.assert_called_with(2)
|
|
|
|
my_auditor = auditor.ObjectAuditor(self.conf)
|
|
my_auditor.interval = 2
|
|
my_auditor._sleep()
|
|
mock_sleep.assert_called_with(2)
|
|
|
|
def test_run_parallel_audit(self):
|
|
|
|
class StopForever(Exception):
|
|
pass
|
|
|
|
class Bogus(Exception):
|
|
pass
|
|
|
|
loop_error = Bogus('exception')
|
|
|
|
class LetMeOut(BaseException):
|
|
pass
|
|
|
|
class ObjectAuditorMock(object):
|
|
check_args = ()
|
|
check_kwargs = {}
|
|
check_device_dir = None
|
|
fork_called = 0
|
|
master = 0
|
|
wait_called = 0
|
|
|
|
def mock_run(self, *args, **kwargs):
|
|
self.check_args = args
|
|
self.check_kwargs = kwargs
|
|
if 'zero_byte_fps' in kwargs:
|
|
self.check_device_dir = kwargs.get('device_dirs')
|
|
|
|
def mock_sleep_stop(self):
|
|
raise StopForever('stop')
|
|
|
|
def mock_sleep_continue(self):
|
|
return
|
|
|
|
def mock_audit_loop_error(self, parent, zbo_fps,
|
|
override_devices=None, **kwargs):
|
|
raise loop_error
|
|
|
|
def mock_fork(self):
|
|
self.fork_called += 1
|
|
if self.master:
|
|
return self.fork_called
|
|
else:
|
|
return 0
|
|
|
|
def mock_wait(self):
|
|
self.wait_called += 1
|
|
return (self.wait_called, 0)
|
|
|
|
for i in string.ascii_letters[2:26]:
|
|
mkdirs(os.path.join(self.devices, 'sd%s' % i))
|
|
|
|
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
|
|
mount_check='false',
|
|
zero_byte_files_per_second=89,
|
|
concurrency=1))
|
|
|
|
mocker = ObjectAuditorMock()
|
|
my_auditor.logger.exception = mock.MagicMock()
|
|
real_audit_loop = my_auditor.audit_loop
|
|
my_auditor.audit_loop = mocker.mock_audit_loop_error
|
|
my_auditor.run_audit = mocker.mock_run
|
|
was_fork = os.fork
|
|
was_wait = os.wait
|
|
os.fork = mocker.mock_fork
|
|
os.wait = mocker.mock_wait
|
|
try:
|
|
my_auditor._sleep = mocker.mock_sleep_stop
|
|
my_auditor.run_once(zero_byte_fps=50)
|
|
my_auditor.logger.exception.assert_called_once_with(
|
|
'ERROR auditing: %s', loop_error)
|
|
my_auditor.logger.exception.reset_mock()
|
|
self.assertRaises(StopForever, my_auditor.run_forever)
|
|
my_auditor.logger.exception.assert_called_once_with(
|
|
'ERROR auditing: %s', loop_error)
|
|
my_auditor.audit_loop = real_audit_loop
|
|
|
|
self.assertRaises(StopForever,
|
|
my_auditor.run_forever, zero_byte_fps=50)
|
|
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 50)
|
|
self.assertEqual(mocker.fork_called, 0)
|
|
|
|
self.assertRaises(SystemExit, my_auditor.run_once)
|
|
self.assertEqual(mocker.fork_called, 1)
|
|
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
|
|
self.assertEqual(mocker.check_device_dir, [])
|
|
self.assertEqual(mocker.check_args, ())
|
|
|
|
device_list = ['sd%s' % i for i in string.ascii_letters[2:10]]
|
|
device_string = ','.join(device_list)
|
|
device_string_bogus = device_string + ',bogus'
|
|
|
|
mocker.fork_called = 0
|
|
self.assertRaises(SystemExit, my_auditor.run_once,
|
|
devices=device_string_bogus)
|
|
self.assertEqual(mocker.fork_called, 1)
|
|
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
|
|
self.assertEqual(sorted(mocker.check_device_dir), device_list)
|
|
|
|
mocker.master = 1
|
|
|
|
mocker.fork_called = 0
|
|
self.assertRaises(StopForever, my_auditor.run_forever)
|
|
# Fork is called 2 times since the zbf process is forked just
|
|
# once before self._sleep() is called and StopForever is raised
|
|
# Also wait is called just once before StopForever is raised
|
|
self.assertEqual(mocker.fork_called, 2)
|
|
self.assertEqual(mocker.wait_called, 1)
|
|
|
|
my_auditor._sleep = mocker.mock_sleep_continue
|
|
my_auditor.audit_loop = works_only_once(my_auditor.audit_loop,
|
|
LetMeOut())
|
|
|
|
my_auditor.concurrency = 2
|
|
mocker.fork_called = 0
|
|
mocker.wait_called = 0
|
|
self.assertRaises(LetMeOut, my_auditor.run_forever)
|
|
# Fork is called no. of devices + (no. of devices)/2 + 1 times
|
|
# since zbf process is forked (no.of devices)/2 + 1 times
|
|
no_devices = len(os.listdir(self.devices))
|
|
self.assertEqual(mocker.fork_called, no_devices + no_devices / 2
|
|
+ 1)
|
|
self.assertEqual(mocker.wait_called, no_devices + no_devices / 2
|
|
+ 1)
|
|
|
|
finally:
|
|
os.fork = was_fork
|
|
os.wait = was_wait
|
|
|
|
def test_run_audit_once(self):
|
|
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
|
|
mount_check='false',
|
|
zero_byte_files_per_second=89,
|
|
concurrency=1))
|
|
|
|
forked_pids = []
|
|
next_zbf_pid = [2]
|
|
next_normal_pid = [1001]
|
|
outstanding_pids = [[]]
|
|
|
|
def fake_fork_child(**kwargs):
|
|
if len(forked_pids) > 10:
|
|
# something's gone horribly wrong
|
|
raise BaseException("forking too much")
|
|
|
|
# ZBF pids are all smaller than the normal-audit pids; this way
|
|
# we can return them first.
|
|
#
|
|
# Also, ZBF pids are even and normal-audit pids are odd; this is
|
|
# so humans seeing this test fail can better tell what's happening.
|
|
if kwargs.get('zero_byte_fps'):
|
|
pid = next_zbf_pid[0]
|
|
next_zbf_pid[0] += 2
|
|
else:
|
|
pid = next_normal_pid[0]
|
|
next_normal_pid[0] += 2
|
|
outstanding_pids[0].append(pid)
|
|
forked_pids.append(pid)
|
|
return pid
|
|
|
|
def fake_os_wait():
|
|
# Smallest pid first; that's ZBF if we have one, else normal
|
|
outstanding_pids[0].sort()
|
|
pid = outstanding_pids[0].pop(0)
|
|
return (pid, 0) # (pid, status)
|
|
|
|
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
|
|
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
|
|
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
|
|
my_auditor.run_once()
|
|
|
|
self.assertEqual(sorted(forked_pids), [2, 1001])
|
|
|
|
def test_run_parallel_audit_once(self):
|
|
my_auditor = auditor.ObjectAuditor(
|
|
dict(devices=self.devices, mount_check='false',
|
|
zero_byte_files_per_second=89, concurrency=2))
|
|
|
|
# ZBF pids are smaller than the normal-audit pids; this way we can
|
|
# return them first from our mocked os.wait().
|
|
#
|
|
# Also, ZBF pids are even and normal-audit pids are odd; this is so
|
|
# humans seeing this test fail can better tell what's happening.
|
|
forked_pids = []
|
|
next_zbf_pid = [2]
|
|
next_normal_pid = [1001]
|
|
outstanding_pids = [[]]
|
|
|
|
def fake_fork_child(**kwargs):
|
|
if len(forked_pids) > 10:
|
|
# something's gone horribly wrong; try not to hang the test
|
|
# run because of it
|
|
raise BaseException("forking too much")
|
|
|
|
if kwargs.get('zero_byte_fps'):
|
|
pid = next_zbf_pid[0]
|
|
next_zbf_pid[0] += 2
|
|
else:
|
|
pid = next_normal_pid[0]
|
|
next_normal_pid[0] += 2
|
|
outstanding_pids[0].append(pid)
|
|
forked_pids.append(pid)
|
|
return pid
|
|
|
|
def fake_os_wait():
|
|
if not outstanding_pids[0]:
|
|
raise BaseException("nobody waiting")
|
|
|
|
# ZBF auditor finishes first
|
|
outstanding_pids[0].sort()
|
|
pid = outstanding_pids[0].pop(0)
|
|
return (pid, 0) # (pid, status)
|
|
|
|
# make sure we've got enough devs that the ZBF auditor can finish
|
|
# before all the normal auditors have been started
|
|
mkdirs(os.path.join(self.devices, 'sdc'))
|
|
mkdirs(os.path.join(self.devices, 'sdd'))
|
|
|
|
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
|
|
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
|
|
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
|
|
my_auditor.run_once()
|
|
|
|
self.assertEqual(sorted(forked_pids), [2, 1001, 1003, 1005, 1007])
|
|
|
|
def test_run_parallel_audit_once_failed_fork(self):
|
|
my_auditor = auditor.ObjectAuditor(
|
|
dict(devices=self.devices, mount_check='false',
|
|
concurrency=2))
|
|
|
|
start_pid = [1001]
|
|
outstanding_pids = []
|
|
failed_once = [False]
|
|
|
|
def failing_fork(**kwargs):
|
|
# this fork fails only on the 2nd call
|
|
# it's enough to cause the growth of orphaned child processes
|
|
if len(outstanding_pids) > 0 and not failed_once[0]:
|
|
failed_once[0] = True
|
|
raise OSError
|
|
start_pid[0] += 2
|
|
pid = start_pid[0]
|
|
outstanding_pids.append(pid)
|
|
return pid
|
|
|
|
def fake_wait():
|
|
return outstanding_pids.pop(0), 0
|
|
|
|
with mock.patch("swift.obj.auditor.os.wait", fake_wait), \
|
|
mock.patch.object(my_auditor, 'fork_child', failing_fork), \
|
|
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
|
|
for i in range(3):
|
|
my_auditor.run_once()
|
|
|
|
self.assertEqual(len(outstanding_pids), 0,
|
|
"orphaned children left {0}, expected 0."
|
|
.format(outstanding_pids))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|