swift/test/unit/obj/test_auditor.py

1940 lines
80 KiB
Python

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import json
import mock
import os
import pkg_resources
import signal
import string
import sys
import time
import xattr
from shutil import rmtree
from tempfile import mkdtemp
import textwrap
from os.path import dirname, basename
from test.debug_logger import debug_logger
from test.unit import (
DEFAULT_TEST_EC_TYPE, make_timestamp_iter, patch_policies,
skip_if_no_xattrs)
from test.unit.obj.common import write_diskfile
from swift.obj import auditor, replicator
from swift.obj.watchers.dark_data import DarkDataWatcher
from swift.obj.diskfile import (
DiskFile, write_metadata, invalidate_hash, get_data_dir,
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE)
from swift.common.exceptions import ClientException
from swift.common.utils import (
mkdirs, normalize_timestamp, Timestamp, readconf, md5, PrefixLoggerAdapter)
from swift.common.storage_policy import (
ECStoragePolicy, StoragePolicy, POLICIES, EC_POLICY)
_mocked_policies = [
StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True),
ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096),
]
def works_only_once(callable_thing, exception):
called = [False]
def only_once(*a, **kw):
if called[0]:
raise exception
else:
called[0] = True
return callable_thing(*a, **kw)
return only_once
def no_audit_watchers(group, name=None):
if group == 'swift.object_audit_watcher':
return iter([])
else:
return pkg_resources.iter_entry_points(group, name)
class FakeRing1(object):
def __init__(self, swift_dir, ring_name=None):
return
def get_nodes(self, *args, **kwargs):
x = 1
node1 = {'ip': '10.0.0.%s' % x,
'replication_ip': '10.0.0.%s' % x,
'port': 6200 + x,
'replication_port': 6200 + x,
'device': 'sda',
'zone': x % 3,
'region': x % 2,
'id': x,
'handoff_index': 1}
return (1, [node1])
class FakeRing2(object):
def __init__(self, swift_dir, ring_name=None):
return
def get_nodes(self, *args, **kwargs):
nodes = []
for x in [1, 2]:
nodes.append({'ip': '10.0.0.%s' % x,
'replication_ip': '10.0.0.%s' % x,
'port': 6200 + x,
'replication_port': 6200 + x,
'device': 'sda',
'zone': x % 3,
'region': x % 2,
'id': x,
'handoff_index': 1})
return (1, nodes)
class TestAuditorBase(unittest.TestCase):
def setUp(self):
skip_if_no_xattrs()
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.devices = os.path.join(self.testdir, 'node')
self.rcache = os.path.join(self.testdir, 'object.recon')
self.logger = debug_logger()
rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, 'sda'))
os.mkdir(os.path.join(self.devices, 'sdb'))
# policy 0
self.objects = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[0]))
self.objects_2 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[0]))
os.mkdir(self.objects)
# policy 1
self.objects_p1 = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[1]))
self.objects_2_p1 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[1]))
os.mkdir(self.objects_p1)
# policy 2
self.objects_p2 = os.path.join(self.devices, 'sda',
get_data_dir(POLICIES[2]))
self.objects_2_p2 = os.path.join(self.devices, 'sdb',
get_data_dir(POLICIES[2]))
os.mkdir(self.objects_p2)
self.parts = {}
self.parts_p1 = {}
self.parts_p2 = {}
for part in ['0', '1', '2', '3']:
self.parts[part] = os.path.join(self.objects, part)
self.parts_p1[part] = os.path.join(self.objects_p1, part)
self.parts_p2[part] = os.path.join(self.objects_p2, part)
os.mkdir(os.path.join(self.objects, part))
os.mkdir(os.path.join(self.objects_p1, part))
os.mkdir(os.path.join(self.objects_p2, part))
self.conf = dict(
devices=self.devices,
mount_check='false',
object_size_stats='10,100,1024,10240')
self.df_mgr = DiskFileManager(self.conf, self.logger)
self.ec_df_mgr = ECDiskFileManager(self.conf, self.logger)
# diskfiles for policy 0, 1, 2
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES[0])
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c2',
'o', policy=POLICIES[1])
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
'sda', '0', 'a', 'c_ec', 'o', policy=POLICIES[2], frag_index=1)
def tearDown(self):
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
@patch_policies(_mocked_policies)
class TestAuditor(TestAuditorBase):
def test_worker_conf_parms(self):
def check_common_defaults():
self.assertEqual(auditor_worker.max_bytes_per_second, 10000000)
self.assertEqual(auditor_worker.log_time, 3600)
# test default values
conf = dict(
devices=self.devices,
mount_check='false',
object_size_stats='10,100,1024,10240')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
check_common_defaults()
for policy in POLICIES:
mgr = auditor_worker.diskfile_router[policy]
self.assertEqual(mgr.disk_chunk_size, 65536)
self.assertEqual(auditor_worker.max_files_per_second, 20)
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 0)
# test specified audit value overrides
conf.update({'disk_chunk_size': 4096})
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
check_common_defaults()
for policy in POLICIES:
mgr = auditor_worker.diskfile_router[policy]
self.assertEqual(mgr.disk_chunk_size, 4096)
self.assertEqual(auditor_worker.max_files_per_second, 50)
self.assertEqual(auditor_worker.zero_byte_only_at_fps, 50)
def test_object_audit_extra_data(self):
def run_tests(disk_file):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = b'0' * 1024
if disk_file.policy.policy_type == EC_POLICY:
data = disk_file.policy.pyeclib_driver.encode(data)[0]
etag = md5(usedforsecurity=False)
with disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
timestamp = str(normalize_timestamp(time.time()))
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0',
policy=disk_file.policy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
os.write(writer._fd, b'extra_data')
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0',
policy=disk_file.policy))
self.assertEqual(auditor_worker.quarantines,
pre_quarantines + 1)
run_tests(self.disk_file)
run_tests(self.disk_file_p1)
run_tests(self.disk_file_ec)
def test_object_audit_adds_metadata_checksums(self):
disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o-md',
policy=POLICIES.legacy)
# simulate a PUT
now = time.time()
data = b'boots and cats and ' * 1024
hasher = md5(usedforsecurity=False)
with disk_file.create() as writer:
writer.write(data)
hasher.update(data)
etag = hasher.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': str(normalize_timestamp(now)),
'Content-Length': len(data),
'Content-Type': 'the old type',
}
writer.put(metadata)
writer.commit(Timestamp(now))
# simulate a subsequent POST
post_metadata = metadata.copy()
post_metadata['Content-Type'] = 'the new type'
post_metadata['X-Object-Meta-Biff'] = 'buff'
post_metadata['X-Timestamp'] = str(normalize_timestamp(now + 1))
disk_file.write_metadata(post_metadata)
file_paths = [os.path.join(disk_file._datadir, fname)
for fname in os.listdir(disk_file._datadir)
if fname not in ('.', '..')]
file_paths.sort()
# sanity check: make sure we have a .data and a .meta file
self.assertEqual(len(file_paths), 2)
self.assertTrue(file_paths[0].endswith(".data"))
self.assertTrue(file_paths[1].endswith(".meta"))
# Go remove the xattr "user.swift.metadata_checksum" as if this
# object were written before Swift supported metadata checksums.
for file_path in file_paths:
xattr.removexattr(file_path, "user.swift.metadata_checksum")
# Run the auditor...
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0',
policy=disk_file.policy))
self.assertEqual(auditor_worker.quarantines, 0) # sanity
# ...and the checksums are back
for file_path in file_paths:
metadata = xattr.getxattr(file_path, "user.swift.metadata")
i = 1
while True:
try:
metadata += xattr.getxattr(
file_path, "user.swift.metadata%d" % i)
i += 1
except (IOError, OSError):
break
checksum = xattr.getxattr(
file_path, "user.swift.metadata_checksum")
self.assertEqual(
checksum,
(md5(metadata, usedforsecurity=False).hexdigest()
.encode('ascii')))
def test_object_audit_diff_data(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = b'0' * 1024
etag = md5(usedforsecurity=False)
timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
pre_quarantines = auditor_worker.quarantines
# remake so it will have metadata
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES.legacy)
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
etag = md5(b'1' + b'0' * 1023, usedforsecurity=False).hexdigest()
metadata['ETag'] = etag
with self.disk_file.create() as writer:
writer.write(data)
writer.put(metadata)
writer.commit(Timestamp(timestamp))
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_audit_checks_EC_fragments(self):
disk_file = self.disk_file_ec
def do_test(data):
# create diskfile and set ETag and content-length to match the data
etag = md5(data, usedforsecurity=False).hexdigest()
timestamp = str(normalize_timestamp(time.time()))
with disk_file.create() as writer:
writer.write(data)
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': len(data),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
self.logger.clear()
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(0, auditor_worker.quarantines) # sanity check
auditor_worker.object_audit(
AuditLocation(disk_file._datadir, 'sda', '0',
policy=disk_file.policy))
return auditor_worker
# two good frags in an EC archive
frag_0 = disk_file.policy.pyeclib_driver.encode(
b'x' * disk_file.policy.ec_segment_size)[0]
frag_1 = disk_file.policy.pyeclib_driver.encode(
b'y' * disk_file.policy.ec_segment_size)[0]
data = frag_0 + frag_1
auditor_worker = do_test(data)
self.assertEqual(0, auditor_worker.quarantines)
self.assertFalse(auditor_worker.logger.get_lines_for_level('error'))
# corrupt second frag headers
corrupt_frag_1 = b'blah' * 16 + frag_1[64:]
data = frag_0 + corrupt_frag_1
auditor_worker = do_test(data)
self.assertEqual(1, auditor_worker.quarantines)
log_lines = auditor_worker.logger.get_lines_for_level('error')
self.assertIn('failed audit and was quarantined: '
'Invalid EC metadata at offset 0x%x' %
len(frag_0),
log_lines[0])
# dangling extra corrupt frag data
data = frag_0 + frag_1 + b'wtf' * 100
auditor_worker = do_test(data)
self.assertEqual(1, auditor_worker.quarantines)
log_lines = auditor_worker.logger.get_lines_for_level('error')
self.assertIn('failed audit and was quarantined: '
'Invalid EC metadata at offset 0x%x' %
len(frag_0 + frag_1),
log_lines[0])
# simulate bug https://bugs.launchpad.net/bugs/1631144 by writing start
# of an ssync subrequest into the diskfile
data = (
b'PUT /a/c/o\r\n' +
b'Content-Length: 999\r\n' +
b'Content-Type: image/jpeg\r\n' +
b'X-Object-Sysmeta-Ec-Content-Length: 1024\r\n' +
b'X-Object-Sysmeta-Ec-Etag: 1234bff7eb767cc6d19627c6b6f9edef\r\n' +
b'X-Object-Sysmeta-Ec-Frag-Index: 1\r\n' +
b'X-Object-Sysmeta-Ec-Scheme: ' +
DEFAULT_TEST_EC_TYPE.encode('ascii') + b'\r\n' +
b'X-Object-Sysmeta-Ec-Segment-Size: 1048576\r\n' +
b'X-Timestamp: 1471512345.17333\r\n\r\n'
)
data += frag_0[:disk_file.policy.fragment_size - len(data)]
auditor_worker = do_test(data)
self.assertEqual(1, auditor_worker.quarantines)
log_lines = auditor_worker.logger.get_lines_for_level('error')
self.assertIn('failed audit and was quarantined: '
'Invalid EC metadata at offset 0x0',
log_lines[0])
def test_object_audit_no_meta(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
fp = open(path, 'wb')
fp.write(b'0' * 1024)
fp.close()
invalidate_hash(os.path.dirname(self.disk_file._datadir))
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_audit_will_not_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
with mock.patch.object(DiskFileManager,
'get_diskfile_from_audit_location', blowup):
self.assertRaises(NameError, auditor_worker.object_audit,
AuditLocation(os.path.dirname(path), 'sda', '0',
policy=POLICIES.legacy))
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
blowup):
auditor_worker.failsafe_object_audit(
AuditLocation(os.path.dirname(path), 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.errors, 1)
def test_audit_location_gets_quarantined(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
# instead of a datadir, we'll make a file!
mkdirs(os.path.dirname(self.disk_file._datadir))
open(self.disk_file._datadir, 'w')
# after we turn the crank ...
auditor_worker.object_audit(location)
# ... it should get quarantined
self.assertFalse(os.path.exists(self.disk_file._datadir))
self.assertEqual(1, auditor_worker.quarantines)
def test_rsync_tempfile_timeout_auto_option(self):
# if we don't have access to the replicator config section we'll use
# our default
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
# if the rsync_tempfile_timeout option is set explicitly we use that
self.conf['rsync_tempfile_timeout'] = '1800'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800)
# if we have a real config we can be a little smarter
config_path = os.path.join(self.testdir, 'objserver.conf')
stub_config = """
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
# the Daemon loader will hand the object-auditor config to the
# auditor who will build the workers from it
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is no object-replicator section we still have to fall back
# to default because we can't parse the config for that section!
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
stub_config = """
[object-replicator]
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if the object-replicator section will parse but does not override
# the default rsync_timeout we assume the default rsync_timeout value
# and add 15mins
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
stub_config = """
[DEFAULT]
reclaim_age = 1209600
[object-replicator]
rsync_timeout = 3600
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is an object-replicator section with a rsync_timeout
# configured we'll use that value (3600) + 900
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900)
def test_inprogress_rsync_tempfiles_get_cleaned_up(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
data = b'VERIFY'
etag = md5(usedforsecurity=False)
timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
datafilename = None
datadir_files = os.listdir(self.disk_file._datadir)
for filename in datadir_files:
if filename.endswith('.data'):
datafilename = filename
break
else:
self.fail('Did not find .data file in %r: %r' %
(self.disk_file._datadir, datadir_files))
rsynctempfile_path = os.path.join(self.disk_file._datadir,
'.%s.9ILVBL' % datafilename)
open(rsynctempfile_path, 'w')
# sanity check we have an extra file
rsync_files = os.listdir(self.disk_file._datadir)
self.assertEqual(len(datadir_files) + 1, len(rsync_files))
# and after we turn the crank ...
auditor_worker.object_audit(location)
# ... we've still got the rsync file
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# and we'll keep it - depending on the rsync_tempfile_timeout
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
self.conf['rsync_tempfile_timeout'] = '3600'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600)
now = time.time() + 1900
with mock.patch('swift.obj.auditor.time.time',
return_value=now):
auditor_worker.object_audit(location)
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# but *tomorrow* when we run
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# ... we'll totally clean that stuff up!
self.assertEqual(datadir_files, os.listdir(self.disk_file._datadir))
# but if we have some random crazy file in there
random_crazy_file_path = os.path.join(self.disk_file._datadir,
'.random.crazy.file')
open(random_crazy_file_path, 'w')
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# that's someone elses problem
self.assertIn(os.path.basename(random_crazy_file_path),
os.listdir(self.disk_file._datadir))
def test_generic_exception_handling(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
# pretend that we logged (and reset counters) just now
auditor_worker.last_logged = time.time()
timestamp = str(normalize_timestamp(time.time()))
pre_errors = auditor_worker.errors
data = b'0' * 1024
etag = md5(usedforsecurity=False)
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
lambda *_: 1 / 0):
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.errors, pre_errors + 1)
def test_object_run_once_pass(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.log_time = 0
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = b'0' * 1024
def write_file(df):
with df.create() as writer:
writer.write(data)
metadata = {
'ETag': md5(data, usedforsecurity=False).hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
# policy 0
write_file(self.disk_file)
# policy 1
write_file(self.disk_file_p1)
# policy 2
write_file(self.disk_file_ec)
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
# 1 object per policy falls into 1024 bucket
self.assertEqual(auditor_worker.stats_buckets[1024], 3)
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
# pick up some additional code coverage, large file
data = b'0' * 1024 * 1024
for df in (self.disk_file, self.disk_file_ec):
with df.create() as writer:
writer.write(data)
metadata = {
'ETag': md5(data, usedforsecurity=False).hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
# still have the 1024 byte object left in policy-1 (plus the
# stats from the original 3)
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
# and then policy-0 disk_file was re-written as a larger object
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
# pick up even more additional code coverage, misc paths
auditor_worker.log_time = -1
auditor_worker.stats_sizes = []
auditor_worker.audit_all_objects(device_dirs=['sda', 'sdb'])
self.assertEqual(auditor_worker.quarantines, pre_quarantines)
self.assertEqual(auditor_worker.stats_buckets[1024], 4)
self.assertEqual(auditor_worker.stats_buckets[10240], 0)
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
def test_object_run_logging(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = self.logger.get_lines_for_level('info')
self.assertGreater(len(log_lines), 0)
self.assertIn('ALL - parallel, sda', log_lines[0])
self.logger.clear()
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = self.logger.get_lines_for_level('info')
self.assertGreater(len(log_lines), 0)
self.assertIn('ZBF - sda', log_lines[0])
def test_object_run_recon_cache(self):
ts = Timestamp(time.time())
data = b'test_data'
with self.disk_file.create() as writer:
writer.write(data)
metadata = {
'ETag': md5(data, usedforsecurity=False).hexdigest(),
'X-Timestamp': ts.normal,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(ts)
# all devices
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects()
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected = {'object_auditor_stats_ALL':
{'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 9}}
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
self.assertEqual(expected, actual_rcache)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects()
self.assertEqual(expected, actual_rcache)
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected.update({
'object_auditor_stats_ZBF':
{'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 0}})
self.assertEqual(expected, actual_rcache)
# specific devices
os.unlink(self.rcache)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=['sda'])
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected = {'object_auditor_stats_ALL':
{'sda': {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 9}}}
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
self.assertEqual(expected, actual_rcache)
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=['sda'])
self.assertEqual(expected, actual_rcache)
with open(self.rcache) as fd:
actual_rcache = json.load(fd)
expected.update({
'object_auditor_stats_ZBF':
{'sda': {'passes': 1, 'errors': 0, 'audit_time': mock.ANY,
'start_time': mock.ANY, 'quarantined': 0,
'bytes_processed': 0}}})
self.assertEqual(expected, actual_rcache)
def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
# pretend that we logged (and reset counters) just now
auditor_worker.last_logged = time.time()
data = b'0' * 1024
etag = md5(usedforsecurity=False)
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
os.write(writer._fd, b'extra_data')
writer.commit(Timestamp(timestamp))
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_run_once_multi_devices(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
# pretend that we logged (and reset counters) just now
auditor_worker.last_logged = time.time()
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = b'0' * 10
etag = md5(usedforsecurity=False)
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
auditor_worker.audit_all_objects()
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'ob',
policy=POLICIES.legacy)
data = b'1' * 10
etag = md5(usedforsecurity=False)
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
os.write(writer._fd, b'extra_data')
auditor_worker.audit_all_objects()
self.assertEqual(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_run_fast_track_non_zero(self):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
data = b'0' * 1024
etag = md5(usedforsecurity=False)
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
timestamp = str(normalize_timestamp(time.time()))
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
etag = md5(usedforsecurity=False)
etag.update(b'1' + b'0' * 1023)
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(writer._fd, metadata)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
del(kwargs['zero_byte_fps'])
clear_auditor_status(self.devices, 'objects')
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
def setup_bad_zero_byte(self, timestamp=None):
if timestamp is None:
timestamp = Timestamp.now()
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
etag = md5(usedforsecurity=False)
with self.disk_file.create() as writer:
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp.internal,
'Content-Length': 10,
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
etag = md5(usedforsecurity=False)
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(writer._fd, metadata)
def test_object_run_fast_track_all(self):
self.setup_bad_zero_byte()
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
def test_object_run_fast_track_zero(self):
self.setup_bad_zero_byte()
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
called_args = [0]
def mock_get_auditor_status(path, logger, audit_type):
called_args[0] = audit_type
return get_auditor_status(path, logger, audit_type)
with mock.patch('swift.obj.diskfile.get_auditor_status',
mock_get_auditor_status):
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
self.assertEqual('ZBF', called_args[0])
def test_object_run_fast_track_zero_check_closed(self):
rat = [False]
class FakeFile(DiskFile):
def _quarantine(self, data_file, msg):
rat[0] = True
DiskFile._quarantine(self, data_file, msg)
self.setup_bad_zero_byte()
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
FakeFile):
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
self.assertTrue(rat[0])
@mock.patch.object(auditor.ObjectAuditor, 'run_audit')
@mock.patch('os.fork', return_value=0)
def test_with_inaccessible_object_location(self, mock_os_fork,
mock_run_audit):
# Need to ensure that any failures in run_audit do
# not prevent sys.exit() from running. Otherwise we get
# zombie processes.
e = OSError('permission denied')
mock_run_audit.side_effect = e
self.auditor = auditor.ObjectAuditor(self.conf)
self.assertRaises(SystemExit, self.auditor.fork_child, self)
def test_with_only_tombstone(self):
# sanity check that auditor doesn't touch solitary tombstones
ts_iter = make_timestamp_iter()
self.setup_bad_zero_byte(timestamp=next(ts_iter))
self.disk_file.delete(next(ts_iter))
files = os.listdir(self.disk_file._datadir)
self.assertEqual(1, len(files))
self.assertTrue(files[0].endswith('ts'))
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
files_after = os.listdir(self.disk_file._datadir)
self.assertEqual(files, files_after)
def test_with_tombstone_and_data(self):
# rsync replication could leave a tombstone and data file in object
# dir - verify they are both removed during audit
ts_iter = make_timestamp_iter()
ts_tomb = next(ts_iter)
ts_data = next(ts_iter)
self.setup_bad_zero_byte(timestamp=ts_data)
tomb_file_path = os.path.join(self.disk_file._datadir,
'%s.ts' % ts_tomb.internal)
with open(tomb_file_path, 'wb') as fd:
write_metadata(fd, {'X-Timestamp': ts_tomb.internal})
files = os.listdir(self.disk_file._datadir)
self.assertEqual(2, len(files))
self.assertTrue(os.path.basename(tomb_file_path) in files, files)
kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.exists(self.disk_file._datadir))
def _audit_tombstone(self, conf, ts_tomb, zero_byte_fps=0):
self.auditor = auditor.ObjectAuditor(conf)
self.auditor.log_time = 0
# create tombstone and hashes.pkl file, ensuring the tombstone is not
# reclaimed by mocking time to be the tombstone time
with mock.patch('time.time', return_value=float(ts_tomb)):
# this delete will create an invalid hashes entry
self.disk_file.delete(ts_tomb)
# this get_hashes call will truncate the invalid hashes entry
self.disk_file.manager.get_hashes(
'sda', '0', [], self.disk_file.policy)
suffix = basename(dirname(self.disk_file._datadir))
part_dir = dirname(dirname(self.disk_file._datadir))
# sanity checks...
self.assertEqual(['%s.ts' % ts_tomb.internal],
os.listdir(self.disk_file._datadir))
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual(b'', fp.read().strip(b'\n'))
# Run auditor
self.auditor.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
# sanity check - auditor should not remove tombstone file
self.assertEqual(['%s.ts' % ts_tomb.internal],
os.listdir(self.disk_file._datadir))
return part_dir, suffix
def test_non_reclaimable_tombstone(self):
# audit with a recent tombstone
ts_tomb = Timestamp(time.time() - 55)
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual(b'', fp.read().strip(b'\n'))
def test_reclaimable_tombstone(self):
# audit with a reclaimable tombstone
ts_tomb = Timestamp(time.time() - 604800)
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
hash_val = fp.read()
self.assertEqual(suffix.encode('ascii'), hash_val.strip(b'\n'))
def test_non_reclaimable_tombstone_with_custom_reclaim_age(self):
# audit with a tombstone newer than custom reclaim age
ts_tomb = Timestamp(time.time() - 604800)
conf = dict(self.conf)
conf['reclaim_age'] = 2 * 604800
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual(b'', fp.read().strip(b'\n'))
def test_reclaimable_tombstone_with_custom_reclaim_age(self):
# audit with a tombstone older than custom reclaim age
ts_tomb = Timestamp(time.time() - 55)
conf = dict(self.conf)
conf['reclaim_age'] = 10
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
hash_val = fp.read()
self.assertEqual(suffix.encode('ascii'), hash_val.strip(b'\n'))
def test_reclaimable_tombstone_with_zero_byte_fps(self):
# audit with a tombstone older than reclaim age by a zero_byte_fps
# worker does not invalidate the hash
ts_tomb = Timestamp(time.time() - 604800)
part_dir, suffix = self._audit_tombstone(
self.conf, ts_tomb, zero_byte_fps=50)
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
self.assertTrue(os.path.exists(hash_invalid))
with open(hash_invalid, 'rb') as fp:
self.assertEqual(b'', fp.read().strip(b'\n'))
def _test_expired_object_is_ignored(self, zero_byte_fps):
# verify that an expired object does not get mistaken for a tombstone
audit = auditor.ObjectAuditor(self.conf, logger=self.logger)
audit.log_time = 0
now = time.time()
write_diskfile(self.disk_file, Timestamp(now - 20),
extra_metadata={'X-Delete-At': now - 10})
files = os.listdir(self.disk_file._datadir)
self.assertTrue([f for f in files if f.endswith('.data')]) # sanity
# diskfile write appends to invalid hashes file
part_dir = dirname(dirname(self.disk_file._datadir))
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
with open(hash_invalid, 'rb') as fp:
self.assertEqual(
basename(dirname(self.disk_file._datadir)).encode('ascii'),
fp.read().strip(b'\n')) # sanity check
# run the auditor...
with mock.patch.object(auditor, 'dump_recon_cache'):
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
# the auditor doesn't touch anything on the invalidation file
# (i.e. not truncate and add no entry)
with open(hash_invalid, 'rb') as fp:
self.assertEqual(
basename(dirname(self.disk_file._datadir)).encode('ascii'),
fp.read().strip(b'\n')) # sanity check
# this get_hashes call will truncate the invalid hashes entry
self.disk_file.manager.get_hashes(
'sda', '0', [], self.disk_file.policy)
with open(hash_invalid, 'rb') as fp:
self.assertEqual(b'', fp.read().strip(b'\n')) # sanity check
# run the auditor, again...
with mock.patch.object(auditor, 'dump_recon_cache'):
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
# verify nothing changed
self.assertTrue(os.path.exists(self.disk_file._datadir))
self.assertEqual(files, os.listdir(self.disk_file._datadir))
self.assertFalse(audit.logger.get_lines_for_level('error'))
self.assertFalse(audit.logger.get_lines_for_level('warning'))
# and there was no hash invalidation
with open(hash_invalid, 'rb') as fp:
self.assertEqual(b'', fp.read().strip(b'\n'))
def test_expired_object_is_ignored(self):
self._test_expired_object_is_ignored(0)
def test_expired_object_is_ignored_with_zero_byte_fps(self):
self._test_expired_object_is_ignored(50)
def test_auditor_reclaim_age(self):
# if we don't have access to the replicator config section we'll use
# diskfile's default
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
# if the reclaim_age option is set explicitly we use that
self.conf['reclaim_age'] = '1800'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 1800)
# if we have a real config we can be a little smarter
config_path = os.path.join(self.testdir, 'objserver.conf')
# if there is no object-replicator section we still have to fall back
# to default because we can't parse the config for that section!
stub_config = """
[object-auditor]
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 86400 * 7)
# verify reclaim_age is of auditor config value
stub_config = """
[object-replicator]
[object-auditor]
reclaim_age = 60
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 60)
# verify reclaim_age falls back to replicator config value
# if there is no auditor config value
config_path = os.path.join(self.testdir, 'objserver.conf')
stub_config = """
[object-replicator]
reclaim_age = 60
[object-auditor]
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 60)
# we'll prefer our own DEFAULT section to the replicator though
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
stub_config = """
[DEFAULT]
reclaim_age = 1209600
[object-replicator]
reclaim_age = 1800
[object-auditor]
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
router = auditor_worker.diskfile_router
for policy in POLICIES:
self.assertEqual(router[policy].reclaim_age, 1209600)
def test_sleeper(self):
with mock.patch(
'time.sleep', mock.MagicMock()) as mock_sleep:
my_auditor = auditor.ObjectAuditor(self.conf)
my_auditor._sleep()
mock_sleep.assert_called_with(30)
my_conf = dict(interval=2)
my_conf.update(self.conf)
my_auditor = auditor.ObjectAuditor(my_conf)
my_auditor._sleep()
mock_sleep.assert_called_with(2)
my_auditor = auditor.ObjectAuditor(self.conf)
my_auditor.interval = 2
my_auditor._sleep()
mock_sleep.assert_called_with(2)
def test_run_parallel_audit(self):
class StopForever(Exception):
pass
class Bogus(Exception):
pass
loop_error = Bogus('exception')
class LetMeOut(BaseException):
pass
class ObjectAuditorMock(object):
check_args = ()
check_kwargs = {}
check_device_dir = None
fork_called = 0
master = 0
wait_called = 0
def mock_run(self, *args, **kwargs):
self.check_args = args
self.check_kwargs = kwargs
if 'zero_byte_fps' in kwargs:
self.check_device_dir = kwargs.get('device_dirs')
def mock_sleep_stop(self):
raise StopForever('stop')
def mock_sleep_continue(self):
return
def mock_audit_loop_error(self, parent, zbo_fps,
override_devices=None, **kwargs):
raise loop_error
def mock_fork(self):
self.fork_called += 1
if self.master:
return self.fork_called
else:
return 0
def mock_wait(self):
self.wait_called += 1
return (self.wait_called, 0)
def mock_signal(self, sig, action):
pass
def mock_exit(self):
pass
for i in string.ascii_letters[2:26]:
mkdirs(os.path.join(self.devices, 'sd%s' % i))
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89,
concurrency=1))
mocker = ObjectAuditorMock()
my_auditor.logger.exception = mock.MagicMock()
real_audit_loop = my_auditor.audit_loop
my_auditor.audit_loop = mocker.mock_audit_loop_error
my_auditor.run_audit = mocker.mock_run
was_fork = os.fork
was_wait = os.wait
was_signal = signal.signal
was_exit = sys.exit
os.fork = mocker.mock_fork
os.wait = mocker.mock_wait
signal.signal = mocker.mock_signal
sys.exit = mocker.mock_exit
try:
my_auditor._sleep = mocker.mock_sleep_stop
my_auditor.run_once(zero_byte_fps=50)
my_auditor.logger.exception.assert_called_once_with(
'ERROR auditing: %s', loop_error)
my_auditor.logger.exception.reset_mock()
self.assertRaises(StopForever, my_auditor.run_forever)
my_auditor.logger.exception.assert_called_once_with(
'ERROR auditing: %s', loop_error)
my_auditor.audit_loop = real_audit_loop
# sleep between ZBF scanner forks
self.assertRaises(StopForever, my_auditor.fork_child, True, True)
mocker.fork_called = 0
signal.signal = was_signal
sys.exit = was_exit
self.assertRaises(StopForever,
my_auditor.run_forever, zero_byte_fps=50)
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 50)
self.assertEqual(mocker.fork_called, 0)
self.assertRaises(SystemExit, my_auditor.run_once)
self.assertEqual(mocker.fork_called, 1)
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEqual(mocker.check_device_dir, [])
self.assertEqual(mocker.check_args, ())
device_list = ['sd%s' % i for i in string.ascii_letters[2:10]]
device_string = ','.join(device_list)
device_string_bogus = device_string + ',bogus'
mocker.fork_called = 0
self.assertRaises(SystemExit, my_auditor.run_once,
devices=device_string_bogus)
self.assertEqual(mocker.fork_called, 1)
self.assertEqual(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEqual(sorted(mocker.check_device_dir), device_list)
mocker.master = 1
mocker.fork_called = 0
self.assertRaises(StopForever, my_auditor.run_forever)
# Fork or Wait are called greate than or equal to 2 times in the
# main process. 2 times if zbf run once and 3 times if zbf run
# again
self.assertGreaterEqual(mocker.fork_called, 2)
self.assertGreaterEqual(mocker.wait_called, 2)
my_auditor._sleep = mocker.mock_sleep_continue
my_auditor.audit_loop = works_only_once(my_auditor.audit_loop,
LetMeOut())
my_auditor.concurrency = 2
mocker.fork_called = 0
mocker.wait_called = 0
self.assertRaises(LetMeOut, my_auditor.run_forever)
# Fork or Wait are called greater than or equal to
# no. of devices + (no. of devices)/2 + 1 times in main process
no_devices = len(os.listdir(self.devices))
self.assertGreaterEqual(mocker.fork_called, no_devices +
no_devices / 2 + 1)
self.assertGreaterEqual(mocker.wait_called, no_devices +
no_devices / 2 + 1)
finally:
os.fork = was_fork
os.wait = was_wait
def test_run_audit_once(self):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89,
concurrency=1))
forked_pids = []
next_zbf_pid = [2]
next_normal_pid = [1001]
outstanding_pids = [[]]
def fake_fork_child(**kwargs):
if len(forked_pids) > 10:
# something's gone horribly wrong
raise BaseException("forking too much")
# ZBF pids are all smaller than the normal-audit pids; this way
# we can return them first.
#
# Also, ZBF pids are even and normal-audit pids are odd; this is
# so humans seeing this test fail can better tell what's happening.
if kwargs.get('zero_byte_fps'):
pid = next_zbf_pid[0]
next_zbf_pid[0] += 2
else:
pid = next_normal_pid[0]
next_normal_pid[0] += 2
outstanding_pids[0].append(pid)
forked_pids.append(pid)
return pid
def fake_os_wait():
# Smallest pid first; that's ZBF if we have one, else normal
outstanding_pids[0].sort()
pid = outstanding_pids[0].pop(0)
return (pid, 0) # (pid, status)
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once()
self.assertEqual(sorted(forked_pids), [2, 1001])
def test_run_audit_once_zbfps(self):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89,
concurrency=1,
recon_cache_path=self.testdir))
with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once(zero_byte_fps=50)
with open(self.rcache) as fd:
# there's no objects to audit so expect no stats; this assertion
# may change if https://bugs.launchpad.net/swift/+bug/1704858 is
# fixed
self.assertEqual({}, json.load(fd))
# check recon cache stays clean after a second run
with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once(zero_byte_fps=50)
with open(self.rcache) as fd:
self.assertEqual({}, json.load(fd))
ts = Timestamp(time.time())
with self.disk_file.create() as writer:
metadata = {
'ETag': md5(b'', usedforsecurity=False).hexdigest(),
'X-Timestamp': ts.normal,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(ts)
# check recon cache stays clean after a second run
with mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once(zero_byte_fps=50)
with open(self.rcache) as fd:
self.assertEqual({
'object_auditor_stats_ZBF': {
'audit_time': 0,
'bytes_processed': 0,
'errors': 0,
'passes': 1,
'quarantined': 0,
'start_time': mock.ANY}},
json.load(fd))
def test_run_parallel_audit_once(self):
my_auditor = auditor.ObjectAuditor(
dict(devices=self.devices, mount_check='false',
zero_byte_files_per_second=89, concurrency=2))
# ZBF pids are smaller than the normal-audit pids; this way we can
# return them first from our mocked os.wait().
#
# Also, ZBF pids are even and normal-audit pids are odd; this is so
# humans seeing this test fail can better tell what's happening.
forked_pids = []
next_zbf_pid = [2]
next_normal_pid = [1001]
outstanding_pids = [[]]
def fake_fork_child(**kwargs):
if len(forked_pids) > 10:
# something's gone horribly wrong; try not to hang the test
# run because of it
raise BaseException("forking too much")
if kwargs.get('zero_byte_fps'):
pid = next_zbf_pid[0]
next_zbf_pid[0] += 2
else:
pid = next_normal_pid[0]
next_normal_pid[0] += 2
outstanding_pids[0].append(pid)
forked_pids.append(pid)
return pid
def fake_os_wait():
if not outstanding_pids[0]:
raise BaseException("nobody waiting")
# ZBF auditor finishes first
outstanding_pids[0].sort()
pid = outstanding_pids[0].pop(0)
return (pid, 0) # (pid, status)
# make sure we've got enough devs that the ZBF auditor can finish
# before all the normal auditors have been started
mkdirs(os.path.join(self.devices, 'sdc'))
mkdirs(os.path.join(self.devices, 'sdd'))
with mock.patch("swift.obj.auditor.os.wait", fake_os_wait), \
mock.patch.object(my_auditor, 'fork_child', fake_fork_child), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
my_auditor.run_once()
self.assertEqual(sorted(forked_pids), [2, 1001, 1003, 1005, 1007])
def test_run_parallel_audit_once_failed_fork(self):
my_auditor = auditor.ObjectAuditor(
dict(devices=self.devices, mount_check='false',
concurrency=2))
start_pid = [1001]
outstanding_pids = []
failed_once = [False]
def failing_fork(**kwargs):
# this fork fails only on the 2nd call
# it's enough to cause the growth of orphaned child processes
if len(outstanding_pids) > 0 and not failed_once[0]:
failed_once[0] = True
raise OSError
start_pid[0] += 2
pid = start_pid[0]
outstanding_pids.append(pid)
return pid
def fake_wait():
return outstanding_pids.pop(0), 0
with mock.patch("swift.obj.auditor.os.wait", fake_wait), \
mock.patch.object(my_auditor, 'fork_child', failing_fork), \
mock.patch.object(my_auditor, '_sleep', lambda *a: None):
for i in range(3):
my_auditor.run_once()
self.assertEqual(len(outstanding_pids), 0,
"orphaned children left {0}, expected 0."
.format(outstanding_pids))
@mock.patch('pkg_resources.iter_entry_points', no_audit_watchers)
@patch_policies(_mocked_policies)
class TestAuditWatchers(TestAuditorBase):
def setUp(self):
super(TestAuditWatchers, self).setUp()
timestamp = Timestamp(time.time())
disk_file = self.df_mgr.get_diskfile(
'sda', '0', 'a', 'c', 'o0', policy=POLICIES.legacy)
data = b'0' * 1024
etag = md5()
with disk_file.create() as writer:
writer.write(data)
etag.update(data)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp.internal,
'Content-Length': str(len(data)),
'X-Object-Meta-Flavor': 'banana',
}
writer.put(metadata)
# The commit does nothing; we keep it for code copy-paste with EC.
writer.commit(timestamp)
disk_file = self.df_mgr.get_diskfile(
'sda', '0', 'a', 'c', 'o1', policy=POLICIES.legacy)
data = b'1' * 2048
etag = md5()
with disk_file.create() as writer:
writer.write(data)
etag.update(data)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp.internal,
'Content-Length': str(len(data)),
'X-Object-Meta-Flavor': 'orange',
}
writer.put(metadata)
writer.commit(timestamp)
frag_0 = self.disk_file_ec.policy.pyeclib_driver.encode(
b'x' * self.disk_file_ec.policy.ec_segment_size)[0]
etag = md5()
with self.disk_file_ec.create() as writer:
writer.write(frag_0)
etag.update(frag_0)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp.internal,
'Content-Length': str(len(frag_0)),
'X-Object-Meta-Flavor': 'peach',
}
writer.put(metadata)
writer.commit(timestamp)
def test_watchers(self):
calls = []
class TestWatcher(object):
def __init__(self, conf, logger):
self._started = False
self._ended = False
calls.append(["__init__", conf, logger])
# Make sure the logger is capable of quacking like a logger
logger.debug("getting started")
def start(self, audit_type, **other_kwargs):
if self._started:
raise Exception("don't call it twice")
self._started = True
calls.append(['start', audit_type])
def see_object(self, object_metadata,
data_file_path, **other_kwargs):
calls.append(['see_object', object_metadata,
data_file_path, other_kwargs])
def end(self, **other_kwargs):
if self._ended:
raise Exception("don't call it twice")
self._ended = True
calls.append(['end'])
conf = self.conf.copy()
conf['watchers'] = 'test_watcher1'
conf['__file__'] = '/etc/swift/swift.conf'
ret_config = {'swift#dark_data': {'action': 'log'}}
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
return_value=ret_config), \
mock.patch('swift.obj.auditor.load_pkg_resource',
side_effect=[TestWatcher]) as mock_load, \
mock.patch('swift.obj.auditor.get_logger',
lambda *a, **kw: self.logger):
my_auditor = auditor.ObjectAuditor(conf)
self.assertEqual(mock_load.mock_calls, [
mock.call('swift.object_audit_watcher', 'test_watcher1'),
])
my_auditor.run_audit(mode='once', zero_byte_fps=float("inf"))
self.assertEqual(len(calls), 6)
self.assertEqual(calls[0], ["__init__", conf, mock.ANY])
self.assertIsInstance(calls[0][2], PrefixLoggerAdapter)
self.assertIs(calls[0][2].logger, self.logger)
self.assertEqual(calls[1], ["start", "ZBF"])
self.assertEqual(calls[2][0], "see_object")
self.assertEqual(calls[3][0], "see_object")
# The order in which the auditor finds things on the filesystem is
# irrelevant; what matters is that it finds all the things.
calls[2:5] = sorted(calls[2:5], key=lambda item: item[1]['name'])
self.assertDictContainsSubset({'name': '/a/c/o0',
'X-Object-Meta-Flavor': 'banana'},
calls[2][1])
self.assertIn('node/sda/objects/0/', calls[2][2]) # data_file_path
self.assertTrue(calls[2][2].endswith('.data')) # data_file_path
self.assertEqual({}, calls[2][3])
self.assertDictContainsSubset({'name': '/a/c/o1',
'X-Object-Meta-Flavor': 'orange'},
calls[3][1])
self.assertIn('node/sda/objects/0/', calls[3][2]) # data_file_path
self.assertTrue(calls[3][2].endswith('.data')) # data_file_path
self.assertEqual({}, calls[3][3])
self.assertDictContainsSubset({'name': '/a/c_ec/o',
'X-Object-Meta-Flavor': 'peach'},
calls[4][1])
self.assertIn('node/sda/objects-2/0/', calls[4][2]) # data_file_path
self.assertTrue(calls[4][2].endswith('.data')) # data_file_path
self.assertEqual({}, calls[4][3])
self.assertEqual(calls[5], ["end"])
log_lines = self.logger.get_lines_for_level('debug')
self.assertIn(
"[audit-watcher test_watcher1] getting started",
log_lines)
def test_builtin_watchers(self):
# Yep, back-channel signaling in tests.
sentinel = 'DARK'
timestamp = Timestamp(time.time())
disk_file = self.df_mgr.get_diskfile(
'sda', '0', 'a', sentinel, 'o2', policy=POLICIES.legacy)
data = b'2' * 1024
etag = md5()
with disk_file.create() as writer:
writer.write(data)
etag.update(data)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp.internal,
'Content-Length': str(len(data)),
'X-Object-Meta-Flavor': 'mango',
}
writer.put(metadata)
writer.commit(timestamp)
def fake_direct_get_container(node, part, account, container,
prefix=None, limit=None):
self.assertEqual(part, 1)
self.assertEqual(limit, 1)
if container == sentinel:
return {}, []
# The returned entry is not abbreviated, but is full of nonsese.
entry = {'bytes': 30968411,
'hash': '60303f4122966fe5925f045eb52d1129',
'name': '%s' % prefix,
'content_type': 'video/mp4',
'last_modified': '2017-08-15T03:30:57.693210'}
return {}, [entry]
conf = self.conf.copy()
conf['watchers'] = 'test_watcher1'
conf['__file__'] = '/etc/swift/swift.conf'
# with default watcher config the DARK object will not be older than
# grace_age so will not be logged
ret_config = {'test_watcher1': {'action': 'log'}}
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
return_value=ret_config), \
mock.patch('swift.obj.auditor.load_pkg_resource',
side_effect=[DarkDataWatcher]):
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
fake_direct_get_container):
my_auditor.run_audit(mode='once')
log_lines = self.logger.get_lines_for_level('info')
self.assertIn(
'[audit-watcher test_watcher1] total unknown 0 ok 4 dark 0',
log_lines)
self.logger.clear()
# with grace_age=0 the DARK object will be older than
# grace_age so will be logged
ret_config = {'test_watcher1': {'action': 'log', 'grace_age': '0'}}
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
return_value=ret_config), \
mock.patch('swift.obj.auditor.load_pkg_resource',
side_effect=[DarkDataWatcher]):
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
fake_direct_get_container):
my_auditor.run_audit(mode='once')
log_lines = self.logger.get_lines_for_level('info')
self.assertIn(
'[audit-watcher test_watcher1] total unknown 0 ok 3 dark 1',
log_lines)
def test_dark_data_watcher_init(self):
conf = {}
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1):
watcher = DarkDataWatcher(conf, self.logger)
self.assertEqual(self.logger, watcher.logger)
self.assertEqual(604800, watcher.grace_age)
self.assertEqual('log', watcher.dark_data_policy)
conf = {'grace_age': 360, 'action': 'delete'}
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1):
watcher = DarkDataWatcher(conf, self.logger)
self.assertEqual(self.logger, watcher.logger)
self.assertEqual(360, watcher.grace_age)
self.assertEqual('delete', watcher.dark_data_policy)
conf = {'grace_age': 0, 'action': 'invalid'}
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1):
watcher = DarkDataWatcher(conf, self.logger)
self.assertEqual(self.logger, watcher.logger)
self.assertEqual(0, watcher.grace_age)
self.assertEqual('log', watcher.dark_data_policy)
def test_dark_data_agreement(self):
# The dark data watcher only sees an object as dark if all container
# servers in the ring reply without an error and return an empty
# listing. So, we have the following permutations for an object:
#
# Container Servers Result
# CS1 CS2
# Listed Listed Good - the baseline result
# Listed Error Good
# Listed Not listed Good
# Error Error Unknown - the baseline failure
# Not listed Error Unknown
# Not listed Not listed Dark - the only such result!
#
scenario = [
{'cr': ['L', 'L'], 'res': 'G'},
{'cr': ['L', 'E'], 'res': 'G'},
{'cr': ['L', 'N'], 'res': 'G'},
{'cr': ['E', 'E'], 'res': 'U'},
{'cr': ['N', 'E'], 'res': 'U'},
{'cr': ['N', 'N'], 'res': 'D'}]
conf = self.conf.copy()
conf['watchers'] = 'test_watcher1'
conf['__file__'] = '/etc/swift/swift.conf'
ret_config = {'test_watcher1': {'action': 'log', 'grace_age': '0'}}
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
return_value=ret_config), \
mock.patch('swift.obj.auditor.load_pkg_resource',
side_effect=[DarkDataWatcher]):
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
for cur in scenario:
def fake_direct_get_container(node, part, account, container,
prefix=None, limit=None):
self.assertEqual(part, 1)
self.assertEqual(limit, 1)
reply_type = cur['cr'][int(node['id']) - 1]
if reply_type == 'E':
raise ClientException("Emulated container server error")
if reply_type == 'N':
return {}, []
entry = {'bytes': 30968411,
'hash': '60303f4122966fe5925f045eb52d1129',
'name': '%s' % prefix,
'content_type': 'video/mp4',
'last_modified': '2017-08-15T03:30:57.693210'}
return {}, [entry]
self.logger.clear()
namespace = 'swift.obj.watchers.dark_data.'
with mock.patch(namespace + 'Ring', FakeRing2), \
mock.patch(namespace + 'direct_get_container',
fake_direct_get_container):
my_auditor.run_audit(mode='once')
# We inherit a common setUp with 3 objects, so 3 everywhere.
if cur['res'] == 'U':
unk_exp, ok_exp, dark_exp = 3, 0, 0
elif cur['res'] == 'G':
unk_exp, ok_exp, dark_exp = 0, 3, 0
else:
unk_exp, ok_exp, dark_exp = 0, 0, 3
log_lines = self.logger.get_lines_for_level('info')
for line in log_lines:
if not line.startswith('[audit-watcher test_watcher1] total'):
continue
words = line.split()
if not (words[3] == 'unknown' and
words[5] == 'ok' and
words[7] == 'dark'):
unittest.TestCase.fail('Syntax error in %r' % (line,))
try:
unk_cnt = int(words[4])
ok_cnt = int(words[6])
dark_cnt = int(words[8])
except ValueError:
unittest.TestCase.fail('Bad value in %r' % (line,))
if unk_cnt != unk_exp or ok_cnt != ok_exp or dark_cnt != dark_exp:
fmt = 'Expected unknown %d ok %d dark %d, got %r, for nodes %r'
msg = fmt % (unk_exp, ok_exp, dark_exp,
' '.join(words[3:]), cur['cr'])
self.fail(msg=msg)
if __name__ == '__main__':
unittest.main()