diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index e5cec2c283..07874cd286 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -520,12 +520,14 @@ log_name object-auditor Label used when logging log_facility LOG_LOCAL0 Syslog log facility log_level INFO Logging level log_time 3600 Frequency of status logs in seconds. -files_per_second 20 Maximum files audited per second. Should - be tuned according to individual system - specs. 0 is unlimited. -bytes_per_second 10000000 Maximum bytes audited per second. Should - be tuned according to individual system - specs. 0 is unlimited. +files_per_second 20 Maximum files audited per second per + auditor process. Should be tuned according + to individual system specs. 0 is unlimited. +bytes_per_second 10000000 Maximum bytes audited per second per + auditor process. Should be tuned according + to individual system specs. 0 is unlimited. +concurrency 1 The number of parallel processes to use + for checksum auditing. ================== ============== ========================================== ------------------------------ diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index a71bcc1c01..6ffc4f33cb 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -227,6 +227,7 @@ use = egg:swift#recon # log_address = /dev/log # # files_per_second = 20 +# concurrency = 1 # bytes_per_second = 10000000 # log_time = 3600 # zero_byte_files_per_second = 50 diff --git a/swift/cli/recon.py b/swift/cli/recon.py index 1394eafaa4..4cb1761d12 100755 --- a/swift/cli/recon.py +++ b/swift/cli/recon.py @@ -558,12 +558,18 @@ class SwiftRecon(object): """ Generator that yields all values for given key in a recon cache entry. This is for use with object auditor recon cache entries. If the - object auditor has run in 'once' mode with a subset of devices - specified the checksum auditor section will have an entry of the form: - {'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}} - The same is true of the ZBF auditor cache entry section. We use this - generator to find all instances of a particular key in these multi- - level dictionaries. + object auditor has run in parallel, the recon cache will have entries + of the form: {'object_auditor_stats_ALL': { 'disk1': {..}, + 'disk2': {..}, + 'disk3': {..}, + ...}} + If the object auditor hasn't run in parallel, the recon cache will have + entries of the form: {'object_auditor_stats_ALL': {...}}. + The ZBF auditor doesn't run in parallel. However, if a subset of + devices is selected for auditing, the recon cache will have an entry + of the form: {'object_auditor_stats_ZBF': { 'disk1disk2..diskN': {}} + We use this generator to find all instances of a particular key in + these multi-level dictionaries. """ for k, v in recon_entry.items(): if isinstance(v, dict): @@ -597,15 +603,15 @@ class SwiftRecon(object): zbf_scan[url] = response['object_auditor_stats_ZBF'] if len(all_scan) > 0: stats = {} - stats[atime] = [(self.nested_get_value(atime, all_scan[i])) + stats[atime] = [sum(self.nested_get_value(atime, all_scan[i])) for i in all_scan] - stats[bprocessed] = [(self.nested_get_value(bprocessed, + stats[bprocessed] = [sum(self.nested_get_value(bprocessed, all_scan[i])) for i in all_scan] - stats[passes] = [(self.nested_get_value(passes, all_scan[i])) + stats[passes] = [sum(self.nested_get_value(passes, all_scan[i])) for i in all_scan] - stats[errors] = [(self.nested_get_value(errors, all_scan[i])) + stats[errors] = [sum(self.nested_get_value(errors, all_scan[i])) for i in all_scan] - stats[quarantined] = [(self.nested_get_value(quarantined, + stats[quarantined] = [sum(self.nested_get_value(quarantined, all_scan[i])) for i in all_scan] for k in stats: if None in stats[k]: @@ -623,13 +629,13 @@ class SwiftRecon(object): print("[ALL_auditor] - No hosts returned valid data.") if len(zbf_scan) > 0: stats = {} - stats[atime] = [(self.nested_get_value(atime, zbf_scan[i])) + stats[atime] = [sum(self.nested_get_value(atime, zbf_scan[i])) for i in zbf_scan] - stats[bprocessed] = [(self.nested_get_value(bprocessed, + stats[bprocessed] = [sum(self.nested_get_value(bprocessed, zbf_scan[i])) for i in zbf_scan] - stats[errors] = [(self.nested_get_value(errors, zbf_scan[i])) + stats[errors] = [sum(self.nested_get_value(errors, zbf_scan[i])) for i in zbf_scan] - stats[quarantined] = [(self.nested_get_value(quarantined, + stats[quarantined] = [sum(self.nested_get_value(quarantined, zbf_scan[i])) for i in zbf_scan] for k in stats: if None in stats[k]: diff --git a/swift/common/utils.py b/swift/common/utils.py index a5badaab9b..ef98d5f4a8 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2284,7 +2284,8 @@ def put_recon_cache_entry(cache_entry, key, item): """ Function that will check if item is a dict, and if so put it under cache_entry[key]. We use nested recon cache entries when the object - auditor runs in 'once' mode with a specified subset of devices. + auditor runs in parallel or else in 'once' mode with a specified + subset of devices. """ if isinstance(item, dict): if key not in cache_entry or key in cache_entry and not \ diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 13d34c18c4..a50375bed1 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -17,6 +17,7 @@ import os import sys import time import signal +from random import shuffle from swift import gettext_ as _ from contextlib import closing from eventlet import Timeout @@ -72,7 +73,10 @@ class AuditorWorker(object): description = '' if device_dirs: device_dir_str = ','.join(sorted(device_dirs)) - description = _(' - %s') % device_dir_str + if self.auditor_type == 'ALL': + description = _(' - parallel, %s') % device_dir_str + else: + description = _(' - %s') % device_dir_str self.logger.info(_('Begin object audit "%s" mode (%s%s)') % (mode, self.auditor_type, description)) begin = reported = time.time() @@ -223,6 +227,7 @@ class ObjectAuditor(Daemon): self.conf = conf self.logger = get_logger(conf, log_route='object-auditor') self.devices = conf.get('devices', '/srv/node') + self.concurrency = int(conf.get('concurrency', 1)) self.conf_zero_byte_fps = int( conf.get('zero_byte_files_per_second', 50)) self.recon_cache_path = conf.get('recon_cache_path', @@ -260,7 +265,7 @@ class ObjectAuditor(Daemon): sys.exit() def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs): - """Audit loop""" + """Parallel audit loop""" self.clear_recon_cache('ALL') self.clear_recon_cache('ZBF') kwargs['device_dirs'] = override_devices @@ -272,7 +277,34 @@ class ObjectAuditor(Daemon): if self.conf_zero_byte_fps: zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs) pids.append(zbf_pid) - pids.append(self.fork_child(**kwargs)) + if self.concurrency == 1: + # Audit all devices in 1 process + pids.append(self.fork_child(**kwargs)) + else: + # Divide devices amongst parallel processes set by + # self.concurrency. Total number of parallel processes + # is self.concurrency + 1 if zero_byte_fps. + parallel_proc = self.concurrency + 1 if \ + self.conf_zero_byte_fps else self.concurrency + device_list = list(override_devices) if override_devices else \ + listdir(self.devices) + shuffle(device_list) + while device_list: + pid = None + if len(pids) == parallel_proc: + pid = os.wait()[0] + pids.remove(pid) + # ZBF scanner must be restarted as soon as it finishes + if self.conf_zero_byte_fps and pid == zbf_pid: + kwargs['device_dirs'] = override_devices + # sleep between ZBF scanner forks + self._sleep() + zbf_pid = self.fork_child(zero_byte_fps=True, + **kwargs) + pids.append(zbf_pid) + else: + kwargs['device_dirs'] = [device_list.pop()] + pids.append(self.fork_child(**kwargs)) while pids: pid = os.wait()[0] # ZBF scanner must be restarted as soon as it finishes diff --git a/test/unit/common/middleware/test_recon.py b/test/unit/common/middleware/test_recon.py index 799cdd6a29..061fae0f77 100644 --- a/test/unit/common/middleware/test_recon.py +++ b/test/unit/common/middleware/test_recon.py @@ -662,15 +662,23 @@ class TestReconSuccess(TestCase): "files_processed": 2310, "quarantined": 0}}) - def test_get_auditor_info_object_once(self): + def test_get_auditor_info_object_parallel_once(self): from_cache_response = { - "object_auditor_stats_ALL": {'disk1disk2': { - "audit_time": 115.14418768882751, - "bytes_processed": 234660, - "completed": 115.4512460231781, - "errors": 0, - "files_processed": 2310, - "quarantined": 0}}, + "object_auditor_stats_ALL": { + 'disk1': { + "audit_time": 115.14418768882751, + "bytes_processed": 234660, + "completed": 115.4512460231781, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}, + 'disk2': { + "audit_time": 115, + "bytes_processed": 234660, + "completed": 115, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}}, "object_auditor_stats_ZBF": {'disk1disk2': { "audit_time": 45.877294063568115, "bytes_processed": 0, @@ -686,13 +694,21 @@ class TestReconSuccess(TestCase): 'object_auditor_stats_ZBF'], '/var/cache/swift/object.recon'), {})]) self.assertEquals(rv, { - "object_auditor_stats_ALL": {'disk1disk2': { - "audit_time": 115.14418768882751, - "bytes_processed": 234660, - "completed": 115.4512460231781, - "errors": 0, - "files_processed": 2310, - "quarantined": 0}}, + "object_auditor_stats_ALL": { + 'disk1': { + "audit_time": 115.14418768882751, + "bytes_processed": 234660, + "completed": 115.4512460231781, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}, + 'disk2': { + "audit_time": 115, + "bytes_processed": 234660, + "completed": 115, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}}, "object_auditor_stats_ZBF": {'disk1disk2': { "audit_time": 45.877294063568115, "bytes_processed": 0, diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 48db7643c9..1b48b6d9a1 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -278,6 +278,24 @@ class TestAuditor(unittest.TestCase): self.assertEquals(auditor_worker.stats_buckets[10240], 0) self.assertEquals(auditor_worker.stats_buckets['OVER'], 1) + def test_object_run_logging(self): + logger = FakeLogger() + auditor_worker = auditor.AuditorWorker(self.conf, logger, + self.rcache, self.devices) + auditor_worker.audit_all_objects(device_dirs=['sda']) + log_lines = logger.get_lines_for_level('info') + self.assertTrue(len(log_lines) > 0) + self.assertTrue(log_lines[0].index('ALL - parallel, sda')) + + logger = FakeLogger() + auditor_worker = auditor.AuditorWorker(self.conf, logger, + self.rcache, self.devices, + zero_byte_only_at_fps=50) + auditor_worker.audit_all_objects(device_dirs=['sda']) + log_lines = logger.get_lines_for_level('info') + self.assertTrue(len(log_lines) > 0) + self.assertTrue(log_lines[0].index('ZBF - sda')) + def test_object_run_once_no_sda(self): auditor_worker = auditor.AuditorWorker(self.conf, self.logger, self.rcache, self.devices) @@ -452,7 +470,7 @@ class TestAuditor(unittest.TestCase): self.assert_(delta_t > 0.08) self.assert_(delta_t < 0.12) - def test_run_audit(self): + def test_run_parallel_audit(self): class StopForever(Exception): pass @@ -500,7 +518,9 @@ class TestAuditor(unittest.TestCase): my_auditor = auditor.ObjectAuditor(dict(devices=self.devices, mount_check='false', - zero_byte_files_per_second=89)) + zero_byte_files_per_second=89, + concurrency=1)) + mocker = ObjectAuditorMock() my_auditor.logger.exception = mock.MagicMock() real_audit_loop = my_auditor.audit_loop @@ -555,12 +575,17 @@ class TestAuditor(unittest.TestCase): my_auditor._sleep = mocker.mock_sleep_continue + my_auditor.concurrency = 2 mocker.fork_called = 0 mocker.wait_called = 0 my_auditor.run_once() - # Fork is called 3 times since the zbf process is forked twice - self.assertEquals(mocker.fork_called, 3) - self.assertEquals(mocker.wait_called, 3) + # Fork is called no. of devices + (no. of devices)/2 + 1 times + # since zbf process is forked (no.of devices)/2 + 1 times + no_devices = len(os.listdir(self.devices)) + self.assertEquals(mocker.fork_called, no_devices + no_devices / 2 + + 1) + self.assertEquals(mocker.wait_called, no_devices + no_devices / 2 + + 1) finally: os.fork = was_fork