Aggregate per-disk recon stats
Address an issue where `OldestAsyncManager` instances created before forking resulted in each child process maintaining its own isolated copy-on-write stats, leaving the parent process with an empty/unused instance. This caused the final `dump_recon` call at the end of `run_forever` to report no meaningful telemetry. The fix aggregates per-disk recon stats collected by each child process. This is done by loading recon cache data from all devices, consolidating key metrics, and writing the aggregated stats back to the recon cache. Change-Id: I70a60ae280e4fccc04ff5e7df9e62b18d916421e
This commit is contained in:
parent
fe7928ea8a
commit
af57922cd8
@ -35,7 +35,7 @@ from swift.common.utils import get_logger, renamer, write_pickle, \
|
||||
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
|
||||
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
|
||||
non_negative_float, config_positive_int_value, non_negative_int, \
|
||||
EventletRateLimiter, node_to_string, parse_options
|
||||
EventletRateLimiter, node_to_string, parse_options, load_recon_cache
|
||||
from swift.common.daemon import Daemon, run_daemon
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import split_policy_string, PolicyError
|
||||
@ -480,102 +480,164 @@ class ObjectUpdater(Daemon):
|
||||
self.container_ring = Ring(self.swift_dir, ring_name='container')
|
||||
return self.container_ring
|
||||
|
||||
def _process_device_in_child(self, dev_path, device):
|
||||
"""Process a single device in a forked child process."""
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
os.environ.pop('NOTIFY_SOCKET', None)
|
||||
eventlet_monkey_patch()
|
||||
self.stats.reset()
|
||||
self.oldest_async_pendings.reset()
|
||||
forkbegin = time.time()
|
||||
self.object_sweep(dev_path)
|
||||
elapsed = time.time() - forkbegin
|
||||
self.logger.info(
|
||||
('Object update sweep of %(device)s '
|
||||
'completed: %(elapsed).02fs, %(stats)s'),
|
||||
{'device': device, 'elapsed': elapsed,
|
||||
'stats': self.stats})
|
||||
self.dump_device_recon(device)
|
||||
|
||||
def _process_devices(self, devices):
|
||||
"""Process devices, handling both single and multi-threaded modes."""
|
||||
pids = []
|
||||
# read from container ring to ensure it's fresh
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in devices:
|
||||
try:
|
||||
dev_path = check_drive(self.devices, device,
|
||||
self.mount_check)
|
||||
except ValueError as err:
|
||||
# We don't count this as an error. The occasional
|
||||
# unmounted drive is part of normal cluster operations,
|
||||
# so a simple warning is sufficient.
|
||||
self.logger.warning('Skipping: %s', err)
|
||||
continue
|
||||
while len(pids) >= self.updater_workers:
|
||||
pids.remove(os.wait()[0])
|
||||
pid = os.fork()
|
||||
if pid:
|
||||
pids.append(pid)
|
||||
else:
|
||||
self._process_device_in_child(dev_path, device)
|
||||
sys.exit()
|
||||
|
||||
while pids:
|
||||
pids.remove(os.wait()[0])
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""Run the updater continuously."""
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
self.logger.info('Begin object update sweep')
|
||||
self.begin = time.time()
|
||||
pids = []
|
||||
# read from container ring to ensure it's fresh
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in self._listdir(self.devices):
|
||||
try:
|
||||
dev_path = check_drive(self.devices, device,
|
||||
self.mount_check)
|
||||
except ValueError as err:
|
||||
# We don't count this as an error. The occasional
|
||||
# unmounted drive is part of normal cluster operations,
|
||||
# so a simple warning is sufficient.
|
||||
self.logger.warning('Skipping: %s', err)
|
||||
continue
|
||||
while len(pids) >= self.updater_workers:
|
||||
pids.remove(os.wait()[0])
|
||||
pid = os.fork()
|
||||
if pid:
|
||||
pids.append(pid)
|
||||
else:
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
os.environ.pop('NOTIFY_SOCKET', None)
|
||||
eventlet_monkey_patch()
|
||||
self.stats.reset()
|
||||
self.oldest_async_pendings.reset()
|
||||
forkbegin = time.time()
|
||||
self.object_sweep(dev_path)
|
||||
elapsed = time.time() - forkbegin
|
||||
self.logger.info(
|
||||
('Object update sweep of %(device)s '
|
||||
'completed: %(elapsed).02fs, %(stats)s'),
|
||||
{'device': device, 'elapsed': elapsed,
|
||||
'stats': self.stats})
|
||||
sys.exit()
|
||||
while pids:
|
||||
pids.remove(os.wait()[0])
|
||||
elapsed = time.time() - self.begin
|
||||
self.logger.info('Object update sweep completed: %.02fs',
|
||||
elapsed)
|
||||
self.dump_recon(elapsed)
|
||||
elapsed = self.run_once(*args, **kwargs)
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""Run the updater once."""
|
||||
self.logger.info('Begin object update single threaded sweep')
|
||||
self.logger.info('Begin object update sweep of all devices')
|
||||
self.begin = time.time()
|
||||
self.stats.reset()
|
||||
self.oldest_async_pendings.reset()
|
||||
for device in self._listdir(self.devices):
|
||||
try:
|
||||
dev_path = check_drive(self.devices, device, self.mount_check)
|
||||
except ValueError as err:
|
||||
# We don't count this as an error. The occasional unmounted
|
||||
# drive is part of normal cluster operations, so a simple
|
||||
# warning is sufficient.
|
||||
self.logger.warning('Skipping: %s', err)
|
||||
continue
|
||||
self.object_sweep(dev_path)
|
||||
devices = self._listdir(self.devices)
|
||||
self._process_devices(devices)
|
||||
elapsed = time.time() - self.begin
|
||||
self.logger.info(
|
||||
('Object update single-threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(stats)s'),
|
||||
{'elapsed': elapsed, 'stats': self.stats})
|
||||
self.dump_recon(elapsed)
|
||||
('Object update sweep of all devices completed: '
|
||||
'%(elapsed).02fs'),
|
||||
{'elapsed': elapsed})
|
||||
self.aggregate_and_dump_recon(devices, elapsed)
|
||||
return elapsed
|
||||
|
||||
def dump_recon(self, elapsed):
|
||||
"""Gathers stats and dumps recon cache."""
|
||||
object_updater_stats = {
|
||||
'failures_oldest_timestamp': (
|
||||
self.oldest_async_pendings.get_oldest_timestamp()
|
||||
),
|
||||
'failures_oldest_timestamp_age': (
|
||||
self.oldest_async_pendings.get_oldest_timestamp_age()
|
||||
),
|
||||
'failures_account_container_count': (
|
||||
len(self.oldest_async_pendings.ac_to_timestamp)
|
||||
),
|
||||
'failures_oldest_timestamp_account_containers': (
|
||||
def _gather_recon_stats(self):
|
||||
"""Gather stats for device recon dumps."""
|
||||
stats = {
|
||||
'failures_oldest_timestamp':
|
||||
self.oldest_async_pendings.get_oldest_timestamp(),
|
||||
'failures_oldest_timestamp_age':
|
||||
self.oldest_async_pendings.get_oldest_timestamp_age(),
|
||||
'failures_account_container_count':
|
||||
len(self.oldest_async_pendings.ac_to_timestamp),
|
||||
'failures_oldest_timestamp_account_containers':
|
||||
self.oldest_async_pendings.get_n_oldest_timestamp_acs(
|
||||
self.dump_count
|
||||
self.dump_count),
|
||||
'tracker_memory_usage':
|
||||
self.oldest_async_pendings.get_memory_usage(),
|
||||
}
|
||||
return stats
|
||||
|
||||
def dump_device_recon(self, device):
|
||||
"""Dump recon stats for a single device."""
|
||||
disk_recon_stats = self._gather_recon_stats()
|
||||
dump_recon_cache(
|
||||
{'object_updater_per_device': {device: disk_recon_stats}},
|
||||
self.rcache,
|
||||
self.logger,
|
||||
)
|
||||
|
||||
def aggregate_and_dump_recon(self, devices, elapsed):
|
||||
"""
|
||||
Aggregate recon stats across devices and dump the result to the
|
||||
recon cache.
|
||||
"""
|
||||
recon_cache = load_recon_cache(self.rcache)
|
||||
device_stats = recon_cache.get('object_updater_per_device', {})
|
||||
if not isinstance(device_stats, dict):
|
||||
raise TypeError('object_updater_per_device must be a dict')
|
||||
device_stats = {k: (v if v is not None else {})
|
||||
for k, v in device_stats.items()}
|
||||
|
||||
devices_to_remove = set(device_stats) - set(devices)
|
||||
update_device_stats = {d: {} for d in devices_to_remove}
|
||||
|
||||
aggregated_oldest_entries = []
|
||||
|
||||
for stats in device_stats.values():
|
||||
container_data = stats.get(
|
||||
'failures_oldest_timestamp_account_containers', {})
|
||||
aggregated_oldest_entries.extend(container_data.get(
|
||||
'oldest_entries', []))
|
||||
aggregated_oldest_entries.sort(key=lambda x: x['timestamp'])
|
||||
aggregated_oldest_entries = aggregated_oldest_entries[:self.dump_count]
|
||||
aggregated_oldest_count = len(aggregated_oldest_entries)
|
||||
|
||||
aggregated_stats = {
|
||||
'failures_account_container_count': max(
|
||||
list(
|
||||
stats.get('failures_account_container_count', 0)
|
||||
for stats in device_stats.values()
|
||||
)
|
||||
or [0],
|
||||
),
|
||||
'tracker_memory_usage': (
|
||||
self.oldest_async_pendings.get_memory_usage()
|
||||
float(sum(
|
||||
stats.get('tracker_memory_usage', 0)
|
||||
for stats in device_stats.values()
|
||||
))
|
||||
/ float(len(device_stats))
|
||||
)
|
||||
* max(self.updater_workers, 1)
|
||||
if device_stats
|
||||
else 0,
|
||||
'failures_oldest_timestamp': min(
|
||||
list(filter(lambda x: x is not None,
|
||||
[stats.get('failures_oldest_timestamp')
|
||||
for stats in device_stats.values()]
|
||||
)) or [None],
|
||||
),
|
||||
'failures_oldest_timestamp_age': max(
|
||||
list(filter(lambda x: x is not None,
|
||||
[stats.get('failures_oldest_timestamp_age')
|
||||
for stats in device_stats.values()]
|
||||
)) or [None],
|
||||
),
|
||||
'failures_oldest_timestamp_account_containers': {
|
||||
'oldest_count': aggregated_oldest_count,
|
||||
'oldest_entries': aggregated_oldest_entries,
|
||||
},
|
||||
}
|
||||
dump_recon_cache(
|
||||
{
|
||||
'object_updater_sweep': elapsed,
|
||||
'object_updater_stats': object_updater_stats,
|
||||
'object_updater_stats': aggregated_stats,
|
||||
'object_updater_per_device': update_device_stats,
|
||||
},
|
||||
self.rcache,
|
||||
self.logger,
|
||||
|
@ -14,17 +14,27 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from unittest import main, SkipTest
|
||||
from uuid import uuid4
|
||||
import random
|
||||
|
||||
import mock
|
||||
|
||||
from swiftclient import client
|
||||
from swiftclient.exceptions import ClientException
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.manager import Manager
|
||||
from six.moves.configparser import ConfigParser
|
||||
from swift.common import direct_client, manager
|
||||
from swift.common.manager import Manager, Server
|
||||
from swift.common.swob import normalize_etag
|
||||
from swift.common.utils import readconf
|
||||
from test.probe.common import kill_nonprimary_server, \
|
||||
kill_server, ReplProbeTest, start_server, ECProbeTest
|
||||
|
||||
@ -334,23 +344,28 @@ class TestUpdateOverridesEC(ECProbeTest):
|
||||
self.assertEqual('test/ctype', listing[0]['content_type'])
|
||||
|
||||
|
||||
class TestObjectUpdaterStats(ReplProbeTest):
|
||||
class UpdaterStatsMixIn(object):
|
||||
|
||||
def setUp(self):
|
||||
super(TestObjectUpdaterStats, self).setUp()
|
||||
super(UpdaterStatsMixIn, self).setUp()
|
||||
self.int_client = self.make_internal_client()
|
||||
self.container_servers = Manager(['container-server'])
|
||||
self.object_updater = Manager(['object-updater'])
|
||||
self._post_setup_config()
|
||||
|
||||
def test_lots_of_asyncs(self):
|
||||
def _post_setup_config(self):
|
||||
pass
|
||||
|
||||
def _create_lots_of_asyncs(self):
|
||||
# Create some (acct, cont) pairs
|
||||
num_accounts = 3
|
||||
num_conts_per_a = 4
|
||||
ac_pairs = []
|
||||
self.ac_pairs = ac_pairs = []
|
||||
for a in range(num_accounts):
|
||||
acct = 'AUTH_user%03d' % a
|
||||
acct = 'AUTH_user-%s-%03d' % (uuid.uuid4(), a)
|
||||
self.int_client.create_account(acct)
|
||||
for c in range(num_conts_per_a):
|
||||
cont = 'cont%03d' % c
|
||||
cont = 'cont-%s-%03d' % (uuid.uuid4(), c)
|
||||
self.int_client.create_container(acct, cont)
|
||||
ac_pairs.append((acct, cont))
|
||||
|
||||
@ -359,10 +374,10 @@ class TestObjectUpdaterStats(ReplProbeTest):
|
||||
self.container_servers.stop(number=n)
|
||||
|
||||
# Create a bunch of objects
|
||||
num_objs_per_ac = 5
|
||||
num_objs_per_ac = 10
|
||||
for acct, cont in ac_pairs:
|
||||
for o in range(num_objs_per_ac):
|
||||
obj = 'obj%03d' % o
|
||||
obj = 'obj-%s-%03d' % (uuid.uuid4(), o)
|
||||
self.int_client.upload_object(BytesIO(b''), acct, cont, obj)
|
||||
|
||||
all_asyncs = self.gather_async_pendings()
|
||||
@ -371,22 +386,57 @@ class TestObjectUpdaterStats(ReplProbeTest):
|
||||
self.assertGreater(len(all_asyncs), total_objs)
|
||||
self.assertLess(len(all_asyncs), total_objs * 2)
|
||||
|
||||
# Run the updater and check stats
|
||||
Manager(['object-updater']).once()
|
||||
recons = []
|
||||
def _gather_recon(self):
|
||||
# We'll collect recon only once from each node
|
||||
dev_to_node_dict = {}
|
||||
for onode in self.object_ring.devs:
|
||||
recon = direct_client.direct_get_recon(onode, 'updater/object')
|
||||
recons.append(recon)
|
||||
# We can skip any devices that are already covered by one of the
|
||||
# other nodes we found
|
||||
if any(self.is_local_to(node, onode)
|
||||
for node in dev_to_node_dict.values()):
|
||||
continue
|
||||
dev_to_node_dict[onode["device"]] = onode
|
||||
self.assertEqual(4, len(dev_to_node_dict)) # sanity
|
||||
|
||||
self.assertEqual(4, len(recons))
|
||||
found_counts = []
|
||||
timeout = 20
|
||||
polling_interval = 2
|
||||
recon_data = []
|
||||
start = time.time()
|
||||
while True:
|
||||
for onode in list(dev_to_node_dict.values()):
|
||||
recon = direct_client.direct_get_recon(
|
||||
onode, 'updater/object')
|
||||
if (recon.get('object_updater_stats') is not None and
|
||||
recon.get('object_updater_sweep') is not None):
|
||||
del dev_to_node_dict[onode["device"]]
|
||||
recon_data.append(recon)
|
||||
if not dev_to_node_dict:
|
||||
break
|
||||
elapsed = time.time() - start
|
||||
if elapsed > timeout:
|
||||
self.fail(
|
||||
"Updates did not process within {timeout} seconds".format(
|
||||
timeout=timeout)
|
||||
)
|
||||
time.sleep(polling_interval)
|
||||
self.assertEqual(4, len(recon_data)) # sanity
|
||||
return recon_data
|
||||
|
||||
def run_updater(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _check_recon_data(self, recon_data):
|
||||
ac_pairs = self.ac_pairs
|
||||
ac_set = set()
|
||||
for recon in recons:
|
||||
for recon in recon_data:
|
||||
updater_stats = recon['object_updater_stats']
|
||||
|
||||
found_counts.append(
|
||||
updater_stats['failures_account_container_count']
|
||||
)
|
||||
found_count = updater_stats['failures_account_container_count']
|
||||
# No node should find MORE unique ac than we created
|
||||
self.assertLessEqual(found_count, len(ac_pairs))
|
||||
# and generally we'd expect them to have "at least" one from
|
||||
# significanly MORE than the "majority" of ac_pairs
|
||||
self.assertGreaterEqual(found_count, len(ac_pairs) / 2)
|
||||
|
||||
oldest_count = updater_stats[
|
||||
'failures_oldest_timestamp_account_containers'
|
||||
@ -403,14 +453,76 @@ class TestObjectUpdaterStats(ReplProbeTest):
|
||||
container = entry['container']
|
||||
timestamp = entry['timestamp']
|
||||
self.assertIsNotNone(timestamp)
|
||||
self.assertIsInstance(timestamp, float)
|
||||
ac_set.add((account, container))
|
||||
|
||||
# All the collected ac_set are from the ac_pairs we created
|
||||
for ac in ac_set:
|
||||
self.assertIn(ac, set(ac_pairs))
|
||||
# Specifically, the ac_pairs we created failures for *first*
|
||||
# are represented by the oldest ac_set across nodes
|
||||
for ac in ac_pairs[:5]:
|
||||
self.assertIn(ac, ac_set)
|
||||
# Where as the more recent failures are NOT!
|
||||
for ac in ac_pairs[-3:]:
|
||||
self.assertNotIn(ac, ac_set)
|
||||
|
||||
for found_count in found_counts:
|
||||
self.assertLessEqual(found_count, len(ac_pairs))
|
||||
def test_stats(self):
|
||||
self._create_lots_of_asyncs()
|
||||
recon_data = self.run_updater()
|
||||
self._check_recon_data(recon_data)
|
||||
|
||||
|
||||
class TestObjectUpdaterStatsRunOnce(UpdaterStatsMixIn, ReplProbeTest):
|
||||
|
||||
def run_updater(self):
|
||||
# Run the updater and check stats
|
||||
Manager(['object-updater']).once()
|
||||
return self._gather_recon()
|
||||
|
||||
|
||||
class TestObjectUpdaterStatsRunForever(UpdaterStatsMixIn, ECProbeTest):
|
||||
|
||||
def _post_setup_config(self):
|
||||
CONF_SECTION = 'object-updater'
|
||||
self.conf_dest = os.path.join(
|
||||
'/tmp/',
|
||||
datetime.now().strftime('swift-%Y-%m-%d_%H-%M-%S-%f')
|
||||
)
|
||||
os.mkdir(self.conf_dest)
|
||||
object_server_dir = os.path.join(self.conf_dest, 'object-server')
|
||||
os.mkdir(object_server_dir)
|
||||
for conf_file in Server('object-updater').conf_files():
|
||||
config = readconf(conf_file)
|
||||
if CONF_SECTION not in config:
|
||||
continue # Ensure the object-updater is set up to run
|
||||
config[CONF_SECTION].update({'interval': '1'})
|
||||
|
||||
parser = ConfigParser()
|
||||
parser.add_section(CONF_SECTION)
|
||||
for option, value in config[CONF_SECTION].items():
|
||||
parser.set(CONF_SECTION, option, value)
|
||||
|
||||
file_name = os.path.basename(conf_file)
|
||||
if file_name.endswith('.d'):
|
||||
# Work around conf.d setups (like you might see with VSAIO)
|
||||
file_name = file_name[:-2]
|
||||
with open(os.path.join(object_server_dir, file_name), 'w') as fp:
|
||||
parser.write(fp)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.conf_dest)
|
||||
|
||||
def run_updater(self):
|
||||
# Start the updater
|
||||
with mock.patch.object(manager, 'SWIFT_DIR', self.conf_dest):
|
||||
updater_status = self.object_updater.start()
|
||||
self.assertEqual(
|
||||
updater_status, 0, "Object updater failed to start")
|
||||
recon_data = self._gather_recon()
|
||||
# Stop the updater
|
||||
stop_status = self.object_updater.stop()
|
||||
self.assertEqual(stop_status, 0, "Object updater failed to stop")
|
||||
return recon_data
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -154,7 +154,8 @@ class TestObjectController(BaseTestCase):
|
||||
self.tmpdir = mkdtemp()
|
||||
self.testdir = os.path.join(self.tmpdir,
|
||||
'tmp_test_object_server_ObjectController')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1'))
|
||||
self.sda1 = os.path.join(self.testdir, 'sda1')
|
||||
mkdirs(self.sda1)
|
||||
self.conf = {'devices': self.testdir, 'mount_check': 'false',
|
||||
'container_update_timeout': 0.0}
|
||||
self.logger = debug_logger('test-object-controller')
|
||||
@ -1091,7 +1092,7 @@ class TestObjectController(BaseTestCase):
|
||||
mock_ring.get_nodes.return_value = (99, [node])
|
||||
object_updater.container_ring = mock_ring
|
||||
mock_update.return_value = ((True, 1, None))
|
||||
object_updater.run_once()
|
||||
object_updater._process_device_in_child(self.sda1, 'sda1')
|
||||
self.assertEqual(1, mock_update.call_count)
|
||||
self.assertEqual((node, 99, 'PUT', '/a/c/o'),
|
||||
mock_update.call_args_list[0][0][0:4])
|
||||
@ -1206,7 +1207,7 @@ class TestObjectController(BaseTestCase):
|
||||
mock_ring = mock.MagicMock()
|
||||
mock_ring.get_nodes.return_value = (99, [node])
|
||||
object_updater.container_ring = mock_ring
|
||||
object_updater.run_once()
|
||||
object_updater._process_device_in_child(self.sda1, 'sda1')
|
||||
|
||||
self.assertEqual(1, len(conn.requests))
|
||||
self.assertEqual('/cdevice/99/.sharded_a/c_shard_1/o',
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user