diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 8b77001ba4..6ff5f9e894 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -35,7 +35,7 @@ from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ non_negative_float, config_positive_int_value, non_negative_int, \ - EventletRateLimiter, node_to_string, parse_options + EventletRateLimiter, node_to_string, parse_options, load_recon_cache from swift.common.daemon import Daemon, run_daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -480,102 +480,164 @@ class ObjectUpdater(Daemon): self.container_ring = Ring(self.swift_dir, ring_name='container') return self.container_ring + def _process_device_in_child(self, dev_path, device): + """Process a single device in a forked child process.""" + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.environ.pop('NOTIFY_SOCKET', None) + eventlet_monkey_patch() + self.stats.reset() + self.oldest_async_pendings.reset() + forkbegin = time.time() + self.object_sweep(dev_path) + elapsed = time.time() - forkbegin + self.logger.info( + ('Object update sweep of %(device)s ' + 'completed: %(elapsed).02fs, %(stats)s'), + {'device': device, 'elapsed': elapsed, + 'stats': self.stats}) + self.dump_device_recon(device) + + def _process_devices(self, devices): + """Process devices, handling both single and multi-threaded modes.""" + pids = [] + # read from container ring to ensure it's fresh + self.get_container_ring().get_nodes('') + for device in devices: + try: + dev_path = check_drive(self.devices, device, + self.mount_check) + except ValueError as err: + # We don't count this as an error. The occasional + # unmounted drive is part of normal cluster operations, + # so a simple warning is sufficient. + self.logger.warning('Skipping: %s', err) + continue + while len(pids) >= self.updater_workers: + pids.remove(os.wait()[0]) + pid = os.fork() + if pid: + pids.append(pid) + else: + self._process_device_in_child(dev_path, device) + sys.exit() + + while pids: + pids.remove(os.wait()[0]) + def run_forever(self, *args, **kwargs): """Run the updater continuously.""" time.sleep(random() * self.interval) while True: - self.logger.info('Begin object update sweep') - self.begin = time.time() - pids = [] - # read from container ring to ensure it's fresh - self.get_container_ring().get_nodes('') - for device in self._listdir(self.devices): - try: - dev_path = check_drive(self.devices, device, - self.mount_check) - except ValueError as err: - # We don't count this as an error. The occasional - # unmounted drive is part of normal cluster operations, - # so a simple warning is sufficient. - self.logger.warning('Skipping: %s', err) - continue - while len(pids) >= self.updater_workers: - pids.remove(os.wait()[0]) - pid = os.fork() - if pid: - pids.append(pid) - else: - signal.signal(signal.SIGTERM, signal.SIG_DFL) - os.environ.pop('NOTIFY_SOCKET', None) - eventlet_monkey_patch() - self.stats.reset() - self.oldest_async_pendings.reset() - forkbegin = time.time() - self.object_sweep(dev_path) - elapsed = time.time() - forkbegin - self.logger.info( - ('Object update sweep of %(device)s ' - 'completed: %(elapsed).02fs, %(stats)s'), - {'device': device, 'elapsed': elapsed, - 'stats': self.stats}) - sys.exit() - while pids: - pids.remove(os.wait()[0]) - elapsed = time.time() - self.begin - self.logger.info('Object update sweep completed: %.02fs', - elapsed) - self.dump_recon(elapsed) + elapsed = self.run_once(*args, **kwargs) if elapsed < self.interval: time.sleep(self.interval - elapsed) def run_once(self, *args, **kwargs): """Run the updater once.""" - self.logger.info('Begin object update single threaded sweep') + self.logger.info('Begin object update sweep of all devices') self.begin = time.time() - self.stats.reset() - self.oldest_async_pendings.reset() - for device in self._listdir(self.devices): - try: - dev_path = check_drive(self.devices, device, self.mount_check) - except ValueError as err: - # We don't count this as an error. The occasional unmounted - # drive is part of normal cluster operations, so a simple - # warning is sufficient. - self.logger.warning('Skipping: %s', err) - continue - self.object_sweep(dev_path) + devices = self._listdir(self.devices) + self._process_devices(devices) elapsed = time.time() - self.begin self.logger.info( - ('Object update single-threaded sweep completed: ' - '%(elapsed).02fs, %(stats)s'), - {'elapsed': elapsed, 'stats': self.stats}) - self.dump_recon(elapsed) + ('Object update sweep of all devices completed: ' + '%(elapsed).02fs'), + {'elapsed': elapsed}) + self.aggregate_and_dump_recon(devices, elapsed) + return elapsed - def dump_recon(self, elapsed): - """Gathers stats and dumps recon cache.""" - object_updater_stats = { - 'failures_oldest_timestamp': ( - self.oldest_async_pendings.get_oldest_timestamp() - ), - 'failures_oldest_timestamp_age': ( - self.oldest_async_pendings.get_oldest_timestamp_age() - ), - 'failures_account_container_count': ( - len(self.oldest_async_pendings.ac_to_timestamp) - ), - 'failures_oldest_timestamp_account_containers': ( + def _gather_recon_stats(self): + """Gather stats for device recon dumps.""" + stats = { + 'failures_oldest_timestamp': + self.oldest_async_pendings.get_oldest_timestamp(), + 'failures_oldest_timestamp_age': + self.oldest_async_pendings.get_oldest_timestamp_age(), + 'failures_account_container_count': + len(self.oldest_async_pendings.ac_to_timestamp), + 'failures_oldest_timestamp_account_containers': self.oldest_async_pendings.get_n_oldest_timestamp_acs( - self.dump_count + self.dump_count), + 'tracker_memory_usage': + self.oldest_async_pendings.get_memory_usage(), + } + return stats + + def dump_device_recon(self, device): + """Dump recon stats for a single device.""" + disk_recon_stats = self._gather_recon_stats() + dump_recon_cache( + {'object_updater_per_device': {device: disk_recon_stats}}, + self.rcache, + self.logger, + ) + + def aggregate_and_dump_recon(self, devices, elapsed): + """ + Aggregate recon stats across devices and dump the result to the + recon cache. + """ + recon_cache = load_recon_cache(self.rcache) + device_stats = recon_cache.get('object_updater_per_device', {}) + if not isinstance(device_stats, dict): + raise TypeError('object_updater_per_device must be a dict') + device_stats = {k: (v if v is not None else {}) + for k, v in device_stats.items()} + + devices_to_remove = set(device_stats) - set(devices) + update_device_stats = {d: {} for d in devices_to_remove} + + aggregated_oldest_entries = [] + + for stats in device_stats.values(): + container_data = stats.get( + 'failures_oldest_timestamp_account_containers', {}) + aggregated_oldest_entries.extend(container_data.get( + 'oldest_entries', [])) + aggregated_oldest_entries.sort(key=lambda x: x['timestamp']) + aggregated_oldest_entries = aggregated_oldest_entries[:self.dump_count] + aggregated_oldest_count = len(aggregated_oldest_entries) + + aggregated_stats = { + 'failures_account_container_count': max( + list( + stats.get('failures_account_container_count', 0) + for stats in device_stats.values() ) + or [0], ), 'tracker_memory_usage': ( - self.oldest_async_pendings.get_memory_usage() + float(sum( + stats.get('tracker_memory_usage', 0) + for stats in device_stats.values() + )) + / float(len(device_stats)) + ) + * max(self.updater_workers, 1) + if device_stats + else 0, + 'failures_oldest_timestamp': min( + list(filter(lambda x: x is not None, + [stats.get('failures_oldest_timestamp') + for stats in device_stats.values()] + )) or [None], ), + 'failures_oldest_timestamp_age': max( + list(filter(lambda x: x is not None, + [stats.get('failures_oldest_timestamp_age') + for stats in device_stats.values()] + )) or [None], + ), + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': aggregated_oldest_count, + 'oldest_entries': aggregated_oldest_entries, + }, } dump_recon_cache( { 'object_updater_sweep': elapsed, - 'object_updater_stats': object_updater_stats, + 'object_updater_stats': aggregated_stats, + 'object_updater_per_device': update_device_stats, }, self.rcache, self.logger, diff --git a/test/probe/test_object_async_update.py b/test/probe/test_object_async_update.py index f7276cc25d..3b5f1e831c 100644 --- a/test/probe/test_object_async_update.py +++ b/test/probe/test_object_async_update.py @@ -14,17 +14,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import shutil +import time +import uuid + +from datetime import datetime from io import BytesIO from unittest import main, SkipTest from uuid import uuid4 import random +import mock + from swiftclient import client from swiftclient.exceptions import ClientException -from swift.common import direct_client -from swift.common.manager import Manager +from six.moves.configparser import ConfigParser +from swift.common import direct_client, manager +from swift.common.manager import Manager, Server from swift.common.swob import normalize_etag +from swift.common.utils import readconf from test.probe.common import kill_nonprimary_server, \ kill_server, ReplProbeTest, start_server, ECProbeTest @@ -334,23 +344,28 @@ class TestUpdateOverridesEC(ECProbeTest): self.assertEqual('test/ctype', listing[0]['content_type']) -class TestObjectUpdaterStats(ReplProbeTest): +class UpdaterStatsMixIn(object): def setUp(self): - super(TestObjectUpdaterStats, self).setUp() + super(UpdaterStatsMixIn, self).setUp() self.int_client = self.make_internal_client() self.container_servers = Manager(['container-server']) + self.object_updater = Manager(['object-updater']) + self._post_setup_config() - def test_lots_of_asyncs(self): + def _post_setup_config(self): + pass + + def _create_lots_of_asyncs(self): # Create some (acct, cont) pairs num_accounts = 3 num_conts_per_a = 4 - ac_pairs = [] + self.ac_pairs = ac_pairs = [] for a in range(num_accounts): - acct = 'AUTH_user%03d' % a + acct = 'AUTH_user-%s-%03d' % (uuid.uuid4(), a) self.int_client.create_account(acct) for c in range(num_conts_per_a): - cont = 'cont%03d' % c + cont = 'cont-%s-%03d' % (uuid.uuid4(), c) self.int_client.create_container(acct, cont) ac_pairs.append((acct, cont)) @@ -359,10 +374,10 @@ class TestObjectUpdaterStats(ReplProbeTest): self.container_servers.stop(number=n) # Create a bunch of objects - num_objs_per_ac = 5 + num_objs_per_ac = 10 for acct, cont in ac_pairs: for o in range(num_objs_per_ac): - obj = 'obj%03d' % o + obj = 'obj-%s-%03d' % (uuid.uuid4(), o) self.int_client.upload_object(BytesIO(b''), acct, cont, obj) all_asyncs = self.gather_async_pendings() @@ -371,22 +386,57 @@ class TestObjectUpdaterStats(ReplProbeTest): self.assertGreater(len(all_asyncs), total_objs) self.assertLess(len(all_asyncs), total_objs * 2) - # Run the updater and check stats - Manager(['object-updater']).once() - recons = [] + def _gather_recon(self): + # We'll collect recon only once from each node + dev_to_node_dict = {} for onode in self.object_ring.devs: - recon = direct_client.direct_get_recon(onode, 'updater/object') - recons.append(recon) + # We can skip any devices that are already covered by one of the + # other nodes we found + if any(self.is_local_to(node, onode) + for node in dev_to_node_dict.values()): + continue + dev_to_node_dict[onode["device"]] = onode + self.assertEqual(4, len(dev_to_node_dict)) # sanity - self.assertEqual(4, len(recons)) - found_counts = [] + timeout = 20 + polling_interval = 2 + recon_data = [] + start = time.time() + while True: + for onode in list(dev_to_node_dict.values()): + recon = direct_client.direct_get_recon( + onode, 'updater/object') + if (recon.get('object_updater_stats') is not None and + recon.get('object_updater_sweep') is not None): + del dev_to_node_dict[onode["device"]] + recon_data.append(recon) + if not dev_to_node_dict: + break + elapsed = time.time() - start + if elapsed > timeout: + self.fail( + "Updates did not process within {timeout} seconds".format( + timeout=timeout) + ) + time.sleep(polling_interval) + self.assertEqual(4, len(recon_data)) # sanity + return recon_data + + def run_updater(self): + raise NotImplementedError() + + def _check_recon_data(self, recon_data): + ac_pairs = self.ac_pairs ac_set = set() - for recon in recons: + for recon in recon_data: updater_stats = recon['object_updater_stats'] - found_counts.append( - updater_stats['failures_account_container_count'] - ) + found_count = updater_stats['failures_account_container_count'] + # No node should find MORE unique ac than we created + self.assertLessEqual(found_count, len(ac_pairs)) + # and generally we'd expect them to have "at least" one from + # significanly MORE than the "majority" of ac_pairs + self.assertGreaterEqual(found_count, len(ac_pairs) / 2) oldest_count = updater_stats[ 'failures_oldest_timestamp_account_containers' @@ -403,14 +453,76 @@ class TestObjectUpdaterStats(ReplProbeTest): container = entry['container'] timestamp = entry['timestamp'] self.assertIsNotNone(timestamp) - self.assertIsInstance(timestamp, float) ac_set.add((account, container)) + # All the collected ac_set are from the ac_pairs we created for ac in ac_set: self.assertIn(ac, set(ac_pairs)) + # Specifically, the ac_pairs we created failures for *first* + # are represented by the oldest ac_set across nodes + for ac in ac_pairs[:5]: + self.assertIn(ac, ac_set) + # Where as the more recent failures are NOT! + for ac in ac_pairs[-3:]: + self.assertNotIn(ac, ac_set) - for found_count in found_counts: - self.assertLessEqual(found_count, len(ac_pairs)) + def test_stats(self): + self._create_lots_of_asyncs() + recon_data = self.run_updater() + self._check_recon_data(recon_data) + + +class TestObjectUpdaterStatsRunOnce(UpdaterStatsMixIn, ReplProbeTest): + + def run_updater(self): + # Run the updater and check stats + Manager(['object-updater']).once() + return self._gather_recon() + + +class TestObjectUpdaterStatsRunForever(UpdaterStatsMixIn, ECProbeTest): + + def _post_setup_config(self): + CONF_SECTION = 'object-updater' + self.conf_dest = os.path.join( + '/tmp/', + datetime.now().strftime('swift-%Y-%m-%d_%H-%M-%S-%f') + ) + os.mkdir(self.conf_dest) + object_server_dir = os.path.join(self.conf_dest, 'object-server') + os.mkdir(object_server_dir) + for conf_file in Server('object-updater').conf_files(): + config = readconf(conf_file) + if CONF_SECTION not in config: + continue # Ensure the object-updater is set up to run + config[CONF_SECTION].update({'interval': '1'}) + + parser = ConfigParser() + parser.add_section(CONF_SECTION) + for option, value in config[CONF_SECTION].items(): + parser.set(CONF_SECTION, option, value) + + file_name = os.path.basename(conf_file) + if file_name.endswith('.d'): + # Work around conf.d setups (like you might see with VSAIO) + file_name = file_name[:-2] + with open(os.path.join(object_server_dir, file_name), 'w') as fp: + parser.write(fp) + + def tearDown(self): + shutil.rmtree(self.conf_dest) + + def run_updater(self): + # Start the updater + with mock.patch.object(manager, 'SWIFT_DIR', self.conf_dest): + updater_status = self.object_updater.start() + self.assertEqual( + updater_status, 0, "Object updater failed to start") + recon_data = self._gather_recon() + # Stop the updater + stop_status = self.object_updater.stop() + self.assertEqual(stop_status, 0, "Object updater failed to stop") + return recon_data if __name__ == '__main__': diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 8262012518..00f41df917 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -154,7 +154,8 @@ class TestObjectController(BaseTestCase): self.tmpdir = mkdtemp() self.testdir = os.path.join(self.tmpdir, 'tmp_test_object_server_ObjectController') - mkdirs(os.path.join(self.testdir, 'sda1')) + self.sda1 = os.path.join(self.testdir, 'sda1') + mkdirs(self.sda1) self.conf = {'devices': self.testdir, 'mount_check': 'false', 'container_update_timeout': 0.0} self.logger = debug_logger('test-object-controller') @@ -1091,7 +1092,7 @@ class TestObjectController(BaseTestCase): mock_ring.get_nodes.return_value = (99, [node]) object_updater.container_ring = mock_ring mock_update.return_value = ((True, 1, None)) - object_updater.run_once() + object_updater._process_device_in_child(self.sda1, 'sda1') self.assertEqual(1, mock_update.call_count) self.assertEqual((node, 99, 'PUT', '/a/c/o'), mock_update.call_args_list[0][0][0:4]) @@ -1206,7 +1207,7 @@ class TestObjectController(BaseTestCase): mock_ring = mock.MagicMock() mock_ring.get_nodes.return_value = (99, [node]) object_updater.container_ring = mock_ring - object_updater.run_once() + object_updater._process_device_in_child(self.sda1, 'sda1') self.assertEqual(1, len(conn.requests)) self.assertEqual('/cdevice/99/.sharded_a/c_shard_1/o', diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 0cac9beecf..076849ba73 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -25,6 +25,7 @@ from contextlib import closing from gzip import GzipFile from tempfile import mkdtemp from shutil import rmtree +import json from swift.common.exceptions import ConnectionTimeout from test import listen_zero @@ -431,27 +432,34 @@ class TestObjectUpdater(unittest.TestCase): completion_lines[0]) @mock.patch.object(object_updater, 'check_drive') - def test_run_once_with_disk_unmounted(self, mock_check_drive): - mock_check_drive.side_effect = ValueError + @mock.patch('swift.obj.updater.os') + def test_run_once_with_disk_unmounted(self, mock_os, mock_check_drive): + def fake_mount_check(root, device, mount_check=True): + raise ValueError('%s is unmounted' % device) + mock_check_drive.side_effect = fake_mount_check + mock_os.path = os.path + mock_os.listdir = os.listdir + + # mount_check False ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', - 'node_timeout': '15'}) + 'node_timeout': '15'}, logger=self.logger) + ou.run_once() - async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) - os.mkdir(async_dir) - ou.run_once() - self.assertTrue(os.path.exists(async_dir)) - # each run calls check_device self.assertEqual([ mock.call(self.devices_dir, 'sda1', False), - mock.call(self.devices_dir, 'sda1', False), ], mock_check_drive.mock_calls) mock_check_drive.reset_mock() + self.assertEqual([], mock_os.mock_calls) + self.assertEqual(['Skipping: sda1 is unmounted'], + self.logger.get_lines_for_level('warning')) + self.logger.clear() + # mount_check True ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, 'mount_check': 'TrUe', @@ -459,21 +467,66 @@ class TestObjectUpdater(unittest.TestCase): 'interval': '1', 'concurrency': '1', 'node_timeout': '15'}, logger=self.logger) - odd_dir = os.path.join(async_dir, 'not really supposed ' - 'to be here') - os.mkdir(odd_dir) ou.run_once() - self.assertTrue(os.path.exists(async_dir)) - self.assertTrue(os.path.exists(odd_dir)) # skipped - not mounted! self.assertEqual([ mock.call(self.devices_dir, 'sda1', True), ], mock_check_drive.mock_calls) + mock_check_drive.reset_mock() + self.assertEqual([], mock_os.mock_calls) + self.assertEqual(['Skipping: sda1 is unmounted'], + self.logger.get_lines_for_level('warning')) + self.logger.clear() self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {}) - @mock.patch('swift.obj.updater.dump_recon_cache') - @mock.patch.object(object_updater, 'check_drive') - def test_run_once(self, mock_check_drive, mock_dump_recon): - mock_check_drive.side_effect = lambda r, d, mc: os.path.join(r, d) + # multiple devices, one unmounted + NUM_DEVICES = 4 + + def fake_list_dir(path): + if path == self.devices_dir: + return ['sda' + str(i) + for i in range(NUM_DEVICES)] + else: + return os.listdir(path) + mock_os.listdir = fake_list_dir + + def fake_mount_check(root, device, mount_check=True): + if device == 'sda2': + raise ValueError('%s is unmounted' % device) + else: + return os.path.join(root, device) + mock_check_drive.side_effect = fake_mount_check + + def fake_list_dir(path): + if path == self.devices_dir: + return ['sda1', 'sda0', 'sda2', 'sda3'] + else: + return os.listdir(path) + mock_os.listdir.side_effect = fake_list_dir + + pids = [i + 1 for i in range(NUM_DEVICES)] + mock_os.fork.side_effect = list(pids) + mock_os.wait.side_effect = [(p, 0) for p in pids] + ou.run_once() + self.assertEqual([ + mock.call(self.devices_dir, 'sda0', True), + mock.call(self.devices_dir, 'sda1', True), + mock.call(self.devices_dir, 'sda2', True), + mock.call(self.devices_dir, 'sda3', True), + ], mock_check_drive.mock_calls) + # we fork/wait for each mounted device + self.assertEqual([ + mock.call.fork(), + mock.call.wait(), + ] * (NUM_DEVICES - 1), mock_os.mock_calls) + self.assertEqual(['Skipping: sda2 is unmounted'], + self.logger.get_lines_for_level('warning')) + self.logger.clear() + self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {}) + + @mock.patch('swift.obj.updater.os') + def test_run_once_child(self, mock_os): + mock_os.path = os.path + mock_os.listdir = os.listdir ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, 'mount_check': 'false', @@ -481,23 +534,179 @@ class TestObjectUpdater(unittest.TestCase): 'interval': '1', 'concurrency': '1', 'node_timeout': '15'}, logger=self.logger) + devices = [] + NUM_DEVICES = 4 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + devices.append(device) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + mock_os.fork.side_effect = [0] + mock_process = ou._process_device_in_child = mock.MagicMock() + with self.assertRaises(SystemExit): + ou.run_once() + self.assertEqual([ + mock.call(self.sda1, 'sda1'), + ], mock_process.mock_calls) + + @mock.patch('swift.obj.updater.os') + def test_run_once_subsequent_children(self, mock_os): + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'updater_workers': '4', + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + devices = [] + NUM_DEVICES = 4 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + devices.append(device) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + mock_process = ou._process_device_in_child = mock.MagicMock() + pids = [i + 1 for i in range(NUM_DEVICES)] + for i in range(4): + mock_os.fork.side_effect = pids[:i] + [0] + with self.assertRaises(SystemExit): + ou.run_once() + + self.assertEqual([ + mock.call(os.path.join(self.devices_dir, 'sda1'), 'sda1'), + mock.call(os.path.join(self.devices_dir, 'sda0'), 'sda0'), + mock.call(os.path.join(self.devices_dir, 'sda2'), 'sda2'), + mock.call(os.path.join(self.devices_dir, 'sda3'), 'sda3'), + ], mock_process.mock_calls) + + @mock.patch('swift.obj.updater.os') + def test_run_once_child_with_more_workers(self, mock_os): + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'updater_workers': '3', + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + devices = [] + NUM_DEVICES = 3 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda{0}'.format(i)) + devices.append(device) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + mock_os.fork.side_effect = [1, 2, 0] + mock_process = ou._process_device_in_child = mock.MagicMock() + with self.assertRaises(SystemExit): + ou.run_once() + self.assertEqual([ + mock.call(os.path.join(self.devices_dir, 'sda2'), 'sda2'), + ], mock_process.mock_calls) + + @mock.patch.object(object_updater, 'check_drive') + @mock.patch('swift.obj.updater.os') + def test_run_once_parent_default(self, mock_os, mock_check_drive): + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'on', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + + NUM_DEVICES = 2 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + pids = [i + 1 for i in range(NUM_DEVICES)] + mock_os.fork.side_effect = pids + mock_os.wait.side_effect = [(i, 0) for i in pids] ou.run_once() + self.assertEqual([ + mock.call(self.devices_dir, 'sda1', True), + mock.call(self.devices_dir, 'sda0', True), + ], mock_check_drive.mock_calls) + self.assertEqual([ + mock.call.fork(), + mock.call.wait(), + mock.call.fork(), + mock.call.wait(), + ], mock_os.mock_calls) + + @mock.patch.object(object_updater, 'check_drive') + @mock.patch('swift.obj.updater.os') + def test_run_once_parent_more_updater_workers(self, mock_os, + mock_check_drive): + # unpatch listdir and path + mock_os.path = os.path + mock_os.listdir = os.listdir + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': '1', + 'swift_dir': self.testdir, + 'updater_workers': '4', + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + NUM_DEVICES = 4 + for i in range(NUM_DEVICES): + device = os.path.join(self.devices_dir, 'sda' + str(i)) + async_dir = os.path.join(device, get_async_dir(POLICIES[0])) + mkdirs(async_dir) + + pids = [i + 1 for i in range(NUM_DEVICES)] + mock_os.fork.side_effect = pids + mock_os.wait.side_effect = [(i, 0) for i in pids] + ou.run_once() + self.assertEqual([ + mock.call(self.devices_dir, 'sda1', True), + mock.call(self.devices_dir, 'sda0', True), + mock.call(self.devices_dir, 'sda2', True), + mock.call(self.devices_dir, 'sda3', True), + ], mock_check_drive.mock_calls) + self.assertEqual([ + mock.call.fork(), + mock.call.fork(), + mock.call.fork(), + mock.call.fork(), + mock.call.wait(), + mock.call.wait(), + mock.call.wait(), + mock.call.wait(), + ], mock_os.mock_calls) + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_process_devices_in_child(self, mock_dump_recon): + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '15'}, logger=self.logger) + ou._process_device_in_child(self.sda1, 'sda1') self.assertEqual([], ou.logger.get_lines_for_level('error')) async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) os.mkdir(async_dir) - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(async_dir)) - # each run calls check_device - self.assertEqual([ - mock.call(self.devices_dir, 'sda1', False), - mock.call(self.devices_dir, 'sda1', False), - ], mock_check_drive.mock_calls) - mock_check_drive.reset_mock() self.assertEqual([], ou.logger.get_lines_for_level('error')) ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, - 'mount_check': 'TrUe', 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', @@ -505,11 +714,9 @@ class TestObjectUpdater(unittest.TestCase): odd_dir = os.path.join(async_dir, 'not really supposed ' 'to be here') os.mkdir(odd_dir) - ou.run_once() + + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(async_dir)) - self.assertEqual([ - mock.call(self.devices_dir, 'sda1', True), - ], mock_check_drive.mock_calls) self.assertEqual([], ou.logger.get_lines_for_level('error')) ohash = hash_path('a', 'c', 'o') @@ -529,7 +736,7 @@ class TestObjectUpdater(unittest.TestCase): 'X-Container-Timestamp': normalize_timestamp(0)}}, async_pending) - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(not os.path.exists(older_op_path)) self.assertTrue(os.path.exists(op_path)) self.assertEqual(ou.logger.statsd_client.get_increment_counts(), @@ -592,7 +799,7 @@ class TestObjectUpdater(unittest.TestCase): dev['replication_port'] = bindsock.getsockname()[1] ou.logger._clear() - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') err = event.wait() if err: raise err @@ -613,7 +820,7 @@ class TestObjectUpdater(unittest.TestCase): # only 1/2 updates succeeds event = spawn(accept, [404, 201]) ou.logger.clear() - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') err = event.wait() if err: raise err @@ -635,7 +842,7 @@ class TestObjectUpdater(unittest.TestCase): with Timeout(99) as exc, \ mock.patch('swift.obj.updater.http_connect') as mock_connect: mock_connect.return_value.getresponse.side_effect = exc - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(op_path)) self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {'failures': 1}) @@ -655,7 +862,7 @@ class TestObjectUpdater(unittest.TestCase): with ConnectionTimeout(9) as exc, \ mock.patch('swift.obj.updater.http_connect') as mock_connect: mock_connect.return_value.getresponse.side_effect = exc - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') self.assertTrue(os.path.exists(op_path)) self.assertEqual(ou.logger.statsd_client.get_increment_counts(), {'failures': 1}) @@ -674,7 +881,7 @@ class TestObjectUpdater(unittest.TestCase): # final update succeeds event = spawn(accept, [201]) ou.logger.clear() - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') err = event.wait() if err: raise err @@ -695,42 +902,43 @@ class TestObjectUpdater(unittest.TestCase): ])) @mock.patch('swift.obj.updater.dump_recon_cache') - @mock.patch.object(object_updater, 'check_drive') - def test_run_once_recon_dump(self, mock_check_drive, mock_dump_recon): + def test_run_once_recon_dump(self, mock_dump_recon): self.maxDiff = None - def assert_and_reset_recon_dump(exp): + def assert_and_reset_recon_dump_per_device(exp): recon_dumps = [call[0][0] for call in mock_dump_recon.call_args_list] self.assertEqual([exp], recon_dumps) mock_dump_recon.reset_mock() - mock_check_drive.side_effect = lambda r, d, mc: os.path.join(r, d) async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) os.mkdir(async_dir) ou = object_updater.ObjectUpdater({ 'devices': self.devices_dir, - 'mount_check': 'TrUe', 'swift_dir': self.testdir, 'interval': '1', 'concurrency': '1', 'node_timeout': '15'}, logger=self.logger) + # There are no asyncs so there are no failures with mock.patch.object(ou, 'object_update', return_value=(False, 'node-id', None)): - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') exp_recon_dump = { - 'object_updater_stats': { - 'failures_account_container_count': 0, - 'failures_oldest_timestamp': None, - 'failures_oldest_timestamp_account_containers': { - 'oldest_count': 0, - 'oldest_entries': []}, - 'failures_oldest_timestamp_age': None, - 'tracker_memory_usage': mock.ANY}, - 'object_updater_sweep': mock.ANY, + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': mock.ANY, + } + } } - assert_and_reset_recon_dump(exp_recon_dump) + assert_and_reset_recon_dump_per_device(exp_recon_dump) ts = next(self.ts_iter) ohash = hash_path('a', 'c', 'o') @@ -748,22 +956,449 @@ class TestObjectUpdater(unittest.TestCase): with mock.patch('swift.obj.updater.time.time', return_value=now): with mock.patch.object(ou, 'object_update', return_value=(False, 'node-id', None)): - ou.run_once() + ou._process_device_in_child(self.sda1, 'sda1') exp_recon_dump = { - 'object_updater_stats': { - 'failures_account_container_count': 1, - 'failures_oldest_timestamp': float(ts), - 'failures_oldest_timestamp_account_containers': { - 'oldest_count': 1, - 'oldest_entries': [{'account': 'a', - 'container': 'c', - 'timestamp': float(ts)}] - }, - 'failures_oldest_timestamp_age': now - float(ts), - 'tracker_memory_usage': mock.ANY}, - 'object_updater_sweep': mock.ANY, + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 1, + 'failures_oldest_timestamp': float(ts), + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 1, + 'oldest_entries': [{'account': 'a', + 'container': 'c', + 'timestamp': float(ts)}] + }, + 'failures_oldest_timestamp_age': now - float(ts), + 'tracker_memory_usage': mock.ANY, + } + } } - assert_and_reset_recon_dump(exp_recon_dump) + assert_and_reset_recon_dump_per_device(exp_recon_dump) + + def test_gather_recon_stats(self): + ou = object_updater.ObjectUpdater({}) + with mock.patch.object( + ou.oldest_async_pendings, + 'get_oldest_timestamp', return_value=123.456 + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'get_oldest_timestamp_age', + return_value=789.012, + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'ac_to_timestamp', + return_value={('account1', 'container1'): 123.456}, + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'get_n_oldest_timestamp_acs', + return_value=[ + {'account': 'AUTH_1', + 'container': 'cont_1', + 'timestamp': 123.456}, + ], + ): + with mock.patch.object( + ou.oldest_async_pendings, + 'get_memory_usage', + return_value=1024, + ): + ou.oldest_async_pendings.ac_to_timestamp = { + ('AUTH_1', 'cont_1'): 123.456} + stats = ou._gather_recon_stats() + + expected_stats = { + 'failures_oldest_timestamp': 123.456, + 'failures_oldest_timestamp_age': 789.012, + 'failures_account_container_count': 1, + 'failures_oldest_timestamp_account_containers': [ + {'account': 'AUTH_1', + 'container': 'cont_1', + 'timestamp': 123.456}, + ], + 'tracker_memory_usage': 1024, + } + self.assertEqual(stats, expected_stats) + + def test_aggregate_and_dump_recon_with_missing_keys(self): + """ + Test aggregation logic when device stats are missing some keys. + """ + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 1, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 3, + }, logger=self.logger) + + incomplete_recon = { + 'object_updater_per_device': { + 'sda1': { + 'tracker_memory_usage': 256, + 'failures_account_container_count': 1, + }, + 'sda2': { + 'failures_oldest_timestamp': 124.56789, + 'failures_oldest_timestamp_age': 789.012, + }, + 'sda3': None, + } + } + utils.dump_recon_cache(incomplete_recon, ou.rcache, ou.logger) + + ou.aggregate_and_dump_recon(['sda1', 'sda2', 'sda3'], 30) + + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 1, + 'tracker_memory_usage': 256.0 / 3.0, + 'failures_oldest_timestamp': 124.56789, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [] + }, + } + + expected_recon = { + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + } + self.assertEqual(expected_recon, found_data) + + def test_aggregate_and_dump_recon_all_empty_devices(self): + """ + Test aggregation logic when all devices are empty or missing. + """ + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 5, + }, logger=self.logger) + + empty_recon = { + 'object_updater_per_device': { + 'sda1': {}, + 'sda2': {}, + } + } + utils.dump_recon_cache(empty_recon, ou.rcache, ou.logger) + + ou.aggregate_and_dump_recon(['sda1', 'sda2'], 30) + + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 0, + 'tracker_memory_usage': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_age': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [] + }, + } + + expected_recon = { + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + } + self.assertEqual(expected_recon, found_data) + + def test_aggregate_and_dump_recon_wrong_type_per_device(self): + """ + Test aggregation when object_updater_per_device is the wrong type. + """ + recon_path = os.path.join(self.testdir, 'recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 5, + }, logger=self.logger) + + # object_updater_per_device as a list instead of dict + malformed_recon = { + 'object_updater_per_device': ['invalid_data_type'] + } + utils.dump_recon_cache(malformed_recon, ou.rcache, ou.logger) + + with self.assertRaises(TypeError) as cm: + ou.aggregate_and_dump_recon(['sda1', 'sda2'], 30) + + self.assertIn( + 'object_updater_per_device must be a dict', str(cm.exception)) + + def test_aggregate_and_dump_recon_partial_device_updates(self): + """ + Test when some devices are removed and partial updates exist. + """ + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 2, + }, logger=self.logger) + + existing_recon = { + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 512, + }, + 'sda2': { + 'failures_account_container_count': 1, + 'tracker_memory_usage': 256, + }, + 'sda3': None, + } + } + utils.dump_recon_cache(existing_recon, ou.rcache, ou.logger) + + ou.aggregate_and_dump_recon(['sda1'], 30) + + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 512.0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_age': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + } + + expected_recon = { + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 512, + }, + }, + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + } + self.assertEqual(expected_recon, found_data) + + def test_dump_device_recon(self): + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': '6', + }) + ou.dump_device_recon('sda1') + with open(recon_file) as f: + found_data = json.load(f) + self.assertEqual({ + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': mock.ANY, + } + } + }, found_data) + + # now add some data + timestamps = [] + for a in range(3): + account = 'AUTH_%s' % a + for c in range(4): + container = 'cont_%s' % c + for ts in range(5): + ts = next(self.ts_iter) + timestamps.append(float(ts)) + ou.oldest_async_pendings.add_update( + account, container, ts) + + now = float(next(self.ts_iter)) + with mock.patch('swift.obj.updater.time.time', return_value=now): + ou.dump_device_recon('sda1') + with open(recon_file) as f: + found_data = json.load(f) + self.assertEqual({ + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 12, + 'failures_oldest_timestamp': timestamps[0], + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 6, + 'oldest_entries': [{ + 'account': 'AUTH_0', + 'container': 'cont_0', + 'timestamp': timestamps[0], + }, { + 'account': 'AUTH_0', + 'container': 'cont_1', + 'timestamp': timestamps[5], + }, { + 'account': 'AUTH_0', + 'container': 'cont_2', + 'timestamp': timestamps[10], + }, { + 'account': 'AUTH_0', + 'container': 'cont_3', + 'timestamp': timestamps[15], + }, { + 'account': 'AUTH_1', + 'container': 'cont_0', + 'timestamp': timestamps[20], + }, { + 'account': 'AUTH_1', + 'container': 'cont_1', + 'timestamp': timestamps[25], + }], + }, + 'failures_oldest_timestamp_age': now - timestamps[0], + 'tracker_memory_usage': mock.ANY, + } + } + }, found_data) + + def test_aggregate_and_dump_recon(self): + self.maxDiff = None + recon_path = os.path.join(self.testdir, 'recon') + recon_file = os.path.join(recon_path, 'object.recon') + os.mkdir(recon_path) + ou = object_updater.ObjectUpdater({ + 'devices': self.devices_dir, + 'swift_dir': self.testdir, + 'updater_workers': 2, + 'recon_cache_path': recon_path, + 'async_tracker_dump_count': 2, + }, logger=self.logger) + existing_recon = { + 'object_updater_per_device': { + 'sda1': { + 'failures_account_container_count': 1, + 'tracker_memory_usage': 512, + 'failures_oldest_timestamp': 123.45678, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 1, + 'oldest_entries': [ + {'account': 'a', 'container': 'c', + 'timestamp': 123.45678} + ], + }, + }, + 'sda2': { + 'failures_account_container_count': 2, + 'tracker_memory_usage': 256, + 'failures_oldest_timestamp': 124.56789, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 2, + 'oldest_entries': [ + {'account': 'x', 'container': 'y', + 'timestamp': 124.56789}, + {'account': 'm', 'container': 'n', + 'timestamp': 125.67890}, + ], + }, + }, + }, + } + utils.dump_recon_cache(existing_recon, ou.rcache, ou.logger) + # add an "empty" device + with mock.patch.object(ou.oldest_async_pendings, 'get_memory_usage', + return_value=128): + ou.dump_device_recon('sda3') + # and also an unmounted one + ou.dump_device_recon('sdx') + existing_recon['object_updater_per_device'].update({ + 'sda3': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': 128, + }, + 'sdx': { + 'failures_account_container_count': 0, + 'failures_oldest_timestamp': None, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 0, + 'oldest_entries': [], + }, + 'failures_oldest_timestamp_age': None, + 'tracker_memory_usage': mock.ANY, + }, + }) + with open(recon_file) as f: + found_data = json.load(f) + self.assertEqual(existing_recon, found_data) # sanity + + # we're setting this up like sdx is stale/unmounted + ou.aggregate_and_dump_recon(['sda1', 'sda2', 'sda3'], 30) + with open(recon_file) as f: + found_data = json.load(f) + + expected_aggregated_stats = { + 'failures_account_container_count': 2, + 'tracker_memory_usage': mock.ANY, + 'failures_oldest_timestamp': 123.45678, + 'failures_oldest_timestamp_age': 789.012, + 'failures_oldest_timestamp_account_containers': { + 'oldest_count': 2, + 'oldest_entries': [ + {'account': 'a', 'container': 'c', 'timestamp': 123.45678}, + {'account': 'x', 'container': 'y', 'timestamp': 124.56789}, + ], + }, + } + expected_recon = dict(existing_recon, **{ + 'object_updater_sweep': 30, + 'object_updater_stats': expected_aggregated_stats, + }) + # and sda4 is removed + del expected_recon['object_updater_per_device']['sdx'] + self.assertEqual(expected_recon, found_data) + + self.assertAlmostEqual( + 755.5, + found_data['object_updater_stats']['tracker_memory_usage'], + delta=200 + ) def test_obj_put_legacy_updates(self): ts = (normalize_timestamp(t) for t in @@ -804,7 +1439,8 @@ class TestObjectUpdater(unittest.TestCase): # run once fake_status_codes = [200, 200, 200] with mocked_http_conn(*fake_status_codes, give_connect=capture): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') + self.assertEqual(len(fake_status_codes), len(request_log)) for request_args, request_kwargs in request_log: ip, part, method, path, headers, qs, ssl = request_args @@ -867,7 +1503,7 @@ class TestObjectUpdater(unittest.TestCase): 200, # object update conflict ] with mocked_http_conn(*fake_status_codes, give_connect=capture): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(len(fake_status_codes), len(request_log)) for request_args, request_kwargs in request_log: ip, part, method, path, headers, qs, ssl = request_args @@ -981,7 +1617,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:3], ts_obj, policies[0]) self._check_update_requests(conn.requests[3:], ts_obj, policies[0]) @@ -1016,7 +1652,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( 507, 200, 507) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual(['/sda1/0/a/c/o'] * 3, @@ -1038,7 +1674,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:2], ts_obj, policies[0]) self._check_update_requests(conn.requests[2:], ts_obj, policies[0]) @@ -1092,7 +1728,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual(['/sda1/0/a/c/o'] * 3, @@ -1165,7 +1801,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) # only *one* set of redirected requests is attempted per cycle @@ -1195,7 +1831,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual( @@ -1239,7 +1875,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:3], ts_obj, policies[0]) self._check_update_requests(conn.requests[3:], ts_obj, policies[0]) # only *one* set of redirected requests is attempted per cycle @@ -1273,7 +1909,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests[:3], ts_obj, policies[0]) self._check_update_requests(conn.requests[3:], ts_obj, policies[0]) # first try the previously persisted container path, response to that @@ -1306,7 +1942,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual( ['/sda1/%s/.shards_a/c_shard_3/o' % shard_3_part] * 3 + @@ -1331,7 +1967,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn( *fake_status_codes, headers=fake_headers) as conn: with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self._check_update_requests(conn.requests, ts_obj, policies[0]) self.assertEqual(['/sda1/%s/a/c/o' % root_part] * 3, [req['path'] for req in conn.requests]) @@ -1366,7 +2002,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn(): with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual( {'quarantines': 1}, @@ -1464,7 +2100,7 @@ class TestObjectUpdater(unittest.TestCase): expected_success = 2 fake_status_codes = [200] * 3 * expected_success with mocked_http_conn(*fake_status_codes) as fake_conn: - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1512,7 +2148,7 @@ class TestObjectUpdater(unittest.TestCase): len(self._find_async_pending_files())) fake_status_codes = [200] * 3 * expected_total with mocked_http_conn(*fake_status_codes): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_total, daemon.stats.successes) self.assertEqual(0, daemon.stats.skips) self.assertEqual([], self._find_async_pending_files()) @@ -1573,7 +2209,7 @@ class TestObjectUpdater(unittest.TestCase): with mocked_http_conn(*fake_status_codes) as fake_conn, \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1663,7 +2299,7 @@ class TestObjectUpdater(unittest.TestCase): fake_object_update), \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator): - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1783,7 +2419,7 @@ class TestObjectUpdater(unittest.TestCase): mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) @@ -1911,7 +2547,7 @@ class TestObjectUpdater(unittest.TestCase): mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: - daemon.run_once() + daemon._process_device_in_child(self.sda1, 'sda1') self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips)