Merge "object-updater: defer ratelimited updates"

This commit is contained in:
Zuul 2022-02-22 07:38:37 +00:00 committed by Gerrit Code Review
commit c7774d960c
4 changed files with 881 additions and 61 deletions

View File

@ -483,6 +483,12 @@ use = egg:swift#recon
# be an integer greater than 0.
# per_container_ratelimit_buckets = 1000
#
# Updates that cannot be sent due to per-container rate-limiting may be
# deferred and re-tried at the end of the updater cycle. This option constrains
# the size of the in-memory data structure used to store deferred updates.
# Must be an integer value greater than or equal to 0.
# max_deferred_updates = 10000
#
# slowdown will sleep that amount between objects. Deprecated; use
# objects_per_second instead.
# slowdown = 0.01

View File

@ -12,6 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from six.moves import queue
import six.moves.cPickle as pickle
import errno
@ -21,6 +22,7 @@ import sys
import time
import uuid
from random import random, shuffle
from collections import deque
from eventlet import spawn, Timeout
@ -31,7 +33,7 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
non_negative_float, config_positive_int_value
non_negative_float, config_positive_int_value, non_negative_int
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@ -41,33 +43,98 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
class RateLimiterBucket(object):
def __init__(self, update_delta):
self.update_delta = update_delta
self.last_time = 0
self.deque = deque()
@property
def wait_until(self):
return self.last_time + self.update_delta
def __len__(self):
return len(self.deque)
def __bool__(self):
return bool(self.deque)
__nonzero__ = __bool__ # py2
def __lt__(self, other):
# used to sort buckets by readiness
if isinstance(other, RateLimiterBucket):
return self.wait_until < other.wait_until
return self.wait_until < other
class BucketizedUpdateSkippingLimiter(object):
"""
Wrap an iterator to filter elements that show up too often.
Wrap an iterator to rate-limit updates on a per-bucket basis, where updates
are mapped to buckets by hashing their destination path. If an update is
rate-limited then it is placed on a deferral queue and may be sent later if
the wrapped iterator is exhausted before the ``drain_until`` time is
reached.
The deferral queue has constrained size and once the queue is full updates
are evicted using a first-in-first-out policy. This policy is used because
updates on the queue may have been made obsolete by newer updates written
to disk, and this is more likely for updates that have been on the queue
longest.
The iterator increments stats as follows:
* The `deferrals` stat is incremented for each update that is
rate-limited. Note that a individual update is rate-limited at most
once.
* The `skips` stat is incremented for each rate-limited update that is
not eventually yielded. This includes updates that are evicted from the
deferral queue and all updates that remain in the deferral queue when
``drain_until`` time is reached and the iterator terminates.
* The `drains` stat is incremented for each rate-limited update that is
eventually yielded.
Consequently, when this iterator terminates, the sum of `skips` and
`drains` is equal to the number of `deferrals`.
:param update_iterable: an async_pending update iterable
:param logger: a logger instance
:param stats: a SweepStats instance
:param num_buckets: number of buckets to divide container hashes into, the
more buckets total the less containers to a bucket
(once a busy container slows down a bucket the whole
bucket starts skipping)
:param max_elements_per_group_per_second: tunable, when skipping kicks in
:param skip_f: function to call with update_ctx when skipping it
bucket starts deferring)
:param max_elements_per_group_per_second: tunable, when deferring kicks in
:param max_deferred_elements: maximum number of deferred elements before
skipping starts. Each bucket may defer updates, but once the total
number of deferred updates summed across all buckets reaches this
value then all buckets will skip subsequent updates.
:param drain_until: time at which any remaining deferred elements must be
skipped and the iterator stops. Once the wrapped iterator has been
exhausted, this iterator will drain deferred elements from its buckets
until either all buckets have drained or this time is reached.
"""
def __init__(self, update_iterable, num_buckets,
max_elements_per_group_per_second,
skip_f=lambda update_ctx: None):
def __init__(self, update_iterable, logger, stats, num_buckets=1000,
max_elements_per_group_per_second=50,
max_deferred_elements=0,
drain_until=0):
self.iterator = iter(update_iterable)
self.logger = logger
self.stats = stats
# if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1)
# an array might be more efficient; but this is pretty cheap
self.next_update = [0.0 for _ in range(self.num_buckets)]
try:
self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
except ZeroDivisionError:
self.bucket_update_delta = -1
self.skip_f = skip_f
self.max_deferred_elements = max_deferred_elements
self.deferred_buckets = deque()
self.drain_until = drain_until
self.salt = str(uuid.uuid4())
self.buckets = [RateLimiterBucket(self.bucket_update_delta)
for _ in range(self.num_buckets)]
self.buckets_ordered_by_readiness = None
def __iter__(self):
return self
@ -76,15 +143,77 @@ class BucketizedUpdateSkippingLimiter(object):
acct, cont = split_update_path(update)
return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets
def _get_time(self):
return time.time()
def next(self):
# first iterate over the wrapped iterator...
for update_ctx in self.iterator:
bucket_key = self._bucket_key(update_ctx['update'])
now = time.time()
if self.next_update[bucket_key] > now:
self.skip_f(update_ctx)
continue
self.next_update[bucket_key] = now + self.bucket_update_delta
return update_ctx
bucket = self.buckets[self._bucket_key(update_ctx['update'])]
now = self._get_time()
if now >= bucket.wait_until:
# no need to ratelimit, just return next update
bucket.last_time = now
return update_ctx
self.stats.deferrals += 1
self.logger.increment("deferrals")
if self.max_deferred_elements > 0:
if len(self.deferred_buckets) >= self.max_deferred_elements:
# create space to defer this update by popping the least
# recent deferral from the least recently deferred bucket;
# updates read from disk recently are preferred over those
# read from disk less recently.
oldest_deferred_bucket = self.deferred_buckets.popleft()
oldest_deferred_bucket.deque.popleft()
self.stats.skips += 1
self.logger.increment("skips")
# append the update to the bucket's queue and append the bucket
# to the queue of deferred buckets
# note: buckets may have multiple entries in deferred_buckets,
# one for each deferred update in that particular bucket
bucket.deque.append(update_ctx)
self.deferred_buckets.append(bucket)
else:
self.stats.skips += 1
self.logger.increment("skips")
if self.buckets_ordered_by_readiness is None:
# initialise a queue of those buckets with deferred elements;
# buckets are queued in the chronological order in which they are
# ready to serve an element
self.buckets_ordered_by_readiness = queue.PriorityQueue()
for bucket in self.buckets:
if bucket:
self.buckets_ordered_by_readiness.put(bucket)
# now drain the buckets...
undrained_elements = []
while not self.buckets_ordered_by_readiness.empty():
now = self._get_time()
bucket = self.buckets_ordered_by_readiness.get_nowait()
if now < self.drain_until:
# wait for next element to be ready
time.sleep(max(0, bucket.wait_until - now))
# drain the most recently deferred element
item = bucket.deque.pop()
if bucket:
# bucket has more deferred elements, re-insert in queue in
# correct chronological position
bucket.last_time = self._get_time()
self.buckets_ordered_by_readiness.put(bucket)
self.stats.drains += 1
self.logger.increment("drains")
return item
else:
# time to stop iterating: gather all un-drained elements
undrained_elements.extend(bucket.deque)
if undrained_elements:
# report final batch of skipped elements
self.stats.skips += len(undrained_elements)
self.logger.update_stats("skips", len(undrained_elements))
raise StopIteration()
__next__ = next
@ -93,9 +222,18 @@ class BucketizedUpdateSkippingLimiter(object):
class SweepStats(object):
"""
Stats bucket for an update sweep
A measure of the rate at which updates are being rate-limited is:
deferrals / (deferrals + successes + failures - drains)
A measure of the rate at which updates are not being sent during a sweep
is:
skips / (skips + successes + failures)
"""
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
unlinks=0, redirects=0, skips=0):
unlinks=0, redirects=0, skips=0, deferrals=0, drains=0):
self.errors = errors
self.failures = failures
self.quarantines = quarantines
@ -103,10 +241,13 @@ class SweepStats(object):
self.unlinks = unlinks
self.redirects = redirects
self.skips = skips
self.deferrals = deferrals
self.drains = drains
def copy(self):
return type(self)(self.errors, self.failures, self.quarantines,
self.successes, self.unlinks)
self.successes, self.unlinks, self.redirects,
self.skips, self.deferrals, self.drains)
def since(self, other):
return type(self)(self.errors - other.errors,
@ -115,7 +256,9 @@ class SweepStats(object):
self.successes - other.successes,
self.unlinks - other.unlinks,
self.redirects - other.redirects,
self.skips - other.skips)
self.skips - other.skips,
self.deferrals - other.deferrals,
self.drains - other.drains)
def reset(self):
self.errors = 0
@ -125,6 +268,8 @@ class SweepStats(object):
self.unlinks = 0
self.redirects = 0
self.skips = 0
self.deferrals = 0
self.drains = 0
def __str__(self):
keys = (
@ -135,6 +280,8 @@ class SweepStats(object):
(self.errors, 'errors'),
(self.redirects, 'redirects'),
(self.skips, 'skips'),
(self.deferrals, 'deferrals'),
(self.drains, 'drains'),
)
return ', '.join('%d %s' % pair for pair in keys)
@ -191,6 +338,9 @@ class ObjectUpdater(Daemon):
DEFAULT_RECON_CACHE_PATH)
self.rcache = os.path.join(self.recon_cache_path, RECON_OBJECT_FILE)
self.stats = SweepStats()
self.max_deferred_updates = non_negative_int(
conf.get('max_deferred_updates', 10000))
self.begin = time.time()
def _listdir(self, path):
try:
@ -214,7 +364,7 @@ class ObjectUpdater(Daemon):
time.sleep(random() * self.interval)
while True:
self.logger.info('Begin object update sweep')
begin = time.time()
self.begin = time.time()
pids = []
# read from container ring to ensure it's fresh
self.get_container_ring().get_nodes('')
@ -248,7 +398,7 @@ class ObjectUpdater(Daemon):
sys.exit()
while pids:
pids.remove(os.wait()[0])
elapsed = time.time() - begin
elapsed = time.time() - self.begin
self.logger.info('Object update sweep completed: %.02fs',
elapsed)
dump_recon_cache({'object_updater_sweep': elapsed},
@ -259,7 +409,7 @@ class ObjectUpdater(Daemon):
def run_once(self, *args, **kwargs):
"""Run the updater once."""
self.logger.info('Begin object update single threaded sweep')
begin = time.time()
self.begin = time.time()
self.stats.reset()
for device in self._listdir(self.devices):
try:
@ -271,7 +421,7 @@ class ObjectUpdater(Daemon):
self.logger.warning('Skipping: %s', err)
continue
self.object_sweep(dev_path)
elapsed = time.time() - begin
elapsed = time.time() - self.begin
self.logger.info(
('Object update single-threaded sweep completed: '
'%(elapsed).02fs, %(stats)s'),
@ -404,18 +554,15 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
def skip_counting_f(update_ctx):
# in the future we could defer update_ctx
self.stats.skips += 1
self.logger.increment("skips")
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
ap_iter = BucketizedUpdateSkippingLimiter(
ap_iter, self.per_container_ratelimit_buckets,
ap_iter, self.logger, self.stats,
self.per_container_ratelimit_buckets,
self.max_objects_per_container_per_second,
skip_f=skip_counting_f)
max_deferred_elements=self.max_deferred_updates,
drain_until=self.begin + self.interval)
with ContextPool(self.concurrency) as pool:
for update_ctx in ap_iter:
pool.spawn(self.process_object_update, **update_ctx)
@ -440,8 +587,10 @@ class ObjectUpdater(Daemon):
'%(successes)d successes, %(failures)d failures, '
'%(quarantines)d quarantines, '
'%(unlinks)d unlinks, %(errors)d errors, '
'%(redirects)d redirects '
'%(skips)d skips '
'%(redirects)d redirects, '
'%(skips)d skips, '
'%(deferrals)d deferrals, '
'%(drains)d drains '
'(pid: %(pid)d)'),
{'device': device,
'elapsed': time.time() - start_time,
@ -452,7 +601,10 @@ class ObjectUpdater(Daemon):
'unlinks': sweep_totals.unlinks,
'errors': sweep_totals.errors,
'redirects': sweep_totals.redirects,
'skips': sweep_totals.skips})
'skips': sweep_totals.skips,
'deferrals': sweep_totals.deferrals,
'drains': sweep_totals.drains
})
def process_object_update(self, update_path, device, policy, update,
**kwargs):

View File

@ -141,6 +141,9 @@ class FakeLogger(logging.Logger, CaptureLog):
counts[metric] += 1
return counts
def get_update_stats(self):
return [call[0] for call in self.log_dict['update_stats']]
def setFormatter(self, obj):
self.formatter = obj

View File

@ -12,6 +12,8 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from queue import PriorityQueue
import eventlet
import six.moves.cPickle as pickle
import mock
@ -139,6 +141,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_second, 50.0)
self.assertEqual(daemon.max_objects_per_container_per_second, 0.0)
self.assertEqual(daemon.per_container_ratelimit_buckets, 1000)
self.assertEqual(daemon.max_deferred_updates, 10000)
# non-defaults
conf = {
@ -151,6 +154,7 @@ class TestObjectUpdater(unittest.TestCase):
'objects_per_second': '10.5',
'max_objects_per_container_per_second': '1.2',
'per_container_ratelimit_buckets': '100',
'max_deferred_updates': '0',
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.assertEqual(daemon.devices, '/some/where/else')
@ -162,6 +166,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_second, 10.5)
self.assertEqual(daemon.max_objects_per_container_per_second, 1.2)
self.assertEqual(daemon.per_container_ratelimit_buckets, 100)
self.assertEqual(daemon.max_deferred_updates, 0)
# check deprecated option
daemon = object_updater.ObjectUpdater({'slowdown': '0.04'},
@ -183,6 +188,9 @@ class TestObjectUpdater(unittest.TestCase):
check_bad({'per_container_ratelimit_buckets': '0'})
check_bad({'per_container_ratelimit_buckets': '-1'})
check_bad({'per_container_ratelimit_buckets': 'auto'})
check_bad({'max_deferred_updates': '-1'})
check_bad({'max_deferred_updates': '1.1'})
check_bad({'max_deferred_updates': 'auto'})
@mock.patch('os.listdir')
def test_listdir_with_exception(self, mock_listdir):
@ -1351,6 +1359,8 @@ class TestObjectUpdater(unittest.TestCase):
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 1,
'max_deferred_updates': 0, # do not re-iterate
'concurrency': 1
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
@ -1383,6 +1393,14 @@ class TestObjectUpdater(unittest.TestCase):
for req in fake_conn.requests),
{'/sda1/%s/a/c1' % c1_part: 3,
'/sda1/%s/.shards_a/c2_shard' % c2_part: 3})
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, '
'0 errors, 0 redirects, 9 skips, 9 deferrals, 0 drains',
info_lines[-1])
self.assertEqual({'skips': 9, 'successes': 2, 'unlinks': 2,
'deferrals': 9},
self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_unlimited(self, mock_recon):
@ -1414,14 +1432,24 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(expected_total, daemon.stats.successes)
self.assertEqual(0, daemon.stats.skips)
self.assertEqual([], self._find_async_pending_files())
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('11 successes, 0 failures, 0 quarantines, 11 unlinks, '
'0 errors, 0 redirects, 0 skips, 0 deferrals, 0 drains',
info_lines[-1])
self.assertEqual({'successes': 11, 'unlinks': 11},
self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_slow_responses(self, mock_recon):
def test_per_container_rate_limit_some_limited(self, mock_recon):
# simulate delays between buckets being fed so that only some updates
# are skipped
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
'max_deferred_updates': 0, # do not re-iterate
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
@ -1431,26 +1459,423 @@ class TestObjectUpdater(unittest.TestCase):
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
latencies = [.11, 0, .11, 0]
# first one always succeeds, second is skipped because it is only 0.05s
# behind the first, second succeeds because it is 0.11 behind the
# first, fourth is skipped
latencies = [0, 0.05, .051, 0]
expected_success = 2
fake_status_codes = [200] * 3 * expected_success
def fake_spawn(pool, *args, **kwargs):
# make each update delay the iter being called again
eventlet.sleep(latencies.pop(0))
return args[0](*args[1:], **kwargs)
contexts_fed_in = []
with mocked_http_conn(*fake_status_codes):
with mock.patch('swift.obj.updater.ContextPool.spawn', fake_spawn):
daemon.run_once()
def ratelimit_if(value):
contexts_fed_in.append(value)
# make each update delay before the iter being called again
eventlet.sleep(latencies.pop(0))
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
with mocked_http_conn(*fake_status_codes) as fake_conn, \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator):
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
paths_fed_in = ['/sda1/%(part)s/%(account)s/%(container)s/%(obj)s'
% dict(ctx['update'], part=c1_part)
for ctx in contexts_fed_in]
expected_update_paths = paths_fed_in[:1] * 3 + paths_fed_in[2:3] * 3
actual_update_paths = [req['path'] for req in fake_conn.requests]
self.assertEqual(expected_update_paths, actual_update_paths)
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, '
'0 errors, 0 redirects, 2 skips, 2 deferrals, 0 drains',
info_lines[-1])
self.assertEqual({'skips': 2, 'successes': 2, 'unlinks': 2,
'deferrals': 2},
self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_defer_2_skip_1(self, mock_recon):
# limit length of deferral queue so that some defer and some skip
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
# only one bucket needed for test
'per_container_ratelimit_buckets': 1,
'max_deferred_updates': 1,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
# all updates for same container
num_c1_files = 4
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
# first succeeds, second is deferred, third succeeds, fourth is
# deferred and bumps second out of deferral queue, fourth is re-tried
latencies = [0, 0.05, .051, 0, 0, .11]
expected_success = 3
contexts_fed_in = []
captured_queues = []
captured_skips_stats = []
def ratelimit_if(value):
contexts_fed_in.append(value)
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
now = [time()]
def fake_get_time(bucket_iter):
captured_skips_stats.append(
daemon.logger.get_increment_counts().get('skips', 0))
captured_queues.append(list(bucket_iter.buckets[0].deque))
# make each update delay before the iter being called again
now[0] += latencies.pop(0)
return now[0]
captured_updates = []
def fake_object_update(node, part, op, obj, *args, **kwargs):
captured_updates.append((node, part, op, obj))
return True, node['id'], False
with mock.patch(
'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
fake_get_time), \
mock.patch.object(daemon, 'object_update',
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator):
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
orig_iteration = contexts_fed_in[:num_c1_files]
# we first capture every async fed in one by one
objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration]
self.assertEqual(num_c1_files, len(set(objs_fed_in)))
# keep track of this order for context
aorder = {ctx['update']['obj']: 'a%02d' % i
for i, ctx in enumerate(orig_iteration)}
expected_drops = (1,)
expected_updates_sent = []
for i, obj in enumerate(objs_fed_in):
if i in expected_drops:
continue
# triple replica, request to 3 nodes each obj!
expected_updates_sent.extend([obj] * 3)
actual_updates_sent = [
utils.split_path(update[3], minsegs=3)[-1]
for update in captured_updates
]
self.assertEqual([aorder[o] for o in expected_updates_sent],
[aorder[o] for o in actual_updates_sent])
self.assertEqual([0, 0, 0, 0, 1], captured_skips_stats)
expected_deferrals = [
[],
[],
[objs_fed_in[1]],
[objs_fed_in[1]],
[objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
[[ctx['update']['obj'] for ctx in q] for q in captured_queues])
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('3 successes, 0 failures, 0 quarantines, 3 unlinks, '
'0 errors, 0 redirects, 1 skips, 2 deferrals, 1 drains',
info_lines[-1])
self.assertEqual(
{'skips': 1, 'successes': 3, 'unlinks': 3, 'deferrals': 2,
'drains': 1}, self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_defer_3_skip_1(self, mock_recon):
# limit length of deferral queue so that some defer and some skip
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
# only one bucket needed for test
'per_container_ratelimit_buckets': 1,
'max_deferred_updates': 2,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
# all updates for same container
num_c1_files = 5
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
# indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral
# queue by 4; 4, 3 are then drained
latencies = [0, 0.05, .051, 0, 0, 0, .11, .01]
expected_success = 4
contexts_fed_in = []
captured_queues = []
captured_skips_stats = []
def ratelimit_if(value):
contexts_fed_in.append(value)
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
now = [time()]
def fake_get_time(bucket_iter):
captured_skips_stats.append(
daemon.logger.get_increment_counts().get('skips', 0))
captured_queues.append(list(bucket_iter.buckets[0].deque))
# make each update delay before the iter being called again
now[0] += latencies.pop(0)
return now[0]
captured_updates = []
def fake_object_update(node, part, op, obj, *args, **kwargs):
captured_updates.append((node, part, op, obj))
return True, node['id'], False
with mock.patch(
'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
fake_get_time), \
mock.patch.object(daemon, 'object_update',
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
orig_iteration = contexts_fed_in[:num_c1_files]
# we first capture every async fed in one by one
objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration]
self.assertEqual(num_c1_files, len(set(objs_fed_in)))
# keep track of this order for context
aorder = {ctx['update']['obj']: 'a%02d' % i
for i, ctx in enumerate(orig_iteration)}
expected_updates_sent = []
for index_sent in (0, 2, 4, 3):
expected_updates_sent.extend(
[contexts_fed_in[index_sent]['update']['obj']] * 3)
actual_updates_sent = [
utils.split_path(update[3], minsegs=3)[-1]
for update in captured_updates
]
self.assertEqual([aorder[o] for o in expected_updates_sent],
[aorder[o] for o in actual_updates_sent])
self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats)
expected_deferrals = [
[],
[],
[objs_fed_in[1]],
[objs_fed_in[1]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[3]], # note: rightmost element is drained
[objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
[[ctx['update']['obj'] for ctx in q] for q in captured_queues])
actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list]
self.assertEqual(2, len(actual_sleeps))
self.assertAlmostEqual(0.1, actual_sleeps[0], 3)
self.assertAlmostEqual(0.09, actual_sleeps[1], 3)
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('4 successes, 0 failures, 0 quarantines, 4 unlinks, '
'0 errors, 0 redirects, 1 skips, 3 deferrals, 2 drains',
info_lines[-1])
self.assertEqual(
{'skips': 1, 'successes': 4, 'unlinks': 4, 'deferrals': 3,
'drains': 2}, self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_unsent_deferrals(self, mock_recon):
# make some updates defer until interval is reached and cycle
# terminates
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
# only one bucket needed for test
'per_container_ratelimit_buckets': 1,
'max_deferred_updates': 5,
'interval': 0.4,
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
os.mkdir(self.async_dir)
# all updates for same container
num_c1_files = 7
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
# first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred
# last 2 deferred items sent before interval elapses
latencies = [0, .05, 0.051, 0, 0, .11, 0, 0,
0.1, 0, 0.1, 0] # total 0.42
expected_success = 5
contexts_fed_in = []
captured_queues = []
captured_skips_stats = []
def ratelimit_if(value):
contexts_fed_in.append(value)
return False # returning False overrides normal ratelimiting
orig_rate_limited_iterator = utils.RateLimitedIterator
def fake_rate_limited_iterator(*args, **kwargs):
# insert our own rate limiting function
kwargs['ratelimit_if'] = ratelimit_if
return orig_rate_limited_iterator(*args, **kwargs)
start = time()
now = [start]
def fake_get_time(bucket_iter):
if not captured_skips_stats:
daemon.begin = now[0]
captured_skips_stats.append(
daemon.logger.get_increment_counts().get('skips', 0))
captured_queues.append(list(bucket_iter.buckets[0].deque))
# insert delay each time iter is called
now[0] += latencies.pop(0)
return now[0]
captured_updates = []
def fake_object_update(node, part, op, obj, *args, **kwargs):
captured_updates.append((node, part, op, obj))
return True, node['id'], False
with mock.patch(
'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
fake_get_time), \
mock.patch.object(daemon, 'object_update',
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
expected_updates_sent = []
for index_sent in (0, 2, 5, 6, 4):
expected_updates_sent.extend(
[contexts_fed_in[index_sent]['update']['obj']] * 3)
actual_updates_sent = [
utils.split_path(update[3], minsegs=3)[-1]
for update in captured_updates
]
self.assertEqual(expected_updates_sent, actual_updates_sent)
# skips (un-drained deferrals) not reported until end of cycle
self.assertEqual([0] * 12, captured_skips_stats)
objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in]
expected_deferrals = [
# queue content before app_iter feeds next update_ctx
[],
[],
[objs_fed_in[1]],
[objs_fed_in[1]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
# queue content before each update_ctx is drained from queue...
# note: rightmost element is drained
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[1], objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
[[ctx['update']['obj'] for ctx in q] for q in captured_queues])
actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list]
self.assertEqual(2, len(actual_sleeps))
self.assertAlmostEqual(0.1, actual_sleeps[0], 3)
self.assertAlmostEqual(0.1, actual_sleeps[1], 3)
info_lines = self.logger.get_lines_for_level('info')
self.assertTrue(info_lines)
self.assertIn('5 successes, 0 failures, 0 quarantines, 5 unlinks, '
'0 errors, 0 redirects, 2 skips, 4 deferrals, 2 drains',
info_lines[-1])
self.assertEqual(
{'successes': 5, 'unlinks': 5, 'deferrals': 4, 'drains': 2},
self.logger.get_increment_counts())
self.assertEqual([('skips', 2)], self.logger.get_update_stats())
class TestObjectUpdaterFunctions(unittest.TestCase):
@ -1477,20 +1902,28 @@ class TestObjectUpdaterFunctions(unittest.TestCase):
class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
def setUp(self):
self.logger = debug_logger()
self.stats = object_updater.SweepStats()
def test_init(self):
it = object_updater.BucketizedUpdateSkippingLimiter([3, 1], 1000, 10)
it = object_updater.BucketizedUpdateSkippingLimiter(
[3, 1], self.logger, self.stats, 1000, 10)
self.assertEqual(1000, it.num_buckets)
self.assertEqual(0.1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
# rate of 0 implies unlimited
it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 9, 0)
it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 9, 0)
self.assertEqual(9, it.num_buckets)
self.assertEqual(-1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
# num_buckets is collared at 1
it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 0, 1)
it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 0, 1)
self.assertEqual(1, it.num_buckets)
self.assertEqual(1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
@ -1501,8 +1934,11 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(20)]
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 9, 0)
iter(update_ctxs), self.logger, self.stats, 9, 0)
self.assertEqual(update_ctxs, [x for x in it])
self.assertEqual(0, self.stats.skips)
self.assertEqual(0, self.stats.drains)
self.assertEqual(0, self.stats.deferrals)
def test_iteration_ratelimited(self):
# verify iteration at limited rate - single bucket
@ -1510,23 +1946,246 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(2)]
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 1, 0.1)
iter(update_ctxs), self.logger, self.stats, 1, 0.1)
# second update is skipped
self.assertEqual(update_ctxs[:1], [x for x in it])
self.assertEqual(1, self.stats.skips)
self.assertEqual(0, self.stats.drains)
self.assertEqual(1, self.stats.deferrals)
def test_iteration_ratelimited_with_callback(self):
# verify iteration at limited rate - single bucket
skipped = []
def on_skip(update_ctx):
skipped.append(update_ctx)
def test_deferral_single_bucket(self):
# verify deferral - single bucket
now = time()
update_ctxs = [
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(2)]
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), 1, 0.1, skip_f=on_skip)
self.assertEqual(update_ctxs[:1], [x for x in it])
self.assertEqual(update_ctxs[1:], skipped)
for i in range(4)]
# enough capacity for all deferrals
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[2], # deferrals...
update_ctxs[1]],
actual)
self.assertEqual(2, mock_sleep.call_count)
self.assertEqual(0, self.stats.skips)
self.assertEqual(2, self.stats.drains)
self.assertEqual(2, self.stats.deferrals)
self.stats.reset()
# only space for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=1,
drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[2]], # deferrals...
actual)
self.assertEqual(1, mock_sleep.call_count)
self.assertEqual(1, self.stats.skips)
self.assertEqual(1, self.stats.drains)
self.assertEqual(2, self.stats.deferrals)
self.stats.reset()
# only time for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[2]], # deferrals...
actual)
self.assertEqual(1, mock_sleep.call_count)
self.assertEqual(1, self.stats.skips)
self.assertEqual(1, self.stats.drains)
self.assertEqual(2, self.stats.deferrals)
self.stats.reset()
# only space for two deferrals, only time for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now,
now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
drain_until=now + 10)
actual = [x for x in it]
self.assertEqual([update_ctxs[0],
update_ctxs[3]], # deferrals...
actual)
self.assertEqual(1, mock_sleep.call_count)
self.assertEqual(2, self.stats.skips)
self.assertEqual(1, self.stats.drains)
self.assertEqual(3, self.stats.deferrals)
self.stats.reset()
def test_deferral_multiple_buckets(self):
# verify deferral - multiple buckets
update_ctxs_1 = [
{'update': {'account': 'a', 'container': 'c1', 'obj': '%3d' % i}}
for i in range(3)]
update_ctxs_2 = [
{'update': {'account': 'a', 'container': 'c2', 'obj': '%3d' % i}}
for i in range(3)]
time_iter = itertools.count(time(), 0.001)
# deferrals stick in both buckets
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(12)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
max_deferred_elements=4,
drain_until=next(time_iter))
it.salt = '' # make container->bucket hashing predictable
actual = [x for x in it]
self.assertEqual([update_ctxs_1[0],
update_ctxs_2[0],
update_ctxs_1[2], # deferrals...
update_ctxs_2[2],
update_ctxs_1[1],
update_ctxs_2[1],
],
actual)
self.assertEqual(4, mock_sleep.call_count)
self.assertEqual(0, self.stats.skips)
self.assertEqual(4, self.stats.drains)
self.assertEqual(4, self.stats.deferrals)
self.stats.reset()
# oldest deferral bumped from one bucket due to max_deferrals == 3
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
max_deferred_elements=3,
drain_until=next(time_iter))
it.salt = '' # make container->bucket hashing predictable
actual = [x for x in it]
self.assertEqual([update_ctxs_1[0],
update_ctxs_2[0],
update_ctxs_1[2], # deferrals...
update_ctxs_2[2],
update_ctxs_2[1],
],
actual)
self.assertEqual(3, mock_sleep.call_count)
self.assertEqual(1, self.stats.skips)
self.assertEqual(3, self.stats.drains)
self.assertEqual(4, self.stats.deferrals)
self.stats.reset()
# older deferrals bumped from one bucket due to max_deferrals == 2
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
max_deferred_elements=2,
drain_until=next(time_iter))
it.salt = '' # make container->bucket hashing predictable
actual = [x for x in it]
self.assertEqual([update_ctxs_1[0],
update_ctxs_2[0],
update_ctxs_2[2], # deferrals...
update_ctxs_2[1],
],
actual)
self.assertEqual(2, mock_sleep.call_count)
self.assertEqual(2, self.stats.skips)
self.assertEqual(2, self.stats.drains)
self.assertEqual(4, self.stats.deferrals)
self.stats.reset()
class TestRateLimiterBucket(unittest.TestCase):
def test_wait_until(self):
b1 = object_updater.RateLimiterBucket(10)
self.assertEqual(10, b1.wait_until)
b1.last_time = b1.wait_until
self.assertEqual(20, b1.wait_until)
b1.last_time = 12345.678
self.assertEqual(12355.678, b1.wait_until)
def test_len(self):
b1 = object_updater.RateLimiterBucket(10)
b1.deque.append(1)
b1.deque.append(2)
self.assertEqual(2, len(b1))
b1.deque.pop()
self.assertEqual(1, len(b1))
def test_bool(self):
b1 = object_updater.RateLimiterBucket(10)
self.assertFalse(b1)
b1.deque.append(1)
self.assertTrue(b1)
b1.deque.pop()
self.assertFalse(b1)
def test_bucket_ordering(self):
time_iter = itertools.count(time(), step=0.001)
b1 = object_updater.RateLimiterBucket(10)
b2 = object_updater.RateLimiterBucket(10)
b2.last_time = next(time_iter)
buckets = PriorityQueue()
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)])
b1.last_time = next(time_iter)
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])
class TestSweepStats(unittest.TestCase):
def test_copy(self):
num_props = len(vars(object_updater.SweepStats()))
stats = object_updater.SweepStats(*range(1, num_props + 1))
stats2 = stats.copy()
self.assertEqual(vars(stats), vars(stats2))
def test_since(self):
stats = object_updater.SweepStats(1, 2, 3, 4, 5, 6, 7, 8, 9)
stats2 = object_updater.SweepStats(4, 6, 8, 10, 12, 14, 16, 18, 20)
expected = object_updater.SweepStats(3, 4, 5, 6, 7, 8, 9, 10, 11)
self.assertEqual(vars(expected), vars(stats2.since(stats)))
def test_reset(self):
num_props = len(vars(object_updater.SweepStats()))
stats = object_updater.SweepStats(*range(1, num_props + 1))
stats.reset()
expected = object_updater.SweepStats()
self.assertEqual(vars(expected), vars(stats))
def test_str(self):
num_props = len(vars(object_updater.SweepStats()))
stats = object_updater.SweepStats(*range(1, num_props + 1))
self.assertEqual(
'4 successes, 2 failures, 3 quarantines, 5 unlinks, 1 errors, '
'6 redirects, 7 skips, 8 deferrals, 9 drains', str(stats))
if __name__ == '__main__':