Parallel object auditor

We are soon going to put servers with a high ratio of disk to CPU
into production as object servers.  One of our concerns with this
configuration is that the object auditor would take too long to
complete its audit cycle.  Therefore we decided to parallelise
the auditor.

The auditor already uses fork(), so we decided to use the parallel
model from the replicator.  Concurrency is set by the concurrency
parameter in the auditor stanza, which sets the number of parallel
checksum auditors.  The actual number of parallel auditing processes
is concurrency + 1 if zero_byte_fps is non-zero.

Only one ZBF process is forked, and a new ZBF process is forked as
soon as the current ZBF process finishes.  Thus the last process
running will always be a ZBF process.

Both forever and once modes are parallelised.

Each checksum auditor process submits a nested dictionary with keys
{'object_auditor_stats_ALL': {'diskn': {..}}} to dump_recon_cache
so that the object_auditor_stats_ALL dict in recon cache consists
of individual sub-dicts for each of the object disks on the server.
The recon cache is no different to before when the checksum auditor
is run in serial mode.  When swift-recon is run, it sums the stats
for the individual disks.

DocImpact

Change-Id: I0ce3db57a43e482d4be351cc522fc9060af6e2d3
This commit is contained in:
Eamonn O'Toole 2014-03-26 16:32:07 +00:00
parent 92fb1c15da
commit d317888a7e
7 changed files with 128 additions and 45 deletions

View File

@ -520,12 +520,14 @@ log_name object-auditor Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
log_time 3600 Frequency of status logs in seconds.
files_per_second 20 Maximum files audited per second. Should
be tuned according to individual system
specs. 0 is unlimited.
bytes_per_second 10000000 Maximum bytes audited per second. Should
be tuned according to individual system
specs. 0 is unlimited.
files_per_second 20 Maximum files audited per second per
auditor process. Should be tuned according
to individual system specs. 0 is unlimited.
bytes_per_second 10000000 Maximum bytes audited per second per
auditor process. Should be tuned according
to individual system specs. 0 is unlimited.
concurrency 1 The number of parallel processes to use
for checksum auditing.
================== ============== ==========================================
------------------------------

View File

@ -227,6 +227,7 @@ use = egg:swift#recon
# log_address = /dev/log
#
# files_per_second = 20
# concurrency = 1
# bytes_per_second = 10000000
# log_time = 3600
# zero_byte_files_per_second = 50

View File

@ -558,12 +558,18 @@ class SwiftRecon(object):
"""
Generator that yields all values for given key in a recon cache entry.
This is for use with object auditor recon cache entries. If the
object auditor has run in 'once' mode with a subset of devices
specified the checksum auditor section will have an entry of the form:
{'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}}
The same is true of the ZBF auditor cache entry section. We use this
generator to find all instances of a particular key in these multi-
level dictionaries.
object auditor has run in parallel, the recon cache will have entries
of the form: {'object_auditor_stats_ALL': { 'disk1': {..},
'disk2': {..},
'disk3': {..},
...}}
If the object auditor hasn't run in parallel, the recon cache will have
entries of the form: {'object_auditor_stats_ALL': {...}}.
The ZBF auditor doesn't run in parallel. However, if a subset of
devices is selected for auditing, the recon cache will have an entry
of the form: {'object_auditor_stats_ZBF': { 'disk1disk2..diskN': {}}
We use this generator to find all instances of a particular key in
these multi-level dictionaries.
"""
for k, v in recon_entry.items():
if isinstance(v, dict):
@ -597,15 +603,15 @@ class SwiftRecon(object):
zbf_scan[url] = response['object_auditor_stats_ZBF']
if len(all_scan) > 0:
stats = {}
stats[atime] = [(self.nested_get_value(atime, all_scan[i]))
stats[atime] = [sum(self.nested_get_value(atime, all_scan[i]))
for i in all_scan]
stats[bprocessed] = [(self.nested_get_value(bprocessed,
stats[bprocessed] = [sum(self.nested_get_value(bprocessed,
all_scan[i])) for i in all_scan]
stats[passes] = [(self.nested_get_value(passes, all_scan[i]))
stats[passes] = [sum(self.nested_get_value(passes, all_scan[i]))
for i in all_scan]
stats[errors] = [(self.nested_get_value(errors, all_scan[i]))
stats[errors] = [sum(self.nested_get_value(errors, all_scan[i]))
for i in all_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
stats[quarantined] = [sum(self.nested_get_value(quarantined,
all_scan[i])) for i in all_scan]
for k in stats:
if None in stats[k]:
@ -623,13 +629,13 @@ class SwiftRecon(object):
print("[ALL_auditor] - No hosts returned valid data.")
if len(zbf_scan) > 0:
stats = {}
stats[atime] = [(self.nested_get_value(atime, zbf_scan[i]))
stats[atime] = [sum(self.nested_get_value(atime, zbf_scan[i]))
for i in zbf_scan]
stats[bprocessed] = [(self.nested_get_value(bprocessed,
stats[bprocessed] = [sum(self.nested_get_value(bprocessed,
zbf_scan[i])) for i in zbf_scan]
stats[errors] = [(self.nested_get_value(errors, zbf_scan[i]))
stats[errors] = [sum(self.nested_get_value(errors, zbf_scan[i]))
for i in zbf_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
stats[quarantined] = [sum(self.nested_get_value(quarantined,
zbf_scan[i])) for i in zbf_scan]
for k in stats:
if None in stats[k]:

View File

@ -2284,7 +2284,8 @@ def put_recon_cache_entry(cache_entry, key, item):
"""
Function that will check if item is a dict, and if so put it under
cache_entry[key]. We use nested recon cache entries when the object
auditor runs in 'once' mode with a specified subset of devices.
auditor runs in parallel or else in 'once' mode with a specified
subset of devices.
"""
if isinstance(item, dict):
if key not in cache_entry or key in cache_entry and not \

View File

@ -17,6 +17,7 @@ import os
import sys
import time
import signal
from random import shuffle
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
@ -72,7 +73,10 @@ class AuditorWorker(object):
description = ''
if device_dirs:
device_dir_str = ','.join(sorted(device_dirs))
description = _(' - %s') % device_dir_str
if self.auditor_type == 'ALL':
description = _(' - parallel, %s') % device_dir_str
else:
description = _(' - %s') % device_dir_str
self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
(mode, self.auditor_type, description))
begin = reported = time.time()
@ -223,6 +227,7 @@ class ObjectAuditor(Daemon):
self.conf = conf
self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.concurrency = int(conf.get('concurrency', 1))
self.conf_zero_byte_fps = int(
conf.get('zero_byte_files_per_second', 50))
self.recon_cache_path = conf.get('recon_cache_path',
@ -260,7 +265,7 @@ class ObjectAuditor(Daemon):
sys.exit()
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
"""Audit loop"""
"""Parallel audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
kwargs['device_dirs'] = override_devices
@ -272,7 +277,34 @@ class ObjectAuditor(Daemon):
if self.conf_zero_byte_fps:
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.append(self.fork_child(**kwargs))
if self.concurrency == 1:
# Audit all devices in 1 process
pids.append(self.fork_child(**kwargs))
else:
# Divide devices amongst parallel processes set by
# self.concurrency. Total number of parallel processes
# is self.concurrency + 1 if zero_byte_fps.
parallel_proc = self.concurrency + 1 if \
self.conf_zero_byte_fps else self.concurrency
device_list = list(override_devices) if override_devices else \
listdir(self.devices)
shuffle(device_list)
while device_list:
pid = None
if len(pids) == parallel_proc:
pid = os.wait()[0]
pids.remove(pid)
# ZBF scanner must be restarted as soon as it finishes
if self.conf_zero_byte_fps and pid == zbf_pid:
kwargs['device_dirs'] = override_devices
# sleep between ZBF scanner forks
self._sleep()
zbf_pid = self.fork_child(zero_byte_fps=True,
**kwargs)
pids.append(zbf_pid)
else:
kwargs['device_dirs'] = [device_list.pop()]
pids.append(self.fork_child(**kwargs))
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes

View File

@ -662,15 +662,23 @@ class TestReconSuccess(TestCase):
"files_processed": 2310,
"quarantined": 0}})
def test_get_auditor_info_object_once(self):
def test_get_auditor_info_object_parallel_once(self):
from_cache_response = {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ALL": {
'disk1': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0},
'disk2': {
"audit_time": 115,
"bytes_processed": 234660,
"completed": 115,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
@ -686,13 +694,21 @@ class TestReconSuccess(TestCase):
'object_auditor_stats_ZBF'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ALL": {
'disk1': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0},
'disk2': {
"audit_time": 115,
"bytes_processed": 234660,
"completed": 115,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,

View File

@ -278,6 +278,24 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
self.assertEquals(auditor_worker.stats_buckets['OVER'], 1)
def test_object_run_logging(self):
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ZBF - sda'))
def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
@ -452,7 +470,7 @@ class TestAuditor(unittest.TestCase):
self.assert_(delta_t > 0.08)
self.assert_(delta_t < 0.12)
def test_run_audit(self):
def test_run_parallel_audit(self):
class StopForever(Exception):
pass
@ -500,7 +518,9 @@ class TestAuditor(unittest.TestCase):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89))
zero_byte_files_per_second=89,
concurrency=1))
mocker = ObjectAuditorMock()
my_auditor.logger.exception = mock.MagicMock()
real_audit_loop = my_auditor.audit_loop
@ -555,12 +575,17 @@ class TestAuditor(unittest.TestCase):
my_auditor._sleep = mocker.mock_sleep_continue
my_auditor.concurrency = 2
mocker.fork_called = 0
mocker.wait_called = 0
my_auditor.run_once()
# Fork is called 3 times since the zbf process is forked twice
self.assertEquals(mocker.fork_called, 3)
self.assertEquals(mocker.wait_called, 3)
# Fork is called no. of devices + (no. of devices)/2 + 1 times
# since zbf process is forked (no.of devices)/2 + 1 times
no_devices = len(os.listdir(self.devices))
self.assertEquals(mocker.fork_called, no_devices + no_devices / 2
+ 1)
self.assertEquals(mocker.wait_called, no_devices + no_devices / 2
+ 1)
finally:
os.fork = was_fork