diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 5329bca8fa..9f9f64515a 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -483,6 +483,12 @@ use = egg:swift#recon # be an integer greater than 0. # per_container_ratelimit_buckets = 1000 # +# Updates that cannot be sent due to per-container rate-limiting may be +# deferred and re-tried at the end of the updater cycle. This option constrains +# the size of the in-memory data structure used to store deferred updates. +# Must be an integer value greater than or equal to 0. +# max_deferred_updates = 10000 +# # slowdown will sleep that amount between objects. Deprecated; use # objects_per_second instead. # slowdown = 0.01 diff --git a/swift/obj/updater.py b/swift/obj/updater.py index b90b63f8a7..01c609f13c 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -12,6 +12,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +from six.moves import queue import six.moves.cPickle as pickle import errno @@ -21,6 +22,7 @@ import sys import time import uuid from random import random, shuffle +from collections import deque from eventlet import spawn, Timeout @@ -31,7 +33,7 @@ from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ - non_negative_float, config_positive_int_value + non_negative_float, config_positive_int_value, non_negative_int from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -41,33 +43,98 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \ HTTP_MOVED_PERMANENTLY +class RateLimiterBucket(object): + def __init__(self, update_delta): + self.update_delta = update_delta + self.last_time = 0 + self.deque = deque() + + @property + def wait_until(self): + return self.last_time + self.update_delta + + def __len__(self): + return len(self.deque) + + def __bool__(self): + return bool(self.deque) + + __nonzero__ = __bool__ # py2 + + def __lt__(self, other): + # used to sort buckets by readiness + if isinstance(other, RateLimiterBucket): + return self.wait_until < other.wait_until + return self.wait_until < other + + class BucketizedUpdateSkippingLimiter(object): """ - Wrap an iterator to filter elements that show up too often. + Wrap an iterator to rate-limit updates on a per-bucket basis, where updates + are mapped to buckets by hashing their destination path. If an update is + rate-limited then it is placed on a deferral queue and may be sent later if + the wrapped iterator is exhausted before the ``drain_until`` time is + reached. + + The deferral queue has constrained size and once the queue is full updates + are evicted using a first-in-first-out policy. This policy is used because + updates on the queue may have been made obsolete by newer updates written + to disk, and this is more likely for updates that have been on the queue + longest. + + The iterator increments stats as follows: + + * The `deferrals` stat is incremented for each update that is + rate-limited. Note that a individual update is rate-limited at most + once. + * The `skips` stat is incremented for each rate-limited update that is + not eventually yielded. This includes updates that are evicted from the + deferral queue and all updates that remain in the deferral queue when + ``drain_until`` time is reached and the iterator terminates. + * The `drains` stat is incremented for each rate-limited update that is + eventually yielded. + + Consequently, when this iterator terminates, the sum of `skips` and + `drains` is equal to the number of `deferrals`. :param update_iterable: an async_pending update iterable + :param logger: a logger instance + :param stats: a SweepStats instance :param num_buckets: number of buckets to divide container hashes into, the more buckets total the less containers to a bucket (once a busy container slows down a bucket the whole - bucket starts skipping) - :param max_elements_per_group_per_second: tunable, when skipping kicks in - :param skip_f: function to call with update_ctx when skipping it + bucket starts deferring) + :param max_elements_per_group_per_second: tunable, when deferring kicks in + :param max_deferred_elements: maximum number of deferred elements before + skipping starts. Each bucket may defer updates, but once the total + number of deferred updates summed across all buckets reaches this + value then all buckets will skip subsequent updates. + :param drain_until: time at which any remaining deferred elements must be + skipped and the iterator stops. Once the wrapped iterator has been + exhausted, this iterator will drain deferred elements from its buckets + until either all buckets have drained or this time is reached. """ - def __init__(self, update_iterable, num_buckets, - max_elements_per_group_per_second, - skip_f=lambda update_ctx: None): + def __init__(self, update_iterable, logger, stats, num_buckets=1000, + max_elements_per_group_per_second=50, + max_deferred_elements=0, + drain_until=0): self.iterator = iter(update_iterable) + self.logger = logger + self.stats = stats # if we want a smaller "blast radius" we could make this number bigger self.num_buckets = max(num_buckets, 1) - # an array might be more efficient; but this is pretty cheap - self.next_update = [0.0 for _ in range(self.num_buckets)] try: self.bucket_update_delta = 1.0 / max_elements_per_group_per_second except ZeroDivisionError: self.bucket_update_delta = -1 - self.skip_f = skip_f + self.max_deferred_elements = max_deferred_elements + self.deferred_buckets = deque() + self.drain_until = drain_until self.salt = str(uuid.uuid4()) + self.buckets = [RateLimiterBucket(self.bucket_update_delta) + for _ in range(self.num_buckets)] + self.buckets_ordered_by_readiness = None def __iter__(self): return self @@ -76,15 +143,77 @@ class BucketizedUpdateSkippingLimiter(object): acct, cont = split_update_path(update) return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets + def _get_time(self): + return time.time() + def next(self): + # first iterate over the wrapped iterator... for update_ctx in self.iterator: - bucket_key = self._bucket_key(update_ctx['update']) - now = time.time() - if self.next_update[bucket_key] > now: - self.skip_f(update_ctx) - continue - self.next_update[bucket_key] = now + self.bucket_update_delta - return update_ctx + bucket = self.buckets[self._bucket_key(update_ctx['update'])] + now = self._get_time() + if now >= bucket.wait_until: + # no need to ratelimit, just return next update + bucket.last_time = now + return update_ctx + + self.stats.deferrals += 1 + self.logger.increment("deferrals") + if self.max_deferred_elements > 0: + if len(self.deferred_buckets) >= self.max_deferred_elements: + # create space to defer this update by popping the least + # recent deferral from the least recently deferred bucket; + # updates read from disk recently are preferred over those + # read from disk less recently. + oldest_deferred_bucket = self.deferred_buckets.popleft() + oldest_deferred_bucket.deque.popleft() + self.stats.skips += 1 + self.logger.increment("skips") + # append the update to the bucket's queue and append the bucket + # to the queue of deferred buckets + # note: buckets may have multiple entries in deferred_buckets, + # one for each deferred update in that particular bucket + bucket.deque.append(update_ctx) + self.deferred_buckets.append(bucket) + else: + self.stats.skips += 1 + self.logger.increment("skips") + + if self.buckets_ordered_by_readiness is None: + # initialise a queue of those buckets with deferred elements; + # buckets are queued in the chronological order in which they are + # ready to serve an element + self.buckets_ordered_by_readiness = queue.PriorityQueue() + for bucket in self.buckets: + if bucket: + self.buckets_ordered_by_readiness.put(bucket) + + # now drain the buckets... + undrained_elements = [] + while not self.buckets_ordered_by_readiness.empty(): + now = self._get_time() + bucket = self.buckets_ordered_by_readiness.get_nowait() + if now < self.drain_until: + # wait for next element to be ready + time.sleep(max(0, bucket.wait_until - now)) + # drain the most recently deferred element + item = bucket.deque.pop() + if bucket: + # bucket has more deferred elements, re-insert in queue in + # correct chronological position + bucket.last_time = self._get_time() + self.buckets_ordered_by_readiness.put(bucket) + self.stats.drains += 1 + self.logger.increment("drains") + return item + else: + # time to stop iterating: gather all un-drained elements + undrained_elements.extend(bucket.deque) + + if undrained_elements: + # report final batch of skipped elements + self.stats.skips += len(undrained_elements) + self.logger.update_stats("skips", len(undrained_elements)) + raise StopIteration() __next__ = next @@ -93,9 +222,18 @@ class BucketizedUpdateSkippingLimiter(object): class SweepStats(object): """ Stats bucket for an update sweep + + A measure of the rate at which updates are being rate-limited is: + + deferrals / (deferrals + successes + failures - drains) + + A measure of the rate at which updates are not being sent during a sweep + is: + + skips / (skips + successes + failures) """ def __init__(self, errors=0, failures=0, quarantines=0, successes=0, - unlinks=0, redirects=0, skips=0): + unlinks=0, redirects=0, skips=0, deferrals=0, drains=0): self.errors = errors self.failures = failures self.quarantines = quarantines @@ -103,10 +241,13 @@ class SweepStats(object): self.unlinks = unlinks self.redirects = redirects self.skips = skips + self.deferrals = deferrals + self.drains = drains def copy(self): return type(self)(self.errors, self.failures, self.quarantines, - self.successes, self.unlinks) + self.successes, self.unlinks, self.redirects, + self.skips, self.deferrals, self.drains) def since(self, other): return type(self)(self.errors - other.errors, @@ -115,7 +256,9 @@ class SweepStats(object): self.successes - other.successes, self.unlinks - other.unlinks, self.redirects - other.redirects, - self.skips - other.skips) + self.skips - other.skips, + self.deferrals - other.deferrals, + self.drains - other.drains) def reset(self): self.errors = 0 @@ -125,6 +268,8 @@ class SweepStats(object): self.unlinks = 0 self.redirects = 0 self.skips = 0 + self.deferrals = 0 + self.drains = 0 def __str__(self): keys = ( @@ -135,6 +280,8 @@ class SweepStats(object): (self.errors, 'errors'), (self.redirects, 'redirects'), (self.skips, 'skips'), + (self.deferrals, 'deferrals'), + (self.drains, 'drains'), ) return ', '.join('%d %s' % pair for pair in keys) @@ -191,6 +338,9 @@ class ObjectUpdater(Daemon): DEFAULT_RECON_CACHE_PATH) self.rcache = os.path.join(self.recon_cache_path, RECON_OBJECT_FILE) self.stats = SweepStats() + self.max_deferred_updates = non_negative_int( + conf.get('max_deferred_updates', 10000)) + self.begin = time.time() def _listdir(self, path): try: @@ -214,7 +364,7 @@ class ObjectUpdater(Daemon): time.sleep(random() * self.interval) while True: self.logger.info('Begin object update sweep') - begin = time.time() + self.begin = time.time() pids = [] # read from container ring to ensure it's fresh self.get_container_ring().get_nodes('') @@ -248,7 +398,7 @@ class ObjectUpdater(Daemon): sys.exit() while pids: pids.remove(os.wait()[0]) - elapsed = time.time() - begin + elapsed = time.time() - self.begin self.logger.info('Object update sweep completed: %.02fs', elapsed) dump_recon_cache({'object_updater_sweep': elapsed}, @@ -259,7 +409,7 @@ class ObjectUpdater(Daemon): def run_once(self, *args, **kwargs): """Run the updater once.""" self.logger.info('Begin object update single threaded sweep') - begin = time.time() + self.begin = time.time() self.stats.reset() for device in self._listdir(self.devices): try: @@ -271,7 +421,7 @@ class ObjectUpdater(Daemon): self.logger.warning('Skipping: %s', err) continue self.object_sweep(dev_path) - elapsed = time.time() - begin + elapsed = time.time() - self.begin self.logger.info( ('Object update single-threaded sweep completed: ' '%(elapsed).02fs, %(stats)s'), @@ -404,18 +554,15 @@ class ObjectUpdater(Daemon): self.logger.info("Object update sweep starting on %s (pid: %d)", device, my_pid) - def skip_counting_f(update_ctx): - # in the future we could defer update_ctx - self.stats.skips += 1 - self.logger.increment("skips") - ap_iter = RateLimitedIterator( self._iter_async_pendings(device), elements_per_second=self.max_objects_per_second) ap_iter = BucketizedUpdateSkippingLimiter( - ap_iter, self.per_container_ratelimit_buckets, + ap_iter, self.logger, self.stats, + self.per_container_ratelimit_buckets, self.max_objects_per_container_per_second, - skip_f=skip_counting_f) + max_deferred_elements=self.max_deferred_updates, + drain_until=self.begin + self.interval) with ContextPool(self.concurrency) as pool: for update_ctx in ap_iter: pool.spawn(self.process_object_update, **update_ctx) @@ -440,8 +587,10 @@ class ObjectUpdater(Daemon): '%(successes)d successes, %(failures)d failures, ' '%(quarantines)d quarantines, ' '%(unlinks)d unlinks, %(errors)d errors, ' - '%(redirects)d redirects ' - '%(skips)d skips ' + '%(redirects)d redirects, ' + '%(skips)d skips, ' + '%(deferrals)d deferrals, ' + '%(drains)d drains ' '(pid: %(pid)d)'), {'device': device, 'elapsed': time.time() - start_time, @@ -452,7 +601,10 @@ class ObjectUpdater(Daemon): 'unlinks': sweep_totals.unlinks, 'errors': sweep_totals.errors, 'redirects': sweep_totals.redirects, - 'skips': sweep_totals.skips}) + 'skips': sweep_totals.skips, + 'deferrals': sweep_totals.deferrals, + 'drains': sweep_totals.drains + }) def process_object_update(self, update_path, device, policy, update, **kwargs): diff --git a/test/debug_logger.py b/test/debug_logger.py index 99f7ec3a53..e1fc84e69c 100644 --- a/test/debug_logger.py +++ b/test/debug_logger.py @@ -141,6 +141,9 @@ class FakeLogger(logging.Logger, CaptureLog): counts[metric] += 1 return counts + def get_update_stats(self): + return [call[0] for call in self.log_dict['update_stats']] + def setFormatter(self, obj): self.formatter = obj diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index a45851f39d..fa3fde32f5 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -12,6 +12,8 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +from queue import PriorityQueue + import eventlet import six.moves.cPickle as pickle import mock @@ -139,6 +141,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.max_objects_per_second, 50.0) self.assertEqual(daemon.max_objects_per_container_per_second, 0.0) self.assertEqual(daemon.per_container_ratelimit_buckets, 1000) + self.assertEqual(daemon.max_deferred_updates, 10000) # non-defaults conf = { @@ -151,6 +154,7 @@ class TestObjectUpdater(unittest.TestCase): 'objects_per_second': '10.5', 'max_objects_per_container_per_second': '1.2', 'per_container_ratelimit_buckets': '100', + 'max_deferred_updates': '0', } daemon = object_updater.ObjectUpdater(conf, logger=self.logger) self.assertEqual(daemon.devices, '/some/where/else') @@ -162,6 +166,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.max_objects_per_second, 10.5) self.assertEqual(daemon.max_objects_per_container_per_second, 1.2) self.assertEqual(daemon.per_container_ratelimit_buckets, 100) + self.assertEqual(daemon.max_deferred_updates, 0) # check deprecated option daemon = object_updater.ObjectUpdater({'slowdown': '0.04'}, @@ -183,6 +188,9 @@ class TestObjectUpdater(unittest.TestCase): check_bad({'per_container_ratelimit_buckets': '0'}) check_bad({'per_container_ratelimit_buckets': '-1'}) check_bad({'per_container_ratelimit_buckets': 'auto'}) + check_bad({'max_deferred_updates': '-1'}) + check_bad({'max_deferred_updates': '1.1'}) + check_bad({'max_deferred_updates': 'auto'}) @mock.patch('os.listdir') def test_listdir_with_exception(self, mock_listdir): @@ -1351,6 +1359,8 @@ class TestObjectUpdater(unittest.TestCase): 'mount_check': 'false', 'swift_dir': self.testdir, 'max_objects_per_container_per_second': 1, + 'max_deferred_updates': 0, # do not re-iterate + 'concurrency': 1 } daemon = object_updater.ObjectUpdater(conf, logger=self.logger) self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) @@ -1383,6 +1393,14 @@ class TestObjectUpdater(unittest.TestCase): for req in fake_conn.requests), {'/sda1/%s/a/c1' % c1_part: 3, '/sda1/%s/.shards_a/c2_shard' % c2_part: 3}) + info_lines = self.logger.get_lines_for_level('info') + self.assertTrue(info_lines) + self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, ' + '0 errors, 0 redirects, 9 skips, 9 deferrals, 0 drains', + info_lines[-1]) + self.assertEqual({'skips': 9, 'successes': 2, 'unlinks': 2, + 'deferrals': 9}, + self.logger.get_increment_counts()) @mock.patch('swift.obj.updater.dump_recon_cache') def test_per_container_rate_limit_unlimited(self, mock_recon): @@ -1414,14 +1432,24 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(expected_total, daemon.stats.successes) self.assertEqual(0, daemon.stats.skips) self.assertEqual([], self._find_async_pending_files()) + info_lines = self.logger.get_lines_for_level('info') + self.assertTrue(info_lines) + self.assertIn('11 successes, 0 failures, 0 quarantines, 11 unlinks, ' + '0 errors, 0 redirects, 0 skips, 0 deferrals, 0 drains', + info_lines[-1]) + self.assertEqual({'successes': 11, 'unlinks': 11}, + self.logger.get_increment_counts()) @mock.patch('swift.obj.updater.dump_recon_cache') - def test_per_container_rate_limit_slow_responses(self, mock_recon): + def test_per_container_rate_limit_some_limited(self, mock_recon): + # simulate delays between buckets being fed so that only some updates + # are skipped conf = { 'devices': self.devices_dir, 'mount_check': 'false', 'swift_dir': self.testdir, 'max_objects_per_container_per_second': 10, + 'max_deferred_updates': 0, # do not re-iterate } daemon = object_updater.ObjectUpdater(conf, logger=self.logger) self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) @@ -1431,26 +1459,423 @@ class TestObjectUpdater(unittest.TestCase): for i in range(num_c1_files): obj_name = 'o%02d' % i self._make_async_pending_pickle('a', 'c1', obj_name) + c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1') expected_total = num_c1_files self.assertEqual(expected_total, len(self._find_async_pending_files())) - latencies = [.11, 0, .11, 0] + # first one always succeeds, second is skipped because it is only 0.05s + # behind the first, second succeeds because it is 0.11 behind the + # first, fourth is skipped + latencies = [0, 0.05, .051, 0] expected_success = 2 fake_status_codes = [200] * 3 * expected_success - def fake_spawn(pool, *args, **kwargs): - # make each update delay the iter being called again - eventlet.sleep(latencies.pop(0)) - return args[0](*args[1:], **kwargs) + contexts_fed_in = [] - with mocked_http_conn(*fake_status_codes): - with mock.patch('swift.obj.updater.ContextPool.spawn', fake_spawn): - daemon.run_once() + def ratelimit_if(value): + contexts_fed_in.append(value) + # make each update delay before the iter being called again + eventlet.sleep(latencies.pop(0)) + return False # returning False overrides normal ratelimiting + + orig_rate_limited_iterator = utils.RateLimitedIterator + + def fake_rate_limited_iterator(*args, **kwargs): + # insert our own rate limiting function + kwargs['ratelimit_if'] = ratelimit_if + return orig_rate_limited_iterator(*args, **kwargs) + + with mocked_http_conn(*fake_status_codes) as fake_conn, \ + mock.patch('swift.obj.updater.RateLimitedIterator', + fake_rate_limited_iterator): + daemon.run_once() self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success self.assertEqual(expected_skipped, daemon.stats.skips) self.assertEqual(expected_skipped, len(self._find_async_pending_files())) + paths_fed_in = ['/sda1/%(part)s/%(account)s/%(container)s/%(obj)s' + % dict(ctx['update'], part=c1_part) + for ctx in contexts_fed_in] + expected_update_paths = paths_fed_in[:1] * 3 + paths_fed_in[2:3] * 3 + actual_update_paths = [req['path'] for req in fake_conn.requests] + self.assertEqual(expected_update_paths, actual_update_paths) + info_lines = self.logger.get_lines_for_level('info') + self.assertTrue(info_lines) + self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, ' + '0 errors, 0 redirects, 2 skips, 2 deferrals, 0 drains', + info_lines[-1]) + self.assertEqual({'skips': 2, 'successes': 2, 'unlinks': 2, + 'deferrals': 2}, + self.logger.get_increment_counts()) + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_per_container_rate_limit_defer_2_skip_1(self, mock_recon): + # limit length of deferral queue so that some defer and some skip + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'max_objects_per_container_per_second': 10, + # only one bucket needed for test + 'per_container_ratelimit_buckets': 1, + 'max_deferred_updates': 1, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) + os.mkdir(self.async_dir) + # all updates for same container + num_c1_files = 4 + for i in range(num_c1_files): + obj_name = 'o%02d' % i + self._make_async_pending_pickle('a', 'c1', obj_name) + c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1') + expected_total = num_c1_files + self.assertEqual(expected_total, + len(self._find_async_pending_files())) + # first succeeds, second is deferred, third succeeds, fourth is + # deferred and bumps second out of deferral queue, fourth is re-tried + latencies = [0, 0.05, .051, 0, 0, .11] + expected_success = 3 + + contexts_fed_in = [] + captured_queues = [] + captured_skips_stats = [] + + def ratelimit_if(value): + contexts_fed_in.append(value) + return False # returning False overrides normal ratelimiting + + orig_rate_limited_iterator = utils.RateLimitedIterator + + def fake_rate_limited_iterator(*args, **kwargs): + # insert our own rate limiting function + kwargs['ratelimit_if'] = ratelimit_if + return orig_rate_limited_iterator(*args, **kwargs) + + now = [time()] + + def fake_get_time(bucket_iter): + captured_skips_stats.append( + daemon.logger.get_increment_counts().get('skips', 0)) + captured_queues.append(list(bucket_iter.buckets[0].deque)) + # make each update delay before the iter being called again + now[0] += latencies.pop(0) + return now[0] + + captured_updates = [] + + def fake_object_update(node, part, op, obj, *args, **kwargs): + captured_updates.append((node, part, op, obj)) + return True, node['id'], False + + with mock.patch( + 'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time', + fake_get_time), \ + mock.patch.object(daemon, 'object_update', + fake_object_update), \ + mock.patch('swift.obj.updater.RateLimitedIterator', + fake_rate_limited_iterator): + daemon.run_once() + self.assertEqual(expected_success, daemon.stats.successes) + expected_skipped = expected_total - expected_success + self.assertEqual(expected_skipped, daemon.stats.skips) + self.assertEqual(expected_skipped, + len(self._find_async_pending_files())) + + orig_iteration = contexts_fed_in[:num_c1_files] + # we first capture every async fed in one by one + objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration] + self.assertEqual(num_c1_files, len(set(objs_fed_in))) + # keep track of this order for context + aorder = {ctx['update']['obj']: 'a%02d' % i + for i, ctx in enumerate(orig_iteration)} + expected_drops = (1,) + expected_updates_sent = [] + for i, obj in enumerate(objs_fed_in): + if i in expected_drops: + continue + # triple replica, request to 3 nodes each obj! + expected_updates_sent.extend([obj] * 3) + + actual_updates_sent = [ + utils.split_path(update[3], minsegs=3)[-1] + for update in captured_updates + ] + self.assertEqual([aorder[o] for o in expected_updates_sent], + [aorder[o] for o in actual_updates_sent]) + + self.assertEqual([0, 0, 0, 0, 1], captured_skips_stats) + + expected_deferrals = [ + [], + [], + [objs_fed_in[1]], + [objs_fed_in[1]], + [objs_fed_in[3]], + ] + self.assertEqual( + expected_deferrals, + [[ctx['update']['obj'] for ctx in q] for q in captured_queues]) + info_lines = self.logger.get_lines_for_level('info') + self.assertTrue(info_lines) + self.assertIn('3 successes, 0 failures, 0 quarantines, 3 unlinks, ' + '0 errors, 0 redirects, 1 skips, 2 deferrals, 1 drains', + info_lines[-1]) + self.assertEqual( + {'skips': 1, 'successes': 3, 'unlinks': 3, 'deferrals': 2, + 'drains': 1}, self.logger.get_increment_counts()) + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_per_container_rate_limit_defer_3_skip_1(self, mock_recon): + # limit length of deferral queue so that some defer and some skip + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'max_objects_per_container_per_second': 10, + # only one bucket needed for test + 'per_container_ratelimit_buckets': 1, + 'max_deferred_updates': 2, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) + os.mkdir(self.async_dir) + # all updates for same container + num_c1_files = 5 + for i in range(num_c1_files): + obj_name = 'o%02d' % i + self._make_async_pending_pickle('a', 'c1', obj_name) + c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1') + expected_total = num_c1_files + self.assertEqual(expected_total, + len(self._find_async_pending_files())) + # indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral + # queue by 4; 4, 3 are then drained + latencies = [0, 0.05, .051, 0, 0, 0, .11, .01] + expected_success = 4 + + contexts_fed_in = [] + captured_queues = [] + captured_skips_stats = [] + + def ratelimit_if(value): + contexts_fed_in.append(value) + return False # returning False overrides normal ratelimiting + + orig_rate_limited_iterator = utils.RateLimitedIterator + + def fake_rate_limited_iterator(*args, **kwargs): + # insert our own rate limiting function + kwargs['ratelimit_if'] = ratelimit_if + return orig_rate_limited_iterator(*args, **kwargs) + + now = [time()] + + def fake_get_time(bucket_iter): + captured_skips_stats.append( + daemon.logger.get_increment_counts().get('skips', 0)) + captured_queues.append(list(bucket_iter.buckets[0].deque)) + # make each update delay before the iter being called again + now[0] += latencies.pop(0) + return now[0] + + captured_updates = [] + + def fake_object_update(node, part, op, obj, *args, **kwargs): + captured_updates.append((node, part, op, obj)) + return True, node['id'], False + + with mock.patch( + 'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time', + fake_get_time), \ + mock.patch.object(daemon, 'object_update', + fake_object_update), \ + mock.patch('swift.obj.updater.RateLimitedIterator', + fake_rate_limited_iterator), \ + mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + daemon.run_once() + self.assertEqual(expected_success, daemon.stats.successes) + expected_skipped = expected_total - expected_success + self.assertEqual(expected_skipped, daemon.stats.skips) + self.assertEqual(expected_skipped, + len(self._find_async_pending_files())) + + orig_iteration = contexts_fed_in[:num_c1_files] + # we first capture every async fed in one by one + objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration] + self.assertEqual(num_c1_files, len(set(objs_fed_in))) + # keep track of this order for context + aorder = {ctx['update']['obj']: 'a%02d' % i + for i, ctx in enumerate(orig_iteration)} + expected_updates_sent = [] + for index_sent in (0, 2, 4, 3): + expected_updates_sent.extend( + [contexts_fed_in[index_sent]['update']['obj']] * 3) + actual_updates_sent = [ + utils.split_path(update[3], minsegs=3)[-1] + for update in captured_updates + ] + self.assertEqual([aorder[o] for o in expected_updates_sent], + [aorder[o] for o in actual_updates_sent]) + + self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats) + + expected_deferrals = [ + [], + [], + [objs_fed_in[1]], + [objs_fed_in[1]], + [objs_fed_in[1], objs_fed_in[3]], + [objs_fed_in[3], objs_fed_in[4]], + [objs_fed_in[3]], # note: rightmost element is drained + [objs_fed_in[3]], + ] + self.assertEqual( + expected_deferrals, + [[ctx['update']['obj'] for ctx in q] for q in captured_queues]) + actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list] + self.assertEqual(2, len(actual_sleeps)) + self.assertAlmostEqual(0.1, actual_sleeps[0], 3) + self.assertAlmostEqual(0.09, actual_sleeps[1], 3) + info_lines = self.logger.get_lines_for_level('info') + self.assertTrue(info_lines) + self.assertIn('4 successes, 0 failures, 0 quarantines, 4 unlinks, ' + '0 errors, 0 redirects, 1 skips, 3 deferrals, 2 drains', + info_lines[-1]) + self.assertEqual( + {'skips': 1, 'successes': 4, 'unlinks': 4, 'deferrals': 3, + 'drains': 2}, self.logger.get_increment_counts()) + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_per_container_rate_limit_unsent_deferrals(self, mock_recon): + # make some updates defer until interval is reached and cycle + # terminates + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'max_objects_per_container_per_second': 10, + # only one bucket needed for test + 'per_container_ratelimit_buckets': 1, + 'max_deferred_updates': 5, + 'interval': 0.4, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) + os.mkdir(self.async_dir) + # all updates for same container + num_c1_files = 7 + for i in range(num_c1_files): + obj_name = 'o%02d' % i + self._make_async_pending_pickle('a', 'c1', obj_name) + c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1') + expected_total = num_c1_files + self.assertEqual(expected_total, + len(self._find_async_pending_files())) + # first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred + # last 2 deferred items sent before interval elapses + latencies = [0, .05, 0.051, 0, 0, .11, 0, 0, + 0.1, 0, 0.1, 0] # total 0.42 + expected_success = 5 + + contexts_fed_in = [] + captured_queues = [] + captured_skips_stats = [] + + def ratelimit_if(value): + contexts_fed_in.append(value) + return False # returning False overrides normal ratelimiting + + orig_rate_limited_iterator = utils.RateLimitedIterator + + def fake_rate_limited_iterator(*args, **kwargs): + # insert our own rate limiting function + kwargs['ratelimit_if'] = ratelimit_if + return orig_rate_limited_iterator(*args, **kwargs) + + start = time() + now = [start] + + def fake_get_time(bucket_iter): + if not captured_skips_stats: + daemon.begin = now[0] + captured_skips_stats.append( + daemon.logger.get_increment_counts().get('skips', 0)) + captured_queues.append(list(bucket_iter.buckets[0].deque)) + # insert delay each time iter is called + now[0] += latencies.pop(0) + return now[0] + + captured_updates = [] + + def fake_object_update(node, part, op, obj, *args, **kwargs): + captured_updates.append((node, part, op, obj)) + return True, node['id'], False + + with mock.patch( + 'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time', + fake_get_time), \ + mock.patch.object(daemon, 'object_update', + fake_object_update), \ + mock.patch('swift.obj.updater.RateLimitedIterator', + fake_rate_limited_iterator), \ + mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + daemon.run_once() + self.assertEqual(expected_success, daemon.stats.successes) + expected_skipped = expected_total - expected_success + self.assertEqual(expected_skipped, daemon.stats.skips) + self.assertEqual(expected_skipped, + len(self._find_async_pending_files())) + + expected_updates_sent = [] + for index_sent in (0, 2, 5, 6, 4): + expected_updates_sent.extend( + [contexts_fed_in[index_sent]['update']['obj']] * 3) + + actual_updates_sent = [ + utils.split_path(update[3], minsegs=3)[-1] + for update in captured_updates + ] + self.assertEqual(expected_updates_sent, actual_updates_sent) + + # skips (un-drained deferrals) not reported until end of cycle + self.assertEqual([0] * 12, captured_skips_stats) + + objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in] + expected_deferrals = [ + # queue content before app_iter feeds next update_ctx + [], + [], + [objs_fed_in[1]], + [objs_fed_in[1]], + [objs_fed_in[1], objs_fed_in[3]], + [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], + [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], + # queue content before each update_ctx is drained from queue... + # note: rightmost element is drained + [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]], + [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], + [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], + [objs_fed_in[1], objs_fed_in[3]], + [objs_fed_in[1], objs_fed_in[3]], + ] + self.assertEqual( + expected_deferrals, + [[ctx['update']['obj'] for ctx in q] for q in captured_queues]) + actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list] + self.assertEqual(2, len(actual_sleeps)) + self.assertAlmostEqual(0.1, actual_sleeps[0], 3) + self.assertAlmostEqual(0.1, actual_sleeps[1], 3) + info_lines = self.logger.get_lines_for_level('info') + self.assertTrue(info_lines) + self.assertIn('5 successes, 0 failures, 0 quarantines, 5 unlinks, ' + '0 errors, 0 redirects, 2 skips, 4 deferrals, 2 drains', + info_lines[-1]) + self.assertEqual( + {'successes': 5, 'unlinks': 5, 'deferrals': 4, 'drains': 2}, + self.logger.get_increment_counts()) + self.assertEqual([('skips', 2)], self.logger.get_update_stats()) class TestObjectUpdaterFunctions(unittest.TestCase): @@ -1477,20 +1902,28 @@ class TestObjectUpdaterFunctions(unittest.TestCase): class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): + + def setUp(self): + self.logger = debug_logger() + self.stats = object_updater.SweepStats() + def test_init(self): - it = object_updater.BucketizedUpdateSkippingLimiter([3, 1], 1000, 10) + it = object_updater.BucketizedUpdateSkippingLimiter( + [3, 1], self.logger, self.stats, 1000, 10) self.assertEqual(1000, it.num_buckets) self.assertEqual(0.1, it.bucket_update_delta) self.assertEqual([3, 1], [x for x in it.iterator]) # rate of 0 implies unlimited - it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 9, 0) + it = object_updater.BucketizedUpdateSkippingLimiter( + iter([3, 1]), self.logger, self.stats, 9, 0) self.assertEqual(9, it.num_buckets) self.assertEqual(-1, it.bucket_update_delta) self.assertEqual([3, 1], [x for x in it.iterator]) # num_buckets is collared at 1 - it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 0, 1) + it = object_updater.BucketizedUpdateSkippingLimiter( + iter([3, 1]), self.logger, self.stats, 0, 1) self.assertEqual(1, it.num_buckets) self.assertEqual(1, it.bucket_update_delta) self.assertEqual([3, 1], [x for x in it.iterator]) @@ -1501,8 +1934,11 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): {'update': {'account': '%d' % i, 'container': '%s' % i}} for i in range(20)] it = object_updater.BucketizedUpdateSkippingLimiter( - iter(update_ctxs), 9, 0) + iter(update_ctxs), self.logger, self.stats, 9, 0) self.assertEqual(update_ctxs, [x for x in it]) + self.assertEqual(0, self.stats.skips) + self.assertEqual(0, self.stats.drains) + self.assertEqual(0, self.stats.deferrals) def test_iteration_ratelimited(self): # verify iteration at limited rate - single bucket @@ -1510,23 +1946,246 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): {'update': {'account': '%d' % i, 'container': '%s' % i}} for i in range(2)] it = object_updater.BucketizedUpdateSkippingLimiter( - iter(update_ctxs), 1, 0.1) + iter(update_ctxs), self.logger, self.stats, 1, 0.1) + # second update is skipped self.assertEqual(update_ctxs[:1], [x for x in it]) + self.assertEqual(1, self.stats.skips) + self.assertEqual(0, self.stats.drains) + self.assertEqual(1, self.stats.deferrals) - def test_iteration_ratelimited_with_callback(self): - # verify iteration at limited rate - single bucket - skipped = [] - - def on_skip(update_ctx): - skipped.append(update_ctx) - + def test_deferral_single_bucket(self): + # verify deferral - single bucket + now = time() update_ctxs = [ {'update': {'account': '%d' % i, 'container': '%s' % i}} - for i in range(2)] - it = object_updater.BucketizedUpdateSkippingLimiter( - iter(update_ctxs), 1, 0.1, skip_f=on_skip) - self.assertEqual(update_ctxs[:1], [x for x in it]) - self.assertEqual(update_ctxs[1:], skipped) + for i in range(4)] + + # enough capacity for all deferrals + with mock.patch('swift.obj.updater.time.time', + side_effect=[now, now, now, now, now, now]): + with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, + max_deferred_elements=2, + drain_until=now + 10) + actual = [x for x in it] + self.assertEqual([update_ctxs[0], + update_ctxs[2], # deferrals... + update_ctxs[1]], + actual) + self.assertEqual(2, mock_sleep.call_count) + self.assertEqual(0, self.stats.skips) + self.assertEqual(2, self.stats.drains) + self.assertEqual(2, self.stats.deferrals) + self.stats.reset() + + # only space for one deferral + with mock.patch('swift.obj.updater.time.time', + side_effect=[now, now, now, now, now]): + with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, + max_deferred_elements=1, + drain_until=now + 10) + actual = [x for x in it] + self.assertEqual([update_ctxs[0], + update_ctxs[2]], # deferrals... + actual) + self.assertEqual(1, mock_sleep.call_count) + self.assertEqual(1, self.stats.skips) + self.assertEqual(1, self.stats.drains) + self.assertEqual(2, self.stats.deferrals) + self.stats.reset() + + # only time for one deferral + with mock.patch('swift.obj.updater.time.time', + side_effect=[now, now, now, now, now + 20, now + 20]): + with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, + max_deferred_elements=2, + drain_until=now + 10) + actual = [x for x in it] + self.assertEqual([update_ctxs[0], + update_ctxs[2]], # deferrals... + actual) + self.assertEqual(1, mock_sleep.call_count) + self.assertEqual(1, self.stats.skips) + self.assertEqual(1, self.stats.drains) + self.assertEqual(2, self.stats.deferrals) + self.stats.reset() + + # only space for two deferrals, only time for one deferral + with mock.patch('swift.obj.updater.time.time', + side_effect=[now, now, now, now, now, + now + 20, now + 20]): + with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs), self.logger, self.stats, 1, 10, + max_deferred_elements=2, + drain_until=now + 10) + actual = [x for x in it] + self.assertEqual([update_ctxs[0], + update_ctxs[3]], # deferrals... + actual) + self.assertEqual(1, mock_sleep.call_count) + self.assertEqual(2, self.stats.skips) + self.assertEqual(1, self.stats.drains) + self.assertEqual(3, self.stats.deferrals) + self.stats.reset() + + def test_deferral_multiple_buckets(self): + # verify deferral - multiple buckets + update_ctxs_1 = [ + {'update': {'account': 'a', 'container': 'c1', 'obj': '%3d' % i}} + for i in range(3)] + update_ctxs_2 = [ + {'update': {'account': 'a', 'container': 'c2', 'obj': '%3d' % i}} + for i in range(3)] + + time_iter = itertools.count(time(), 0.001) + + # deferrals stick in both buckets + with mock.patch('swift.obj.updater.time.time', + side_effect=[next(time_iter) for _ in range(12)]): + with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs_1 + update_ctxs_2), + self.logger, self.stats, 4, 10, + max_deferred_elements=4, + drain_until=next(time_iter)) + it.salt = '' # make container->bucket hashing predictable + actual = [x for x in it] + self.assertEqual([update_ctxs_1[0], + update_ctxs_2[0], + update_ctxs_1[2], # deferrals... + update_ctxs_2[2], + update_ctxs_1[1], + update_ctxs_2[1], + ], + actual) + self.assertEqual(4, mock_sleep.call_count) + self.assertEqual(0, self.stats.skips) + self.assertEqual(4, self.stats.drains) + self.assertEqual(4, self.stats.deferrals) + self.stats.reset() + + # oldest deferral bumped from one bucket due to max_deferrals == 3 + with mock.patch('swift.obj.updater.time.time', + side_effect=[next(time_iter) for _ in range(10)]): + with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs_1 + update_ctxs_2), + self.logger, self.stats, 4, 10, + max_deferred_elements=3, + drain_until=next(time_iter)) + it.salt = '' # make container->bucket hashing predictable + actual = [x for x in it] + self.assertEqual([update_ctxs_1[0], + update_ctxs_2[0], + update_ctxs_1[2], # deferrals... + update_ctxs_2[2], + update_ctxs_2[1], + ], + actual) + self.assertEqual(3, mock_sleep.call_count) + self.assertEqual(1, self.stats.skips) + self.assertEqual(3, self.stats.drains) + self.assertEqual(4, self.stats.deferrals) + self.stats.reset() + + # older deferrals bumped from one bucket due to max_deferrals == 2 + with mock.patch('swift.obj.updater.time.time', + side_effect=[next(time_iter) for _ in range(10)]): + with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs_1 + update_ctxs_2), + self.logger, self.stats, 4, 10, + max_deferred_elements=2, + drain_until=next(time_iter)) + it.salt = '' # make container->bucket hashing predictable + actual = [x for x in it] + self.assertEqual([update_ctxs_1[0], + update_ctxs_2[0], + update_ctxs_2[2], # deferrals... + update_ctxs_2[1], + ], + actual) + self.assertEqual(2, mock_sleep.call_count) + self.assertEqual(2, self.stats.skips) + self.assertEqual(2, self.stats.drains) + self.assertEqual(4, self.stats.deferrals) + self.stats.reset() + + +class TestRateLimiterBucket(unittest.TestCase): + def test_wait_until(self): + b1 = object_updater.RateLimiterBucket(10) + self.assertEqual(10, b1.wait_until) + b1.last_time = b1.wait_until + self.assertEqual(20, b1.wait_until) + b1.last_time = 12345.678 + self.assertEqual(12355.678, b1.wait_until) + + def test_len(self): + b1 = object_updater.RateLimiterBucket(10) + b1.deque.append(1) + b1.deque.append(2) + self.assertEqual(2, len(b1)) + b1.deque.pop() + self.assertEqual(1, len(b1)) + + def test_bool(self): + b1 = object_updater.RateLimiterBucket(10) + self.assertFalse(b1) + b1.deque.append(1) + self.assertTrue(b1) + b1.deque.pop() + self.assertFalse(b1) + + def test_bucket_ordering(self): + time_iter = itertools.count(time(), step=0.001) + b1 = object_updater.RateLimiterBucket(10) + b2 = object_updater.RateLimiterBucket(10) + + b2.last_time = next(time_iter) + buckets = PriorityQueue() + buckets.put(b1) + buckets.put(b2) + self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)]) + + b1.last_time = next(time_iter) + buckets.put(b1) + buckets.put(b2) + self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)]) + + +class TestSweepStats(unittest.TestCase): + def test_copy(self): + num_props = len(vars(object_updater.SweepStats())) + stats = object_updater.SweepStats(*range(1, num_props + 1)) + stats2 = stats.copy() + self.assertEqual(vars(stats), vars(stats2)) + + def test_since(self): + stats = object_updater.SweepStats(1, 2, 3, 4, 5, 6, 7, 8, 9) + stats2 = object_updater.SweepStats(4, 6, 8, 10, 12, 14, 16, 18, 20) + expected = object_updater.SweepStats(3, 4, 5, 6, 7, 8, 9, 10, 11) + self.assertEqual(vars(expected), vars(stats2.since(stats))) + + def test_reset(self): + num_props = len(vars(object_updater.SweepStats())) + stats = object_updater.SweepStats(*range(1, num_props + 1)) + stats.reset() + expected = object_updater.SweepStats() + self.assertEqual(vars(expected), vars(stats)) + + def test_str(self): + num_props = len(vars(object_updater.SweepStats())) + stats = object_updater.SweepStats(*range(1, num_props + 1)) + self.assertEqual( + '4 successes, 2 failures, 3 quarantines, 5 unlinks, 1 errors, ' + '6 redirects, 7 skips, 8 deferrals, 9 drains', str(stats)) if __name__ == '__main__':