Merge "object-updater: add concurrent updates"

This commit is contained in:
Zuul 2018-06-14 20:37:06 +00:00 committed by Gerrit Code Review
commit ea33638d0c
4 changed files with 97 additions and 40 deletions

View File

@ -909,7 +909,9 @@ log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
log_address /dev/log Logging directory
interval 300 Minimum time for a pass to take
concurrency 1 Number of updater workers to spawn
updater_workers 1 Number of worker processes
concurrency 8 Number of updates to run concurrently in
each worker process
node_timeout DEFAULT or 10 Request timeout to external services. This
uses what's set here, or what's set in the
DEFAULT section, or 10 (though other

View File

@ -371,9 +371,16 @@ use = egg:swift#recon
# log_address = /dev/log
#
# interval = 300
# concurrency = 1
# node_timeout = <whatever's in the DEFAULT section or 10>
#
# updater_workers controls how many processes the object updater will
# spawn, while concurrency controls how many async_pending records
# each updater process will operate on at any one time. With
# concurrency=C and updater_workers=W, there will be up to W*C
# async_pending records being processed at once.
# concurrency = 8
# updater_workers = 1
#
# Send at most this many object updates per second
# objects_per_second = 50
#

View File

@ -29,7 +29,7 @@ from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data
eventlet_monkey_patch, get_redirect_data, ContextPool
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@ -94,7 +94,8 @@ class ObjectUpdater(Daemon):
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.interval = int(conf.get('interval', 300))
self.container_ring = None
self.concurrency = int(conf.get('concurrency', 1))
self.concurrency = int(conf.get('concurrency', 8))
self.updater_workers = int(conf.get('updater_workers', 1))
if 'slowdown' in conf:
self.logger.warning(
'The slowdown option is deprecated in favor of '
@ -150,7 +151,7 @@ class ObjectUpdater(Daemon):
self.logger.warning(
_('Skipping %s as it is not mounted'), device)
continue
while len(pids) >= self.concurrency:
while len(pids) >= self.updater_workers:
pids.remove(os.wait()[0])
pid = os.fork()
if pid:
@ -230,6 +231,7 @@ class ObjectUpdater(Daemon):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
for update in sorted(self._listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
@ -244,13 +246,34 @@ class ObjectUpdater(Daemon):
'name %s')
% (update_path))
continue
yield {'device': device, 'policy': policy,
'path': update_path,
'obj_hash': obj_hash, 'timestamp': timestamp}
try:
os.rmdir(prefix_path)
except OSError:
pass
# Async pendings are stored on disk like this:
#
# <device>/async_pending/<suffix>/<obj_hash>-<timestamp>
#
# If there are multiple updates for a given object,
# they'll look like this:
#
# <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp1>
# <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp2>
# <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp3>
#
# Async updates also have the property that newer
# updates contain all the information in older updates.
# Since we sorted the directory listing in reverse
# order, we'll see timestamp3 first, yield it, and then
# unlink timestamp2 and timestamp1 since we know they
# are obsolete.
#
# This way, our caller only gets useful async_pendings.
if obj_hash == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
else:
last_obj_hash = obj_hash
yield {'device': device, 'policy': policy,
'path': update_path,
'obj_hash': obj_hash, 'timestamp': timestamp}
def object_sweep(self, device):
"""
@ -265,31 +288,25 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
last_obj_hash = None
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
for update in ap_iter:
if update['obj_hash'] == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update['path'])
else:
self.process_object_update(update['path'], update['device'],
update['policy'])
last_obj_hash = update['obj_hash']
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
self.logger.info(
('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
with ContextPool(self.concurrency) as pool:
for update in ap_iter:
pool.spawn(self.process_object_update,
update['path'], update['device'], update['policy'])
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
self.logger.info(
('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
pool.waitall()
self.logger.timing_since('timing', start_time)
sweep_totals = self.stats.since(start_stats)
@ -370,6 +387,13 @@ class ObjectUpdater(Daemon):
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
try:
# If this was the last async_pending in the directory,
# then this will succeed. Otherwise, it'll fail, and
# that's okay.
os.rmdir(os.path.dirname(update_path))
except OSError:
pass
elif redirects:
# erase any previous successes
update.pop('successes', None)

View File

@ -43,6 +43,23 @@ from swift.common.utils import (
from swift.common.storage_policy import StoragePolicy, POLICIES
class MockPool(object):
def __init__(self, *a, **kw):
pass
def spawn(self, func, *args, **kwargs):
func(*args, **kwargs)
def waitall(self):
pass
def __enter__(self):
return self
def __exit__(self, *a, **kw):
pass
_mocked_policies = [StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True)]
@ -104,7 +121,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.mount_check, True)
self.assertEqual(daemon.swift_dir, '/etc/swift')
self.assertEqual(daemon.interval, 300)
self.assertEqual(daemon.concurrency, 1)
self.assertEqual(daemon.concurrency, 8)
self.assertEqual(daemon.updater_workers, 1)
self.assertEqual(daemon.max_objects_per_second, 50.0)
# non-defaults
@ -114,6 +132,7 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': '/not/here',
'interval': '600',
'concurrency': '2',
'updater_workers': '3',
'objects_per_second': '10.5',
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
@ -122,6 +141,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.swift_dir, '/not/here')
self.assertEqual(daemon.interval, 600)
self.assertEqual(daemon.concurrency, 2)
self.assertEqual(daemon.updater_workers, 3)
self.assertEqual(daemon.max_objects_per_second, 10.5)
# check deprecated option
@ -234,10 +254,8 @@ class TestObjectUpdater(unittest.TestCase):
if should_skip:
# if we were supposed to skip over the dir, we didn't process
# anything at all
self.assertTrue(os.path.exists(prefix_dir))
self.assertEqual(set(), seen)
else:
self.assertTrue(not os.path.exists(prefix_dir))
self.assertEqual(expected, seen)
# test cleanup: the tempdir gets cleaned up between runs, but this
@ -291,7 +309,8 @@ class TestObjectUpdater(unittest.TestCase):
# and 5 async_pendings on disk, we should get at least two progress
# lines.
with mock.patch('swift.obj.updater.time',
mock.MagicMock(time=mock_time_function)):
mock.MagicMock(time=mock_time_function)), \
mock.patch.object(object_updater, 'ContextPool', MockPool):
ou.object_sweep(self.sda1)
info_lines = logger.get_lines_for_level('info')
@ -444,7 +463,6 @@ class TestObjectUpdater(unittest.TestCase):
os.mkdir(odd_dir)
ou.run_once()
self.assertTrue(os.path.exists(async_dir))
self.assertFalse(os.path.exists(odd_dir))
self.assertEqual([
mock.call(self.devices_dir, 'sda1', True),
], mock_check_drive.mock_calls)
@ -548,7 +566,13 @@ class TestObjectUpdater(unittest.TestCase):
err = event.wait()
if err:
raise err
self.assertTrue(not os.path.exists(op_path))
# we remove the async_pending and its containing suffix dir, but not
# anything above that
self.assertFalse(os.path.exists(op_path))
self.assertFalse(os.path.exists(os.path.dirname(op_path)))
self.assertTrue(os.path.exists(os.path.dirname(os.path.dirname(
op_path))))
self.assertEqual(ou.logger.get_increment_counts(),
{'unlinks': 1, 'successes': 1})