Track unlinks of async_pendings.
It's not sufficient to just look at swift.object-updater.successes to see the async_pending unlink rate. There are two different spots where unlinks happen: one when an async_pending has been successfully processed, and another when the updater notices multiple async_pendings for the same object. Both events are now tracked under the same name: swift.object-updater.unlinks. FakeLogger has now sprouted a couple of convenience methods for testing logged metrics. Fixed pep8 1.3.3's complaints in the files this diff touches. Also: bonus speling and, grammar fixes in the admin guide. Change-Id: I8c1493784adbe24ba2b5512615e87669b3d94505
This commit is contained in:
@@ -518,7 +518,7 @@ Metric Name Description
|
|||||||
`account-replicator.no_changes` Count of accounts found to be in sync.
|
`account-replicator.no_changes` Count of accounts found to be in sync.
|
||||||
`account-replicator.hashmatches` Count of accounts found to be in sync via hash
|
`account-replicator.hashmatches` Count of accounts found to be in sync via hash
|
||||||
comparison (`broker.merge_syncs` was called).
|
comparison (`broker.merge_syncs` was called).
|
||||||
`account-replicator.rsyncs` Count of completely missing accounts where were sent
|
`account-replicator.rsyncs` Count of completely missing accounts which were sent
|
||||||
via rsync.
|
via rsync.
|
||||||
`account-replicator.remote_merges` Count of syncs handled by sending entire database
|
`account-replicator.remote_merges` Count of syncs handled by sending entire database
|
||||||
via rsync.
|
via rsync.
|
||||||
@@ -628,7 +628,7 @@ Metric Name Description
|
|||||||
sychronization via deletion.
|
sychronization via deletion.
|
||||||
`container-sync.puts` Count of container database rows sync'ed by PUTing.
|
`container-sync.puts` Count of container database rows sync'ed by PUTing.
|
||||||
`container-sync.puts.timing` Timing data for each container database row
|
`container-sync.puts.timing` Timing data for each container database row
|
||||||
sychronization via PUTing.
|
synchronization via PUTing.
|
||||||
=============================== ====================================================
|
=============================== ====================================================
|
||||||
|
|
||||||
Metrics for `container-updater`:
|
Metrics for `container-updater`:
|
||||||
@@ -693,7 +693,7 @@ Metric Name Description
|
|||||||
`object-replicator.partition.update.timing` Timing data for partitions replicated which also
|
`object-replicator.partition.update.timing` Timing data for partitions replicated which also
|
||||||
belong on this node. This metric is not tracked
|
belong on this node. This metric is not tracked
|
||||||
per-device.
|
per-device.
|
||||||
`object-replicator.suffix.hashes` Count of suffix directories whose has (of filenames)
|
`object-replicator.suffix.hashes` Count of suffix directories whose hash (of filenames)
|
||||||
was recalculated.
|
was recalculated.
|
||||||
`object-replicator.suffix.syncs` Count of suffix directories replicated with rsync.
|
`object-replicator.suffix.syncs` Count of suffix directories replicated with rsync.
|
||||||
=================================================== ====================================================
|
=================================================== ====================================================
|
||||||
@@ -756,7 +756,12 @@ Metric Name Description
|
|||||||
`object-updater.quarantines` Count of async_pending container updates which were
|
`object-updater.quarantines` Count of async_pending container updates which were
|
||||||
corrupted and moved to quarantine.
|
corrupted and moved to quarantine.
|
||||||
`object-updater.successes` Count of successful container updates.
|
`object-updater.successes` Count of successful container updates.
|
||||||
`object-updater.failures` Count of failed continer updates.
|
`object-updater.failures` Count of failed container updates.
|
||||||
|
`object-updater.unlinks` Count of async_pending files unlinked. An
|
||||||
|
async_pending file is unlinked either when it is
|
||||||
|
successfully processed or when the replicator sees
|
||||||
|
that there is a newer async_pending file for the
|
||||||
|
same object.
|
||||||
============================ ====================================================
|
============================ ====================================================
|
||||||
|
|
||||||
Metrics for `proxy-server` (in the table, `<type>` is the proxy-server
|
Metrics for `proxy-server` (in the table, `<type>` is the proxy-server
|
||||||
@@ -869,7 +874,7 @@ Object Auditor
|
|||||||
--------------
|
--------------
|
||||||
|
|
||||||
On system failures, the XFS file system can sometimes truncate files it's
|
On system failures, the XFS file system can sometimes truncate files it's
|
||||||
trying to write and produce zero byte files. The object-auditor will catch
|
trying to write and produce zero-byte files. The object-auditor will catch
|
||||||
these problems but in the case of a system crash it would be advisable to run
|
these problems but in the case of a system crash it would be advisable to run
|
||||||
an extra, less rate limited sweep to check for these specific files. You can
|
an extra, less rate limited sweep to check for these specific files. You can
|
||||||
run this command as follows:
|
run this command as follows:
|
||||||
@@ -927,7 +932,7 @@ Swift Oldies are processes that have just been around for a long
|
|||||||
time. There's nothing necessarily wrong with this, but it might
|
time. There's nothing necessarily wrong with this, but it might
|
||||||
indicate a hung process if you regularly upgrade and reload/restart
|
indicate a hung process if you regularly upgrade and reload/restart
|
||||||
services. You might have so many servers that you don't notice when a
|
services. You might have so many servers that you don't notice when a
|
||||||
reload/restart fails, `swift-oldies` can help with this.
|
reload/restart fails; `swift-oldies` can help with this.
|
||||||
|
|
||||||
For example, if you upgraded and reloaded/restarted everything 2 days
|
For example, if you upgraded and reloaded/restarted everything 2 days
|
||||||
ago, and you've already cleaned up any orphans with `swift-orphans`,
|
ago, and you've already cleaned up any orphans with `swift-orphans`,
|
||||||
|
|||||||
@@ -90,7 +90,8 @@ class ObjectUpdater(Daemon):
|
|||||||
forkbegin = time.time()
|
forkbegin = time.time()
|
||||||
self.object_sweep(os.path.join(self.devices, device))
|
self.object_sweep(os.path.join(self.devices, device))
|
||||||
elapsed = time.time() - forkbegin
|
elapsed = time.time() - forkbegin
|
||||||
self.logger.info(_('Object update sweep of %(device)s'
|
self.logger.info(
|
||||||
|
_('Object update sweep of %(device)s'
|
||||||
' completed: %(elapsed).02fs, %(success)s successes'
|
' completed: %(elapsed).02fs, %(success)s successes'
|
||||||
', %(fail)s failures'),
|
', %(fail)s failures'),
|
||||||
{'device': device, 'elapsed': elapsed,
|
{'device': device, 'elapsed': elapsed,
|
||||||
@@ -121,7 +122,8 @@ class ObjectUpdater(Daemon):
|
|||||||
continue
|
continue
|
||||||
self.object_sweep(os.path.join(self.devices, device))
|
self.object_sweep(os.path.join(self.devices, device))
|
||||||
elapsed = time.time() - begin
|
elapsed = time.time() - begin
|
||||||
self.logger.info(_('Object update single threaded sweep completed: '
|
self.logger.info(
|
||||||
|
_('Object update single threaded sweep completed: '
|
||||||
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
|
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
|
||||||
{'elapsed': elapsed, 'success': self.successes,
|
{'elapsed': elapsed, 'success': self.successes,
|
||||||
'fail': self.failures})
|
'fail': self.failures})
|
||||||
@@ -156,6 +158,7 @@ class ObjectUpdater(Daemon):
|
|||||||
% (update_path))
|
% (update_path))
|
||||||
continue
|
continue
|
||||||
if obj_hash == last_obj_hash:
|
if obj_hash == last_obj_hash:
|
||||||
|
self.logger.increment("unlinks")
|
||||||
os.unlink(update_path)
|
os.unlink(update_path)
|
||||||
else:
|
else:
|
||||||
self.process_object_update(update_path, device)
|
self.process_object_update(update_path, device)
|
||||||
@@ -180,8 +183,9 @@ class ObjectUpdater(Daemon):
|
|||||||
self.logger.exception(
|
self.logger.exception(
|
||||||
_('ERROR Pickle problem, quarantining %s'), update_path)
|
_('ERROR Pickle problem, quarantining %s'), update_path)
|
||||||
self.logger.increment('quarantines')
|
self.logger.increment('quarantines')
|
||||||
renamer(update_path, os.path.join(device,
|
renamer(update_path, os.path.join(
|
||||||
'quarantined', 'objects', os.path.basename(update_path)))
|
device, 'quarantined', 'objects',
|
||||||
|
os.path.basename(update_path)))
|
||||||
return
|
return
|
||||||
successes = update.get('successes', [])
|
successes = update.get('successes', [])
|
||||||
part, nodes = self.get_container_ring().get_nodes(
|
part, nodes = self.get_container_ring().get_nodes(
|
||||||
@@ -204,6 +208,7 @@ class ObjectUpdater(Daemon):
|
|||||||
self.logger.increment('successes')
|
self.logger.increment('successes')
|
||||||
self.logger.debug(_('Update sent for %(obj)s %(path)s'),
|
self.logger.debug(_('Update sent for %(obj)s %(path)s'),
|
||||||
{'obj': obj, 'path': update_path})
|
{'obj': obj, 'path': update_path})
|
||||||
|
self.logger.increment("unlinks")
|
||||||
os.unlink(update_path)
|
os.unlink(update_path)
|
||||||
else:
|
else:
|
||||||
self.failures += 1
|
self.failures += 1
|
||||||
|
|||||||
@@ -141,6 +141,17 @@ class FakeLogger(object):
|
|||||||
update_stats = _store_in('update_stats')
|
update_stats = _store_in('update_stats')
|
||||||
set_statsd_prefix = _store_in('set_statsd_prefix')
|
set_statsd_prefix = _store_in('set_statsd_prefix')
|
||||||
|
|
||||||
|
def get_increments(self):
|
||||||
|
return [call[0][0] for call in self.log_dict['increment']]
|
||||||
|
|
||||||
|
def get_increment_counts(self):
|
||||||
|
counts = {}
|
||||||
|
for metric in self.get_increments():
|
||||||
|
if metric not in counts:
|
||||||
|
counts[metric] = 0
|
||||||
|
counts[metric] += 1
|
||||||
|
return counts
|
||||||
|
|
||||||
def setFormatter(self, obj):
|
def setFormatter(self, obj):
|
||||||
self.formatter = obj
|
self.formatter = obj
|
||||||
|
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ from swift.common.ring import RingData
|
|||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
|
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
|
||||||
write_pickle
|
write_pickle
|
||||||
|
from test.unit import FakeLogger
|
||||||
|
|
||||||
|
|
||||||
class TestObjectUpdater(unittest.TestCase):
|
class TestObjectUpdater(unittest.TestCase):
|
||||||
@@ -40,7 +41,8 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'object_updater')
|
'object_updater')
|
||||||
rmtree(self.testdir, ignore_errors=1)
|
rmtree(self.testdir, ignore_errors=1)
|
||||||
os.mkdir(self.testdir)
|
os.mkdir(self.testdir)
|
||||||
pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
|
pickle.dump(
|
||||||
|
RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
|
||||||
[{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
|
[{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
|
||||||
'zone': 0},
|
'zone': 0},
|
||||||
{'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
|
{'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
|
||||||
@@ -62,8 +64,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'swift_dir': self.testdir,
|
'swift_dir': self.testdir,
|
||||||
'interval': '1',
|
'interval': '1',
|
||||||
'concurrency': '2',
|
'concurrency': '2',
|
||||||
'node_timeout': '5',
|
'node_timeout': '5'})
|
||||||
})
|
|
||||||
self.assert_(hasattr(cu, 'logger'))
|
self.assert_(hasattr(cu, 'logger'))
|
||||||
self.assert_(cu.logger is not None)
|
self.assert_(cu.logger is not None)
|
||||||
self.assertEquals(cu.devices, self.devices_dir)
|
self.assertEquals(cu.devices, self.devices_dir)
|
||||||
@@ -105,8 +106,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'swift_dir': self.testdir,
|
'swift_dir': self.testdir,
|
||||||
'interval': '1',
|
'interval': '1',
|
||||||
'concurrency': '1',
|
'concurrency': '1',
|
||||||
'node_timeout': '5',
|
'node_timeout': '5'})
|
||||||
})
|
|
||||||
cu.object_sweep(self.sda1)
|
cu.object_sweep(self.sda1)
|
||||||
self.assert_(not os.path.exists(prefix_dir))
|
self.assert_(not os.path.exists(prefix_dir))
|
||||||
self.assertEqual(expected, seen)
|
self.assertEqual(expected, seen)
|
||||||
@@ -118,8 +118,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'swift_dir': self.testdir,
|
'swift_dir': self.testdir,
|
||||||
'interval': '1',
|
'interval': '1',
|
||||||
'concurrency': '1',
|
'concurrency': '1',
|
||||||
'node_timeout': '15',
|
'node_timeout': '15'})
|
||||||
})
|
|
||||||
cu.run_once()
|
cu.run_once()
|
||||||
async_dir = os.path.join(self.sda1, object_server.ASYNCDIR)
|
async_dir = os.path.join(self.sda1, object_server.ASYNCDIR)
|
||||||
os.mkdir(async_dir)
|
os.mkdir(async_dir)
|
||||||
@@ -135,13 +134,24 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
ohash = hash_path('a', 'c', 'o')
|
ohash = hash_path('a', 'c', 'o')
|
||||||
odir = os.path.join(async_dir, ohash[-3:])
|
odir = os.path.join(async_dir, ohash[-3:])
|
||||||
mkdirs(odir)
|
mkdirs(odir)
|
||||||
op_path = os.path.join(odir,
|
older_op_path = os.path.join(
|
||||||
|
odir,
|
||||||
|
'%s-%s' % (ohash, normalize_timestamp(time() - 1)))
|
||||||
|
op_path = os.path.join(
|
||||||
|
odir,
|
||||||
'%s-%s' % (ohash, normalize_timestamp(time())))
|
'%s-%s' % (ohash, normalize_timestamp(time())))
|
||||||
pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c', 'obj': 'o',
|
for path in (op_path, older_op_path):
|
||||||
'headers': {'X-Container-Timestamp': normalize_timestamp(0)}},
|
with open(path, 'wb') as async_pending:
|
||||||
open(op_path, 'wb'))
|
pickle.dump({'op': 'PUT', 'account': 'a', 'container': 'c',
|
||||||
|
'obj': 'o', 'headers': {
|
||||||
|
'X-Container-Timestamp': normalize_timestamp(0)}},
|
||||||
|
async_pending)
|
||||||
|
cu.logger = FakeLogger()
|
||||||
cu.run_once()
|
cu.run_once()
|
||||||
|
self.assert_(not os.path.exists(older_op_path))
|
||||||
self.assert_(os.path.exists(op_path))
|
self.assert_(os.path.exists(op_path))
|
||||||
|
self.assertEqual(cu.logger.get_increment_counts(),
|
||||||
|
{'failures': 1, 'unlinks': 1})
|
||||||
|
|
||||||
bindsock = listen(('127.0.0.1', 0))
|
bindsock = listen(('127.0.0.1', 0))
|
||||||
|
|
||||||
@@ -182,21 +192,31 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
except BaseException, err:
|
except BaseException, err:
|
||||||
return err
|
return err
|
||||||
return None
|
return None
|
||||||
|
|
||||||
event = spawn(accept, [201, 500])
|
event = spawn(accept, [201, 500])
|
||||||
for dev in cu.get_container_ring().devs:
|
for dev in cu.get_container_ring().devs:
|
||||||
if dev is not None:
|
if dev is not None:
|
||||||
dev['port'] = bindsock.getsockname()[1]
|
dev['port'] = bindsock.getsockname()[1]
|
||||||
|
|
||||||
|
cu.logger = FakeLogger()
|
||||||
cu.run_once()
|
cu.run_once()
|
||||||
err = event.wait()
|
err = event.wait()
|
||||||
if err:
|
if err:
|
||||||
raise err
|
raise err
|
||||||
self.assert_(os.path.exists(op_path))
|
self.assert_(os.path.exists(op_path))
|
||||||
|
self.assertEqual(cu.logger.get_increment_counts(),
|
||||||
|
{'failures': 1})
|
||||||
|
|
||||||
event = spawn(accept, [201])
|
event = spawn(accept, [201])
|
||||||
|
cu.logger = FakeLogger()
|
||||||
cu.run_once()
|
cu.run_once()
|
||||||
err = event.wait()
|
err = event.wait()
|
||||||
if err:
|
if err:
|
||||||
raise err
|
raise err
|
||||||
self.assert_(not os.path.exists(op_path))
|
self.assert_(not os.path.exists(op_path))
|
||||||
|
self.assertEqual(cu.logger.get_increment_counts(),
|
||||||
|
{'unlinks': 1, 'successes': 1})
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user