object-updater: add concurrent updates

The object updater now supports two configuration settings:
"concurrency" and "updater_workers". The latter controls how many
worker processes are spawned, while the former controls how many
concurrent container updates are performed by each worker
process. This should speed the processing of async_pendings.

There is a change to the semantics of the configuration
options. Previously, "concurrency" controlled the number of worker
processes spawned, and "updater_workers" did not exist. I switched the
meanings for consistency with other configuration options. In the
object reconstructor, object replicator, object server, object
expirer, container replicator, container server, account replicator,
account server, and account reaper, "concurrency" refers to the number
of concurrent tasks performed within one process (for reference, the
container updater and object auditor use "concurrency" to mean number
of processes).

On upgrade, a node configured with concurrency=N will still handle
async updates N-at-a-time, but will do so using only one process
instead of N.

UpgradeImpact:

If you have a config file like this:

    [object-updater]
    concurrency = <N>

and you want to take advantage of faster updates, then do this:

    [object-updater]
    concurrency = 8  # the default; you can omit this line
    updater_workers = <N>

If you want updates to be processed exactly as before, do this:

    [object-updater]
    concurrency = 1
    updater_workers = <N>

Change-Id: I17e18088e61f664e1b9942d66423666d0cae1689
This commit is contained in:
Samuel Merritt 2018-06-04 16:26:50 -07:00
parent 3e9fdfceb4
commit d5c532a94e
4 changed files with 97 additions and 40 deletions

View File

@ -909,7 +909,9 @@ log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level log_level INFO Logging level
log_address /dev/log Logging directory log_address /dev/log Logging directory
interval 300 Minimum time for a pass to take interval 300 Minimum time for a pass to take
concurrency 1 Number of updater workers to spawn updater_workers 1 Number of worker processes
concurrency 8 Number of updates to run concurrently in
each worker process
node_timeout DEFAULT or 10 Request timeout to external services. This node_timeout DEFAULT or 10 Request timeout to external services. This
uses what's set here, or what's set in the uses what's set here, or what's set in the
DEFAULT section, or 10 (though other DEFAULT section, or 10 (though other

View File

@ -371,9 +371,16 @@ use = egg:swift#recon
# log_address = /dev/log # log_address = /dev/log
# #
# interval = 300 # interval = 300
# concurrency = 1
# node_timeout = <whatever's in the DEFAULT section or 10> # node_timeout = <whatever's in the DEFAULT section or 10>
# #
# updater_workers controls how many processes the object updater will
# spawn, while concurrency controls how many async_pending records
# each updater process will operate on at any one time. With
# concurrency=C and updater_workers=W, there will be up to W*C
# async_pending records being processed at once.
# concurrency = 8
# updater_workers = 1
#
# Send at most this many object updates per second # Send at most this many object updates per second
# objects_per_second = 50 # objects_per_second = 50
# #

View File

@ -29,7 +29,7 @@ from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \ from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data eventlet_monkey_patch, get_redirect_data, ContextPool
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError from swift.common.storage_policy import split_policy_string, PolicyError
@ -94,7 +94,8 @@ class ObjectUpdater(Daemon):
self.swift_dir = conf.get('swift_dir', '/etc/swift') self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.interval = int(conf.get('interval', 300)) self.interval = int(conf.get('interval', 300))
self.container_ring = None self.container_ring = None
self.concurrency = int(conf.get('concurrency', 1)) self.concurrency = int(conf.get('concurrency', 8))
self.updater_workers = int(conf.get('updater_workers', 1))
if 'slowdown' in conf: if 'slowdown' in conf:
self.logger.warning( self.logger.warning(
'The slowdown option is deprecated in favor of ' 'The slowdown option is deprecated in favor of '
@ -150,7 +151,7 @@ class ObjectUpdater(Daemon):
self.logger.warning( self.logger.warning(
_('Skipping %s as it is not mounted'), device) _('Skipping %s as it is not mounted'), device)
continue continue
while len(pids) >= self.concurrency: while len(pids) >= self.updater_workers:
pids.remove(os.wait()[0]) pids.remove(os.wait()[0])
pid = os.fork() pid = os.fork()
if pid: if pid:
@ -230,6 +231,7 @@ class ObjectUpdater(Daemon):
prefix_path = os.path.join(async_pending, prefix) prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path): if not os.path.isdir(prefix_path):
continue continue
last_obj_hash = None
for update in sorted(self._listdir(prefix_path), reverse=True): for update in sorted(self._listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update) update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path): if not os.path.isfile(update_path):
@ -244,13 +246,34 @@ class ObjectUpdater(Daemon):
'name %s') 'name %s')
% (update_path)) % (update_path))
continue continue
yield {'device': device, 'policy': policy, # Async pendings are stored on disk like this:
'path': update_path, #
'obj_hash': obj_hash, 'timestamp': timestamp} # <device>/async_pending/<suffix>/<obj_hash>-<timestamp>
try: #
os.rmdir(prefix_path) # If there are multiple updates for a given object,
except OSError: # they'll look like this:
pass #
# <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp1>
# <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp2>
# <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp3>
#
# Async updates also have the property that newer
# updates contain all the information in older updates.
# Since we sorted the directory listing in reverse
# order, we'll see timestamp3 first, yield it, and then
# unlink timestamp2 and timestamp1 since we know they
# are obsolete.
#
# This way, our caller only gets useful async_pendings.
if obj_hash == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
else:
last_obj_hash = obj_hash
yield {'device': device, 'policy': policy,
'path': update_path,
'obj_hash': obj_hash, 'timestamp': timestamp}
def object_sweep(self, device): def object_sweep(self, device):
""" """
@ -265,31 +288,25 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)", self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid) device, my_pid)
last_obj_hash = None
ap_iter = RateLimitedIterator( ap_iter = RateLimitedIterator(
self._iter_async_pendings(device), self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second) elements_per_second=self.max_objects_per_second)
for update in ap_iter: with ContextPool(self.concurrency) as pool:
if update['obj_hash'] == last_obj_hash: for update in ap_iter:
self.stats.unlinks += 1 pool.spawn(self.process_object_update,
self.logger.increment('unlinks') update['path'], update['device'], update['policy'])
os.unlink(update['path']) now = time.time()
else: if now - last_status_update >= self.report_interval:
self.process_object_update(update['path'], update['device'], this_sweep = self.stats.since(start_stats)
update['policy']) self.logger.info(
last_obj_hash = update['obj_hash'] ('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
now = time.time() {'device': device,
if now - last_status_update >= self.report_interval: 'elapsed': now - start_time,
this_sweep = self.stats.since(start_stats) 'pid': my_pid,
self.logger.info( 'stats': this_sweep})
('Object update sweep progress on %(device)s: ' last_status_update = now
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'), pool.waitall()
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
self.logger.timing_since('timing', start_time) self.logger.timing_since('timing', start_time)
sweep_totals = self.stats.since(start_stats) sweep_totals = self.stats.since(start_stats)
@ -370,6 +387,13 @@ class ObjectUpdater(Daemon):
self.stats.unlinks += 1 self.stats.unlinks += 1
self.logger.increment('unlinks') self.logger.increment('unlinks')
os.unlink(update_path) os.unlink(update_path)
try:
# If this was the last async_pending in the directory,
# then this will succeed. Otherwise, it'll fail, and
# that's okay.
os.rmdir(os.path.dirname(update_path))
except OSError:
pass
elif redirects: elif redirects:
# erase any previous successes # erase any previous successes
update.pop('successes', None) update.pop('successes', None)

View File

@ -43,6 +43,23 @@ from swift.common.utils import (
from swift.common.storage_policy import StoragePolicy, POLICIES from swift.common.storage_policy import StoragePolicy, POLICIES
class MockPool(object):
def __init__(self, *a, **kw):
pass
def spawn(self, func, *args, **kwargs):
func(*args, **kwargs)
def waitall(self):
pass
def __enter__(self):
return self
def __exit__(self, *a, **kw):
pass
_mocked_policies = [StoragePolicy(0, 'zero', False), _mocked_policies = [StoragePolicy(0, 'zero', False),
StoragePolicy(1, 'one', True)] StoragePolicy(1, 'one', True)]
@ -104,7 +121,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.mount_check, True) self.assertEqual(daemon.mount_check, True)
self.assertEqual(daemon.swift_dir, '/etc/swift') self.assertEqual(daemon.swift_dir, '/etc/swift')
self.assertEqual(daemon.interval, 300) self.assertEqual(daemon.interval, 300)
self.assertEqual(daemon.concurrency, 1) self.assertEqual(daemon.concurrency, 8)
self.assertEqual(daemon.updater_workers, 1)
self.assertEqual(daemon.max_objects_per_second, 50.0) self.assertEqual(daemon.max_objects_per_second, 50.0)
# non-defaults # non-defaults
@ -114,6 +132,7 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': '/not/here', 'swift_dir': '/not/here',
'interval': '600', 'interval': '600',
'concurrency': '2', 'concurrency': '2',
'updater_workers': '3',
'objects_per_second': '10.5', 'objects_per_second': '10.5',
} }
daemon = object_updater.ObjectUpdater(conf, logger=self.logger) daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
@ -122,6 +141,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.swift_dir, '/not/here') self.assertEqual(daemon.swift_dir, '/not/here')
self.assertEqual(daemon.interval, 600) self.assertEqual(daemon.interval, 600)
self.assertEqual(daemon.concurrency, 2) self.assertEqual(daemon.concurrency, 2)
self.assertEqual(daemon.updater_workers, 3)
self.assertEqual(daemon.max_objects_per_second, 10.5) self.assertEqual(daemon.max_objects_per_second, 10.5)
# check deprecated option # check deprecated option
@ -234,10 +254,8 @@ class TestObjectUpdater(unittest.TestCase):
if should_skip: if should_skip:
# if we were supposed to skip over the dir, we didn't process # if we were supposed to skip over the dir, we didn't process
# anything at all # anything at all
self.assertTrue(os.path.exists(prefix_dir))
self.assertEqual(set(), seen) self.assertEqual(set(), seen)
else: else:
self.assertTrue(not os.path.exists(prefix_dir))
self.assertEqual(expected, seen) self.assertEqual(expected, seen)
# test cleanup: the tempdir gets cleaned up between runs, but this # test cleanup: the tempdir gets cleaned up between runs, but this
@ -291,7 +309,8 @@ class TestObjectUpdater(unittest.TestCase):
# and 5 async_pendings on disk, we should get at least two progress # and 5 async_pendings on disk, we should get at least two progress
# lines. # lines.
with mock.patch('swift.obj.updater.time', with mock.patch('swift.obj.updater.time',
mock.MagicMock(time=mock_time_function)): mock.MagicMock(time=mock_time_function)), \
mock.patch.object(object_updater, 'ContextPool', MockPool):
ou.object_sweep(self.sda1) ou.object_sweep(self.sda1)
info_lines = logger.get_lines_for_level('info') info_lines = logger.get_lines_for_level('info')
@ -444,7 +463,6 @@ class TestObjectUpdater(unittest.TestCase):
os.mkdir(odd_dir) os.mkdir(odd_dir)
ou.run_once() ou.run_once()
self.assertTrue(os.path.exists(async_dir)) self.assertTrue(os.path.exists(async_dir))
self.assertFalse(os.path.exists(odd_dir))
self.assertEqual([ self.assertEqual([
mock.call(self.devices_dir, 'sda1', True), mock.call(self.devices_dir, 'sda1', True),
], mock_check_drive.mock_calls) ], mock_check_drive.mock_calls)
@ -548,7 +566,13 @@ class TestObjectUpdater(unittest.TestCase):
err = event.wait() err = event.wait()
if err: if err:
raise err raise err
self.assertTrue(not os.path.exists(op_path))
# we remove the async_pending and its containing suffix dir, but not
# anything above that
self.assertFalse(os.path.exists(op_path))
self.assertFalse(os.path.exists(os.path.dirname(op_path)))
self.assertTrue(os.path.exists(os.path.dirname(os.path.dirname(
op_path))))
self.assertEqual(ou.logger.get_increment_counts(), self.assertEqual(ou.logger.get_increment_counts(),
{'unlinks': 1, 'successes': 1}) {'unlinks': 1, 'successes': 1})