From af57922cd8b30cdb333d536f0506c673b591c069 Mon Sep 17 00:00:00 2001 From: Chinemerem Date: Fri, 6 Dec 2024 09:01:34 -0800 Subject: [PATCH] Aggregate per-disk recon stats Address an issue where `OldestAsyncManager` instances created before forking resulted in each child process maintaining its own isolated copy-on-write stats, leaving the parent process with an empty/unused instance. This caused the final `dump_recon` call at the end of `run_forever` to report no meaningful telemetry. The fix aggregates per-disk recon stats collected by each child process. This is done by loading recon cache data from all devices, consolidating key metrics, and writing the aggregated stats back to the recon cache. Change-Id: I70a60ae280e4fccc04ff5e7df9e62b18d916421e --- swift/obj/updater.py | 212 ++++--- test/probe/test_object_async_update.py | 160 ++++- test/unit/obj/test_server.py | 7 +- test/unit/obj/test_updater.py | 806 ++++++++++++++++++++++--- 4 files changed, 998 insertions(+), 187 deletions(-) diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 8b77001ba4..6ff5f9e894 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -35,7 +35,7 @@ from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ non_negative_float, config_positive_int_value, non_negative_int, \ - EventletRateLimiter, node_to_string, parse_options + EventletRateLimiter, node_to_string, parse_options, load_recon_cache from swift.common.daemon import Daemon, run_daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -480,102 +480,164 @@ class ObjectUpdater(Daemon): self.container_ring = Ring(self.swift_dir, ring_name='container') return self.container_ring + def _process_device_in_child(self, dev_path, device): + """Process a single device in a forked child process.""" + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.environ.pop('NOTIFY_SOCKET', None) + eventlet_monkey_patch() + self.stats.reset() + self.oldest_async_pendings.reset() + forkbegin = time.time() + self.object_sweep(dev_path) + elapsed = time.time() - forkbegin + self.logger.info( + ('Object update sweep of %(device)s ' + 'completed: %(elapsed).02fs, %(stats)s'), + {'device': device, 'elapsed': elapsed, + 'stats': self.stats}) + self.dump_device_recon(device) + + def _process_devices(self, devices): + """Process devices, handling both single and multi-threaded modes.""" + pids = [] + # read from container ring to ensure it's fresh + self.get_container_ring().get_nodes('') + for device in devices: + try: + dev_path = check_drive(self.devices, device, + self.mount_check) + except ValueError as err: + # We don't count this as an error. The occasional + # unmounted drive is part of normal cluster operations, + # so a simple warning is sufficient. + self.logger.warning('Skipping: %s', err) + continue + while len(pids) >= self.updater_workers: + pids.remove(os.wait()[0]) + pid = os.fork() + if pid: + pids.append(pid) + else: + self._process_device_in_child(dev_path, device) + sys.exit() + + while pids: + pids.remove(os.wait()[0]) + def run_forever(self, *args, **kwargs): """Run the updater continuously.""" time.sleep(random() * self.interval) while True: - self.logger.info('Begin object update sweep') - self.begin = time.time() - pids = [] - # read from container ring to ensure it's fresh - self.get_container_ring().get_nodes('') - for device in self._listdir(self.devices): - try: - dev_path = check_drive(self.devices, device, - self.mount_check) - except ValueError as err: - # We don't count this as an error. The occasional - # unmounted drive is part of normal cluster operations, - # so a simple warning is sufficient. - self.logger.warning('Skipping: %s', err) - continue - while len(pids) >= self.updater_workers: - pids.remove(os.wait()[0]) - pid = os.fork() - if pid: - pids.append(pid) - else: - signal.signal(signal.SIGTERM, signal.SIG_DFL) - os.environ.pop('NOTIFY_SOCKET', None) - eventlet_monkey_patch() - self.stats.reset() - self.oldest_async_pendings.reset() - forkbegin = time.time() - self.object_sweep(dev_path) - elapsed = time.time() - forkbegin - self.logger.info( - ('Object update sweep of %(device)s ' - 'completed: %(elapsed).02fs, %(stats)s'), - {'device': device, 'elapsed': elapsed, - 'stats': self.stats}) - sys.exit() - while pids: - pids.remove(os.wait()[0]) - elapsed = time.time() - self.begin - self.logger.info('Object update sweep completed: %.02fs', - elapsed) - self.dump_recon(elapsed) + elapsed = self.run_once(*args, **kwargs) if elapsed < self.interval: time.sleep(self.interval - elapsed) def run_once(self, *args, **kwargs): """Run the updater once.""" - self.logger.info('Begin object update single threaded sweep') + self.logger.info('Begin object update sweep of all devices') self.begin = time.time() - self.stats.reset() - self.oldest_async_pendings.reset() - for device in self._listdir(self.devices): - try: - dev_path = check_drive(self.devices, device, self.mount_check) - except ValueError as err: - # We don't count this as an error. The occasional unmounted - # drive is part of normal cluster operations, so a simple - # warning is sufficient. - self.logger.warning('Skipping: %s', err) - continue - self.object_sweep(dev_path) + devices = self._listdir(self.devices) + self._process_devices(devices) elapsed = time.time() - self.begin self.logger.info( - ('Object update single-threaded sweep completed: ' - '%(elapsed).02fs, %(stats)s'), - {'elapsed': elapsed, 'stats': self.stats}) - self.dump_recon(elapsed) + ('Object update sweep of all devices completed: ' + '%(elapsed).02fs'), + {'elapsed': elapsed}) + self.aggregate_and_dump_recon(devices, elapsed) + return elapsed - def dump_recon(self, elapsed): - """Gathers stats and dumps recon cache.""" - object_updater_stats = { - 'failures_oldest_timestamp': ( - self.oldest_async_pendings.get_oldest_timestamp() - ), - 'failures_oldest_timestamp_age': ( - self.oldest_async_pendings.get_oldest_timestamp_age() - ), - 'failures_account_container_count': ( - len(self.oldest_async_pendings.ac_to_timestamp) - ), - 'failures_oldest_timestamp_account_containers': ( + def _gather_recon_stats(self): + """Gather stats for device recon dumps.""" + stats = { + 'failures_oldest_timestamp': + self.oldest_async_pendings.get_oldest_timestamp(), + 'failures_oldest_timestamp_age': + self.oldest_async_pendings.get_oldest_timestamp_age(), + 'failures_account_container_count': + len(self.oldest_async_pendings.ac_to_timestamp), + 'failures_oldest_timestamp_account_containers': self.oldest_async_pendings.get_n_oldest_timestamp_acs( - self.dump_count + self.dump_count), + 'tracker_memory_usage': + self.oldest_async_pendings.get_memory_usage(), + } + return stats + + def dump_device_recon(self, device): + """Dump recon stats for a single device.""" + disk_recon_stats = self._gather_recon_stats() + dump_recon_cache( + {'object_updater_per_device': {device: disk_recon_stats}}, + self.rcache, + self.logger, + ) + + def aggregate_and_dump_recon(self, devices, elapsed): + """ + Aggregate recon stats across devices and dump the result to the + recon cache. + """ + recon_cache = load_recon_cache(self.rcache) + device_stats = recon_cache.get('object_updater_per_device', {}) + if not isinstance(device_stats, dict): + raise TypeError('object_updater_per_device must be a dict') + device_stats = {k: (v if v is not None else {}) + for k, v in device_stats.items()} + + devices_to_remove = set(device_stats) - set(devices) + update_device_stats = {d: {} for d in devices_to_remove} + + aggregated_oldest_entries = [] + + for stats in device_stats.values(): + container_data = stats.get( + 'failures_oldest_timestamp_account_containers', {}) + aggregated_oldest_entries.extend(container_data.get( + 'oldest_entries', [])) + aggregated_oldest_entries.sort(key=lambda x: x['timestamp']) + aggregated_oldest_entries = aggregated_oldest_entries[:self.dump_count] + aggregated_oldest_count = len(aggregated_oldest_entries) + + aggregated_stats = { + 'failures_account_container_count': max( + list( + stats.get('failures_account_container_count', 0) + for stats in device_stats.values() ) + or [0], ), 'tracker_memory_usage': ( - self.oldest_async_pendings.get_memory_usage() + float(sum( + stats.get('tracker_memory_usage', 0) + for stats in device_stats.values() + )) + / float(len(device_stats)) + ) + * max(self.updater_workers, 1) + if device_stats + else 0, + 'failures_oldest_timestamp': min( + list(filter(lambda x: x is not None, + [stats.get('failures_oldest_timestamp') + for stats in device_stats.values()] + )) or [None], ), + 'failures_oldest_timestamp_age': max( + list(filter(lambda x: x is not None, + [stats.get('failures_oldest_timestamp_age') + for stats in device_stats.values()] + )) or [None], + ), + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': aggregated_oldest_count, + 'oldest_entries': aggregated_oldest_entries, + }, } dump_recon_cache( { 'object_updater_sweep': elapsed, - 'object_updater_stats': object_updater_stats, + 'object_updater_stats': aggregated_stats, + 'object_updater_per_device': update_device_stats, }, self.rcache, self.logger, diff --git a/test/probe/test_object_async_update.py b/test/probe/test_object_async_update.py index f7276cc25d..3b5f1e831c 100644 --- a/test/probe/test_object_async_update.py +++ b/test/probe/test_object_async_update.py @@ -14,17 +14,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import shutil +import time +import uuid + +from datetime import datetime from io import BytesIO from unittest import main, SkipTest from uuid import uuid4 import random +import mock + from swiftclient import client from swiftclient.exceptions import ClientException -from swift.common import direct_client -from swift.common.manager import Manager +from six.moves.configparser import ConfigParser +from swift.common import direct_client, manager +from swift.common.manager import Manager, Server from swift.common.swob import normalize_etag +from swift.common.utils import readconf from test.probe.common import kill_nonprimary_server, \ kill_server, ReplProbeTest, start_server, ECProbeTest @@ -334,23 +344,28 @@ class TestUpdateOverridesEC(ECProbeTest): self.assertEqual('test/ctype', listing[0]['content_type']) -class TestObjectUpdaterStats(ReplProbeTest): +class UpdaterStatsMixIn(object): def setUp(self): - super(TestObjectUpdaterStats, self).setUp() + super(UpdaterStatsMixIn, self).setUp() self.int_client = self.make_internal_client() self.container_servers = Manager(['container-server']) + self.object_updater = Manager(['object-updater']) + self._post_setup_config() - def test_lots_of_asyncs(self): + def _post_setup_config(self): + pass + + def _create_lots_of_asyncs(self): # Create some (acct, cont) pairs num_accounts = 3 num_conts_per_a = 4 - ac_pairs = [] + self.ac_pairs = ac_pairs = [] for a in range(num_accounts): - acct = 'AUTH_user%03d' % a + acct = 'AUTH_user-%s-%03d' % (uuid.uuid4(), a) self.int_client.create_account(acct) for c in range(num_conts_per_a): - cont = 'cont%03d' % c + cont = 'cont-%s-%03d' % (uuid.uuid4(), c) self.int_client.create_container(acct, cont) ac_pairs.append((acct, cont)) @@ -359,10 +374,10 @@ class TestObjectUpdaterStats(ReplProbeTest): self.container_servers.stop(number=n) # Create a bunch of objects - num_objs_per_ac = 5 + num_objs_per_ac = 10 for acct, cont in ac_pairs: for o in range(num_objs_per_ac): - obj = 'obj%03d' % o + obj = 'obj-%s-%03d' % (uuid.uuid4(), o) self.int_client.upload_object(BytesIO(b''), acct, cont, obj) all_asyncs = self.gather_async_pendings() @@ -371,22 +386,57 @@ class TestObjectUpdaterStats(ReplProbeTest): self.assertGreater(len(all_asyncs), total_objs) self.assertLess(len(all_asyncs), total_objs * 2) - # Run the updater and check stats - Manager(['object-updater']).once() - recons = [] + def _gather_recon(self): + # We'll collect recon only once from each node + dev_to_node_dict = {} for onode in self.object_ring.devs: - recon = direct_client.direct_get_recon(onode, 'updater/object') - recons.append(recon) + # We can skip any devices that are already covered by one of the + # other nodes we found + if any(self.is_local_to(node, onode) + for node in dev_to_node_dict.values()): + continue + dev_to_node_dict[onode["device"]] = onode + self.assertEqual(4, len(dev_to_node_dict)) # sanity - self.assertEqual(4, len(recons)) - found_counts = [] + timeout = 20 + polling_interval = 2 + recon_data = [] + start = time.time() + while True: + for onode in list(dev_to_node_dict.values()): + recon = direct_client.direct_get_recon( + onode, 'updater/object') + if (recon.get('object_updater_stats') is not None and + recon.get('object_updater_sweep') is not None): + del dev_to_node_dict[onode["device"]] + recon_data.append(recon) + if not dev_to_node_dict: + break + elapsed = time.time() - start + if elapsed > timeout: + self.fail( + "Updates did not process within {timeout} seconds".format( + timeout=timeout) + ) + time.sleep(polling_interval) + self.assertEqual(4, len(recon_data)) # sanity + return recon_data + + def run_updater(self): + raise NotImplementedError() + + def _check_recon_data(self, recon_data): + ac_pairs = self.ac_pairs ac_set = set() - for recon in recons: + for recon in recon_data: updater_stats = recon['object_updater_stats'] - found_counts.append( - updater_stats['failures_account_container_count'] - ) + found_count = updater_stats['failures_account_container_count'] + # No node should find MORE unique ac than we created + self.assertLessEqual(found_count, len(ac_pairs)) + # and generally we'd expect them to have "at least" one from + # significanly MORE than the "majority" of ac_pairs + self.assertGreaterEqual(found_count, len(ac_pairs) / 2) oldest_count = updater_stats[ 'failures_oldest_timestamp_account_containers' @@ -403,14 +453,76 @@ class TestObjectUpdaterStats(ReplProbeTest): container = entry['container'] timestamp = entry['timestamp'] self.assertIsNotNone(timestamp) - self.assertIsInstance(timestamp, float) ac_set.add((account, container)) + # All the collected ac_set are from the ac_pairs we created for ac in ac_set: self.assertIn(ac, set(ac_pairs)) + # Specifically, the ac_pairs we created failures for *first* + # are represented by the oldest ac_set across nodes + for ac in ac_pairs[:5]: + self.assertIn(ac, ac_set) + # Where as the more recent failures are NOT! + for ac in ac_pairs[-3:]: + self.assertNotIn(ac, ac_set) - for found_count in found_counts: - self.assertLessEqual(found_count, len(ac_pairs)) + def test_stats(self): + self._create_lots_of_asyncs() + recon_data = self.run_updater() + self._check_recon_data(recon_data) + + +class TestObjectUpdaterStatsRunOnce(UpdaterStatsMixIn, ReplProbeTest): + + def run_updater(self): + # Run the updater and check stats + Manager(['object-updater']).once() + return self._gather_recon() + + +class TestObjectUpdaterStatsRunForever(UpdaterStatsMixIn, ECProbeTest): + + def _post_setup_config(self): + CONF_SECTION = 'object-updater' + self.conf_dest = os.path.join( + '/tmp/', + datetime.now().strftime('swift-%Y-%m-%d_%H-%M-%S-%f') + ) + os.mkdir(self.conf_dest) + object_server_dir = os.path.join(self.conf_dest, 'object-server') + os.mkdir(object_server_dir) + for conf_file in Server('object-updater').conf_files(): + config = readconf(conf_file) + if CONF_SECTION not in config: + continue # Ensure the object-updater is set up to run + config[CONF_SECTION].update({'interval': '1'}) + + parser = ConfigParser() + parser.add_section(CONF_SECTION) + for option, value in config[CONF_SECTION].items(): + parser.set(CONF_SECTION, option, value) + + file_name = os.path.basename(conf_file) + if file_name.endswith('.d'): + # Work around conf.d setups (like you might see with VSAIO) + file_name = file_name[:-2] + with open(os.path.join(object_server_dir, file_name), 'w') as fp: + parser.write(fp) + + def tearDown(self): + shutil.rmtree(self.conf_dest) + + def run_updater(self): + # Start the updater + with mock.patch.object(manager, 'SWIFT_DIR', self.conf_dest): + updater_status = self.object_updater.start() + self.assertEqual( + updater_status, 0, "Object updater failed to start") + recon_data = self._gather_recon() + # Stop the updater + stop_status = self.object_updater.stop() + self.assertEqual(stop_status, 0, "Object updater failed to stop") + return recon_data if __name__ == '__main__': diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 8262012518..00f41df917 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -154,7 +154,8 @@ class TestObjectController(BaseTestCase): self.tmpdir = mkdtemp() self.testdir = os.path.join(self.tmpdir, 'tmp_test_object_server_ObjectController') - mkdirs(os.path.join(self.testdir, 'sda1')) + self.sda1 = os.path.join(self.testdir, 'sda1') + mkdirs(self.sda1) self.conf = {'devices': self.testdir, 'mount_check': 'false', 'container_update_timeout': 0.0} self.logger = debug_logger('test-object-controller') @@ -1091,7 +1092,7 @@ class TestObjectController(BaseTestCase): mock_ring.get_nodes.return_value = (99, [node]) object_updater.container_ring = mock_ring mock_update.return_value = ((True, 1, None)) - object_updater.run_once() + object_updater._process_device_in_child(self.sda1, 'sda1') self.assertEqual(1, mock_update.call_count) self.assertEqual((node, 99, 'PUT', '/a/c/o'), mock_update.call_args_list[0][0][0:4]) @@ -1206,7 +1207,7 @@ class TestObjectController(BaseTestCase): mock_ring = mock.MagicMock() mock_ring.get_nodes.return_value = (99, [node]) object_updater.container_ring = mock_ring - object_updater.run_once() + object_updater._process_device_in_child(self.sda1, 'sda1') self.assertEqual(1, len(conn.requests)) self.assertEqual('/cdevice/99/.sharded_a/c_shard_1/o', diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 0cac9beecf..076849ba73 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -25,6 +25,7 @@ from contextlib import closing from gzip import GzipFile from tempfile import mkdtemp from shutil import rmtree +import json from swift.common.exceptions import ConnectionTimeout from test import listen_zero @@ -431,27 +432,34 @@ class TestObjectUpdater(unittest.TestCase): completion_lines[0]) @mock.patch.object(object_updater, 'check_drive') - def test_run_once_with_disk_unmounted(self, mock_check_drive): - mock_check_drive.side_effect = ValueError + @mock.patch('swift.obj.updater.os') + def test_run_once_with_disk_unmounted(self, mock_os, mock_check_drive): + def fake_mount_check(root, device, mount_check=True): + raise ValueError('%s is unmounted' % device) + mock_check_drive.side_effect = fake_mount_check + mock_os.path = os.path + mock_os.listdir = os.listdir + + # mount_check False ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', - 'node_timeout': '15'}) + 'node_timeout': '15'}, logger=self.logger) + ou.run_once() - async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) - os.mkdir(async_dir) - ou.run_once() - self.assertTrue(os.path.exists(async_dir)) - # each run calls check_device self.assertEqual([ mock.call(self.devices_dir, 'sda1', False), - mock.call(self.devices_dir, 'sda1', False), ], mock_check_drive.mock_calls) mock_check_drive.reset_mock() + self.assertEqual([], mock_os.mock_calls) + self.assertEqual(['Skipping: sda1 is unmounted'], + self.logger.get_lines_for_level('warning')) + self.logger.clear() + # mount_check True ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, 'mount_check': 'TrUe', @@ -459,21 +467,66 @@ class TestObjectUpdater(unittest.TestCase): 'interval': '1', 'concurrency': '1', 'node_timeout': '15'}, logger=self.logger) - odd_dir = os.path.join(async_dir, 'not really supposed ' - 'to be here') - os.mkdir(odd_dir) ou.run_once() - self.assertTrue(os.path.exists(async_dir)) - self.assertTrue(os.path.exists(odd_dir)) # skipped - not mounted! self.assertEqual([ mock.call(self.devices_dir, 'sda1', True), ], mock_check_drive.mock_calls) + mock_check_drive.reset_mock() + self.assertEqual([], mock_os.mock_calls) + self.assertEqual(['Skipping: sda1 is unmounted'], + self.logger.get_lines_for_level('warning')) + self.logger.clear() self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {}) - @mock.patch('swift.obj.updater.dump_recon_cache') - @mock.patch.object(object_updater, 'check_drive') - def test_run_once(self, mock_check_drive, mock_dump_recon): - mock_check_drive.side_effect = lambda r, d, mc: os.path.join(r, d) + # multiple devices, one unmounted + NUM_DEVICES = 4 + + def fake_list_dir(path): + if path == self.devices_dir: + return ['sda' + str(i) + for i in range(NUM_DEVICES)] + else: + return os.listdir(path) + mock_os.listdir = fake_list_dir + + def fake_mount_check(root, device, mount_check=True): + if device == 'sda2': + raise ValueError('%s is unmounted' % device) + else: + return os.path.join(root, device) + mock_check_drive.side_effect = fake_mount_check + + def fake_list_dir(path): + if path == self.devices_dir: + return ['sda1', 'sda0', 'sda2', 'sda3'] + else: + return os.listdir(path) + mock_os.listdir.side_effect = fake_list_dir + + pids = [i + 1 for i in range(NUM_DEVICES)] + mock_os.fork.side_effect = list(pids) + mock_os.wait.side_effect = [(p, 0) for p in pids] + ou.run_once() + self.assertEqual([ + mock.call(self.devices_dir, 'sda0', True), + mock.call(self.devices_dir, 'sda1', True), + mock.call(self.devices_dir, 'sda2', True), + mock.call(self.devices_dir, 'sda3', True), + ], mock_check_drive.mock_calls) + # we fork/wait for each mounted device + self.assertEqual([ + mock.call.fork(), + mock.call.wait(), + ] * (NUM_DEVICES - 1), mock_os.mock_calls) + self.assertEqual(['Skipping: sda2 is unmounted'], + self.logger.get_lines_for_level('warning')) + self.logger.clear() + self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {}) + + @mock.patch('swift.obj.updater.os') + def test_run_once_child(self, mock_os): + mock_os.path = os.path + mock_os.listdir = os.listdir ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, 'mount_check': 'false', @@ -481,23 +534,179 @@ class TestObjectUpdater(unittest.TestCase): 'interval': '1', 'concurrency': '1', 'node_timeout': '15'}, logger=self.logger) + devices = [] + NUM_DEVICES = 4 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + devices.append(device) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + mock_os.fork.side_effect = [0] + mock_process = ou._process_device_in_child = mock.MagicMock() + with self.assertRaises(SystemExit): + ou.run_once() + self.assertEqual([ + mock.call(self.sda1, 'sda1'), + ], mock_process.mock_calls) + + @mock.patch('swift.obj.updater.os') + def test_run_once_subsequent_children(self, mock_os): + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'updater_workers': '4', + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + devices = [] + NUM_DEVICES = 4 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + devices.append(device) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + mock_process = ou._process_device_in_child = mock.MagicMock() + pids = [i + 1 for i in range(NUM_DEVICES)] + for i in range(4): + mock_os.fork.side_effect = pids[:i] + [0] + with self.assertRaises(SystemExit): + ou.run_once() + + self.assertEqual([ + mock.call(os.path.join(self.devices_dir, 'sda1'), 'sda1'), + mock.call(os.path.join(self.devices_dir, 'sda0'), 'sda0'), + mock.call(os.path.join(self.devices_dir, 'sda2'), 'sda2'), + mock.call(os.path.join(self.devices_dir, 'sda3'), 'sda3'), + ], mock_process.mock_calls) + + @mock.patch('swift.obj.updater.os') + def test_run_once_child_with_more_workers(self, mock_os): + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'updater_workers': '3', + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + devices = [] + NUM_DEVICES = 3 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda{0}'.format(i)) + devices.append(device) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + mock_os.fork.side_effect = [1, 2, 0] + mock_process = ou._process_device_in_child = mock.MagicMock() + with self.assertRaises(SystemExit): + ou.run_once() + self.assertEqual([ + mock.call(os.path.join(self.devices_dir, 'sda2'), 'sda2'), + ], mock_process.mock_calls) + + @mock.patch.object(object_updater, 'check_drive') + @mock.patch('swift.obj.updater.os') + def test_run_once_parent_default(self, mock_os, mock_check_drive): + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'on', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + + NUM_DEVICES = 2 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + pids = [i + 1 for i in range(NUM_DEVICES)] + mock_os.fork.side_effect = pids + mock_os.wait.side_effect = [(i, 0) for i in pids] ou.run_once() + self.assertEqual([ + mock.call(self.devices_dir, 'sda1', True), + mock.call(self.devices_dir, 'sda0', True), + ], mock_check_drive.mock_calls) + self.assertEqual([ + mock.call.fork(), + mock.call.wait(), + mock.call.fork(), + mock.call.wait(), + ], mock_os.mock_calls) + + @mock.patch.object(object_updater, 'check_drive') + @mock.patch('swift.obj.updater.os') + def test_run_once_parent_more_updater_workers(self, mock_os, + mock_check_drive): + # unpatch listdir and path + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': '1', + 'swift_dir': self.testdir, + 'updater_workers': '4', + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + NUM_DEVICES = 4 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + pids = [i + 1 for i in range(NUM_DEVICES)] + mock_os.fork.side_effect = pids + mock_os.wait.side_effect = [(i, 0) for i in pids] + ou.run_once() + self.assertEqual([ + mock.call(self.devices_dir, 'sda1', True), + mock.call(self.devices_dir, 'sda0', True), + mock.call(self.devices_dir, 'sda2', True), + mock.call(self.devices_dir, 'sda3', True), + ], mock_check_drive.mock_calls) + self.assertEqual([ + mock.call.fork(), + mock.call.fork(), + mock.call.fork(), + mock.call.fork(), + mock.call.wait(), + mock.call.wait(), + mock.call.wait(), + mock.call.wait(), + ], mock_os.mock_calls) + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_process_devices_in_child(self, mock_dump_recon): + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + ou._process_device_in_child(self.sda1, 'sda1') self.assertEqual([], ou.logger.get_lines_for_level('error')) async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) os.mkdir(async_dir) - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(async_dir)) - # each run calls check_device - self.assertEqual([ - mock.call(self.devices_dir, 'sda1', False), - mock.call(self.devices_dir, 'sda1', False), - ], mock_check_drive.mock_calls) - mock_check_drive.reset_mock() self.assertEqual([], ou.logger.get_lines_for_level('error')) ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, - 'mount_check': 'TrUe', 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', @@ -505,11 +714,9 @@ class TestObjectUpdater(unittest.TestCase): odd_dir = os.path.join(async_dir, 'not really supposed ' 'to be here') os.mkdir(odd_dir) - ou.run_once() + + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(async_dir)) - self.assertEqual([ - mock.call(self.devices_dir, 'sda1', True), - ], mock_check_drive.mock_calls) self.assertEqual([], ou.logger.get_lines_for_level('error')) ohash = hash_path('a', 'c', 'o') @@ -529,7 +736,7 @@ class TestObjectUpdater(unittest.TestCase): 'X-Container-Timestamp': normalize_timestamp(0)}}, async_pending) - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(not os.path.exists(older_op_path)) self.assertTrue(os.path.exists(op_path)) self.assertEqual(ou.logger.statsd_client.get_increment_counts(), @@ -592,7 +799,7 @@ class TestObjectUpdater(unittest.TestCase): dev['replication_port'] = bindsock.getsockname()[1] ou.logger._clear() - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') err = event.wait() if err: raise err @@ -613,7 +820,7 @@ class TestObjectUpdater(unittest.TestCase): # only 1/2 updates succeeds event = spawn(accept, [404, 201]) ou.logger.clear() - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') err = event.wait() if err: raise err @@ -635,7 +842,7 @@ class TestObjectUpdater(unittest.TestCase): with Timeout(99) as exc, \ mock.patch('swift.obj.updater.http_connect') as mock_connect: mock_connect.return_value.getresponse.side_effect = exc - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(op_path)) self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {'failures': 1}) @@ -655,7 +862,7 @@ class TestObjectUpdater(unittest.TestCase): with ConnectionTimeout(9) as exc, \ mock.patch('swift.obj.updater.http_connect') as mock_connect: mock_connect.return_value.getresponse.side_effect = exc - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(op_path)) self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {'failures': 1}) @@ -674,7 +881,7 @@ class TestObjectUpdater(unittest.TestCase): # final update succeeds event = spawn(accept, [201]) ou.logger.clear() - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') err = event.wait() if err: raise err @@ -695,42 +902,43 @@ class TestObjectUpdater(unittest.TestCase): ])) @mock.patch('swift.obj.updater.dump_recon_cache') - @mock.patch.object(object_updater, 'check_drive') - def test_run_once_recon_dump(self, mock_check_drive, mock_dump_recon): + def test_run_once_recon_dump(self, mock_dump_recon): self.maxDiff = None - def assert_and_reset_recon_dump(exp): + def assert_and_reset_recon_dump_per_device(exp): recon_dumps = [call[0][0] for call in mock_dump_recon.call_args_list] self.assertEqual([exp], recon_dumps) mock_dump_recon.reset_mock() - mock_check_drive.side_effect = lambda r, d, mc: os.path.join(r, d) async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) os.mkdir(async_dir) ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, - 'mount_check': 'TrUe', 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', 'node_timeout': '15'}, logger=self.logger) + # There are no asyncs so there are no failures with mock.patch.object(ou, 'object_update', return_value=(False, 'node-id', None)): - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') exp_recon_dump = { - 'object_updater_stats': { - 'failures_account_container_count': 0, - 'failures_oldest_timestamp': None, - 'failures_oldest_timestamp_account_containers': { - 'oldest_count': 0, - 'oldest_entries': []}, - 'failures_oldest_timestamp_age': None, - 'tracker_memory_usage': mock.ANY}, - 'object_updater_sweep': mock.ANY, + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': mock.ANY, + } + } } - assert_and_reset_recon_dump(exp_recon_dump) + assert_and_reset_recon_dump_per_device(exp_recon_dump) ts = next(self.ts_iter) ohash = hash_path('a', 'c', 'o') @@ -748,22 +956,449 @@ class TestObjectUpdater(unittest.TestCase): with mock.patch('swift.obj.updater.time.time', return_value=now): with mock.patch.object(ou, 'object_update', return_value=(False, 'node-id', None)): - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') exp_recon_dump = { - 'object_updater_stats': { - 'failures_account_container_count': 1, - 'failures_oldest_timestamp': float(ts), - 'failures_oldest_timestamp_account_containers': { - 'oldest_count': 1, - 'oldest_entries': [{'account': 'a', - 'container': 'c', - 'timestamp': float(ts)}] - }, - 'failures_oldest_timestamp_age': now - float(ts), - 'tracker_memory_usage': mock.ANY}, - 'object_updater_sweep': mock.ANY, + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 1, + 'failures_oldest_timestamp': float(ts), + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 1, + 'oldest_entries': [{'account': 'a', + 'container': 'c', + 'timestamp': float(ts)}] + }, + 'failures_oldest_timestamp_age': now - float(ts), + 'tracker_memory_usage': mock.ANY, + } + } } - assert_and_reset_recon_dump(exp_recon_dump) + assert_and_reset_recon_dump_per_device(exp_recon_dump) + + def test_gather_recon_stats(self): + ou = object_updater.ObjectUpdater({}) + with mock.patch.object( + ou.oldest_async_pendings, + 'get_oldest_timestamp', return_value=123.456 + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'get_oldest_timestamp_age', + return_value=789.012, + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'ac_to_timestamp', + return_value={('account1', 'container1'): 123.456}, + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'get_n_oldest_timestamp_acs', + return_value=[ + {'account': 'AUTH_1', + 'container': 'cont_1', + 'timestamp': 123.456}, + ], + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'get_memory_usage', + return_value=1024, + ): + ou.oldest_async_pendings.ac_to_timestamp = { + ('AUTH_1', 'cont_1'): 123.456} + stats = ou._gather_recon_stats() + + expected_stats = { + 'failures_oldest_timestamp': 123.456, + 'failures_oldest_timestamp_age': 789.012, + 'failures_account_container_count': 1, + 'failures_oldest_timestamp_account_containers': [ + {'account': 'AUTH_1', + 'container': 'cont_1', + 'timestamp': 123.456}, + ], + 'tracker_memory_usage': 1024, + } + self.assertEqual(stats, expected_stats) + + def test_aggregate_and_dump_recon_with_missing_keys(self): + """ + Test aggregation logic when device stats are missing some keys. + """ + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 1, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 3, + }, logger=self.logger) + + incomplete_recon = { + 'object_updater_per_device': { + 'sda1': { + 'tracker_memory_usage': 256, + 'failures_account_container_count': 1, + }, + 'sda2': { + 'failures_oldest_timestamp': 124.56789, + 'failures_oldest_timestamp_age': 789.012, + }, + 'sda3': None, + } + } + utils.dump_recon_cache(incomplete_recon, ou.rcache, ou.logger) + + ou.aggregate_and_dump_recon(['sda1', 'sda2', 'sda3'], 30) + + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 1, + 'tracker_memory_usage': 256.0 / 3.0, + 'failures_oldest_timestamp': 124.56789, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [] + }, + } + + expected_recon = { + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + } + self.assertEqual(expected_recon, found_data) + + def test_aggregate_and_dump_recon_all_empty_devices(self): + """ + Test aggregation logic when all devices are empty or missing. + """ + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 5, + }, logger=self.logger) + + empty_recon = { + 'object_updater_per_device': { + 'sda1': {}, + 'sda2': {}, + } + } + utils.dump_recon_cache(empty_recon, ou.rcache, ou.logger) + + ou.aggregate_and_dump_recon(['sda1', 'sda2'], 30) + + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 0, + 'tracker_memory_usage': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_age': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [] + }, + } + + expected_recon = { + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + } + self.assertEqual(expected_recon, found_data) + + def test_aggregate_and_dump_recon_wrong_type_per_device(self): + """ + Test aggregation when object_updater_per_device is the wrong type. + """ + recon_path = os.path.join(self.testdir, 'recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 5, + }, logger=self.logger) + + # object_updater_per_device as a list instead of dict + malformed_recon = { + 'object_updater_per_device': ['invalid_data_type'] + } + utils.dump_recon_cache(malformed_recon, ou.rcache, ou.logger) + + with self.assertRaises(TypeError) as cm: + ou.aggregate_and_dump_recon(['sda1', 'sda2'], 30) + + self.assertIn( + 'object_updater_per_device must be a dict', str(cm.exception)) + + def test_aggregate_and_dump_recon_partial_device_updates(self): + """ + Test when some devices are removed and partial updates exist. + """ + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 2, + }, logger=self.logger) + + existing_recon = { + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 512, + }, + 'sda2': { + 'failures_account_container_count': 1, + 'tracker_memory_usage': 256, + }, + 'sda3': None, + } + } + utils.dump_recon_cache(existing_recon, ou.rcache, ou.logger) + + ou.aggregate_and_dump_recon(['sda1'], 30) + + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 512.0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_age': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + } + + expected_recon = { + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 512, + }, + }, + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + } + self.assertEqual(expected_recon, found_data) + + def test_dump_device_recon(self): + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': '6', + }) + ou.dump_device_recon('sda1') + with open(recon_file) as f: + found_data = json.load(f) + self.assertEqual({ + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': mock.ANY, + } + } + }, found_data) + + # now add some data + timestamps = [] + for a in range(3): + account = 'AUTH_%s' % a + for c in range(4): + container = 'cont_%s' % c + for ts in range(5): + ts = next(self.ts_iter) + timestamps.append(float(ts)) + ou.oldest_async_pendings.add_update( + account, container, ts) + + now = float(next(self.ts_iter)) + with mock.patch('swift.obj.updater.time.time', return_value=now): + ou.dump_device_recon('sda1') + with open(recon_file) as f: + found_data = json.load(f) + self.assertEqual({ + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 12, + 'failures_oldest_timestamp': timestamps[0], + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 6, + 'oldest_entries': [{ + 'account': 'AUTH_0', + 'container': 'cont_0', + 'timestamp': timestamps[0], + }, { + 'account': 'AUTH_0', + 'container': 'cont_1', + 'timestamp': timestamps[5], + }, { + 'account': 'AUTH_0', + 'container': 'cont_2', + 'timestamp': timestamps[10], + }, { + 'account': 'AUTH_0', + 'container': 'cont_3', + 'timestamp': timestamps[15], + }, { + 'account': 'AUTH_1', + 'container': 'cont_0', + 'timestamp': timestamps[20], + }, { + 'account': 'AUTH_1', + 'container': 'cont_1', + 'timestamp': timestamps[25], + }], + }, + 'failures_oldest_timestamp_age': now - timestamps[0], + 'tracker_memory_usage': mock.ANY, + } + } + }, found_data) + + def test_aggregate_and_dump_recon(self): + self.maxDiff = None + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 2, + }, logger=self.logger) + existing_recon = { + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 1, + 'tracker_memory_usage': 512, + 'failures_oldest_timestamp': 123.45678, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 1, + 'oldest_entries': [ + {'account': 'a', 'container': 'c', + 'timestamp': 123.45678} + ], + }, + }, + 'sda2': { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 256, + 'failures_oldest_timestamp': 124.56789, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 2, + 'oldest_entries': [ + {'account': 'x', 'container': 'y', + 'timestamp': 124.56789}, + {'account': 'm', 'container': 'n', + 'timestamp': 125.67890}, + ], + }, + }, + }, + } + utils.dump_recon_cache(existing_recon, ou.rcache, ou.logger) + # add an "empty" device + with mock.patch.object(ou.oldest_async_pendings, 'get_memory_usage', + return_value=128): + ou.dump_device_recon('sda3') + # and also an unmounted one + ou.dump_device_recon('sdx') + existing_recon['object_updater_per_device'].update({ + 'sda3': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': 128, + }, + 'sdx': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': mock.ANY, + }, + }) + with open(recon_file) as f: + found_data = json.load(f) + self.assertEqual(existing_recon, found_data) # sanity + + # we're setting this up like sdx is stale/unmounted + ou.aggregate_and_dump_recon(['sda1', 'sda2', 'sda3'], 30) + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 2, + 'tracker_memory_usage': mock.ANY, + 'failures_oldest_timestamp': 123.45678, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 2, + 'oldest_entries': [ + {'account': 'a', 'container': 'c', 'timestamp': 123.45678}, + {'account': 'x', 'container': 'y', 'timestamp': 124.56789}, + ], + }, + } + expected_recon = dict(existing_recon, **{ + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + }) + # and sda4 is removed + del expected_recon['object_updater_per_device']['sdx'] + self.assertEqual(expected_recon, found_data) + + self.assertAlmostEqual( + 755.5, + found_data['object_updater_stats']['tracker_memory_usage'], + delta=200 + ) def test_obj_put_legacy_updates(self): ts = (normalize_timestamp(t) for t in @@ -804,7 +1439,8 @@ class TestObjectUpdater(unittest.TestCase): # run once fake_status_codes = [200, 200, 200] with mocked_http_conn(*fake_status_codes, give_connect=capture): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') + self.assertEqual(len(fake_status_codes), len(request_log)) for request_args, request_kwargs in request_log: ip, part, method, path, headers, qs, ssl = request_args @@ -867,7 +1503,7 @@ class TestObjectUpdater(unittest.TestCase): 200, # object update conflict ] with mocked_http_conn(*fake_status_codes, give_connect=capture): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(len(fake_status_codes), len(request_log)) for request_args, request_kwargs in request_log: ip, part, method, path, headers, qs, ssl = request_args @@ -981,7 +1617,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:3], ts_obj, policies[0]) self._check_update_requests(conn.requests[3:], ts_obj, policies[0]) @@ -1016,7 +1652,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( 507, 200, 507) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual(['/sda1/0/a/c/o'] * 3, @@ -1038,7 +1674,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:2], ts_obj, policies[0]) self._check_update_requests(conn.requests[2:], ts_obj, policies[0]) @@ -1092,7 +1728,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual(['/sda1/0/a/c/o'] * 3, @@ -1165,7 +1801,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) # only *one* set of redirected requests is attempted per cycle @@ -1195,7 +1831,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual( @@ -1239,7 +1875,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:3], ts_obj, policies[0]) self._check_update_requests(conn.requests[3:], ts_obj, policies[0]) # only *one* set of redirected requests is attempted per cycle @@ -1273,7 +1909,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:3], ts_obj, policies[0]) self._check_update_requests(conn.requests[3:], ts_obj, policies[0]) # first try the previously persisted container path, response to that @@ -1306,7 +1942,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual( ['/sda1/%s/.shards_a/c_shard_3/o' % shard_3_part] * 3 + @@ -1331,7 +1967,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual(['/sda1/%s/a/c/o' % root_part] * 3, [req['path'] for req in conn.requests]) @@ -1366,7 +2002,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn(): with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual( {'quarantines': 1}, @@ -1464,7 +2100,7 @@ class TestObjectUpdater(unittest.TestCase): expected_success = 2 fake_status_codes = [200] * 3 * expected_success with mocked_http_conn(*fake_status_codes) as fake_conn: - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1512,7 +2148,7 @@ class TestObjectUpdater(unittest.TestCase): len(self._find_async_pending_files())) fake_status_codes = [200] * 3 * expected_total with mocked_http_conn(*fake_status_codes): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_total, daemon.stats.successes) self.assertEqual(0, daemon.stats.skips) self.assertEqual([], self._find_async_pending_files()) @@ -1573,7 +2209,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn(*fake_status_codes) as fake_conn, \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1663,7 +2299,7 @@ class TestObjectUpdater(unittest.TestCase): fake_object_update), \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1783,7 +2419,7 @@ class TestObjectUpdater(unittest.TestCase): mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1911,7 +2547,7 @@ class TestObjectUpdater(unittest.TestCase): mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips)