Improve object-updater's stats logging
The object updater has five different stats, but its logging only told you two of them (successes and failures), and it only told you after finishing all the async_pendings for a device. If you have a cluster that's been sick and has millions upon millions of async_pendings laying around, then your object-updaters are frustratingly silent. I've seen one cluster with around 8 million async_pendings per disk where the object-updaters only emitted stats every 12 hours. Yes, if you have StatsD logging set up properly, you can go look at your graphs and get real-time feedback on what it's doing. If you don't have that, all you get is a frustrating silence. Now, the object updater tells you all of its stats (successes, failures, quarantines due to bad pickles, unlinks, and errors), and it tells you incremental progress every five minutes. The logging at the end of a pass remains and has been expanded to also include all stats. Also included is a small change to what counts as an error: unmounted drives no longer do. The goal is that only abnormal things count as errors, like permission problems, malformed filenames, and so on. These are things that should never happen, but if they do, may require operator intervention. Drives fail, so logging an error upon encountering an unmounted drive is not useful. Change-Id: Idbddd507f0b633d14dffb7a9834fce93a10359ab
This commit is contained in:
parent
9a323e1989
commit
f64c00b00a
@ -909,6 +909,8 @@ objects_per_second 50 Maximum objects updated per second.
|
||||
system specs. 0 is unlimited.
|
||||
slowdown 0.01 Time in seconds to wait between objects.
|
||||
Deprecated in favor of objects_per_second.
|
||||
report_interval 300 Interval in seconds between logging
|
||||
statistics about the current update pass.
|
||||
recon_cache_path /var/cache/swift Path to recon cache
|
||||
nice_priority None Scheduling priority of server processes.
|
||||
Niceness values range from -20 (most
|
||||
|
@ -372,6 +372,12 @@ use = egg:swift#recon
|
||||
# objects_per_second instead.
|
||||
# slowdown = 0.01
|
||||
#
|
||||
# Log stats (at INFO level) every report_interval seconds. This
|
||||
# logging is per-process, so with concurrency > 1, the logs will
|
||||
# contain one stats log per worker process every report_interval
|
||||
# seconds.
|
||||
# report_interval = 300
|
||||
#
|
||||
# recon_cache_path = /var/cache/swift
|
||||
#
|
||||
# You can set scheduling priority of processes. Niceness values range from -20
|
||||
|
@ -36,6 +36,37 @@ from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE
|
||||
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
|
||||
|
||||
|
||||
class SweepStats(object):
|
||||
"""
|
||||
Stats bucket for an update sweep
|
||||
"""
|
||||
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
|
||||
unlinks=0):
|
||||
self.errors = errors
|
||||
self.failures = failures
|
||||
self.quarantines = quarantines
|
||||
self.successes = successes
|
||||
self.unlinks = unlinks
|
||||
|
||||
def copy(self):
|
||||
return type(self)(self.errors, self.failures, self.quarantines,
|
||||
self.successes, self.unlinks)
|
||||
|
||||
def since(self, other):
|
||||
return type(self)(self.errors - other.errors,
|
||||
self.failures - other.failures,
|
||||
self.quarantines - other.quarantines,
|
||||
self.successes - other.successes,
|
||||
self.unlinks - other.unlinks)
|
||||
|
||||
def reset(self):
|
||||
self.errors = 0
|
||||
self.failures = 0
|
||||
self.quarantines = 0
|
||||
self.successes = 0
|
||||
self.unlinks = 0
|
||||
|
||||
|
||||
class ObjectUpdater(Daemon):
|
||||
"""Update object information in container listings."""
|
||||
|
||||
@ -63,16 +94,18 @@ class ObjectUpdater(Daemon):
|
||||
objects_per_second))
|
||||
self.node_timeout = float(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.report_interval = float(conf.get('report_interval', 300))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
|
||||
self.stats = SweepStats()
|
||||
|
||||
def _listdir(self, path):
|
||||
try:
|
||||
return os.listdir(path)
|
||||
except OSError as e:
|
||||
self.stats.errors += 1
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(_('ERROR: Unable to access %(path)s: '
|
||||
'%(error)s') %
|
||||
{'path': path, 'error': e})
|
||||
@ -95,7 +128,9 @@ class ObjectUpdater(Daemon):
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in self._listdir(self.devices):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
# We don't count this as an error. The occasional
|
||||
# unmounted drive is part of normal cluster operations,
|
||||
# so a simple warning is sufficient.
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
@ -107,17 +142,22 @@ class ObjectUpdater(Daemon):
|
||||
else:
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
eventlet_monkey_patch()
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.stats.reset()
|
||||
forkbegin = time.time()
|
||||
self.object_sweep(os.path.join(self.devices, device))
|
||||
elapsed = time.time() - forkbegin
|
||||
self.logger.info(
|
||||
_('Object update sweep of %(device)s'
|
||||
' completed: %(elapsed).02fs, %(success)s successes'
|
||||
', %(fail)s failures'),
|
||||
('Object update sweep of %(device)s '
|
||||
'completed: %(elapsed).02fs, '
|
||||
'%(successes)d successes, %(failures)d failures, '
|
||||
'%(quarantines)d quarantines, '
|
||||
'%(unlinks)d unlinks, %(errors)d errors'),
|
||||
{'device': device, 'elapsed': elapsed,
|
||||
'success': self.successes, 'fail': self.failures})
|
||||
'success': self.stats.successes,
|
||||
'failures': self.stats.failures,
|
||||
'quarantines': self.stats.quarantines,
|
||||
'unlinks': self.stats.unlinks,
|
||||
'errors': self.stats.errors})
|
||||
sys.exit()
|
||||
while pids:
|
||||
pids.remove(os.wait()[0])
|
||||
@ -133,21 +173,29 @@ class ObjectUpdater(Daemon):
|
||||
"""Run the updater once."""
|
||||
self.logger.info(_('Begin object update single threaded sweep'))
|
||||
begin = time.time()
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.stats.reset()
|
||||
for device in self._listdir(self.devices):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
# We don't count this as an error. The occasional unmounted
|
||||
# drive is part of normal cluster operations, so a simple
|
||||
# warning is sufficient.
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
self.object_sweep(os.path.join(self.devices, device))
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
_('Object update single threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
|
||||
{'elapsed': elapsed, 'success': self.successes,
|
||||
'fail': self.failures})
|
||||
('Object update single-threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(successes)d successes, '
|
||||
'%(failures)d failures, '
|
||||
'%(quarantines)d quarantines, %(unlinks)d unlinks, '
|
||||
'%(errors)d errors'),
|
||||
{'elapsed': elapsed,
|
||||
'successes': self.stats.successes,
|
||||
'failures': self.stats.failures,
|
||||
'quarantines': self.stats.quarantines,
|
||||
'unlinks': self.stats.unlinks,
|
||||
'errors': self.stats.errors})
|
||||
dump_recon_cache({'object_updater_sweep': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
@ -158,6 +206,12 @@ class ObjectUpdater(Daemon):
|
||||
:param device: path to device
|
||||
"""
|
||||
start_time = time.time()
|
||||
last_status_update = start_time
|
||||
start_stats = self.stats.copy()
|
||||
my_pid = os.getpid()
|
||||
self.logger.info("Object update sweep starting on %s (pid: %d)",
|
||||
device, my_pid)
|
||||
|
||||
# loop through async pending dirs for all policies
|
||||
for asyncdir in self._listdir(device):
|
||||
# we only care about directories
|
||||
@ -170,6 +224,8 @@ class ObjectUpdater(Daemon):
|
||||
try:
|
||||
base, policy = split_policy_string(asyncdir)
|
||||
except PolicyError as e:
|
||||
# This isn't an error, but a misconfiguration. Logging a
|
||||
# warning should be sufficient.
|
||||
self.logger.warning(_('Directory %(directory)r does not map '
|
||||
'to a valid policy (%(error)s)') % {
|
||||
'directory': asyncdir, 'error': e})
|
||||
@ -186,6 +242,7 @@ class ObjectUpdater(Daemon):
|
||||
try:
|
||||
obj_hash, timestamp = update.split('-')
|
||||
except ValueError:
|
||||
self.stats.errors += 1
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(
|
||||
_('ERROR async pending file with unexpected '
|
||||
@ -193,7 +250,8 @@ class ObjectUpdater(Daemon):
|
||||
% (update_path))
|
||||
continue
|
||||
if obj_hash == last_obj_hash:
|
||||
self.logger.increment("unlinks")
|
||||
self.stats.unlinks += 1
|
||||
self.logger.increment('unlinks')
|
||||
os.unlink(update_path)
|
||||
else:
|
||||
self.process_object_update(update_path, device,
|
||||
@ -203,11 +261,47 @@ class ObjectUpdater(Daemon):
|
||||
self.objects_running_time = ratelimit_sleep(
|
||||
self.objects_running_time,
|
||||
self.max_objects_per_second)
|
||||
|
||||
now = time.time()
|
||||
if now - last_status_update >= self.report_interval:
|
||||
this_sweep = self.stats.since(start_stats)
|
||||
self.logger.info(
|
||||
('Object update sweep progress on %(device)s: '
|
||||
'%(elapsed).02fs, '
|
||||
'%(successes)d successes, %(failures)d failures, '
|
||||
'%(quarantines)d quarantines, '
|
||||
'%(unlinks)d unlinks, %(errors)d errors '
|
||||
'(pid: %(pid)d)'),
|
||||
{'device': device,
|
||||
'elapsed': now - start_time,
|
||||
'pid': my_pid,
|
||||
'successes': this_sweep.successes,
|
||||
'failures': this_sweep.failures,
|
||||
'quarantines': this_sweep.quarantines,
|
||||
'unlinks': this_sweep.unlinks,
|
||||
'errors': this_sweep.errors})
|
||||
last_status_update = now
|
||||
try:
|
||||
os.rmdir(prefix_path)
|
||||
except OSError:
|
||||
pass
|
||||
self.logger.timing_since('timing', start_time)
|
||||
sweep_totals = self.stats.since(start_stats)
|
||||
self.logger.info(
|
||||
('Object update sweep completed on %(device)s '
|
||||
'in %(elapsed).02fs seconds:, '
|
||||
'%(successes)d successes, %(failures)d failures, '
|
||||
'%(quarantines)d quarantines, '
|
||||
'%(unlinks)d unlinks, %(errors)d errors '
|
||||
'(pid: %(pid)d)'),
|
||||
{'device': device,
|
||||
'elapsed': time.time() - start_time,
|
||||
'pid': my_pid,
|
||||
'successes': sweep_totals.successes,
|
||||
'failures': sweep_totals.failures,
|
||||
'quarantines': sweep_totals.quarantines,
|
||||
'unlinks': sweep_totals.unlinks,
|
||||
'errors': sweep_totals.errors})
|
||||
|
||||
def process_object_update(self, update_path, device, policy):
|
||||
"""
|
||||
@ -222,6 +316,7 @@ class ObjectUpdater(Daemon):
|
||||
except Exception:
|
||||
self.logger.exception(
|
||||
_('ERROR Pickle problem, quarantining %s'), update_path)
|
||||
self.stats.quarantines += 1
|
||||
self.logger.increment('quarantines')
|
||||
target_path = os.path.join(device, 'quarantined', 'objects',
|
||||
os.path.basename(update_path))
|
||||
@ -249,14 +344,15 @@ class ObjectUpdater(Daemon):
|
||||
else:
|
||||
success = False
|
||||
if success:
|
||||
self.successes += 1
|
||||
self.stats.successes += 1
|
||||
self.logger.increment('successes')
|
||||
self.logger.debug('Update sent for %(obj)s %(path)s',
|
||||
{'obj': obj, 'path': update_path})
|
||||
self.logger.increment("unlinks")
|
||||
self.stats.unlinks += 1
|
||||
self.logger.increment('unlinks')
|
||||
os.unlink(update_path)
|
||||
else:
|
||||
self.failures += 1
|
||||
self.stats.failures += 1
|
||||
self.logger.increment('failures')
|
||||
self.logger.debug('Update failed for %(obj)s %(path)s',
|
||||
{'obj': obj, 'path': update_path})
|
||||
|
@ -25,7 +25,8 @@ from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from test import listen_zero
|
||||
from test.unit import (
|
||||
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn)
|
||||
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn,
|
||||
FakeLogger)
|
||||
from time import time
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
@ -248,6 +249,66 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
# a warning indicating that the '99' policy isn't valid
|
||||
check_with_idx('99', 1, should_skip=True)
|
||||
|
||||
def test_sweep_logs(self):
|
||||
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
|
||||
prefix_dir = os.path.join(asyncdir, 'abc')
|
||||
mkpath(prefix_dir)
|
||||
|
||||
for o, t in [('abc', 123), ('def', 234), ('ghi', 345),
|
||||
('jkl', 456), ('mno', 567)]:
|
||||
ohash = hash_path('account', 'container', o)
|
||||
o_path = os.path.join(prefix_dir, ohash + '-' +
|
||||
normalize_timestamp(t))
|
||||
write_pickle({}, o_path)
|
||||
|
||||
class MockObjectUpdater(object_updater.ObjectUpdater):
|
||||
def process_object_update(self, update_path, device, policy):
|
||||
os.unlink(update_path)
|
||||
self.stats.successes += 1
|
||||
self.stats.unlinks += 1
|
||||
|
||||
logger = FakeLogger()
|
||||
ou = MockObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.testdir,
|
||||
'interval': '1',
|
||||
'concurrency': '1',
|
||||
'report_interval': '10.0',
|
||||
'node_timeout': '5'}, logger=logger)
|
||||
|
||||
now = [time()]
|
||||
|
||||
def mock_time_function():
|
||||
rv = now[0]
|
||||
now[0] += 5
|
||||
return rv
|
||||
|
||||
# With 10s between updates, time() advancing 5s every time we look,
|
||||
# and 5 async_pendings on disk, we should get at least two progress
|
||||
# lines.
|
||||
with mock.patch('swift.obj.updater.time',
|
||||
mock.MagicMock(time=mock_time_function)):
|
||||
ou.object_sweep(self.sda1)
|
||||
|
||||
info_lines = logger.get_lines_for_level('info')
|
||||
self.assertEqual(4, len(info_lines))
|
||||
self.assertIn("sweep starting", info_lines[0])
|
||||
self.assertIn(self.sda1, info_lines[0])
|
||||
|
||||
self.assertIn("sweep progress", info_lines[1])
|
||||
# the space ensures it's a positive number
|
||||
self.assertIn(" 2 successes", info_lines[1])
|
||||
self.assertIn(self.sda1, info_lines[1])
|
||||
|
||||
self.assertIn("sweep progress", info_lines[2])
|
||||
self.assertIn(" 4 successes", info_lines[2])
|
||||
self.assertIn(self.sda1, info_lines[2])
|
||||
|
||||
self.assertIn("sweep complete", info_lines[3])
|
||||
self.assertIn(" 5 successes", info_lines[3])
|
||||
self.assertIn(self.sda1, info_lines[3])
|
||||
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once_with_disk_unmounted(self, mock_check_drive):
|
||||
mock_check_drive.return_value = False
|
||||
@ -286,7 +347,7 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
self.assertEqual([
|
||||
mock.call(self.devices_dir, 'sda1', True),
|
||||
], mock_check_drive.mock_calls)
|
||||
self.assertEqual(ou.logger.get_increment_counts(), {'errors': 1})
|
||||
self.assertEqual(ou.logger.get_increment_counts(), {})
|
||||
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once(self, mock_check_drive):
|
||||
|
Loading…
x
Reference in New Issue
Block a user