Merge "Leave updater per-device stats in recon for debugging"
This commit is contained in:
@@ -583,11 +583,9 @@ class ObjectUpdater(Daemon):
|
|||||||
device_stats = recon_cache.get('object_updater_per_device', {})
|
device_stats = recon_cache.get('object_updater_per_device', {})
|
||||||
if not isinstance(device_stats, dict):
|
if not isinstance(device_stats, dict):
|
||||||
raise TypeError('object_updater_per_device must be a dict')
|
raise TypeError('object_updater_per_device must be a dict')
|
||||||
device_stats = {k: (v if v is not None else {})
|
|
||||||
for k, v in device_stats.items()}
|
|
||||||
|
|
||||||
devices_to_remove = set(device_stats) - set(devices)
|
devices_to_remove = set(device_stats) - set(devices)
|
||||||
update_device_stats = {d: {} for d in devices_to_remove}
|
device_stats = {dev: device_stats.get(dev) or {}
|
||||||
|
for dev in devices}
|
||||||
|
|
||||||
aggregated_oldest_entries = []
|
aggregated_oldest_entries = []
|
||||||
|
|
||||||
@@ -615,7 +613,7 @@ class ObjectUpdater(Daemon):
|
|||||||
))
|
))
|
||||||
/ float(len(device_stats))
|
/ float(len(device_stats))
|
||||||
)
|
)
|
||||||
* max(self.updater_workers, 1)
|
* max(1, min(self.updater_workers, len(device_stats)))
|
||||||
if device_stats
|
if device_stats
|
||||||
else 0,
|
else 0,
|
||||||
'failures_oldest_timestamp': min(
|
'failures_oldest_timestamp': min(
|
||||||
@@ -635,13 +633,17 @@ class ObjectUpdater(Daemon):
|
|||||||
'oldest_entries': aggregated_oldest_entries,
|
'oldest_entries': aggregated_oldest_entries,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
recon_dump = {
|
||||||
|
'object_updater_sweep': elapsed,
|
||||||
|
'object_updater_stats': aggregated_stats,
|
||||||
|
'object_updater_last': now
|
||||||
|
}
|
||||||
|
if devices_to_remove:
|
||||||
|
recon_dump['object_updater_per_device'] = {
|
||||||
|
d: {} for d in devices_to_remove}
|
||||||
dump_recon_cache(
|
dump_recon_cache(
|
||||||
{
|
recon_dump,
|
||||||
'object_updater_sweep': elapsed,
|
|
||||||
'object_updater_stats': aggregated_stats,
|
|
||||||
'object_updater_per_device': update_device_stats,
|
|
||||||
'object_updater_last': now
|
|
||||||
},
|
|
||||||
self.rcache,
|
self.rcache,
|
||||||
self.logger,
|
self.logger,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -74,6 +74,7 @@ def _sorted_listdir(path):
|
|||||||
|
|
||||||
@patch_policies(_mocked_policies)
|
@patch_policies(_mocked_policies)
|
||||||
class TestObjectUpdater(unittest.TestCase):
|
class TestObjectUpdater(unittest.TestCase):
|
||||||
|
maxDiff = None
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
utils.HASH_PATH_SUFFIX = b'endcap'
|
utils.HASH_PATH_SUFFIX = b'endcap'
|
||||||
@@ -1051,6 +1052,12 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'tracker_memory_usage': 256,
|
'tracker_memory_usage': 256,
|
||||||
'failures_account_container_count': 1,
|
'failures_account_container_count': 1,
|
||||||
},
|
},
|
||||||
|
# N.B. this is unrealistic test stub to cover failures modes
|
||||||
|
# where workers die weird, for the expected aggregated stats
|
||||||
|
# calculation sda2 & sda3 are contributing to the count of
|
||||||
|
# devices when averaging per-worker device memory but no actual
|
||||||
|
# values; so the reported "tracker_memory_usage" appears
|
||||||
|
# artifically deflated under failure
|
||||||
'sda2': {
|
'sda2': {
|
||||||
'failures_oldest_timestamp': 124.56789,
|
'failures_oldest_timestamp': 124.56789,
|
||||||
'failures_oldest_timestamp_age': 789.012,
|
'failures_oldest_timestamp_age': 789.012,
|
||||||
@@ -1080,6 +1087,17 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'object_updater_sweep': 30,
|
'object_updater_sweep': 30,
|
||||||
'object_updater_stats': expected_aggregated_stats,
|
'object_updater_stats': expected_aggregated_stats,
|
||||||
'object_updater_last': now,
|
'object_updater_last': now,
|
||||||
|
'object_updater_per_device': {
|
||||||
|
'sda1': {
|
||||||
|
'tracker_memory_usage': 256,
|
||||||
|
'failures_account_container_count': 1,
|
||||||
|
},
|
||||||
|
'sda2': {
|
||||||
|
'failures_oldest_timestamp': 124.56789,
|
||||||
|
'failures_oldest_timestamp_age': 789.012,
|
||||||
|
},
|
||||||
|
'sda3': None,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
self.assertEqual(expected_recon, found_data)
|
self.assertEqual(expected_recon, found_data)
|
||||||
|
|
||||||
@@ -1106,8 +1124,13 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
utils.dump_recon_cache(empty_recon, ou.rcache, ou.logger)
|
utils.dump_recon_cache(empty_recon, ou.rcache, ou.logger)
|
||||||
|
with open(recon_file) as f:
|
||||||
|
found_data = json.load(f)
|
||||||
|
# N.B. recon doesn't let you write second level keys with empty values;
|
||||||
|
# because it's hijacked that spelling to mean "remove this sub-key"
|
||||||
|
self.assertEqual({'object_updater_per_device': {}}, found_data)
|
||||||
now = float(next(self.ts_iter))
|
now = float(next(self.ts_iter))
|
||||||
ou.aggregate_and_dump_recon(['sda1', 'sda2'], 30, now)
|
ou.aggregate_and_dump_recon(['sda1', 'sda2', 'sda3'], 30, now)
|
||||||
|
|
||||||
with open(recon_file) as f:
|
with open(recon_file) as f:
|
||||||
found_data = json.load(f)
|
found_data = json.load(f)
|
||||||
@@ -1126,7 +1149,8 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
expected_recon = {
|
expected_recon = {
|
||||||
'object_updater_sweep': 30,
|
'object_updater_sweep': 30,
|
||||||
'object_updater_stats': expected_aggregated_stats,
|
'object_updater_stats': expected_aggregated_stats,
|
||||||
'object_updater_last': now
|
'object_updater_last': now,
|
||||||
|
'object_updater_per_device': {},
|
||||||
}
|
}
|
||||||
self.assertEqual(expected_recon, found_data)
|
self.assertEqual(expected_recon, found_data)
|
||||||
|
|
||||||
@@ -1169,6 +1193,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
ou = object_updater.ObjectUpdater({
|
ou = object_updater.ObjectUpdater({
|
||||||
'devices': self.devices_dir,
|
'devices': self.devices_dir,
|
||||||
'swift_dir': self.testdir,
|
'swift_dir': self.testdir,
|
||||||
|
# N.B. we have less devices than workers!
|
||||||
'updater_workers': 2,
|
'updater_workers': 2,
|
||||||
'recon_cache_path': recon_path,
|
'recon_cache_path': recon_path,
|
||||||
'async_tracker_dump_count': 2,
|
'async_tracker_dump_count': 2,
|
||||||
@@ -1178,17 +1203,24 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'object_updater_per_device': {
|
'object_updater_per_device': {
|
||||||
'sda1': {
|
'sda1': {
|
||||||
'failures_account_container_count': 2,
|
'failures_account_container_count': 2,
|
||||||
'tracker_memory_usage': 512,
|
'tracker_memory_usage': 384,
|
||||||
},
|
},
|
||||||
|
# this is a relatively realistic example of existing recon data
|
||||||
|
# for a pre-existing per-device key for a removed device
|
||||||
'sda2': {
|
'sda2': {
|
||||||
'failures_account_container_count': 1,
|
'failures_account_container_count': 1,
|
||||||
'tracker_memory_usage': 256,
|
'tracker_memory_usage': 256,
|
||||||
},
|
},
|
||||||
|
# N.B. this sda3 is an unrealistic test stub to cover failures
|
||||||
|
# modes where workers die weird
|
||||||
'sda3': None,
|
'sda3': None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
utils.dump_recon_cache(existing_recon, ou.rcache, ou.logger)
|
utils.dump_recon_cache(existing_recon, ou.rcache, ou.logger)
|
||||||
now = float(next(self.ts_iter))
|
now = float(next(self.ts_iter))
|
||||||
|
# N.B. because neither sda2 nor sda3 are passed in as expected devices
|
||||||
|
# to aggregate neither contribute to device count when calculating
|
||||||
|
# per-worker averages in during aggregate
|
||||||
ou.aggregate_and_dump_recon(['sda1'], 30, now)
|
ou.aggregate_and_dump_recon(['sda1'], 30, now)
|
||||||
|
|
||||||
with open(recon_file) as f:
|
with open(recon_file) as f:
|
||||||
@@ -1196,7 +1228,9 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
|
|
||||||
expected_aggregated_stats = {
|
expected_aggregated_stats = {
|
||||||
'failures_account_container_count': 2,
|
'failures_account_container_count': 2,
|
||||||
'tracker_memory_usage': 512.0,
|
# only sda1 should be considered because it's the only device value
|
||||||
|
# that wouldn't be stale
|
||||||
|
'tracker_memory_usage': 384,
|
||||||
'failures_oldest_timestamp': None,
|
'failures_oldest_timestamp': None,
|
||||||
'failures_oldest_timestamp_age': None,
|
'failures_oldest_timestamp_age': None,
|
||||||
'failures_oldest_timestamp_account_containers': {
|
'failures_oldest_timestamp_account_containers': {
|
||||||
@@ -1209,7 +1243,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
'object_updater_per_device': {
|
'object_updater_per_device': {
|
||||||
'sda1': {
|
'sda1': {
|
||||||
'failures_account_container_count': 2,
|
'failures_account_container_count': 2,
|
||||||
'tracker_memory_usage': 512,
|
'tracker_memory_usage': 384,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
'object_updater_sweep': 30,
|
'object_updater_sweep': 30,
|
||||||
@@ -1407,10 +1441,9 @@ class TestObjectUpdater(unittest.TestCase):
|
|||||||
del expected_recon['object_updater_per_device']['sdx']
|
del expected_recon['object_updater_per_device']['sdx']
|
||||||
self.assertEqual(expected_recon, found_data)
|
self.assertEqual(expected_recon, found_data)
|
||||||
|
|
||||||
self.assertAlmostEqual(
|
self.assertEqual(
|
||||||
755.5,
|
(512 + 256 + 128) / 3 * 2,
|
||||||
found_data['object_updater_stats']['tracker_memory_usage'],
|
found_data['object_updater_stats']['tracker_memory_usage'],
|
||||||
delta=200
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_obj_put_legacy_updates(self):
|
def test_obj_put_legacy_updates(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user