Use check_drive consistently
We added check_drive to the account/container servers to unify how all the storage wsgi servers treat device dirs/mounts. Thus pushes that unification down into the consistency engine. Drive-by: * use FakeLogger less * clean up some repeititon in probe utility for device re-"mounting" Related-Change-Id: I3362a6ebff423016bb367b4b6b322bb41ae08764 Change-Id: I941ffbc568ebfa5964d49964dc20c382a5e2ec2a
This commit is contained in:
parent
449d83fb0c
commit
feee399840
@ -28,13 +28,14 @@ import six
|
||||
|
||||
import swift.common.db
|
||||
from swift.account.backend import AccountBroker, DATADIR
|
||||
from swift.common.constraints import check_drive
|
||||
from swift.common.direct_client import direct_delete_container, \
|
||||
direct_delete_object, direct_get_container
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.utils import get_logger, whataremyips, ismount, \
|
||||
config_true_value, Timestamp
|
||||
from swift.common.utils import get_logger, whataremyips, config_true_value, \
|
||||
Timestamp
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.storage_policy import POLICIES, PolicyError
|
||||
|
||||
@ -133,8 +134,7 @@ class AccountReaper(Daemon):
|
||||
begin = time()
|
||||
try:
|
||||
for device in os.listdir(self.devices):
|
||||
if self.mount_check and not ismount(
|
||||
os.path.join(self.devices, device)):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
self.logger.debug(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
|
@ -28,10 +28,11 @@ from eventlet import GreenPool, sleep, Timeout
|
||||
from eventlet.green import subprocess
|
||||
|
||||
import swift.common.db
|
||||
from swift.common.constraints import check_drive
|
||||
from swift.common.direct_client import quote
|
||||
from swift.common.utils import get_logger, whataremyips, storage_directory, \
|
||||
renamer, mkdirs, lock_parent_directory, config_true_value, \
|
||||
unlink_older_than, dump_recon_cache, rsync_module_interpolation, ismount, \
|
||||
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
|
||||
json, Timestamp
|
||||
from swift.common import ring
|
||||
from swift.common.ring.utils import is_local_device
|
||||
@ -636,8 +637,8 @@ class Replicator(Daemon):
|
||||
node['replication_ip'],
|
||||
node['replication_port']):
|
||||
found_local = True
|
||||
if self.mount_check and not ismount(
|
||||
os.path.join(self.root, node['device'])):
|
||||
if not check_drive(self.root, node['device'],
|
||||
self.mount_check):
|
||||
self._add_failure_stats(
|
||||
[(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
@ -696,7 +697,7 @@ class ReplicatorRpc(object):
|
||||
return HTTPBadRequest(body='Invalid object type')
|
||||
op = args.pop(0)
|
||||
drive, partition, hsh = replicate_args
|
||||
if self.mount_check and not ismount(os.path.join(self.root, drive)):
|
||||
if not check_drive(self.root, drive, self.mount_check):
|
||||
return Response(status='507 %s is not mounted' % drive)
|
||||
db_file = os.path.join(self.root, drive,
|
||||
storage_directory(self.datadir, partition, hsh),
|
||||
|
@ -26,11 +26,12 @@ from tempfile import mkstemp
|
||||
from eventlet import spawn, Timeout
|
||||
|
||||
import swift.common.db
|
||||
from swift.common.constraints import check_drive
|
||||
from swift.container.backend import ContainerBroker, DATADIR
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger, config_true_value, ismount, \
|
||||
from swift.common.utils import get_logger, config_true_value, \
|
||||
dump_recon_cache, majority_size, Timestamp, ratelimit_sleep, \
|
||||
eventlet_monkey_patch
|
||||
from swift.common.daemon import Daemon
|
||||
@ -40,9 +41,9 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
|
||||
class ContainerUpdater(Daemon):
|
||||
"""Update container information in account listings."""
|
||||
|
||||
def __init__(self, conf):
|
||||
def __init__(self, conf, logger=None):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='container-updater')
|
||||
self.logger = logger or get_logger(conf, log_route='container-updater')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
@ -100,8 +101,8 @@ class ContainerUpdater(Daemon):
|
||||
"""
|
||||
paths = []
|
||||
for device in self._listdir(self.devices):
|
||||
dev_path = os.path.join(self.devices, device)
|
||||
if self.mount_check and not ismount(dev_path):
|
||||
dev_path = check_drive(self.devices, device, self.mount_check)
|
||||
if not dev_path:
|
||||
self.logger.warning(_('%s is not mounted'), device)
|
||||
continue
|
||||
con_path = os.path.join(dev_path, DATADIR)
|
||||
|
@ -289,9 +289,9 @@ class AuditorWorker(object):
|
||||
class ObjectAuditor(Daemon):
|
||||
"""Audit objects."""
|
||||
|
||||
def __init__(self, conf, **options):
|
||||
def __init__(self, conf, logger=None, **options):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='object-auditor')
|
||||
self.logger = logger or get_logger(conf, log_route='object-auditor')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
self.conf_zero_byte_fps = int(
|
||||
|
@ -62,7 +62,7 @@ from swift.common.request_helpers import is_sys_meta
|
||||
from swift.common.utils import mkdirs, Timestamp, \
|
||||
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
|
||||
fsync_dir, drop_buffer_cache, lock_path, write_pickle, \
|
||||
config_true_value, listdir, split_path, ismount, remove_file, \
|
||||
config_true_value, listdir, split_path, remove_file, \
|
||||
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
|
||||
tpool_reraise, MD5_OF_EMPTY_STRING, link_fd_to_path, o_tmpfile_supported, \
|
||||
O_TMPFILE, makedirs_count, replace_partition_in_path
|
||||
@ -429,11 +429,11 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
|
||||
shuffle(device_dirs)
|
||||
|
||||
for device in device_dirs:
|
||||
if mount_check and not \
|
||||
ismount(os.path.join(devices, device)):
|
||||
if not check_drive(devices, device, mount_check):
|
||||
if logger:
|
||||
logger.debug(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
'Skipping %s as it is not %s', device,
|
||||
'mounted' if mount_check else 'a dir')
|
||||
continue
|
||||
# loop through object dirs for all policies
|
||||
device_dir = os.path.join(devices, device)
|
||||
@ -1209,14 +1209,15 @@ class BaseDiskFileManager(object):
|
||||
:returns: full path to the device, None if the path to the device is
|
||||
not a proper mount point or directory.
|
||||
"""
|
||||
# we'll do some kind of check unless explicitly forbidden
|
||||
if mount_check is not False:
|
||||
if mount_check is False:
|
||||
# explicitly forbidden from syscall, just return path
|
||||
return join(self.devices, device)
|
||||
# we'll do some kind of check if not explicitly forbidden
|
||||
if mount_check or self.mount_check:
|
||||
mount_check = True
|
||||
else:
|
||||
mount_check = False
|
||||
return check_drive(self.devices, device, mount_check)
|
||||
return join(self.devices, device)
|
||||
|
||||
@contextmanager
|
||||
def replication_lock(self, device):
|
||||
|
@ -29,9 +29,10 @@ from eventlet import GreenPool, tpool, Timeout, sleep
|
||||
from eventlet.green import subprocess
|
||||
from eventlet.support.greenlets import GreenletExit
|
||||
|
||||
from swift.common.constraints import check_drive
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.utils import whataremyips, unlink_older_than, \
|
||||
compute_eta, get_logger, dump_recon_cache, ismount, \
|
||||
compute_eta, get_logger, dump_recon_cache, \
|
||||
rsync_module_interpolation, mkdirs, config_true_value, list_from_csv, \
|
||||
tpool_reraise, config_auto_int_value, storage_directory
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
@ -585,10 +586,9 @@ class ObjectReplicator(Daemon):
|
||||
and (override_devices is None
|
||||
or dev['device'] in override_devices))]:
|
||||
found_local = True
|
||||
dev_path = join(self.devices_dir, local_dev['device'])
|
||||
obj_path = join(dev_path, data_dir)
|
||||
tmp_path = join(dev_path, get_tmp_dir(policy))
|
||||
if self.mount_check and not ismount(dev_path):
|
||||
dev_path = check_drive(self.devices_dir, local_dev['device'],
|
||||
self.mount_check)
|
||||
if not dev_path:
|
||||
self._add_failure_stats(
|
||||
[(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
@ -597,6 +597,8 @@ class ObjectReplicator(Daemon):
|
||||
self.logger.warning(
|
||||
_('%s is not mounted'), local_dev['device'])
|
||||
continue
|
||||
obj_path = join(dev_path, data_dir)
|
||||
tmp_path = join(dev_path, get_tmp_dir(policy))
|
||||
unlink_older_than(tmp_path, time.time() -
|
||||
df_mgr.reclaim_age)
|
||||
if not os.path.exists(obj_path):
|
||||
@ -728,8 +730,9 @@ class ObjectReplicator(Daemon):
|
||||
if override_partitions and \
|
||||
job['partition'] not in override_partitions:
|
||||
continue
|
||||
dev_path = join(self.devices_dir, job['device'])
|
||||
if self.mount_check and not ismount(dev_path):
|
||||
dev_path = check_drive(self.devices_dir, job['device'],
|
||||
self.mount_check)
|
||||
if not dev_path:
|
||||
self._add_failure_stats([(failure_dev['replication_ip'],
|
||||
failure_dev['device'])
|
||||
for failure_dev in job['nodes']])
|
||||
|
@ -24,11 +24,11 @@ from random import random
|
||||
from eventlet import spawn, Timeout
|
||||
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_drive
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import get_logger, renamer, write_pickle, \
|
||||
dump_recon_cache, config_true_value, ismount, ratelimit_sleep, \
|
||||
eventlet_monkey_patch
|
||||
dump_recon_cache, config_true_value, ratelimit_sleep, eventlet_monkey_patch
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import split_policy_string, PolicyError
|
||||
@ -94,8 +94,7 @@ class ObjectUpdater(Daemon):
|
||||
# read from container ring to ensure it's fresh
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in self._listdir(self.devices):
|
||||
if self.mount_check and \
|
||||
not ismount(os.path.join(self.devices, device)):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
@ -137,8 +136,7 @@ class ObjectUpdater(Daemon):
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
for device in self._listdir(self.devices):
|
||||
if self.mount_check and \
|
||||
not ismount(os.path.join(self.devices, device)):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
|
@ -444,7 +444,7 @@ class ProbeTest(unittest.TestCase):
|
||||
def revive_drive(self, device):
|
||||
disabled_name = device + "X"
|
||||
if os.path.isdir(disabled_name):
|
||||
renamer(device + "X", device)
|
||||
renamer(disabled_name, device)
|
||||
else:
|
||||
os.system('sudo mount %s' % device)
|
||||
|
||||
|
@ -806,15 +806,13 @@ class TestReaper(unittest.TestCase):
|
||||
devices = prepare_data_dir()
|
||||
r = init_reaper(devices)
|
||||
|
||||
with patch('swift.account.reaper.ismount', lambda x: True):
|
||||
with patch(
|
||||
'swift.account.reaper.AccountReaper.reap_device') as foo:
|
||||
with patch('swift.account.reaper.AccountReaper.reap_device') as foo, \
|
||||
unit.mock_check_drive(ismount=True):
|
||||
r.run_once()
|
||||
self.assertEqual(foo.called, 1)
|
||||
|
||||
with patch('swift.account.reaper.ismount', lambda x: False):
|
||||
with patch(
|
||||
'swift.account.reaper.AccountReaper.reap_device') as foo:
|
||||
with patch('swift.account.reaper.AccountReaper.reap_device') as foo, \
|
||||
unit.mock_check_drive(ismount=False):
|
||||
r.run_once()
|
||||
self.assertFalse(foo.called)
|
||||
|
||||
|
@ -38,7 +38,6 @@ from swift.common.swob import HTTPException
|
||||
|
||||
from test import unit
|
||||
from test.unit.common.test_db import ExampleBroker
|
||||
from test.unit import with_tempdir
|
||||
|
||||
|
||||
TEST_ACCOUNT_NAME = 'a c t'
|
||||
@ -517,13 +516,14 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(replicator.mount_check, True)
|
||||
self.assertEqual(replicator.port, 6200)
|
||||
|
||||
def mock_ismount(path):
|
||||
self.assertEqual(path,
|
||||
os.path.join(replicator.root,
|
||||
replicator.ring.devs[0]['device']))
|
||||
return False
|
||||
def mock_check_drive(root, device, mount_check):
|
||||
self.assertEqual(root, replicator.root)
|
||||
self.assertEqual(device, replicator.ring.devs[0]['device'])
|
||||
self.assertEqual(mount_check, True)
|
||||
return None
|
||||
|
||||
self._patch(patch.object, db_replicator, 'ismount', mock_ismount)
|
||||
self._patch(patch.object, db_replicator, 'check_drive',
|
||||
mock_check_drive)
|
||||
replicator.run_once()
|
||||
|
||||
self.assertEqual(
|
||||
@ -552,7 +552,6 @@ class TestDBReplicator(unittest.TestCase):
|
||||
|
||||
self._patch(patch.object, db_replicator, 'whataremyips',
|
||||
lambda *a, **kw: ['1.1.1.1'])
|
||||
self._patch(patch.object, db_replicator, 'ismount', lambda *args: True)
|
||||
self._patch(patch.object, db_replicator, 'unlink_older_than',
|
||||
mock_unlink_older_than)
|
||||
self._patch(patch.object, db_replicator, 'roundrobin_datadirs',
|
||||
@ -560,13 +559,19 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self._patch(patch.object, replicator.cpool, 'spawn_n', mock_spawn_n)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(ismount=True) as mocks:
|
||||
mock_os.path.isdir.return_value = True
|
||||
replicator.run_once()
|
||||
mock_os.path.isdir.assert_called_with(
|
||||
os.path.join(replicator.root,
|
||||
replicator.ring.devs[0]['device'],
|
||||
replicator.datadir))
|
||||
self.assertEqual([
|
||||
mock.call(os.path.join(
|
||||
replicator.root,
|
||||
replicator.ring.devs[0]['device'])),
|
||||
], mocks['ismount'].call_args_list)
|
||||
|
||||
def test_usync(self):
|
||||
fake_http = ReplHttp()
|
||||
@ -871,27 +876,28 @@ class TestDBReplicator(unittest.TestCase):
|
||||
'/some/foo/some_device/deeper/and/deeper'))
|
||||
|
||||
def test_dispatch_no_arg_pop(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
response = rpc.dispatch(('a',), 'arg')
|
||||
self.assertEqual('Invalid object type', response.body)
|
||||
self.assertEqual(400, response.status_int)
|
||||
|
||||
def test_dispatch_drive_not_mounted(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, True)
|
||||
|
||||
def mock_ismount(path):
|
||||
self.assertEqual('/drive', path)
|
||||
return False
|
||||
|
||||
self._patch(patch.object, db_replicator, 'ismount', mock_ismount)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=True)
|
||||
|
||||
with unit.mock_check_drive() as mocks:
|
||||
response = rpc.dispatch(('drive', 'part', 'hash'), ['method'])
|
||||
self.assertEqual([mock.call(os.path.join('/drive'))],
|
||||
mocks['ismount'].call_args_list)
|
||||
|
||||
self.assertEqual('507 drive is not mounted', response.status)
|
||||
self.assertEqual(507, response.status_int)
|
||||
|
||||
def test_dispatch_unexpected_operation_db_does_not_exist(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
def mock_mkdirs(path):
|
||||
self.assertEqual('/drive/tmp', path)
|
||||
@ -899,7 +905,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self._patch(patch.object, db_replicator, 'mkdirs', mock_mkdirs)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.return_value = False
|
||||
response = rpc.dispatch(('drive', 'part', 'hash'), ['unexpected'])
|
||||
|
||||
@ -907,7 +914,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(404, response.status_int)
|
||||
|
||||
def test_dispatch_operation_unexpected(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
self._patch(patch.object, db_replicator, 'mkdirs', lambda *args: True)
|
||||
|
||||
@ -919,7 +927,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
rpc.unexpected = unexpected_method
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.return_value = True
|
||||
response = rpc.dispatch(('drive', 'part', 'hash'),
|
||||
['unexpected', 'arg1', 'arg2'])
|
||||
@ -928,12 +937,14 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual('unexpected-called', response)
|
||||
|
||||
def test_dispatch_operation_rsync_then_merge(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
self._patch(patch.object, db_replicator, 'renamer', lambda *args: True)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.return_value = True
|
||||
response = rpc.dispatch(('drive', 'part', 'hash'),
|
||||
['rsync_then_merge', 'arg1', 'arg2'])
|
||||
@ -945,12 +956,14 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(204, response.status_int)
|
||||
|
||||
def test_dispatch_operation_complete_rsync(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
self._patch(patch.object, db_replicator, 'renamer', lambda *args: True)
|
||||
|
||||
with patch('swift.common.db_replicator.os', new=mock.MagicMock(
|
||||
wraps=os)) as mock_os:
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.side_effect = [False, True]
|
||||
response = rpc.dispatch(('drive', 'part', 'hash'),
|
||||
['complete_rsync', 'arg1', 'arg2'])
|
||||
@ -962,10 +975,12 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(204, response.status_int)
|
||||
|
||||
def test_rsync_then_merge_db_does_not_exist(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.return_value = False
|
||||
response = rpc.rsync_then_merge('drive', '/data/db.db',
|
||||
('arg1', 'arg2'))
|
||||
@ -974,10 +989,12 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(404, response.status_int)
|
||||
|
||||
def test_rsync_then_merge_old_does_not_exist(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.side_effect = [True, False]
|
||||
response = rpc.rsync_then_merge('drive', '/data/db.db',
|
||||
('arg1', 'arg2'))
|
||||
@ -988,7 +1005,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(404, response.status_int)
|
||||
|
||||
def test_rsync_then_merge_with_objects(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
def mock_renamer(old, new):
|
||||
self.assertEqual('/drive/tmp/arg1', old)
|
||||
@ -997,7 +1015,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self._patch(patch.object, db_replicator, 'renamer', mock_renamer)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.return_value = True
|
||||
response = rpc.rsync_then_merge('drive', '/data/db.db',
|
||||
['arg1', 'arg2'])
|
||||
@ -1005,10 +1024,12 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(204, response.status_int)
|
||||
|
||||
def test_complete_rsync_db_does_not_exist(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.return_value = True
|
||||
response = rpc.complete_rsync('drive', '/data/db.db',
|
||||
['arg1', 'arg2'])
|
||||
@ -1017,10 +1038,12 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(404, response.status_int)
|
||||
|
||||
def test_complete_rsync_old_file_does_not_exist(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.return_value = False
|
||||
response = rpc.complete_rsync('drive', '/data/db.db',
|
||||
['arg1', 'arg2'])
|
||||
@ -1031,7 +1054,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(404, response.status_int)
|
||||
|
||||
def test_complete_rsync_rename(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
|
||||
def mock_exists(path):
|
||||
if path == '/data/db.db':
|
||||
@ -1046,7 +1070,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self._patch(patch.object, db_replicator, 'renamer', mock_renamer)
|
||||
|
||||
with patch('swift.common.db_replicator.os',
|
||||
new=mock.MagicMock(wraps=os)) as mock_os:
|
||||
new=mock.MagicMock(wraps=os)) as mock_os, \
|
||||
unit.mock_check_drive(isdir=True):
|
||||
mock_os.path.exists.side_effect = [False, True]
|
||||
response = rpc.complete_rsync('drive', '/data/db.db',
|
||||
['arg1', 'arg2'])
|
||||
@ -1054,7 +1079,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(204, response.status_int)
|
||||
|
||||
def test_replicator_sync_with_broker_replication_missing_table(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
rpc.logger = unit.debug_logger()
|
||||
broker = FakeBroker()
|
||||
broker.get_repl_missing_table = True
|
||||
@ -1069,6 +1095,7 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self._patch(patch.object, db_replicator, 'quarantine_db',
|
||||
mock_quarantine_db)
|
||||
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
response = rpc.sync(broker, ('remote_sync', 'hash_', 'id_',
|
||||
'created_at', 'put_timestamp',
|
||||
'delete_timestamp', 'metadata'))
|
||||
@ -1082,12 +1109,14 @@ class TestDBReplicator(unittest.TestCase):
|
||||
"Quarantining DB %s" % broker])
|
||||
|
||||
def test_replicator_sync(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
broker = FakeBroker()
|
||||
|
||||
response = rpc.sync(broker, (broker.get_sync() + 1, 12345, 'id_',
|
||||
'created_at', 'put_timestamp',
|
||||
'delete_timestamp',
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
response = rpc.sync(broker, (
|
||||
broker.get_sync() + 1, 12345, 'id_',
|
||||
'created_at', 'put_timestamp', 'delete_timestamp',
|
||||
'{"meta1": "data1", "meta2": "data2"}'))
|
||||
|
||||
self.assertEqual({'meta1': 'data1', 'meta2': 'data2'},
|
||||
@ -1100,20 +1129,26 @@ class TestDBReplicator(unittest.TestCase):
|
||||
self.assertEqual(200, response.status_int)
|
||||
|
||||
def test_rsync_then_merge(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
rpc.rsync_then_merge('sda1', '/srv/swift/blah', ('a', 'b'))
|
||||
|
||||
def test_merge_items(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
fake_broker = FakeBroker()
|
||||
args = ('a', 'b')
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
rpc.merge_items(fake_broker, args)
|
||||
self.assertEqual(fake_broker.args, args)
|
||||
|
||||
def test_merge_syncs(self):
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
fake_broker = FakeBroker()
|
||||
args = ('a', 'b')
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
rpc.merge_syncs(fake_broker, args)
|
||||
self.assertEqual(fake_broker.args, (args[0],))
|
||||
|
||||
@ -1121,10 +1156,13 @@ class TestDBReplicator(unittest.TestCase):
|
||||
drive = '/some/root'
|
||||
db_file = __file__
|
||||
args = ['old_file']
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
resp = rpc.complete_rsync(drive, db_file, args)
|
||||
self.assertTrue(isinstance(resp, HTTPException))
|
||||
self.assertEqual(404, resp.status_int)
|
||||
with unit.mock_check_drive(isdir=True):
|
||||
resp = rpc.complete_rsync(drive, 'new_db_file', args)
|
||||
self.assertTrue(isinstance(resp, HTTPException))
|
||||
self.assertEqual(404, resp.status_int)
|
||||
@ -1132,7 +1170,8 @@ class TestDBReplicator(unittest.TestCase):
|
||||
def test_complete_rsync(self):
|
||||
drive = mkdtemp()
|
||||
args = ['old_file']
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)
|
||||
rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker,
|
||||
mount_check=False)
|
||||
os.mkdir('%s/tmp' % drive)
|
||||
old_file = '%s/tmp/old_file' % drive
|
||||
new_file = '%s/new_db_file' % drive
|
||||
@ -1145,7 +1184,7 @@ class TestDBReplicator(unittest.TestCase):
|
||||
finally:
|
||||
rmtree(drive)
|
||||
|
||||
@with_tempdir
|
||||
@unit.with_tempdir
|
||||
def test_empty_suffix_and_hash_dirs_get_cleanedup(self, tempdir):
|
||||
datadir = os.path.join(tempdir, 'containers')
|
||||
db_path = ('450/afd/7089ab48d955ab0851fc51cc17a34afd/'
|
||||
@ -1538,6 +1577,8 @@ def attach_fake_replication_rpc(rpc, replicate_hook=None):
|
||||
print('REPLICATE: %s, %s, %r' % (self.path, op, sync_args))
|
||||
replicate_args = self.path.lstrip('/').split('/')
|
||||
args = [op] + list(sync_args)
|
||||
with unit.mock_check_drive(isdir=not rpc.mount_check,
|
||||
ismount=rpc.mount_check):
|
||||
swob_response = rpc.dispatch(replicate_args, args)
|
||||
resp = FakeHTTPResponse(swob_response)
|
||||
if replicate_hook:
|
||||
@ -1565,7 +1606,7 @@ class TestReplicatorSync(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.root = mkdtemp()
|
||||
self.rpc = self.replicator_rpc(
|
||||
self.root, self.datadir, self.backend, False,
|
||||
self.root, self.datadir, self.backend, mount_check=False,
|
||||
logger=unit.debug_logger())
|
||||
FakeReplConnection = attach_fake_replication_rpc(self.rpc)
|
||||
self._orig_ReplConnection = db_replicator.ReplConnection
|
||||
@ -1621,7 +1662,9 @@ class TestReplicatorSync(unittest.TestCase):
|
||||
return True
|
||||
daemon._rsync_file = _rsync_file
|
||||
with mock.patch('swift.common.db_replicator.whataremyips',
|
||||
new=lambda *a, **kw: [node['replication_ip']]):
|
||||
new=lambda *a, **kw: [node['replication_ip']]), \
|
||||
unit.mock_check_drive(isdir=not daemon.mount_check,
|
||||
ismount=daemon.mount_check):
|
||||
daemon.run_once()
|
||||
return daemon
|
||||
|
||||
|
@ -30,7 +30,7 @@ from swift.common.utils import Timestamp, encode_timestamps
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit.common import test_db_replicator
|
||||
from test.unit import patch_policies, make_timestamp_iter, FakeLogger
|
||||
from test.unit import patch_policies, make_timestamp_iter, mock_check_drive
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
@ -176,6 +176,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
node = random.choice([n for n in self._ring.devs
|
||||
if n['id'] != local_node['id']])
|
||||
info = broker.get_replication_info()
|
||||
with mock_check_drive(ismount=True):
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertFalse(success)
|
||||
|
||||
@ -1024,8 +1025,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
def update_sync_store(self, broker):
|
||||
raise OSError(1, '1')
|
||||
|
||||
logger = FakeLogger()
|
||||
daemon = replicator.ContainerReplicator({}, logger)
|
||||
daemon = replicator.ContainerReplicator({}, logger=self.logger)
|
||||
daemon.sync_store = FakeContainerSyncStore()
|
||||
ts_iter = make_timestamp_iter()
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
@ -1033,7 +1033,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
broker.initialize(timestamp.internal, POLICIES.default.idx)
|
||||
info = broker.get_replication_info()
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
log_lines = logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(log_lines))
|
||||
self.assertIn('Failed to update sync_store', log_lines[0])
|
||||
|
||||
|
@ -22,7 +22,6 @@ import itertools
|
||||
from contextlib import contextmanager
|
||||
from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger
|
||||
from time import gmtime
|
||||
from xml.dom import minidom
|
||||
import time
|
||||
@ -62,7 +61,7 @@ def save_globals():
|
||||
|
||||
@patch_policies
|
||||
class TestContainerController(unittest.TestCase):
|
||||
"""Test swift.container.server.ContainerController"""
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = os.path.join(
|
||||
mkdtemp(), 'tmp_test_container_server_ContainerController')
|
||||
@ -70,8 +69,10 @@ class TestContainerController(unittest.TestCase):
|
||||
rmtree(self.testdir)
|
||||
mkdirs(os.path.join(self.testdir, 'sda1'))
|
||||
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
|
||||
self.logger = debug_logger()
|
||||
self.controller = container_server.ContainerController(
|
||||
{'devices': self.testdir, 'mount_check': 'false'})
|
||||
{'devices': self.testdir, 'mount_check': 'false'},
|
||||
logger=self.logger)
|
||||
# some of the policy tests want at least two policies
|
||||
self.assertTrue(len(POLICIES) > 1)
|
||||
|
||||
@ -3194,7 +3195,6 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEqual(self.logger.get_lines_for_level('info'), [])
|
||||
|
||||
def test_GET_log_requests_true(self):
|
||||
self.controller.logger = FakeLogger()
|
||||
self.controller.log_requests = True
|
||||
|
||||
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
|
||||
@ -3203,7 +3203,6 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertTrue(self.controller.logger.log_dict['info'])
|
||||
|
||||
def test_GET_log_requests_false(self):
|
||||
self.controller.logger = FakeLogger()
|
||||
self.controller.log_requests = False
|
||||
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.controller)
|
||||
@ -3214,19 +3213,18 @@ class TestContainerController(unittest.TestCase):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'HEAD', 'REMOTE_ADDR': '1.2.3.4'})
|
||||
self.controller.logger = FakeLogger()
|
||||
with mock.patch(
|
||||
'time.gmtime', mock.MagicMock(side_effect=[gmtime(10001.0)])):
|
||||
with mock.patch(
|
||||
'time.time',
|
||||
mock.MagicMock(side_effect=[10000.0, 10001.0, 10002.0])):
|
||||
with mock.patch(
|
||||
'os.getpid', mock.MagicMock(return_value=1234)):
|
||||
with mock.patch('time.gmtime',
|
||||
mock.MagicMock(side_effect=[gmtime(10001.0)])), \
|
||||
mock.patch('time.time',
|
||||
mock.MagicMock(side_effect=[
|
||||
10000.0, 10001.0, 10002.0])), \
|
||||
mock.patch('os.getpid', mock.MagicMock(return_value=1234)):
|
||||
req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.controller.logger.log_dict['info'],
|
||||
[(('1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a/c" '
|
||||
'404 - "-" "-" "-" 2.0000 "-" 1234 0',), {})])
|
||||
info_lines = self.controller.logger.get_lines_for_level('info')
|
||||
self.assertEqual(info_lines, [
|
||||
'1.2.3.4 - - [01/Jan/1970:02:46:41 +0000] "HEAD /sda1/p/a/c" '
|
||||
'404 - "-" "-" "-" 2.0000 "-" 1234 0',
|
||||
])
|
||||
|
||||
|
||||
@patch_policies([
|
||||
|
@ -21,7 +21,7 @@ from contextlib import closing
|
||||
from gzip import GzipFile
|
||||
from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger
|
||||
from test.unit import debug_logger, mock_check_drive
|
||||
|
||||
from eventlet import spawn, Timeout
|
||||
|
||||
@ -55,6 +55,7 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
os.mkdir(self.devices_dir)
|
||||
self.sda1 = os.path.join(self.devices_dir, 'sda1')
|
||||
os.mkdir(self.sda1)
|
||||
self.logger = debug_logger('test')
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||
@ -71,7 +72,7 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
}
|
||||
if conf_updates:
|
||||
conf.update(conf_updates)
|
||||
return container_updater.ContainerUpdater(conf)
|
||||
return container_updater.ContainerUpdater(conf, logger=self.logger)
|
||||
|
||||
def test_creation(self):
|
||||
cu = self._get_container_updater({'concurrency': '2',
|
||||
@ -127,11 +128,8 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
check_bad({'slowdown': 'baz'})
|
||||
check_bad({'containers_per_second': 'quux'})
|
||||
|
||||
@mock.patch.object(container_updater, 'ismount')
|
||||
@mock.patch.object(container_updater.ContainerUpdater, 'container_sweep')
|
||||
def test_run_once_with_device_unmounted(self, mock_sweep, mock_ismount):
|
||||
|
||||
mock_ismount.return_value = False
|
||||
def test_run_once_with_device_unmounted(self, mock_sweep):
|
||||
cu = self._get_container_updater()
|
||||
containers_dir = os.path.join(self.sda1, DATADIR)
|
||||
os.mkdir(containers_dir)
|
||||
@ -146,9 +144,9 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
|
||||
mock_sweep.reset_mock()
|
||||
cu = self._get_container_updater({'mount_check': 'true'})
|
||||
cu.logger = FakeLogger()
|
||||
with mock_check_drive():
|
||||
cu.run_once()
|
||||
log_lines = cu.logger.get_lines_for_level('warning')
|
||||
log_lines = self.logger.get_lines_for_level('warning')
|
||||
self.assertGreater(len(log_lines), 0)
|
||||
msg = 'sda1 is not mounted'
|
||||
self.assertEqual(log_lines[0], msg)
|
||||
@ -237,10 +235,9 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
e = OSError('permission_denied')
|
||||
mock_listdir.side_effect = e
|
||||
cu = self._get_container_updater()
|
||||
cu.logger = FakeLogger()
|
||||
paths = cu.get_paths()
|
||||
self.assertEqual(paths, [])
|
||||
log_lines = cu.logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
msg = ('ERROR: Failed to get paths to drive partitions: '
|
||||
'permission_denied')
|
||||
self.assertEqual(log_lines[0], msg)
|
||||
@ -248,10 +245,9 @@ class TestContainerUpdater(unittest.TestCase):
|
||||
@mock.patch('os.listdir', return_value=['foo', 'bar'])
|
||||
def test_listdir_without_exception(self, mock_listdir):
|
||||
cu = self._get_container_updater()
|
||||
cu.logger = FakeLogger()
|
||||
path = cu._listdir('foo/bar/')
|
||||
self.assertEqual(path, ['foo', 'bar'])
|
||||
log_lines = cu.logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(len(log_lines), 0)
|
||||
|
||||
def test_unicode(self):
|
||||
|
@ -25,7 +25,7 @@ from hashlib import md5
|
||||
from tempfile import mkdtemp
|
||||
import textwrap
|
||||
from os.path import dirname, basename
|
||||
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
|
||||
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
||||
DEFAULT_TEST_EC_TYPE)
|
||||
from swift.obj import auditor, replicator
|
||||
from swift.obj.diskfile import (
|
||||
@ -66,7 +66,7 @@ class TestAuditor(unittest.TestCase):
|
||||
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
|
||||
self.devices = os.path.join(self.testdir, 'node')
|
||||
self.rcache = os.path.join(self.testdir, 'object.recon')
|
||||
self.logger = FakeLogger()
|
||||
self.logger = debug_logger()
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
mkdirs(os.path.join(self.devices, 'sda'))
|
||||
os.mkdir(os.path.join(self.devices, 'sdb'))
|
||||
@ -246,7 +246,8 @@ class TestAuditor(unittest.TestCase):
|
||||
writer.put(metadata)
|
||||
writer.commit(Timestamp(timestamp))
|
||||
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, FakeLogger(),
|
||||
self.logger.clear()
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
self.assertEqual(0, auditor_worker.quarantines) # sanity check
|
||||
auditor_worker.object_audit(
|
||||
@ -600,20 +601,19 @@ class TestAuditor(unittest.TestCase):
|
||||
self.assertEqual(auditor_worker.stats_buckets['OVER'], 2)
|
||||
|
||||
def test_object_run_logging(self):
|
||||
logger = FakeLogger()
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
||||
log_lines = logger.get_lines_for_level('info')
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertGreater(len(log_lines), 0)
|
||||
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
|
||||
|
||||
logger = FakeLogger()
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
||||
self.logger.clear()
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices,
|
||||
zero_byte_only_at_fps=50)
|
||||
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
||||
log_lines = logger.get_lines_for_level('info')
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertGreater(len(log_lines), 0)
|
||||
self.assertTrue(log_lines[0].index('ZBF - sda'))
|
||||
|
||||
@ -984,8 +984,7 @@ class TestAuditor(unittest.TestCase):
|
||||
|
||||
def _test_expired_object_is_ignored(self, zero_byte_fps):
|
||||
# verify that an expired object does not get mistaken for a tombstone
|
||||
audit = auditor.ObjectAuditor(self.conf)
|
||||
audit.logger = FakeLogger()
|
||||
audit = auditor.ObjectAuditor(self.conf, logger=self.logger)
|
||||
audit.log_time = 0
|
||||
now = time.time()
|
||||
write_diskfile(self.disk_file, Timestamp(now - 20),
|
||||
|
@ -41,11 +41,10 @@ import pyeclib.ec_iface
|
||||
|
||||
from eventlet import hubs, timeout, tpool
|
||||
from swift.obj.diskfile import MD5_OF_EMPTY_STRING, update_auditor_status
|
||||
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
||||
from test.unit import (mock as unit_mock, temptree, mock_check_drive,
|
||||
patch_policies, debug_logger, EMPTY_ETAG,
|
||||
make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
|
||||
requires_o_tmpfile_support, encode_frag_archive_bodies,
|
||||
mock_check_drive)
|
||||
requires_o_tmpfile_support, encode_frag_archive_bodies)
|
||||
from nose import SkipTest
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
@ -167,7 +166,8 @@ class TestDiskFileModuleMethods(unittest.TestCase):
|
||||
self.conf = dict(
|
||||
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
||||
timeout='300', stats_interval='1')
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, FakeLogger())
|
||||
self.logger = debug_logger()
|
||||
self.df_mgr = diskfile.DiskFileManager(self.conf, logger=self.logger)
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
@ -456,11 +456,8 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
self.assertEqual(locations, expected)
|
||||
|
||||
def test_skipping_unmounted_devices(self):
|
||||
def mock_ismount(path):
|
||||
return path.endswith('sdp')
|
||||
|
||||
with mock.patch('swift.obj.diskfile.ismount', mock_ismount):
|
||||
with temptree([]) as tmpdir:
|
||||
with temptree([]) as tmpdir, mock_check_drive() as mocks:
|
||||
mocks['ismount'].side_effect = lambda path: path.endswith('sdp')
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects",
|
||||
"2607", "df3",
|
||||
"ec2871fe724411f91787462f97d30df3"))
|
||||
@ -482,14 +479,15 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
"sdp", "2607", POLICIES[0])])
|
||||
|
||||
# Do it again, this time with a logger.
|
||||
ml = mock.MagicMock()
|
||||
logger = debug_logger()
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=True, logger=ml)]
|
||||
ml.debug.assert_called_once_with(
|
||||
'Skipping %s as it is not mounted',
|
||||
'sdq')
|
||||
devices=tmpdir, mount_check=True, logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping sdq as it is not mounted',
|
||||
], debug_lines)
|
||||
|
||||
def test_skipping_files(self):
|
||||
with temptree([]) as tmpdir:
|
||||
@ -512,14 +510,38 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
"sdp", "2607", POLICIES[0])])
|
||||
|
||||
# Do it again, this time with a logger.
|
||||
ml = mock.MagicMock()
|
||||
logger = debug_logger('test')
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False, logger=ml)]
|
||||
ml.debug.assert_called_once_with(
|
||||
'Skipping %s: Not a directory' %
|
||||
os.path.join(tmpdir, "garbage"))
|
||||
devices=tmpdir, mount_check=False, logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping garbage as it is not a dir',
|
||||
], debug_lines)
|
||||
logger.clear()
|
||||
with mock_check_drive(isdir=True):
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False, logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping %s: Not a directory' % os.path.join(
|
||||
tmpdir, "garbage"),
|
||||
], debug_lines)
|
||||
logger.clear()
|
||||
with mock_check_drive() as mocks:
|
||||
mocks['ismount'].side_effect = lambda path: (
|
||||
False if path.endswith('garbage') else True)
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition, loc.policy)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=True, logger=logger)]
|
||||
debug_lines = logger.get_lines_for_level('debug')
|
||||
self.assertEqual([
|
||||
'Skipping garbage as it is not mounted',
|
||||
], debug_lines)
|
||||
|
||||
def test_only_catch_expected_errors(self):
|
||||
# Crazy exceptions should still escape object_audit_location_generator
|
||||
@ -558,7 +580,8 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
|
||||
|
||||
# Pretend that some time passed between each partition
|
||||
with mock.patch('os.stat') as mock_stat:
|
||||
with mock.patch('os.stat') as mock_stat, \
|
||||
mock_check_drive(isdir=True):
|
||||
mock_stat.return_value.st_mtime = time() - 60
|
||||
# Auditor starts, there are two partitions to check
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
@ -569,6 +592,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
# the generator and restarts There is now only one remaining
|
||||
# partition to check
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
with mock_check_drive(isdir=True):
|
||||
gen.next()
|
||||
|
||||
# There are no more remaining partitions
|
||||
@ -577,6 +601,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
# There are no partitions to check if the auditor restarts another
|
||||
# time and the status files have not been cleared
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
with mock_check_drive(isdir=True):
|
||||
self.assertRaises(StopIteration, gen.next)
|
||||
|
||||
# Reset status file
|
||||
@ -586,6 +611,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
# check two partitions again, because the remaining
|
||||
# partitions were empty and a new listdir was executed
|
||||
gen = diskfile.object_audit_location_generator(tmpdir, False)
|
||||
with mock_check_drive(isdir=True):
|
||||
gen.next()
|
||||
gen.next()
|
||||
|
||||
@ -864,9 +890,9 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
tomb_file = timestamp.internal + '.ts'
|
||||
for policy in POLICIES:
|
||||
for unexpected in unexpected_files:
|
||||
self.logger.clear()
|
||||
files = [unexpected, tomb_file]
|
||||
df_mgr = self.df_router[policy]
|
||||
df_mgr.logger = FakeLogger()
|
||||
datadir = os.path.join('/srv/node/sdb1/',
|
||||
diskfile.get_data_dir(policy))
|
||||
|
||||
@ -874,7 +900,6 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
expected = {'ts_file': os.path.join(datadir, tomb_file)}
|
||||
self._assertDictContainsSubset(expected, results)
|
||||
|
||||
log_lines = df_mgr.logger.get_lines_for_level('warning')
|
||||
self.assertTrue(
|
||||
log_lines[0].startswith(
|
||||
@ -962,31 +987,31 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
def test_replication_one_per_device_deprecation(self):
|
||||
conf = dict(**self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 1)
|
||||
|
||||
conf = dict(replication_concurrency_per_device='0', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 0)
|
||||
|
||||
conf = dict(replication_concurrency_per_device='2', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
|
||||
conf = dict(replication_concurrency_per_device=2, **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
|
||||
# Check backward compatibility
|
||||
conf = dict(replication_one_per_device='true', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 1)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device is deprecated',
|
||||
log_lines[-1])
|
||||
|
||||
conf = dict(replication_one_per_device='false', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 0)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device is deprecated',
|
||||
@ -995,7 +1020,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
# If defined, new parameter has precedence
|
||||
conf = dict(replication_concurrency_per_device='2',
|
||||
replication_one_per_device='true', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device ignored',
|
||||
@ -1003,7 +1028,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
conf = dict(replication_concurrency_per_device='2',
|
||||
replication_one_per_device='false', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 2)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device ignored',
|
||||
@ -1011,7 +1036,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
|
||||
conf = dict(replication_concurrency_per_device='0',
|
||||
replication_one_per_device='true', **self.conf)
|
||||
mgr = diskfile.DiskFileManager(conf, FakeLogger())
|
||||
mgr = diskfile.DiskFileManager(conf, self.logger)
|
||||
self.assertEqual(mgr.replication_concurrency_per_device, 0)
|
||||
log_lines = mgr.logger.get_lines_for_level('warning')
|
||||
self.assertIn('replication_one_per_device ignored',
|
||||
@ -1124,12 +1149,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
||||
self.assertTrue(lock_exc is None)
|
||||
|
||||
def test_missing_splice_warning(self):
|
||||
logger = FakeLogger()
|
||||
with mock.patch('swift.common.splice.splice._c_splice', None):
|
||||
self.conf['splice'] = 'yes'
|
||||
mgr = diskfile.DiskFileManager(self.conf, logger)
|
||||
mgr = diskfile.DiskFileManager(self.conf, logger=self.logger)
|
||||
|
||||
warnings = logger.get_lines_for_level('warning')
|
||||
warnings = self.logger.get_lines_for_level('warning')
|
||||
self.assertGreater(len(warnings), 0)
|
||||
self.assertTrue('splice()' in warnings[-1])
|
||||
self.assertFalse(mgr.use_splice)
|
||||
|
@ -30,7 +30,7 @@ from eventlet.green import subprocess
|
||||
from eventlet import Timeout
|
||||
|
||||
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
||||
mocked_http_conn, FakeLogger)
|
||||
mocked_http_conn, FakeLogger, mock_check_drive)
|
||||
from swift.common import utils
|
||||
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
|
||||
storage_directory)
|
||||
@ -454,6 +454,20 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
self.assertEqual(jobs_by_pol_part[part]['path'],
|
||||
os.path.join(self.objects_1, part[1:]))
|
||||
|
||||
def test_collect_jobs_unmounted(self):
|
||||
with mock_check_drive() as mocks:
|
||||
jobs = self.replicator.collect_jobs()
|
||||
self.assertEqual(jobs, [])
|
||||
self.assertEqual(mocks['ismount'].mock_calls, [])
|
||||
self.assertEqual(len(mocks['isdir'].mock_calls), 2)
|
||||
|
||||
self.replicator.mount_check = True
|
||||
with mock_check_drive() as mocks:
|
||||
jobs = self.replicator.collect_jobs()
|
||||
self.assertEqual(jobs, [])
|
||||
self.assertEqual(mocks['isdir'].mock_calls, [])
|
||||
self.assertEqual(len(mocks['ismount'].mock_calls), 2)
|
||||
|
||||
def test_collect_jobs_failure_report_with_auditor_stats_json(self):
|
||||
devs = [
|
||||
{'id': 0, 'device': 'sda', 'zone': 0,
|
||||
|
@ -24,21 +24,21 @@ from gzip import GzipFile
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from test import listen_zero
|
||||
from test.unit import FakeLogger, make_timestamp_iter
|
||||
from test.unit import debug_logger, patch_policies, mocked_http_conn
|
||||
from test.unit import (
|
||||
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn)
|
||||
from time import time
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
from eventlet import spawn, Timeout
|
||||
|
||||
from swift.obj import updater as object_updater
|
||||
from swift.obj.diskfile import (ASYNCDIR_BASE, get_async_dir, DiskFileManager,
|
||||
get_tmp_dir)
|
||||
from swift.obj.diskfile import (
|
||||
ASYNCDIR_BASE, get_async_dir, DiskFileManager, get_tmp_dir)
|
||||
from swift.common.ring import RingData
|
||||
from swift.common import utils
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
|
||||
write_pickle
|
||||
from swift.common.utils import (
|
||||
hash_path, normalize_timestamp, mkdirs, write_pickle)
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
@ -146,11 +146,10 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.testdir,
|
||||
}
|
||||
daemon = object_updater.ObjectUpdater(conf)
|
||||
daemon.logger = FakeLogger()
|
||||
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
|
||||
paths = daemon._listdir('foo/bar')
|
||||
self.assertEqual([], paths)
|
||||
log_lines = daemon.logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
msg = ('ERROR: Unable to access foo/bar: permission_denied')
|
||||
self.assertEqual(log_lines[0], msg)
|
||||
|
||||
@ -162,10 +161,9 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.testdir,
|
||||
}
|
||||
daemon = object_updater.ObjectUpdater(conf)
|
||||
daemon.logger = FakeLogger()
|
||||
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
|
||||
path = daemon._listdir('foo/bar/')
|
||||
log_lines = daemon.logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(len(log_lines), 0)
|
||||
self.assertEqual(path, ['foo', 'bar'])
|
||||
|
||||
@ -250,9 +248,9 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
# a warning indicating that the '99' policy isn't valid
|
||||
check_with_idx('99', 1, should_skip=True)
|
||||
|
||||
@mock.patch.object(object_updater, 'ismount')
|
||||
def test_run_once_with_disk_unmounted(self, mock_ismount):
|
||||
mock_ismount.return_value = False
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once_with_disk_unmounted(self, mock_check_drive):
|
||||
mock_check_drive.return_value = False
|
||||
ou = object_updater.ObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'false',
|
||||
@ -265,8 +263,12 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
os.mkdir(async_dir)
|
||||
ou.run_once()
|
||||
self.assertTrue(os.path.exists(async_dir))
|
||||
# mount_check == False means no call to ismount
|
||||
self.assertEqual([], mock_ismount.mock_calls)
|
||||
# each run calls check_device
|
||||
self.assertEqual([
|
||||
mock.call(self.devices_dir, 'sda1', False),
|
||||
mock.call(self.devices_dir, 'sda1', False),
|
||||
], mock_check_drive.mock_calls)
|
||||
mock_check_drive.reset_mock()
|
||||
|
||||
ou = object_updater.ObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
@ -281,15 +283,14 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
ou.run_once()
|
||||
self.assertTrue(os.path.exists(async_dir))
|
||||
self.assertTrue(os.path.exists(odd_dir)) # skipped - not mounted!
|
||||
# mount_check == True means ismount was checked
|
||||
self.assertEqual([
|
||||
mock.call(self.sda1),
|
||||
], mock_ismount.mock_calls)
|
||||
mock.call(self.devices_dir, 'sda1', True),
|
||||
], mock_check_drive.mock_calls)
|
||||
self.assertEqual(ou.logger.get_increment_counts(), {'errors': 1})
|
||||
|
||||
@mock.patch.object(object_updater, 'ismount')
|
||||
def test_run_once(self, mock_ismount):
|
||||
mock_ismount.return_value = True
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once(self, mock_check_drive):
|
||||
mock_check_drive.return_value = True
|
||||
ou = object_updater.ObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'false',
|
||||
@ -302,8 +303,12 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
os.mkdir(async_dir)
|
||||
ou.run_once()
|
||||
self.assertTrue(os.path.exists(async_dir))
|
||||
# mount_check == False means no call to ismount
|
||||
self.assertEqual([], mock_ismount.mock_calls)
|
||||
# each run calls check_device
|
||||
self.assertEqual([
|
||||
mock.call(self.devices_dir, 'sda1', False),
|
||||
mock.call(self.devices_dir, 'sda1', False),
|
||||
], mock_check_drive.mock_calls)
|
||||
mock_check_drive.reset_mock()
|
||||
|
||||
ou = object_updater.ObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
@ -317,11 +322,10 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
os.mkdir(odd_dir)
|
||||
ou.run_once()
|
||||
self.assertTrue(os.path.exists(async_dir))
|
||||
self.assertTrue(not os.path.exists(odd_dir))
|
||||
# mount_check == True means ismount was checked
|
||||
self.assertFalse(os.path.exists(odd_dir))
|
||||
self.assertEqual([
|
||||
mock.call(self.sda1),
|
||||
], mock_ismount.mock_calls)
|
||||
mock.call(self.devices_dir, 'sda1', True),
|
||||
], mock_check_drive.mock_calls)
|
||||
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
odir = os.path.join(async_dir, ohash[-3:])
|
||||
|
Loading…
Reference in New Issue
Block a user