diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index e9557b6c27..5329bca8fa 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -473,6 +473,16 @@ use = egg:swift#recon # Send at most this many object updates per second # objects_per_second = 50 # +# Send at most this many object updates per bucket per second. The value must +# be a float greater than or equal to 0. Set to 0 for unlimited. +# max_objects_per_container_per_second = 0 +# +# The per_container ratelimit implementation uses a hashring to constrain +# memory requirements. Orders of magnitude more buckets will use (nominally) +# more memory, but will ratelimit smaller groups of containers. The value must +# be an integer greater than 0. +# per_container_ratelimit_buckets = 1000 +# # slowdown will sleep that amount between objects. Deprecated; use # objects_per_second instead. # slowdown = 0.01 diff --git a/swift/obj/updater.py b/swift/obj/updater.py index d49f12f76f..a0db3ffe75 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -19,6 +19,7 @@ import os import signal import sys import time +import uuid from random import random, shuffle from eventlet import spawn, Timeout @@ -29,7 +30,8 @@ from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ - eventlet_monkey_patch, get_redirect_data, ContextPool + eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ + non_negative_float, config_positive_int_value from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -39,18 +41,68 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \ HTTP_MOVED_PERMANENTLY +class BucketizedUpdateSkippingLimiter(object): + """ + Wrap an iterator to filter elements that show up too often. + + :param update_iterable: an async_pending update iterable + :param num_buckets: number of buckets to divide container hashes into, the + more buckets total the less containers to a bucket + (once a busy container slows down a bucket the whole + bucket starts skipping) + :param max_elements_per_group_per_second: tunable, when skipping kicks in + :param skip_f: function to call with update_ctx when skipping it + """ + + def __init__(self, update_iterable, num_buckets, + max_elements_per_group_per_second, + skip_f=lambda update_ctx: None): + self.iterator = iter(update_iterable) + # if we want a smaller "blast radius" we could make this number bigger + self.num_buckets = max(num_buckets, 1) + # an array might be more efficient; but this is pretty cheap + self.next_update = [0.0 for _ in range(self.num_buckets)] + try: + self.bucket_update_delta = 1.0 / max_elements_per_group_per_second + except ZeroDivisionError: + self.bucket_update_delta = -1 + self.skip_f = skip_f + self.salt = str(uuid.uuid4()) + + def __iter__(self): + return self + + def _bucket_key(self, update): + acct, cont = split_update_path(update) + return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets + + def next(self): + for update_ctx in self.iterator: + bucket_key = self._bucket_key(update_ctx['update']) + now = time.time() + if self.next_update[bucket_key] > now: + self.skip_f(update_ctx) + continue + self.next_update[bucket_key] = now + self.bucket_update_delta + return update_ctx + raise StopIteration() + + __next__ = next + + class SweepStats(object): """ Stats bucket for an update sweep """ def __init__(self, errors=0, failures=0, quarantines=0, successes=0, - unlinks=0, redirects=0): + unlinks=0, redirects=0, skips=0): self.errors = errors self.failures = failures self.quarantines = quarantines self.successes = successes self.unlinks = unlinks self.redirects = redirects + self.skips = skips def copy(self): return type(self)(self.errors, self.failures, self.quarantines, @@ -62,7 +114,8 @@ class SweepStats(object): self.quarantines - other.quarantines, self.successes - other.successes, self.unlinks - other.unlinks, - self.redirects - other.redirects) + self.redirects - other.redirects, + self.skips - other.skips) def reset(self): self.errors = 0 @@ -71,6 +124,7 @@ class SweepStats(object): self.successes = 0 self.unlinks = 0 self.redirects = 0 + self.skips = 0 def __str__(self): keys = ( @@ -80,10 +134,26 @@ class SweepStats(object): (self.unlinks, 'unlinks'), (self.errors, 'errors'), (self.redirects, 'redirects'), + (self.skips, 'skips'), ) return ', '.join('%d %s' % pair for pair in keys) +def split_update_path(update): + """ + Split the account and container parts out of the async update data. + + N.B. updates to shards set the container_path key while the account and + container keys are always the root. + """ + container_path = update.get('container_path') + if container_path: + acct, cont = split_path('/' + container_path, minsegs=2) + else: + acct, cont = update['account'], update['container'] + return acct, cont + + class ObjectUpdater(Daemon): """Update object information in container listings.""" @@ -110,6 +180,10 @@ class ObjectUpdater(Daemon): self.max_objects_per_second = \ float(conf.get('objects_per_second', objects_per_second)) + self.max_objects_per_container_per_second = non_negative_float( + conf.get('max_objects_per_container_per_second', 0)) + self.per_container_ratelimit_buckets = config_positive_int_value( + conf.get('per_container_ratelimit_buckets', 1000)) self.node_timeout = float(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.report_interval = float(conf.get('report_interval', 300)) @@ -205,13 +279,40 @@ class ObjectUpdater(Daemon): dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger) + def _load_update(self, device, update_path): + try: + return pickle.load(open(update_path, 'rb')) + except Exception as e: + if getattr(e, 'errno', None) == errno.ENOENT: + return + self.logger.exception( + 'ERROR Pickle problem, quarantining %s', update_path) + self.stats.quarantines += 1 + self.logger.increment('quarantines') + target_path = os.path.join(device, 'quarantined', 'objects', + os.path.basename(update_path)) + renamer(update_path, target_path, fsync=False) + try: + # If this was the last async_pending in the directory, + # then this will succeed. Otherwise, it'll fail, and + # that's okay. + os.rmdir(os.path.dirname(update_path)) + except OSError: + pass + return + def _iter_async_pendings(self, device): """ - Locate and yield all the async pendings on the device. Multiple updates - for the same object will come out in reverse-chronological order - (i.e. newest first) so that callers can skip stale async_pendings. + Locate and yield an update context for all the async pending files on + the device. Each update context contains details of the async pending + file location, its timestamp and the un-pickled update data. - Tries to clean up empty directories as it goes. + Async pending files that fail to load will be quarantined. + + Only the most recent update for the same object is yielded; older + (stale) async pending files are unlinked as they are located. + + The iterator tries to clean up empty directories as it goes. """ # loop through async pending dirs for all policies for asyncdir in self._listdir(device): @@ -238,12 +339,13 @@ class ObjectUpdater(Daemon): if not os.path.isdir(prefix_path): continue last_obj_hash = None - for update in sorted(self._listdir(prefix_path), reverse=True): - update_path = os.path.join(prefix_path, update) + for update_file in sorted(self._listdir(prefix_path), + reverse=True): + update_path = os.path.join(prefix_path, update_file) if not os.path.isfile(update_path): continue try: - obj_hash, timestamp = update.split('-') + obj_hash, timestamp = update_file.split('-') except ValueError: self.stats.errors += 1 self.logger.increment('errors') @@ -280,9 +382,14 @@ class ObjectUpdater(Daemon): raise else: last_obj_hash = obj_hash - yield {'device': device, 'policy': policy, - 'path': update_path, - 'obj_hash': obj_hash, 'timestamp': timestamp} + update = self._load_update(device, update_path) + if update is not None: + yield {'device': device, + 'policy': policy, + 'update_path': update_path, + 'obj_hash': obj_hash, + 'timestamp': timestamp, + 'update': update} def object_sweep(self, device): """ @@ -297,13 +404,21 @@ class ObjectUpdater(Daemon): self.logger.info("Object update sweep starting on %s (pid: %d)", device, my_pid) + def skip_counting_f(update_ctx): + # in the future we could defer update_ctx + self.stats.skips += 1 + self.logger.increment("skips") + ap_iter = RateLimitedIterator( self._iter_async_pendings(device), elements_per_second=self.max_objects_per_second) + ap_iter = BucketizedUpdateSkippingLimiter( + ap_iter, self.per_container_ratelimit_buckets, + self.max_objects_per_container_per_second, + skip_f=skip_counting_f) with ContextPool(self.concurrency) as pool: - for update in ap_iter: - pool.spawn(self.process_object_update, - update['path'], update['device'], update['policy']) + for update_ctx in ap_iter: + pool.spawn(self.process_object_update, **update_ctx) now = time.time() if now - last_status_update >= self.report_interval: this_sweep = self.stats.since(start_stats) @@ -326,6 +441,7 @@ class ObjectUpdater(Daemon): '%(quarantines)d quarantines, ' '%(unlinks)d unlinks, %(errors)d errors, ' '%(redirects)d redirects ' + '%(skips)d skips ' '(pid: %(pid)d)'), {'device': device, 'elapsed': time.time() - start_time, @@ -335,36 +451,20 @@ class ObjectUpdater(Daemon): 'quarantines': sweep_totals.quarantines, 'unlinks': sweep_totals.unlinks, 'errors': sweep_totals.errors, - 'redirects': sweep_totals.redirects}) + 'redirects': sweep_totals.redirects, + 'skips': sweep_totals.skips}) - def process_object_update(self, update_path, device, policy): + def process_object_update(self, update_path, device, policy, update, + **kwargs): """ Process the object information to be updated and update. :param update_path: path to pickled object update file :param device: path to device :param policy: storage policy of object update + :param update: the un-pickled update data + :param kwargs: un-used keys from update_ctx """ - try: - update = pickle.load(open(update_path, 'rb')) - except Exception as e: - if getattr(e, 'errno', None) == errno.ENOENT: - return - self.logger.exception( - 'ERROR Pickle problem, quarantining %s', update_path) - self.stats.quarantines += 1 - self.logger.increment('quarantines') - target_path = os.path.join(device, 'quarantined', 'objects', - os.path.basename(update_path)) - renamer(update_path, target_path, fsync=False) - try: - # If this was the last async_pending in the directory, - # then this will succeed. Otherwise, it'll fail, and - # that's okay. - os.rmdir(os.path.dirname(update_path)) - except OSError: - pass - return def do_update(): successes = update.get('successes', []) @@ -374,11 +474,7 @@ class ObjectUpdater(Daemon): str(int(policy))) headers_out.setdefault('X-Backend-Accept-Redirect', 'true') headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true') - container_path = update.get('container_path') - if container_path: - acct, cont = split_path('/' + container_path, minsegs=2) - else: - acct, cont = update['account'], update['container'] + acct, cont = split_update_path(update) part, nodes = self.get_container_ring().get_nodes(acct, cont) obj = '/%s/%s/%s' % (acct, cont, update['obj']) events = [spawn(self.object_update, diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 1ba4c8d6ed..b380286f9e 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -12,13 +12,14 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - +import eventlet import six.moves.cPickle as pickle import mock import os import unittest import random import itertools +from collections import Counter from contextlib import closing from gzip import GzipFile from tempfile import mkdtemp @@ -39,8 +40,7 @@ from swift.common.ring import RingData from swift.common import utils from swift.common.header_key_dict import HeaderKeyDict from swift.common.swob import bytes_to_wsgi -from swift.common.utils import ( - hash_path, normalize_timestamp, mkdirs, write_pickle) +from swift.common.utils import hash_path, normalize_timestamp, mkdirs from swift.common.storage_policy import StoragePolicy, POLICIES @@ -135,6 +135,8 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.concurrency, 8) self.assertEqual(daemon.updater_workers, 1) self.assertEqual(daemon.max_objects_per_second, 50.0) + self.assertEqual(daemon.max_objects_per_container_per_second, 0.0) + self.assertEqual(daemon.per_container_ratelimit_buckets, 1000) # non-defaults conf = { @@ -145,6 +147,8 @@ class TestObjectUpdater(unittest.TestCase): 'concurrency': '2', 'updater_workers': '3', 'objects_per_second': '10.5', + 'max_objects_per_container_per_second': '1.2', + 'per_container_ratelimit_buckets': '100', } daemon = object_updater.ObjectUpdater(conf, logger=self.logger) self.assertEqual(daemon.devices, '/some/where/else') @@ -154,6 +158,8 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.concurrency, 2) self.assertEqual(daemon.updater_workers, 3) self.assertEqual(daemon.max_objects_per_second, 10.5) + self.assertEqual(daemon.max_objects_per_container_per_second, 1.2) + self.assertEqual(daemon.per_container_ratelimit_buckets, 100) # check deprecated option daemon = object_updater.ObjectUpdater({'slowdown': '0.04'}, @@ -169,6 +175,12 @@ class TestObjectUpdater(unittest.TestCase): check_bad({'concurrency': '1.0'}) check_bad({'slowdown': 'baz'}) check_bad({'objects_per_second': 'quux'}) + check_bad({'max_objects_per_container_per_second': '-0.1'}) + check_bad({'max_objects_per_container_per_second': 'auto'}) + check_bad({'per_container_ratelimit_buckets': '1.2'}) + check_bad({'per_container_ratelimit_buckets': '0'}) + check_bad({'per_container_ratelimit_buckets': '-1'}) + check_bad({'per_container_ratelimit_buckets': 'auto'}) @mock.patch('os.listdir') def test_listdir_with_exception(self, mock_listdir): @@ -201,11 +213,12 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(len(log_lines), 0) self.assertEqual(path, ['foo', 'bar']) - def test_object_sweep(self): - def check_with_idx(index, warn, should_skip): - if int(index) > 0: + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_object_sweep(self, mock_recon): + def check_with_idx(policy_index, warn, should_skip): + if int(policy_index) > 0: asyncdir = os.path.join(self.sda1, - ASYNCDIR_BASE + "-" + index) + ASYNCDIR_BASE + "-" + policy_index) else: asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE) @@ -220,7 +233,8 @@ class TestObjectUpdater(unittest.TestCase): os.path.join(self.sda1, ASYNCDIR_BASE + '-' + 'twentington'), os.path.join(self.sda1, - ASYNCDIR_BASE + '-' + str(int(index) + 100))) + ASYNCDIR_BASE + '-' + str( + int(policy_index) + 100))) for not_dir in not_dirs: with open(not_dir, 'w'): @@ -239,13 +253,13 @@ class TestObjectUpdater(unittest.TestCase): o_path = os.path.join(prefix_dir, ohash + '-' + normalize_timestamp(t)) if t == timestamps[0]: - expected.add((o_path, int(index))) - write_pickle({}, o_path) + expected.add((o_path, int(policy_index))) + self._write_dummy_pickle(o_path, 'account', 'container', o) seen = set() class MockObjectUpdater(object_updater.ObjectUpdater): - def process_object_update(self, update_path, device, policy): + def process_object_update(self, update_path, policy, **kwargs): seen.add((update_path, int(policy))) os.unlink(update_path) @@ -290,10 +304,10 @@ class TestObjectUpdater(unittest.TestCase): ohash = hash_path('account', 'container', o) o_path = os.path.join(prefix_dir, ohash + '-' + normalize_timestamp(t)) - write_pickle({}, o_path) + self._write_dummy_pickle(o_path, 'account', 'container', o) class MockObjectUpdater(object_updater.ObjectUpdater): - def process_object_update(self, update_path, device, policy): + def process_object_update(self, update_path, **kwargs): os.unlink(update_path) self.stats.successes += 1 self.stats.unlinks += 1 @@ -312,12 +326,13 @@ class TestObjectUpdater(unittest.TestCase): def mock_time_function(): rv = now[0] - now[0] += 5 + now[0] += 4 return rv - # With 10s between updates, time() advancing 5s every time we look, + # With 10s between updates, time() advancing 4s every time we look, # and 5 async_pendings on disk, we should get at least two progress - # lines. + # lines. (time is incremented by 4 each time the update app iter yields + # and each time the elapsed time is sampled) with mock.patch('swift.obj.updater.time', mock.MagicMock(time=mock_time_function)), \ mock.patch.object(object_updater, 'ContextPool', MockPool): @@ -360,10 +375,10 @@ class TestObjectUpdater(unittest.TestCase): ohash = hash_path('account', 'container%d' % policy.idx, o) o_path = os.path.join(prefix_dir, ohash + '-' + normalize_timestamp(t)) - write_pickle({}, o_path) + self._write_dummy_pickle(o_path, 'account', 'container', o) class MockObjectUpdater(object_updater.ObjectUpdater): - def process_object_update(self, update_path, device, policy): + def process_object_update(self, update_path, **kwargs): os.unlink(update_path) self.stats.successes += 1 self.stats.unlinks += 1 @@ -1196,7 +1211,7 @@ class TestObjectUpdater(unittest.TestCase): def test_obj_update_gone_missing(self): # if you've got multiple updaters running (say, both a background - # and foreground process), process_object_update may get a file + # and foreground process), _load_update may get a file # that doesn't exist policies = list(POLICIES) random.shuffle(policies) @@ -1218,13 +1233,227 @@ class TestObjectUpdater(unittest.TestCase): odir, '%s-%s' % (ohash, next(self.ts_iter).internal)) + self.assertEqual(os.listdir(async_dir), [ohash[-3:]]) + self.assertFalse(os.listdir(odir)) with mocked_http_conn(): with mock.patch('swift.obj.updater.dump_recon_cache'): - daemon.process_object_update(op_path, self.sda1, policies[0]) + daemon._load_update(self.sda1, op_path) self.assertEqual({}, daemon.logger.get_increment_counts()) self.assertEqual(os.listdir(async_dir), [ohash[-3:]]) self.assertFalse(os.listdir(odir)) + def _write_dummy_pickle(self, path, a, c, o, cp=None): + update = { + 'op': 'PUT', + 'account': a, + 'container': c, + 'obj': o, + 'headers': {'X-Container-Timestamp': normalize_timestamp(0)} + } + if cp: + update['container_path'] = cp + with open(path, 'wb') as async_pending: + pickle.dump(update, async_pending) + + def _make_async_pending_pickle(self, a, c, o, cp=None): + ohash = hash_path(a, c, o) + odir = os.path.join(self.async_dir, ohash[-3:]) + mkdirs(odir) + path = os.path.join( + odir, + '%s-%s' % (ohash, normalize_timestamp(time()))) + self._write_dummy_pickle(path, a, c, o, cp) + + def _find_async_pending_files(self): + found_files = [] + for root, dirs, files in os.walk(self.async_dir): + found_files.extend(files) + return found_files + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_per_container_rate_limit(self, mock_recon): + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'max_objects_per_container_per_second': 1, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) + os.mkdir(self.async_dir) + num_c1_files = 10 + for i in range(num_c1_files): + obj_name = 'o%02d' % i + self._make_async_pending_pickle('a', 'c1', obj_name) + c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1') + # make one more in a different container, with a container_path + self._make_async_pending_pickle('a', 'c2', obj_name, + cp='.shards_a/c2_shard') + c2_part, _ = daemon.get_container_ring().get_nodes('.shards_a', + 'c2_shard') + expected_total = num_c1_files + 1 + self.assertEqual(expected_total, + len(self._find_async_pending_files())) + expected_success = 2 + fake_status_codes = [200] * 3 * expected_success + with mocked_http_conn(*fake_status_codes) as fake_conn: + daemon.run_once() + self.assertEqual(expected_success, daemon.stats.successes) + expected_skipped = expected_total - expected_success + self.assertEqual(expected_skipped, daemon.stats.skips) + self.assertEqual(expected_skipped, + len(self._find_async_pending_files())) + self.assertEqual( + Counter( + '/'.join(req['path'].split('/')[:5]) + for req in fake_conn.requests), + {'/sda1/%s/a/c1' % c1_part: 3, + '/sda1/%s/.shards_a/c2_shard' % c2_part: 3}) + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_per_container_rate_limit_unlimited(self, mock_recon): + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'max_objects_per_container_per_second': 0, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) + os.mkdir(self.async_dir) + num_c1_files = 10 + for i in range(num_c1_files): + obj_name = 'o%02d' % i + self._make_async_pending_pickle('a', 'c1', obj_name) + c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1') + # make one more in a different container, with a container_path + self._make_async_pending_pickle('a', 'c2', obj_name, + cp='.shards_a/c2_shard') + c2_part, _ = daemon.get_container_ring().get_nodes('.shards_a', + 'c2_shard') + expected_total = num_c1_files + 1 + self.assertEqual(expected_total, + len(self._find_async_pending_files())) + fake_status_codes = [200] * 3 * expected_total + with mocked_http_conn(*fake_status_codes): + daemon.run_once() + self.assertEqual(expected_total, daemon.stats.successes) + self.assertEqual(0, daemon.stats.skips) + self.assertEqual([], self._find_async_pending_files()) + + @mock.patch('swift.obj.updater.dump_recon_cache') + def test_per_container_rate_limit_slow_responses(self, mock_recon): + conf = { + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'max_objects_per_container_per_second': 10, + } + daemon = object_updater.ObjectUpdater(conf, logger=self.logger) + self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0])) + os.mkdir(self.async_dir) + # all updates for same container + num_c1_files = 4 + for i in range(num_c1_files): + obj_name = 'o%02d' % i + self._make_async_pending_pickle('a', 'c1', obj_name) + expected_total = num_c1_files + self.assertEqual(expected_total, + len(self._find_async_pending_files())) + latencies = [.11, 0, .11, 0] + expected_success = 2 + fake_status_codes = [200] * 3 * expected_success + + def fake_spawn(pool, *args, **kwargs): + # make each update delay the iter being called again + eventlet.sleep(latencies.pop(0)) + return args[0](*args[1:], **kwargs) + + with mocked_http_conn(*fake_status_codes): + with mock.patch('swift.obj.updater.ContextPool.spawn', fake_spawn): + daemon.run_once() + self.assertEqual(expected_success, daemon.stats.successes) + expected_skipped = expected_total - expected_success + self.assertEqual(expected_skipped, daemon.stats.skips) + self.assertEqual(expected_skipped, + len(self._find_async_pending_files())) + + +class TestObjectUpdaterFunctions(unittest.TestCase): + def test_split_update_path(self): + update = { + 'op': 'PUT', + 'account': 'a', + 'container': 'c', + 'obj': 'o', + 'headers': { + 'X-Container-Timestamp': normalize_timestamp(0), + } + } + actual = object_updater.split_update_path(update) + self.assertEqual(('a', 'c'), actual) + + update['container_path'] = None + actual = object_updater.split_update_path(update) + self.assertEqual(('a', 'c'), actual) + + update['container_path'] = '.shards_a/c_shard_n' + actual = object_updater.split_update_path(update) + self.assertEqual(('.shards_a', 'c_shard_n'), actual) + + +class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): + def test_init(self): + it = object_updater.BucketizedUpdateSkippingLimiter([3, 1], 1000, 10) + self.assertEqual(1000, it.num_buckets) + self.assertEqual(0.1, it.bucket_update_delta) + self.assertEqual([3, 1], [x for x in it.iterator]) + + # rate of 0 implies unlimited + it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 9, 0) + self.assertEqual(9, it.num_buckets) + self.assertEqual(-1, it.bucket_update_delta) + self.assertEqual([3, 1], [x for x in it.iterator]) + + # num_buckets is collared at 1 + it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 0, 1) + self.assertEqual(1, it.num_buckets) + self.assertEqual(1, it.bucket_update_delta) + self.assertEqual([3, 1], [x for x in it.iterator]) + + def test_iteration_unlimited(self): + # verify iteration at unlimited rate + update_ctxs = [ + {'update': {'account': '%d' % i, 'container': '%s' % i}} + for i in range(20)] + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs), 9, 0) + self.assertEqual(update_ctxs, [x for x in it]) + + def test_iteration_ratelimited(self): + # verify iteration at limited rate - single bucket + update_ctxs = [ + {'update': {'account': '%d' % i, 'container': '%s' % i}} + for i in range(2)] + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs), 1, 0.1) + self.assertEqual(update_ctxs[:1], [x for x in it]) + + def test_iteration_ratelimited_with_callback(self): + # verify iteration at limited rate - single bucket + skipped = [] + + def on_skip(update_ctx): + skipped.append(update_ctx) + + update_ctxs = [ + {'update': {'account': '%d' % i, 'container': '%s' % i}} + for i in range(2)] + it = object_updater.BucketizedUpdateSkippingLimiter( + iter(update_ctxs), 1, 0.1, skip_f=on_skip) + self.assertEqual(update_ctxs[:1], [x for x in it]) + self.assertEqual(update_ctxs[1:], skipped) + if __name__ == '__main__': unittest.main()