Merge "Parallel object auditor"

This commit is contained in:
Jenkins 2014-07-08 10:58:36 +00:00 committed by Gerrit Code Review
commit c94779d2ac
7 changed files with 119 additions and 36 deletions

View File

@ -520,12 +520,14 @@ log_name object-auditor Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
log_time 3600 Frequency of status logs in seconds.
files_per_second 20 Maximum files audited per second. Should
be tuned according to individual system
specs. 0 is unlimited.
bytes_per_second 10000000 Maximum bytes audited per second. Should
be tuned according to individual system
specs. 0 is unlimited.
files_per_second 20 Maximum files audited per second per
auditor process. Should be tuned according
to individual system specs. 0 is unlimited.
bytes_per_second 10000000 Maximum bytes audited per second per
auditor process. Should be tuned according
to individual system specs. 0 is unlimited.
concurrency 1 The number of parallel processes to use
for checksum auditing.
================== ============== ==========================================
------------------------------

View File

@ -227,6 +227,7 @@ use = egg:swift#recon
# log_address = /dev/log
#
# files_per_second = 20
# concurrency = 1
# bytes_per_second = 10000000
# log_time = 3600
# zero_byte_files_per_second = 50

View File

@ -558,12 +558,18 @@ class SwiftRecon(object):
"""
Generator that yields all values for given key in a recon cache entry.
This is for use with object auditor recon cache entries. If the
object auditor has run in 'once' mode with a subset of devices
specified the checksum auditor section will have an entry of the form:
{'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}}
The same is true of the ZBF auditor cache entry section. We use this
generator to find all instances of a particular key in these multi-
level dictionaries.
object auditor has run in parallel, the recon cache will have entries
of the form: {'object_auditor_stats_ALL': { 'disk1': {..},
'disk2': {..},
'disk3': {..},
...}}
If the object auditor hasn't run in parallel, the recon cache will have
entries of the form: {'object_auditor_stats_ALL': {...}}.
The ZBF auditor doesn't run in parallel. However, if a subset of
devices is selected for auditing, the recon cache will have an entry
of the form: {'object_auditor_stats_ZBF': { 'disk1disk2..diskN': {}}
We use this generator to find all instances of a particular key in
these multi-level dictionaries.
"""
for k, v in recon_entry.items():
if isinstance(v, dict):

View File

@ -2307,7 +2307,8 @@ def put_recon_cache_entry(cache_entry, key, item):
"""
Function that will check if item is a dict, and if so put it under
cache_entry[key]. We use nested recon cache entries when the object
auditor runs in 'once' mode with a specified subset of devices.
auditor runs in parallel or else in 'once' mode with a specified
subset of devices.
"""
if isinstance(item, dict):
if key not in cache_entry or key in cache_entry and not \

View File

@ -17,6 +17,7 @@ import os
import sys
import time
import signal
from random import shuffle
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
@ -72,7 +73,10 @@ class AuditorWorker(object):
description = ''
if device_dirs:
device_dir_str = ','.join(sorted(device_dirs))
description = _(' - %s') % device_dir_str
if self.auditor_type == 'ALL':
description = _(' - parallel, %s') % device_dir_str
else:
description = _(' - %s') % device_dir_str
self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
(mode, self.auditor_type, description))
begin = reported = time.time()
@ -223,6 +227,7 @@ class ObjectAuditor(Daemon):
self.conf = conf
self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.concurrency = int(conf.get('concurrency', 1))
self.conf_zero_byte_fps = int(
conf.get('zero_byte_files_per_second', 50))
self.recon_cache_path = conf.get('recon_cache_path',
@ -260,7 +265,7 @@ class ObjectAuditor(Daemon):
sys.exit()
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
"""Audit loop"""
"""Parallel audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
kwargs['device_dirs'] = override_devices
@ -272,7 +277,34 @@ class ObjectAuditor(Daemon):
if self.conf_zero_byte_fps:
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.append(self.fork_child(**kwargs))
if self.concurrency == 1:
# Audit all devices in 1 process
pids.append(self.fork_child(**kwargs))
else:
# Divide devices amongst parallel processes set by
# self.concurrency. Total number of parallel processes
# is self.concurrency + 1 if zero_byte_fps.
parallel_proc = self.concurrency + 1 if \
self.conf_zero_byte_fps else self.concurrency
device_list = list(override_devices) if override_devices else \
listdir(self.devices)
shuffle(device_list)
while device_list:
pid = None
if len(pids) == parallel_proc:
pid = os.wait()[0]
pids.remove(pid)
# ZBF scanner must be restarted as soon as it finishes
if self.conf_zero_byte_fps and pid == zbf_pid:
kwargs['device_dirs'] = override_devices
# sleep between ZBF scanner forks
self._sleep()
zbf_pid = self.fork_child(zero_byte_fps=True,
**kwargs)
pids.append(zbf_pid)
else:
kwargs['device_dirs'] = [device_list.pop()]
pids.append(self.fork_child(**kwargs))
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes

View File

@ -662,15 +662,23 @@ class TestReconSuccess(TestCase):
"files_processed": 2310,
"quarantined": 0}})
def test_get_auditor_info_object_once(self):
def test_get_auditor_info_object_parallel_once(self):
from_cache_response = {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ALL": {
'disk1': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0},
'disk2': {
"audit_time": 115,
"bytes_processed": 234660,
"completed": 115,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
@ -686,13 +694,21 @@ class TestReconSuccess(TestCase):
'object_auditor_stats_ZBF'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ALL": {
'disk1': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0},
'disk2': {
"audit_time": 115,
"bytes_processed": 234660,
"completed": 115,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,

View File

@ -278,6 +278,24 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
self.assertEquals(auditor_worker.stats_buckets['OVER'], 1)
def test_object_run_logging(self):
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ZBF - sda'))
def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
@ -451,7 +469,7 @@ class TestAuditor(unittest.TestCase):
my_auditor._sleep()
mock_sleep.assert_called_with(auditor.SLEEP_BETWEEN_AUDITS)
def test_run_audit(self):
def test_run_parallel_audit(self):
class StopForever(Exception):
pass
@ -499,7 +517,9 @@ class TestAuditor(unittest.TestCase):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89))
zero_byte_files_per_second=89,
concurrency=1))
mocker = ObjectAuditorMock()
my_auditor.logger.exception = mock.MagicMock()
real_audit_loop = my_auditor.audit_loop
@ -554,12 +574,17 @@ class TestAuditor(unittest.TestCase):
my_auditor._sleep = mocker.mock_sleep_continue
my_auditor.concurrency = 2
mocker.fork_called = 0
mocker.wait_called = 0
my_auditor.run_once()
# Fork is called 3 times since the zbf process is forked twice
self.assertEquals(mocker.fork_called, 3)
self.assertEquals(mocker.wait_called, 3)
# Fork is called no. of devices + (no. of devices)/2 + 1 times
# since zbf process is forked (no.of devices)/2 + 1 times
no_devices = len(os.listdir(self.devices))
self.assertEquals(mocker.fork_called, no_devices + no_devices / 2
+ 1)
self.assertEquals(mocker.wait_called, no_devices + no_devices / 2
+ 1)
finally:
os.fork = was_fork