Merge "Aggregate per-disk recon stats"
This commit is contained in:
commit
ea06ed4494
@ -35,7 +35,7 @@ from swift.common.utils import get_logger, renamer, write_pickle, \
|
||||
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
|
||||
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
|
||||
non_negative_float, config_positive_int_value, non_negative_int, \
|
||||
EventletRateLimiter, node_to_string, parse_options
|
||||
EventletRateLimiter, node_to_string, parse_options, load_recon_cache
|
||||
from swift.common.daemon import Daemon, run_daemon
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import split_policy_string, PolicyError
|
||||
@ -480,102 +480,164 @@ class ObjectUpdater(Daemon):
|
||||
self.container_ring = Ring(self.swift_dir, ring_name='container')
|
||||
return self.container_ring
|
||||
|
||||
def _process_device_in_child(self, dev_path, device):
|
||||
"""Process a single device in a forked child process."""
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
os.environ.pop('NOTIFY_SOCKET', None)
|
||||
eventlet_monkey_patch()
|
||||
self.stats.reset()
|
||||
self.oldest_async_pendings.reset()
|
||||
forkbegin = time.time()
|
||||
self.object_sweep(dev_path)
|
||||
elapsed = time.time() - forkbegin
|
||||
self.logger.info(
|
||||
('Object update sweep of %(device)s '
|
||||
'completed: %(elapsed).02fs, %(stats)s'),
|
||||
{'device': device, 'elapsed': elapsed,
|
||||
'stats': self.stats})
|
||||
self.dump_device_recon(device)
|
||||
|
||||
def _process_devices(self, devices):
|
||||
"""Process devices, handling both single and multi-threaded modes."""
|
||||
pids = []
|
||||
# read from container ring to ensure it's fresh
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in devices:
|
||||
try:
|
||||
dev_path = check_drive(self.devices, device,
|
||||
self.mount_check)
|
||||
except ValueError as err:
|
||||
# We don't count this as an error. The occasional
|
||||
# unmounted drive is part of normal cluster operations,
|
||||
# so a simple warning is sufficient.
|
||||
self.logger.warning('Skipping: %s', err)
|
||||
continue
|
||||
while len(pids) >= self.updater_workers:
|
||||
pids.remove(os.wait()[0])
|
||||
pid = os.fork()
|
||||
if pid:
|
||||
pids.append(pid)
|
||||
else:
|
||||
self._process_device_in_child(dev_path, device)
|
||||
sys.exit()
|
||||
|
||||
while pids:
|
||||
pids.remove(os.wait()[0])
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""Run the updater continuously."""
|
||||
time.sleep(random() * self.interval)
|
||||
while True:
|
||||
self.logger.info('Begin object update sweep')
|
||||
self.begin = time.time()
|
||||
pids = []
|
||||
# read from container ring to ensure it's fresh
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in self._listdir(self.devices):
|
||||
try:
|
||||
dev_path = check_drive(self.devices, device,
|
||||
self.mount_check)
|
||||
except ValueError as err:
|
||||
# We don't count this as an error. The occasional
|
||||
# unmounted drive is part of normal cluster operations,
|
||||
# so a simple warning is sufficient.
|
||||
self.logger.warning('Skipping: %s', err)
|
||||
continue
|
||||
while len(pids) >= self.updater_workers:
|
||||
pids.remove(os.wait()[0])
|
||||
pid = os.fork()
|
||||
if pid:
|
||||
pids.append(pid)
|
||||
else:
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
os.environ.pop('NOTIFY_SOCKET', None)
|
||||
eventlet_monkey_patch()
|
||||
self.stats.reset()
|
||||
self.oldest_async_pendings.reset()
|
||||
forkbegin = time.time()
|
||||
self.object_sweep(dev_path)
|
||||
elapsed = time.time() - forkbegin
|
||||
self.logger.info(
|
||||
('Object update sweep of %(device)s '
|
||||
'completed: %(elapsed).02fs, %(stats)s'),
|
||||
{'device': device, 'elapsed': elapsed,
|
||||
'stats': self.stats})
|
||||
sys.exit()
|
||||
while pids:
|
||||
pids.remove(os.wait()[0])
|
||||
elapsed = time.time() - self.begin
|
||||
self.logger.info('Object update sweep completed: %.02fs',
|
||||
elapsed)
|
||||
self.dump_recon(elapsed)
|
||||
elapsed = self.run_once(*args, **kwargs)
|
||||
if elapsed < self.interval:
|
||||
time.sleep(self.interval - elapsed)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
"""Run the updater once."""
|
||||
self.logger.info('Begin object update single threaded sweep')
|
||||
self.logger.info('Begin object update sweep of all devices')
|
||||
self.begin = time.time()
|
||||
self.stats.reset()
|
||||
self.oldest_async_pendings.reset()
|
||||
for device in self._listdir(self.devices):
|
||||
try:
|
||||
dev_path = check_drive(self.devices, device, self.mount_check)
|
||||
except ValueError as err:
|
||||
# We don't count this as an error. The occasional unmounted
|
||||
# drive is part of normal cluster operations, so a simple
|
||||
# warning is sufficient.
|
||||
self.logger.warning('Skipping: %s', err)
|
||||
continue
|
||||
self.object_sweep(dev_path)
|
||||
devices = self._listdir(self.devices)
|
||||
self._process_devices(devices)
|
||||
elapsed = time.time() - self.begin
|
||||
self.logger.info(
|
||||
('Object update single-threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(stats)s'),
|
||||
{'elapsed': elapsed, 'stats': self.stats})
|
||||
self.dump_recon(elapsed)
|
||||
('Object update sweep of all devices completed: '
|
||||
'%(elapsed).02fs'),
|
||||
{'elapsed': elapsed})
|
||||
self.aggregate_and_dump_recon(devices, elapsed)
|
||||
return elapsed
|
||||
|
||||
def dump_recon(self, elapsed):
|
||||
"""Gathers stats and dumps recon cache."""
|
||||
object_updater_stats = {
|
||||
'failures_oldest_timestamp': (
|
||||
self.oldest_async_pendings.get_oldest_timestamp()
|
||||
),
|
||||
'failures_oldest_timestamp_age': (
|
||||
self.oldest_async_pendings.get_oldest_timestamp_age()
|
||||
),
|
||||
'failures_account_container_count': (
|
||||
len(self.oldest_async_pendings.ac_to_timestamp)
|
||||
),
|
||||
'failures_oldest_timestamp_account_containers': (
|
||||
def _gather_recon_stats(self):
|
||||
"""Gather stats for device recon dumps."""
|
||||
stats = {
|
||||
'failures_oldest_timestamp':
|
||||
self.oldest_async_pendings.get_oldest_timestamp(),
|
||||
'failures_oldest_timestamp_age':
|
||||
self.oldest_async_pendings.get_oldest_timestamp_age(),
|
||||
'failures_account_container_count':
|
||||
len(self.oldest_async_pendings.ac_to_timestamp),
|
||||
'failures_oldest_timestamp_account_containers':
|
||||
self.oldest_async_pendings.get_n_oldest_timestamp_acs(
|
||||
self.dump_count
|
||||
self.dump_count),
|
||||
'tracker_memory_usage':
|
||||
self.oldest_async_pendings.get_memory_usage(),
|
||||
}
|
||||
return stats
|
||||
|
||||
def dump_device_recon(self, device):
|
||||
"""Dump recon stats for a single device."""
|
||||
disk_recon_stats = self._gather_recon_stats()
|
||||
dump_recon_cache(
|
||||
{'object_updater_per_device': {device: disk_recon_stats}},
|
||||
self.rcache,
|
||||
self.logger,
|
||||
)
|
||||
|
||||
def aggregate_and_dump_recon(self, devices, elapsed):
|
||||
"""
|
||||
Aggregate recon stats across devices and dump the result to the
|
||||
recon cache.
|
||||
"""
|
||||
recon_cache = load_recon_cache(self.rcache)
|
||||
device_stats = recon_cache.get('object_updater_per_device', {})
|
||||
if not isinstance(device_stats, dict):
|
||||
raise TypeError('object_updater_per_device must be a dict')
|
||||
device_stats = {k: (v if v is not None else {})
|
||||
for k, v in device_stats.items()}
|
||||
|
||||
devices_to_remove = set(device_stats) - set(devices)
|
||||
update_device_stats = {d: {} for d in devices_to_remove}
|
||||
|
||||
aggregated_oldest_entries = []
|
||||
|
||||
for stats in device_stats.values():
|
||||
container_data = stats.get(
|
||||
'failures_oldest_timestamp_account_containers', {})
|
||||
aggregated_oldest_entries.extend(container_data.get(
|
||||
'oldest_entries', []))
|
||||
aggregated_oldest_entries.sort(key=lambda x: x['timestamp'])
|
||||
aggregated_oldest_entries = aggregated_oldest_entries[:self.dump_count]
|
||||
aggregated_oldest_count = len(aggregated_oldest_entries)
|
||||
|
||||
aggregated_stats = {
|
||||
'failures_account_container_count': max(
|
||||
list(
|
||||
stats.get('failures_account_container_count', 0)
|
||||
for stats in device_stats.values()
|
||||
)
|
||||
or [0],
|
||||
),
|
||||
'tracker_memory_usage': (
|
||||
self.oldest_async_pendings.get_memory_usage()
|
||||
float(sum(
|
||||
stats.get('tracker_memory_usage', 0)
|
||||
for stats in device_stats.values()
|
||||
))
|
||||
/ float(len(device_stats))
|
||||
)
|
||||
* max(self.updater_workers, 1)
|
||||
if device_stats
|
||||
else 0,
|
||||
'failures_oldest_timestamp': min(
|
||||
list(filter(lambda x: x is not None,
|
||||
[stats.get('failures_oldest_timestamp')
|
||||
for stats in device_stats.values()]
|
||||
)) or [None],
|
||||
),
|
||||
'failures_oldest_timestamp_age': max(
|
||||
list(filter(lambda x: x is not None,
|
||||
[stats.get('failures_oldest_timestamp_age')
|
||||
for stats in device_stats.values()]
|
||||
)) or [None],
|
||||
),
|
||||
'failures_oldest_timestamp_account_containers': {
|
||||
'oldest_count': aggregated_oldest_count,
|
||||
'oldest_entries': aggregated_oldest_entries,
|
||||
},
|
||||
}
|
||||
dump_recon_cache(
|
||||
{
|
||||
'object_updater_sweep': elapsed,
|
||||
'object_updater_stats': object_updater_stats,
|
||||
'object_updater_stats': aggregated_stats,
|
||||
'object_updater_per_device': update_device_stats,
|
||||
},
|
||||
self.rcache,
|
||||
self.logger,
|
||||
|
@ -14,17 +14,27 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from unittest import main, SkipTest
|
||||
from uuid import uuid4
|
||||
import random
|
||||
|
||||
import mock
|
||||
|
||||
from swiftclient import client
|
||||
from swiftclient.exceptions import ClientException
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.manager import Manager
|
||||
from six.moves.configparser import ConfigParser
|
||||
from swift.common import direct_client, manager
|
||||
from swift.common.manager import Manager, Server
|
||||
from swift.common.swob import normalize_etag
|
||||
from swift.common.utils import readconf
|
||||
from test.probe.common import kill_nonprimary_server, \
|
||||
kill_server, ReplProbeTest, start_server, ECProbeTest
|
||||
|
||||
@ -334,23 +344,28 @@ class TestUpdateOverridesEC(ECProbeTest):
|
||||
self.assertEqual('test/ctype', listing[0]['content_type'])
|
||||
|
||||
|
||||
class TestObjectUpdaterStats(ReplProbeTest):
|
||||
class UpdaterStatsMixIn(object):
|
||||
|
||||
def setUp(self):
|
||||
super(TestObjectUpdaterStats, self).setUp()
|
||||
super(UpdaterStatsMixIn, self).setUp()
|
||||
self.int_client = self.make_internal_client()
|
||||
self.container_servers = Manager(['container-server'])
|
||||
self.object_updater = Manager(['object-updater'])
|
||||
self._post_setup_config()
|
||||
|
||||
def test_lots_of_asyncs(self):
|
||||
def _post_setup_config(self):
|
||||
pass
|
||||
|
||||
def _create_lots_of_asyncs(self):
|
||||
# Create some (acct, cont) pairs
|
||||
num_accounts = 3
|
||||
num_conts_per_a = 4
|
||||
ac_pairs = []
|
||||
self.ac_pairs = ac_pairs = []
|
||||
for a in range(num_accounts):
|
||||
acct = 'AUTH_user%03d' % a
|
||||
acct = 'AUTH_user-%s-%03d' % (uuid.uuid4(), a)
|
||||
self.int_client.create_account(acct)
|
||||
for c in range(num_conts_per_a):
|
||||
cont = 'cont%03d' % c
|
||||
cont = 'cont-%s-%03d' % (uuid.uuid4(), c)
|
||||
self.int_client.create_container(acct, cont)
|
||||
ac_pairs.append((acct, cont))
|
||||
|
||||
@ -359,10 +374,10 @@ class TestObjectUpdaterStats(ReplProbeTest):
|
||||
self.container_servers.stop(number=n)
|
||||
|
||||
# Create a bunch of objects
|
||||
num_objs_per_ac = 5
|
||||
num_objs_per_ac = 10
|
||||
for acct, cont in ac_pairs:
|
||||
for o in range(num_objs_per_ac):
|
||||
obj = 'obj%03d' % o
|
||||
obj = 'obj-%s-%03d' % (uuid.uuid4(), o)
|
||||
self.int_client.upload_object(BytesIO(b''), acct, cont, obj)
|
||||
|
||||
all_asyncs = self.gather_async_pendings()
|
||||
@ -371,22 +386,57 @@ class TestObjectUpdaterStats(ReplProbeTest):
|
||||
self.assertGreater(len(all_asyncs), total_objs)
|
||||
self.assertLess(len(all_asyncs), total_objs * 2)
|
||||
|
||||
# Run the updater and check stats
|
||||
Manager(['object-updater']).once()
|
||||
recons = []
|
||||
def _gather_recon(self):
|
||||
# We'll collect recon only once from each node
|
||||
dev_to_node_dict = {}
|
||||
for onode in self.object_ring.devs:
|
||||
recon = direct_client.direct_get_recon(onode, 'updater/object')
|
||||
recons.append(recon)
|
||||
# We can skip any devices that are already covered by one of the
|
||||
# other nodes we found
|
||||
if any(self.is_local_to(node, onode)
|
||||
for node in dev_to_node_dict.values()):
|
||||
continue
|
||||
dev_to_node_dict[onode["device"]] = onode
|
||||
self.assertEqual(4, len(dev_to_node_dict)) # sanity
|
||||
|
||||
self.assertEqual(4, len(recons))
|
||||
found_counts = []
|
||||
timeout = 20
|
||||
polling_interval = 2
|
||||
recon_data = []
|
||||
start = time.time()
|
||||
while True:
|
||||
for onode in list(dev_to_node_dict.values()):
|
||||
recon = direct_client.direct_get_recon(
|
||||
onode, 'updater/object')
|
||||
if (recon.get('object_updater_stats') is not None and
|
||||
recon.get('object_updater_sweep') is not None):
|
||||
del dev_to_node_dict[onode["device"]]
|
||||
recon_data.append(recon)
|
||||
if not dev_to_node_dict:
|
||||
break
|
||||
elapsed = time.time() - start
|
||||
if elapsed > timeout:
|
||||
self.fail(
|
||||
"Updates did not process within {timeout} seconds".format(
|
||||
timeout=timeout)
|
||||
)
|
||||
time.sleep(polling_interval)
|
||||
self.assertEqual(4, len(recon_data)) # sanity
|
||||
return recon_data
|
||||
|
||||
def run_updater(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _check_recon_data(self, recon_data):
|
||||
ac_pairs = self.ac_pairs
|
||||
ac_set = set()
|
||||
for recon in recons:
|
||||
for recon in recon_data:
|
||||
updater_stats = recon['object_updater_stats']
|
||||
|
||||
found_counts.append(
|
||||
updater_stats['failures_account_container_count']
|
||||
)
|
||||
found_count = updater_stats['failures_account_container_count']
|
||||
# No node should find MORE unique ac than we created
|
||||
self.assertLessEqual(found_count, len(ac_pairs))
|
||||
# and generally we'd expect them to have "at least" one from
|
||||
# significanly MORE than the "majority" of ac_pairs
|
||||
self.assertGreaterEqual(found_count, len(ac_pairs) / 2)
|
||||
|
||||
oldest_count = updater_stats[
|
||||
'failures_oldest_timestamp_account_containers'
|
||||
@ -403,14 +453,76 @@ class TestObjectUpdaterStats(ReplProbeTest):
|
||||
container = entry['container']
|
||||
timestamp = entry['timestamp']
|
||||
self.assertIsNotNone(timestamp)
|
||||
self.assertIsInstance(timestamp, float)
|
||||
ac_set.add((account, container))
|
||||
|
||||
# All the collected ac_set are from the ac_pairs we created
|
||||
for ac in ac_set:
|
||||
self.assertIn(ac, set(ac_pairs))
|
||||
# Specifically, the ac_pairs we created failures for *first*
|
||||
# are represented by the oldest ac_set across nodes
|
||||
for ac in ac_pairs[:5]:
|
||||
self.assertIn(ac, ac_set)
|
||||
# Where as the more recent failures are NOT!
|
||||
for ac in ac_pairs[-3:]:
|
||||
self.assertNotIn(ac, ac_set)
|
||||
|
||||
for found_count in found_counts:
|
||||
self.assertLessEqual(found_count, len(ac_pairs))
|
||||
def test_stats(self):
|
||||
self._create_lots_of_asyncs()
|
||||
recon_data = self.run_updater()
|
||||
self._check_recon_data(recon_data)
|
||||
|
||||
|
||||
class TestObjectUpdaterStatsRunOnce(UpdaterStatsMixIn, ReplProbeTest):
|
||||
|
||||
def run_updater(self):
|
||||
# Run the updater and check stats
|
||||
Manager(['object-updater']).once()
|
||||
return self._gather_recon()
|
||||
|
||||
|
||||
class TestObjectUpdaterStatsRunForever(UpdaterStatsMixIn, ECProbeTest):
|
||||
|
||||
def _post_setup_config(self):
|
||||
CONF_SECTION = 'object-updater'
|
||||
self.conf_dest = os.path.join(
|
||||
'/tmp/',
|
||||
datetime.now().strftime('swift-%Y-%m-%d_%H-%M-%S-%f')
|
||||
)
|
||||
os.mkdir(self.conf_dest)
|
||||
object_server_dir = os.path.join(self.conf_dest, 'object-server')
|
||||
os.mkdir(object_server_dir)
|
||||
for conf_file in Server('object-updater').conf_files():
|
||||
config = readconf(conf_file)
|
||||
if CONF_SECTION not in config:
|
||||
continue # Ensure the object-updater is set up to run
|
||||
config[CONF_SECTION].update({'interval': '1'})
|
||||
|
||||
parser = ConfigParser()
|
||||
parser.add_section(CONF_SECTION)
|
||||
for option, value in config[CONF_SECTION].items():
|
||||
parser.set(CONF_SECTION, option, value)
|
||||
|
||||
file_name = os.path.basename(conf_file)
|
||||
if file_name.endswith('.d'):
|
||||
# Work around conf.d setups (like you might see with VSAIO)
|
||||
file_name = file_name[:-2]
|
||||
with open(os.path.join(object_server_dir, file_name), 'w') as fp:
|
||||
parser.write(fp)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.conf_dest)
|
||||
|
||||
def run_updater(self):
|
||||
# Start the updater
|
||||
with mock.patch.object(manager, 'SWIFT_DIR', self.conf_dest):
|
||||
updater_status = self.object_updater.start()
|
||||
self.assertEqual(
|
||||
updater_status, 0, "Object updater failed to start")
|
||||
recon_data = self._gather_recon()
|
||||
# Stop the updater
|
||||
stop_status = self.object_updater.stop()
|
||||
self.assertEqual(stop_status, 0, "Object updater failed to stop")
|
||||
return recon_data
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -154,7 +154,8 @@ class TestObjectController(BaseTestCase):
|
||||
self.tmpdir = mkdtemp()
|
||||
self.testdir = os.path.join(self.tmpdir,
|
||||
'tmp_test_object_server_ObjectController')
|
||||
mkdirs(os.path.join(self.testdir, 'sda1'))
|
||||
self.sda1 = os.path.join(self.testdir, 'sda1')
|
||||
mkdirs(self.sda1)
|
||||
self.conf = {'devices': self.testdir, 'mount_check': 'false',
|
||||
'container_update_timeout': 0.0}
|
||||
self.logger = debug_logger('test-object-controller')
|
||||
@ -1091,7 +1092,7 @@ class TestObjectController(BaseTestCase):
|
||||
mock_ring.get_nodes.return_value = (99, [node])
|
||||
object_updater.container_ring = mock_ring
|
||||
mock_update.return_value = ((True, 1, None))
|
||||
object_updater.run_once()
|
||||
object_updater._process_device_in_child(self.sda1, 'sda1')
|
||||
self.assertEqual(1, mock_update.call_count)
|
||||
self.assertEqual((node, 99, 'PUT', '/a/c/o'),
|
||||
mock_update.call_args_list[0][0][0:4])
|
||||
@ -1206,7 +1207,7 @@ class TestObjectController(BaseTestCase):
|
||||
mock_ring = mock.MagicMock()
|
||||
mock_ring.get_nodes.return_value = (99, [node])
|
||||
object_updater.container_ring = mock_ring
|
||||
object_updater.run_once()
|
||||
object_updater._process_device_in_child(self.sda1, 'sda1')
|
||||
|
||||
self.assertEqual(1, len(conn.requests))
|
||||
self.assertEqual('/cdevice/99/.sharded_a/c_shard_1/o',
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user