relinker: Add /recon/relinker endpoint and drop progress stats
To further benefit the stats capturing for the relinker, drop partition progress to a new relinker.recon recon cache and add a new recon endpoint: GET /recon/relinker To gather get live relinking progress data: $ curl http://127.0.0.3:6030/recon/relinker |python -mjson.tool { "devices": { "sdb3": { "parts_done": 523, "policies": { "1": { "next_part_power": 11, "start_time": 1618998724.845616, "stats": { "errors": 0, "files": 1630, "hash_dirs": 1630, "linked": 1630, "policies": 1, "removed": 0 }, "timestamp": 1618998730.24672, "total_parts": 1029, "total_time": 5.400741815567017 }}, "start_time": 1618998724.845946, "stats": { "errors": 0, "files": 836, "hash_dirs": 836, "linked": 836, "removed": 0 }, "timestamp": 1618998730.24672, "total_parts": 523, "total_time": 5.400741815567017 }, "sdb7": { "parts_done": 506, "policies": { "1": { "next_part_power": 11, "part_power": 10, "parts_done": 506, "start_time": 1618998724.845616, "stats": { "errors": 0, "files": 794, "hash_dirs": 794, "linked": 794, "removed": 0 }, "step": "relink", "timestamp": 1618998730.166175, "total_parts": 506, "total_time": 5.320528984069824 } }, "start_time": 1618998724.845616, "stats": { "errors": 0, "files": 794, "hash_dirs": 794, "linked": 794, "removed": 0 }, "timestamp": 1618998730.166175, "total_parts": 506, "total_time": 5.320528984069824 } }, "workers": { "100": { "drives": ["sda1"], "return_code": 0, "timestamp": 1618998730.166175} }} Also, add a constant DEFAULT_RECON_CACHE_PATH to help fix failing tests by mocking recon_cache_path, so that errors are not logged due to dump_recon_cache exceptions. Mock recon_cache_path more widely and assert no error logs more widely. Change-Id: I625147dadd44f008a7c48eb5d6ac1c54c4c0ef05
This commit is contained in:
parent
876cf34862
commit
4ce907a4ae
@ -638,3 +638,6 @@ use = egg:swift#xprofile
|
||||
# tombstones from their old locations, causing duplicate tombstones with
|
||||
# different inodes to be relinked to the next partition power location.
|
||||
# link_check_limit = 2
|
||||
#
|
||||
# stats_interval = 300.0
|
||||
# recon_cache_path = /var/cache/swift
|
||||
|
@ -22,12 +22,14 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import replace_partition_in_path, config_true_value, \
|
||||
audit_location_generator, get_logger, readconf, drop_privileges, \
|
||||
RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \
|
||||
non_negative_float, non_negative_int, config_auto_int_value
|
||||
non_negative_float, non_negative_int, config_auto_int_value, \
|
||||
dump_recon_cache
|
||||
from swift.obj import diskfile
|
||||
|
||||
|
||||
@ -39,6 +41,13 @@ STEP_CLEANUP = 'cleanup'
|
||||
EXIT_SUCCESS = 0
|
||||
EXIT_NO_APPLICABLE_POLICY = 2
|
||||
EXIT_ERROR = 1
|
||||
RECON_FILE = 'relinker.recon'
|
||||
DEFAULT_RECON_CACHE_PATH = '/var/cache/swift'
|
||||
DEFAULT_STATS_INTERVAL = 300.0
|
||||
|
||||
|
||||
def recursive_defaultdict():
|
||||
return defaultdict(recursive_defaultdict)
|
||||
|
||||
|
||||
def policy(policy_name_or_index):
|
||||
@ -48,9 +57,50 @@ def policy(policy_name_or_index):
|
||||
return value
|
||||
|
||||
|
||||
def _aggregate_stats(base_stats, update_stats):
|
||||
for key, value in update_stats.items():
|
||||
base_stats.setdefault(key, 0)
|
||||
base_stats[key] += value
|
||||
|
||||
return base_stats
|
||||
|
||||
|
||||
def _aggregate_recon_stats(base_stats, updated_stats):
|
||||
for k, v in updated_stats.items():
|
||||
if k == 'stats':
|
||||
base_stats['stats'] = _aggregate_stats(base_stats['stats'], v)
|
||||
elif k == "start_time":
|
||||
base_stats[k] = min(base_stats.get(k, v), v)
|
||||
elif k in ("timestamp", "total_time"):
|
||||
base_stats[k] = max(base_stats.get(k, 0), v)
|
||||
elif k in ('parts_done', 'total_parts'):
|
||||
base_stats[k] += v
|
||||
|
||||
return base_stats
|
||||
|
||||
|
||||
def _zero_stats():
|
||||
return {
|
||||
'hash_dirs': 0,
|
||||
'files': 0,
|
||||
'linked': 0,
|
||||
'removed': 0,
|
||||
'errors': 0}
|
||||
|
||||
|
||||
def _zero_collated_stats():
|
||||
return {
|
||||
'parts_done': 0,
|
||||
'total_parts': 0,
|
||||
'total_time': 0,
|
||||
'stats': _zero_stats()}
|
||||
|
||||
|
||||
class Relinker(object):
|
||||
def __init__(self, conf, logger, device_list=None, do_cleanup=False):
|
||||
self.conf = conf
|
||||
self.recon_cache = os.path.join(self.conf['recon_cache_path'],
|
||||
RECON_FILE)
|
||||
self.logger = logger
|
||||
self.device_list = device_list or []
|
||||
self.do_cleanup = do_cleanup
|
||||
@ -60,26 +110,65 @@ class Relinker(object):
|
||||
self.part_power = self.next_part_power = None
|
||||
self.diskfile_mgr = None
|
||||
self.dev_lock = None
|
||||
self._last_recon_update = time.time()
|
||||
self.stats_interval = float(conf.get(
|
||||
'stats_interval', DEFAULT_STATS_INTERVAL))
|
||||
self.diskfile_router = diskfile.DiskFileRouter(self.conf, self.logger)
|
||||
self._zero_stats()
|
||||
self.stats = _zero_stats()
|
||||
self.devices_data = recursive_defaultdict()
|
||||
self.policy_count = 0
|
||||
self.pid = os.getpid()
|
||||
|
||||
def _zero_stats(self):
|
||||
self.stats = {
|
||||
'hash_dirs': 0,
|
||||
'files': 0,
|
||||
'linked': 0,
|
||||
'removed': 0,
|
||||
'errors': 0,
|
||||
'policies': 0,
|
||||
}
|
||||
def _aggregate_dev_policy_stats(self):
|
||||
for dev_data in self.devices_data.values():
|
||||
dev_data.update(_zero_collated_stats())
|
||||
for policy_data in dev_data.get('policies', {}).values():
|
||||
_aggregate_recon_stats(dev_data, policy_data)
|
||||
|
||||
def _update_recon(self, device=None, force_dump=False):
|
||||
if not force_dump and self._last_recon_update + self.stats_interval \
|
||||
> time.time():
|
||||
# not time yet!
|
||||
return
|
||||
if device:
|
||||
# dump recon stats for the device
|
||||
num_parts_done = sum(
|
||||
1 for part_done in self.states["state"].values()
|
||||
if part_done)
|
||||
num_total_parts = len(self.states["state"])
|
||||
step = STEP_CLEANUP if self.do_cleanup else STEP_RELINK
|
||||
policy_dev_progress = {'step': step,
|
||||
'parts_done': num_parts_done,
|
||||
'total_parts': num_total_parts,
|
||||
'timestamp': time.time()}
|
||||
self.devices_data[device]['policies'][self.policy.idx].update(
|
||||
policy_dev_progress)
|
||||
|
||||
# aggregate device policy level values into device level
|
||||
self._aggregate_dev_policy_stats()
|
||||
|
||||
# We want to periodically update the worker recon timestamp so we know
|
||||
# it's still running
|
||||
recon_data = self._update_worker_stats(recon_dump=False)
|
||||
|
||||
recon_data.update({'devices': self.devices_data})
|
||||
if device:
|
||||
self.logger.debug("Updating recon for %s", device)
|
||||
else:
|
||||
self.logger.debug("Updating recon")
|
||||
self._last_recon_update = time.time()
|
||||
dump_recon_cache(recon_data, self.recon_cache, self.logger)
|
||||
|
||||
@property
|
||||
def total_errors(self):
|
||||
return sum([
|
||||
self.stats['errors'],
|
||||
self.stats.get('unmounted', 0),
|
||||
self.stats.get('unlistable_partitions', 0),
|
||||
])
|
||||
# first make sure the policy data is aggregated down to the device
|
||||
# level
|
||||
self._aggregate_dev_policy_stats()
|
||||
return sum([sum([
|
||||
dev.get('stats', {}).get('errors', 0),
|
||||
dev.get('stats', {}).get('unmounted', 0),
|
||||
dev.get('stats', {}).get('unlistable_partitions', 0)])
|
||||
for dev in self.devices_data.values()])
|
||||
|
||||
def devices_filter(self, _, devices):
|
||||
if self.device_list:
|
||||
@ -117,9 +206,24 @@ class Relinker(object):
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def hook_post_device(self, _):
|
||||
# initialise the device in recon.
|
||||
device = os.path.basename(device_path)
|
||||
self.devices_data[device]['policies'][self.policy.idx] = {
|
||||
'start_time': time.time(), 'stats': _zero_stats(),
|
||||
'part_power': self.states["part_power"],
|
||||
'next_part_power': self.states["next_part_power"]}
|
||||
self.stats = \
|
||||
self.devices_data[device]['policies'][self.policy.idx]['stats']
|
||||
self._update_recon(device)
|
||||
|
||||
def hook_post_device(self, device_path):
|
||||
os.close(self.dev_lock)
|
||||
self.dev_lock = None
|
||||
device = os.path.basename(device_path)
|
||||
pol_stats = self.devices_data[device]['policies'][self.policy.idx]
|
||||
total_time = time.time() - pol_stats['start_time']
|
||||
pol_stats.update({'total_time': total_time, 'stats': self.stats})
|
||||
self._update_recon(device, force_dump=True)
|
||||
|
||||
def partitions_filter(self, datadir_path, partitions):
|
||||
# Remove all non partitions first (eg: auditor_status_ALL.json)
|
||||
@ -265,8 +369,10 @@ class Relinker(object):
|
||||
if part)
|
||||
step = STEP_CLEANUP if self.do_cleanup else STEP_RELINK
|
||||
num_total_parts = len(self.states["state"])
|
||||
self.logger.info("Step: %s Device: %s Policy: %s Partitions: %d/%d" % (
|
||||
step, device, self.policy.name, num_parts_done, num_total_parts))
|
||||
self.logger.info(
|
||||
"Step: %s Device: %s Policy: %s Partitions: %d/%d",
|
||||
step, device, self.policy.name, num_parts_done, num_total_parts)
|
||||
self._update_recon(device)
|
||||
|
||||
def hashes_filter(self, suff_path, hashes):
|
||||
hashes = list(hashes)
|
||||
@ -431,6 +537,11 @@ class Relinker(object):
|
||||
'Error invalidating suffix for %s: %r',
|
||||
hash_path, exc)
|
||||
|
||||
def place_policy_stat(self, dev, policy, stat, value):
|
||||
stats = self.devices_data[dev]['policies'][policy.idx].setdefault(
|
||||
"stats", _zero_stats())
|
||||
stats[stat] = stats.get(stat, 0) + value
|
||||
|
||||
def process_policy(self, policy):
|
||||
self.logger.info(
|
||||
'Processing files for policy %s under %s (cleanup=%s)',
|
||||
@ -444,6 +555,7 @@ class Relinker(object):
|
||||
"next_part_power": self.next_part_power,
|
||||
"state": {},
|
||||
}
|
||||
audit_stats = {}
|
||||
|
||||
locations = audit_location_generator(
|
||||
self.conf['devices'],
|
||||
@ -457,7 +569,7 @@ class Relinker(object):
|
||||
hook_post_partition=self.hook_post_partition,
|
||||
hashes_filter=self.hashes_filter,
|
||||
logger=self.logger,
|
||||
error_counter=self.stats,
|
||||
error_counter=audit_stats,
|
||||
yield_hash_dirs=True
|
||||
)
|
||||
if self.conf['files_per_second'] > 0:
|
||||
@ -471,8 +583,30 @@ class Relinker(object):
|
||||
continue
|
||||
self.process_location(hash_path, new_hash_path)
|
||||
|
||||
# any unmounted devices don't trigger the pre_device trigger.
|
||||
# so we'll deal with them here.
|
||||
for dev in audit_stats.get('unmounted', []):
|
||||
self.place_policy_stat(dev, policy, 'unmounted', 1)
|
||||
|
||||
# Further unlistable_partitions doesn't trigger the post_device, so
|
||||
# we also need to deal with them here.
|
||||
for datadir in audit_stats.get('unlistable_partitions', []):
|
||||
device_path, _ = os.path.split(datadir)
|
||||
device = os.path.basename(device_path)
|
||||
self.place_policy_stat(device, policy, 'unlistable_partitions', 1)
|
||||
|
||||
def _update_worker_stats(self, recon_dump=True, return_code=None):
|
||||
worker_stats = {'devices': self.device_list,
|
||||
'timestamp': time.time(),
|
||||
'return_code': return_code}
|
||||
worker_data = {"workers": {str(self.pid): worker_stats}}
|
||||
if recon_dump:
|
||||
dump_recon_cache(worker_data, self.recon_cache, self.logger)
|
||||
return worker_data
|
||||
|
||||
def run(self):
|
||||
self._zero_stats()
|
||||
num_policies = 0
|
||||
self._update_worker_stats()
|
||||
for policy in self.conf['policies']:
|
||||
self.policy = policy
|
||||
policy.object_ring = None # Ensure it will be reloaded
|
||||
@ -484,13 +618,16 @@ class Relinker(object):
|
||||
if self.do_cleanup != part_power_increased:
|
||||
continue
|
||||
|
||||
self.stats['policies'] += 1
|
||||
num_policies += 1
|
||||
self.process_policy(policy)
|
||||
|
||||
policies = self.stats.pop('policies')
|
||||
if not policies:
|
||||
# Some stat collation happens during _update_recon and we want to force
|
||||
# this to happen at the end of the run
|
||||
self._update_recon(force_dump=True)
|
||||
if not num_policies:
|
||||
self.logger.warning(
|
||||
"No policy found to increase the partition power.")
|
||||
self._update_worker_stats(return_code=EXIT_NO_APPLICABLE_POLICY)
|
||||
return EXIT_NO_APPLICABLE_POLICY
|
||||
|
||||
if self.total_errors > 0:
|
||||
@ -502,34 +639,49 @@ class Relinker(object):
|
||||
log_method = self.logger.info
|
||||
status = EXIT_SUCCESS
|
||||
|
||||
hash_dirs = self.stats.pop('hash_dirs')
|
||||
files = self.stats.pop('files')
|
||||
linked = self.stats.pop('linked')
|
||||
removed = self.stats.pop('removed')
|
||||
action_errors = self.stats.pop('errors')
|
||||
unmounted = self.stats.pop('unmounted', 0)
|
||||
stats = _zero_stats()
|
||||
for dev_stats in self.devices_data.values():
|
||||
stats = _aggregate_stats(stats, dev_stats.get('stats', {}))
|
||||
hash_dirs = stats.pop('hash_dirs')
|
||||
files = stats.pop('files')
|
||||
linked = stats.pop('linked')
|
||||
removed = stats.pop('removed')
|
||||
action_errors = stats.pop('errors')
|
||||
unmounted = stats.pop('unmounted', 0)
|
||||
if unmounted:
|
||||
self.logger.warning('%d disks were unmounted', unmounted)
|
||||
listdir_errors = self.stats.pop('unlistable_partitions', 0)
|
||||
listdir_errors = stats.pop('unlistable_partitions', 0)
|
||||
if listdir_errors:
|
||||
self.logger.warning(
|
||||
'There were %d errors listing partition directories',
|
||||
listdir_errors)
|
||||
if self.stats:
|
||||
if stats:
|
||||
self.logger.warning(
|
||||
'There were unexpected errors while enumerating disk '
|
||||
'files: %r', self.stats)
|
||||
'files: %r', stats)
|
||||
|
||||
log_method(
|
||||
'%d hash dirs processed (cleanup=%s) (%d files, %d linked, '
|
||||
'%d removed, %d errors)', hash_dirs, self.do_cleanup, files,
|
||||
linked, removed, action_errors + listdir_errors)
|
||||
|
||||
self._update_worker_stats(return_code=status)
|
||||
return status
|
||||
|
||||
|
||||
def _reset_recon(recon_cache, logger):
|
||||
device_progress_recon = {'devices': {}, 'workers': {}}
|
||||
dump_recon_cache(device_progress_recon, recon_cache, logger)
|
||||
|
||||
|
||||
def parallel_process(do_cleanup, conf, logger=None, device_list=None):
|
||||
logger = logger or logging.getLogger()
|
||||
|
||||
# initialise recon dump for collection
|
||||
# Lets start by always deleting last run's stats
|
||||
recon_cache = os.path.join(conf['recon_cache_path'], RECON_FILE)
|
||||
_reset_recon(recon_cache, logger)
|
||||
|
||||
device_list = sorted(set(device_list or os.listdir(conf['devices'])))
|
||||
workers = conf['workers']
|
||||
if workers == 'auto':
|
||||
@ -636,6 +788,10 @@ def main(args):
|
||||
type=non_negative_float, dest='files_per_second',
|
||||
help='Used to limit I/O. Zero implies no limit '
|
||||
'(default: no limit).')
|
||||
parser.add_argument('--stats-interval', default=None,
|
||||
type=non_negative_float, dest='stats_interval',
|
||||
help='Emit stats to recon roughly every N seconds. '
|
||||
'(default: %d).' % DEFAULT_STATS_INTERVAL)
|
||||
parser.add_argument(
|
||||
'--workers', default=None, type=auto_or_int, help=(
|
||||
'Process devices across N workers '
|
||||
@ -689,6 +845,11 @@ def main(args):
|
||||
'link_check_limit': (
|
||||
args.link_check_limit if args.link_check_limit is not None
|
||||
else non_negative_int(conf.get('link_check_limit', 2))),
|
||||
'recon_cache_path': conf.get('recon_cache_path',
|
||||
DEFAULT_RECON_CACHE_PATH),
|
||||
'stats_interval': non_negative_float(
|
||||
args.stats_interval or conf.get('stats_interval',
|
||||
DEFAULT_STATS_INTERVAL)),
|
||||
})
|
||||
return parallel_process(
|
||||
args.action == 'cleanup', conf, logger, args.device_list)
|
||||
|
@ -56,6 +56,8 @@ class ReconMiddleware(object):
|
||||
'account.recon')
|
||||
self.drive_recon_cache = os.path.join(self.recon_cache_path,
|
||||
'drive.recon')
|
||||
self.relink_recon_cache = os.path.join(self.recon_cache_path,
|
||||
"relinker.recon")
|
||||
self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
|
||||
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
|
||||
|
||||
@ -65,20 +67,26 @@ class ReconMiddleware(object):
|
||||
self.rings.append(os.path.join(swift_dir,
|
||||
policy.ring_name + '.ring.gz'))
|
||||
|
||||
def _from_recon_cache(self, cache_keys, cache_file, openr=open):
|
||||
def _from_recon_cache(self, cache_keys, cache_file, openr=open,
|
||||
ignore_missing=False):
|
||||
"""retrieve values from a recon cache file
|
||||
|
||||
:params cache_keys: list of cache items to retrieve
|
||||
:params cache_file: cache file to retrieve items from.
|
||||
:params openr: open to use [for unittests]
|
||||
:params ignore_missing: Some recon stats are very temporary, in this
|
||||
case it would be better to not log if things are missing.
|
||||
:return: dict of cache items and their values or none if not found
|
||||
"""
|
||||
try:
|
||||
with openr(cache_file, 'r') as f:
|
||||
recondata = json.load(f)
|
||||
return dict((key, recondata.get(key)) for key in cache_keys)
|
||||
except IOError:
|
||||
self.logger.exception(_('Error reading recon cache file'))
|
||||
except IOError as err:
|
||||
if err.errno == errno.ENOENT and ignore_missing:
|
||||
pass
|
||||
else:
|
||||
self.logger.exception(_('Error reading recon cache file'))
|
||||
except ValueError:
|
||||
self.logger.exception(_('Error parsing recon cache file'))
|
||||
except Exception:
|
||||
@ -334,6 +342,14 @@ class ReconMiddleware(object):
|
||||
|
||||
return time.time()
|
||||
|
||||
def get_relinker_info(self):
|
||||
"""get relinker info, if any"""
|
||||
|
||||
stat_keys = ['devices', 'workers']
|
||||
return self._from_recon_cache(stat_keys,
|
||||
self.relink_recon_cache,
|
||||
ignore_missing=True)
|
||||
|
||||
def GET(self, req):
|
||||
root, rcheck, rtype = req.split_path(1, 3, True)
|
||||
all_rtypes = ['account', 'container', 'object']
|
||||
@ -378,6 +394,8 @@ class ReconMiddleware(object):
|
||||
content = self.get_time()
|
||||
elif rcheck == "sharding":
|
||||
content = self.get_sharding_info()
|
||||
elif rcheck == "relinker":
|
||||
content = self.get_relinker_info()
|
||||
else:
|
||||
content = "Invalid path: %s" % req.path
|
||||
return Response(request=req, status="404 Not Found",
|
||||
|
@ -3311,8 +3311,8 @@ def audit_location_generator(devices, datadir, suffix='',
|
||||
for device in device_dir:
|
||||
if mount_check and not ismount(os.path.join(devices, device)):
|
||||
if error_counter is not None:
|
||||
error_counter.setdefault('unmounted', 0)
|
||||
error_counter['unmounted'] += 1
|
||||
error_counter.setdefault('unmounted', [])
|
||||
error_counter['unmounted'].append(device)
|
||||
if logger:
|
||||
logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
@ -3325,8 +3325,8 @@ def audit_location_generator(devices, datadir, suffix='',
|
||||
except OSError as e:
|
||||
# NB: listdir ignores non-existent datadir_path
|
||||
if error_counter is not None:
|
||||
error_counter.setdefault('unlistable_partitions', 0)
|
||||
error_counter['unlistable_partitions'] += 1
|
||||
error_counter.setdefault('unlistable_partitions', [])
|
||||
error_counter['unlistable_partitions'].append(datadir_path)
|
||||
if logger:
|
||||
logger.warning(_('Skipping %(datadir)s because %(err)s'),
|
||||
{'datadir': datadir_path, 'err': e})
|
||||
|
@ -225,6 +225,7 @@ class FakeRing(Ring):
|
||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||
# this is set higher, or R^2 for R replicas
|
||||
self.set_replicas(replicas)
|
||||
self._next_part_power = None
|
||||
self._reload()
|
||||
|
||||
def has_changed(self):
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -15,6 +15,7 @@
|
||||
|
||||
import array
|
||||
from contextlib import contextmanager
|
||||
import errno
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
@ -29,6 +30,7 @@ from swift.common import ring, utils
|
||||
from swift.common.swob import Request
|
||||
from swift.common.middleware import recon
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import patch_policies
|
||||
|
||||
|
||||
@ -160,6 +162,9 @@ class FakeRecon(object):
|
||||
def fake_sharding(self):
|
||||
return {"sharding_stats": "1"}
|
||||
|
||||
def fake_relinker(self):
|
||||
return {"relinktest": "1"}
|
||||
|
||||
def fake_updater(self, recon_type):
|
||||
self.fake_updater_rtype = recon_type
|
||||
return {'updatertest': "1"}
|
||||
@ -205,8 +210,10 @@ class FakeRecon(object):
|
||||
def nocontent(self):
|
||||
return None
|
||||
|
||||
def raise_IOError(self, *args, **kwargs):
|
||||
raise IOError
|
||||
def raise_IOError(self, errno=None):
|
||||
mock_obj = mock.MagicMock()
|
||||
mock_obj.side_effect = IOError(errno, str(errno))
|
||||
return mock_obj
|
||||
|
||||
def raise_ValueError(self, *args, **kwargs):
|
||||
raise ValueError
|
||||
@ -235,6 +242,7 @@ class TestReconSuccess(TestCase):
|
||||
self.real_from_cache = self.app._from_recon_cache
|
||||
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
|
||||
self.frecon = FakeRecon()
|
||||
self.app.logger = debug_logger()
|
||||
|
||||
# replace hash md5 implementation of the md5_hash_for_file function
|
||||
mock_hash_for_file = mock.patch(
|
||||
@ -476,11 +484,29 @@ class TestReconSuccess(TestCase):
|
||||
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
|
||||
|
||||
def test_from_recon_cache_ioerror(self):
|
||||
oart = self.frecon.raise_IOError
|
||||
oart = self.frecon.raise_IOError()
|
||||
self.app._from_recon_cache = self.real_from_cache
|
||||
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
|
||||
'test.cache', openr=oart)
|
||||
self.assertEqual(rv, {'notpresentkey': None, 'testkey1': None})
|
||||
self.assertIn('Error reading recon cache file: ',
|
||||
self.app.logger.get_lines_for_level('error'))
|
||||
# Now try with ignore_missing but not ENOENT
|
||||
self.app.logger.clear()
|
||||
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
|
||||
'test.cache', openr=oart,
|
||||
ignore_missing=True)
|
||||
self.assertEqual(rv, {'notpresentkey': None, 'testkey1': None})
|
||||
self.assertIn('Error reading recon cache file: ',
|
||||
self.app.logger.get_lines_for_level('error'))
|
||||
# Now try again with ignore_missing with ENOENT
|
||||
self.app.logger.clear()
|
||||
oart = self.frecon.raise_IOError(errno.ENOENT)
|
||||
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
|
||||
'test.cache', openr=oart,
|
||||
ignore_missing=True)
|
||||
self.assertEqual(rv, {'notpresentkey': None, 'testkey1': None})
|
||||
self.assertEqual(self.app.logger.get_lines_for_level('error'), [])
|
||||
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
|
||||
|
||||
def test_from_recon_cache_valueerror(self):
|
||||
@ -489,6 +515,8 @@ class TestReconSuccess(TestCase):
|
||||
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
|
||||
'test.cache', openr=oart)
|
||||
self.assertEqual(rv, {'notpresentkey': None, 'testkey1': None})
|
||||
self.assertIn('Error parsing recon cache file: ',
|
||||
self.app.logger.get_lines_for_level('error'))
|
||||
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
|
||||
|
||||
def test_from_recon_cache_exception(self):
|
||||
@ -497,6 +525,8 @@ class TestReconSuccess(TestCase):
|
||||
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
|
||||
'test.cache', openr=oart)
|
||||
self.assertEqual(rv, {'notpresentkey': None, 'testkey1': None})
|
||||
self.assertIn('Error retrieving recon data: ',
|
||||
self.app.logger.get_lines_for_level('error'))
|
||||
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
|
||||
|
||||
def test_get_mounted(self):
|
||||
@ -1189,6 +1219,88 @@ class TestReconSuccess(TestCase):
|
||||
'/var/cache/swift/container.recon'), {})])
|
||||
self.assertEqual(rv, from_cache_response)
|
||||
|
||||
def test_get_relinker_info(self):
|
||||
from_cache_response = {
|
||||
"devices": {
|
||||
"sdb3": {
|
||||
"parts_done": 523,
|
||||
"policies": {
|
||||
"1": {
|
||||
"next_part_power": 11,
|
||||
"start_time": 1618998724.845616,
|
||||
"stats": {
|
||||
"errors": 0,
|
||||
"files": 1630,
|
||||
"hash_dirs": 1630,
|
||||
"linked": 1630,
|
||||
"policies": 1,
|
||||
"removed": 0
|
||||
},
|
||||
"timestamp": 1618998730.24672,
|
||||
"total_parts": 1029,
|
||||
"total_time": 5.400741815567017
|
||||
}},
|
||||
"start_time": 1618998724.845946,
|
||||
"stats": {
|
||||
"errors": 0,
|
||||
"files": 836,
|
||||
"hash_dirs": 836,
|
||||
"linked": 836,
|
||||
"removed": 0
|
||||
},
|
||||
"timestamp": 1618998730.24672,
|
||||
"total_parts": 523,
|
||||
"total_time": 5.400741815567017
|
||||
},
|
||||
"sdb7": {
|
||||
"parts_done": 506,
|
||||
"policies": {
|
||||
"1": {
|
||||
"next_part_power": 11,
|
||||
"part_power": 10,
|
||||
"parts_done": 506,
|
||||
"start_time": 1618998724.845616,
|
||||
"stats": {
|
||||
"errors": 0,
|
||||
"files": 794,
|
||||
"hash_dirs": 794,
|
||||
"linked": 794,
|
||||
"removed": 0
|
||||
},
|
||||
"step": "relink",
|
||||
"timestamp": 1618998730.166175,
|
||||
"total_parts": 506,
|
||||
"total_time": 5.320528984069824
|
||||
}
|
||||
},
|
||||
"start_time": 1618998724.845616,
|
||||
"stats": {
|
||||
"errors": 0,
|
||||
"files": 794,
|
||||
"hash_dirs": 794,
|
||||
"linked": 794,
|
||||
"removed": 0
|
||||
},
|
||||
"timestamp": 1618998730.166175,
|
||||
"total_parts": 506,
|
||||
"total_time": 5.320528984069824
|
||||
}
|
||||
},
|
||||
"workers": {
|
||||
"100": {
|
||||
"drives": ["sda1"],
|
||||
"return_code": 0,
|
||||
"timestamp": 1618998730.166175}
|
||||
}}
|
||||
self.fakecache.fakeout_calls = []
|
||||
self.fakecache.fakeout = from_cache_response
|
||||
rv = self.app.get_relinker_info()
|
||||
self.assertEqual(self.fakecache.fakeout_calls,
|
||||
[((['devices', 'workers'],
|
||||
'/var/cache/swift/relinker.recon'),
|
||||
{'ignore_missing': True})])
|
||||
self.assertEqual(rv, from_cache_response)
|
||||
|
||||
|
||||
class TestReconMiddleware(unittest.TestCase):
|
||||
|
||||
@ -1222,6 +1334,7 @@ class TestReconMiddleware(unittest.TestCase):
|
||||
self.app.get_driveaudit_error = self.frecon.fake_driveaudit
|
||||
self.app.get_time = self.frecon.fake_time
|
||||
self.app.get_sharding_info = self.frecon.fake_sharding
|
||||
self.app.get_relinker_info = self.frecon.fake_relinker
|
||||
|
||||
def test_recon_get_mem(self):
|
||||
get_mem_resp = [b'{"memtest": "1"}']
|
||||
@ -1497,6 +1610,14 @@ class TestReconMiddleware(unittest.TestCase):
|
||||
resp = self.app(req.environ, start_response)
|
||||
self.assertEqual(resp, get_sharding_resp)
|
||||
|
||||
def test_recon_get_relink(self):
|
||||
get_recon_resp = [
|
||||
b'{"relinktest": "1"}']
|
||||
req = Request.blank('/recon/relinker',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = self.app(req.environ, start_response)
|
||||
self.assertEqual(resp, get_recon_resp)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -6611,7 +6611,7 @@ class TestAuditLocationGenerator(unittest.TestCase):
|
||||
dev_dir = os.path.join(devices, 'device_is_empty_dir')
|
||||
os.makedirs(dev_dir)
|
||||
|
||||
def assert_listdir_error(devices):
|
||||
def assert_listdir_error(devices, expected):
|
||||
logger = debug_logger()
|
||||
error_counter = {}
|
||||
locations = utils.audit_location_generator(
|
||||
@ -6620,19 +6620,23 @@ class TestAuditLocationGenerator(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual([], list(locations))
|
||||
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
|
||||
self.assertEqual({'unlistable_partitions': 1}, error_counter)
|
||||
self.assertEqual({'unlistable_partitions': expected},
|
||||
error_counter)
|
||||
|
||||
# file under devices/
|
||||
devices = os.path.join(tmpdir, 'devices3')
|
||||
os.makedirs(devices)
|
||||
with open(os.path.join(devices, 'device_is_file'), 'w'):
|
||||
pass
|
||||
assert_listdir_error(devices)
|
||||
listdir_error_data_dir = os.path.join(devices, 'device_is_file',
|
||||
'data')
|
||||
assert_listdir_error(devices, [listdir_error_data_dir])
|
||||
|
||||
# dir under devices/
|
||||
devices = os.path.join(tmpdir, 'devices4')
|
||||
device = os.path.join(devices, 'device')
|
||||
os.makedirs(device)
|
||||
expected_datadir = os.path.join(devices, 'device', 'data')
|
||||
assert_no_errors(devices)
|
||||
|
||||
# error for dir under devices/
|
||||
@ -6644,7 +6648,7 @@ class TestAuditLocationGenerator(unittest.TestCase):
|
||||
return orig_listdir(path)
|
||||
|
||||
with mock.patch('swift.common.utils.listdir', mocked):
|
||||
assert_listdir_error(devices)
|
||||
assert_listdir_error(devices, [expected_datadir])
|
||||
|
||||
# mount check error
|
||||
devices = os.path.join(tmpdir, 'devices5')
|
||||
@ -6669,7 +6673,7 @@ class TestAuditLocationGenerator(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual([], list(locations))
|
||||
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
|
||||
self.assertEqual({'unmounted': 1}, error_counter)
|
||||
self.assertEqual({'unmounted': ['device']}, error_counter)
|
||||
|
||||
|
||||
class TestGreenAsyncPile(unittest.TestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user