diff --git a/swift/common/db_auditor.py b/swift/common/db_auditor.py index 635fcc6ecd..229cfbdc4a 100644 --- a/swift/common/db_auditor.py +++ b/swift/common/db_auditor.py @@ -22,7 +22,7 @@ from eventlet import Timeout import swift.common.db from swift.common.utils import get_logger, audit_location_generator, \ - config_true_value, dump_recon_cache, ratelimit_sleep + config_true_value, dump_recon_cache, EventletRateLimiter from swift.common.daemon import Daemon from swift.common.exceptions import DatabaseAuditorException from swift.common.recon import DEFAULT_RECON_CACHE_PATH, \ @@ -56,9 +56,9 @@ class DatabaseAuditor(Daemon): self.logging_interval = 3600 # once an hour self.passes = 0 self.failures = 0 - self.running_time = 0 self.max_dbs_per_second = \ float(conf.get('{}s_per_second'.format(self.server_type), 200)) + self.rate_limiter = EventletRateLimiter(self.max_dbs_per_second) swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) self.recon_cache_path = conf.get('recon_cache_path', @@ -88,8 +88,7 @@ class DatabaseAuditor(Daemon): reported = time.time() self.passes = 0 self.failures = 0 - self.running_time = ratelimit_sleep( - self.running_time, self.max_dbs_per_second) + self.rate_limiter.wait() return reported def run_forever(self, *args, **kwargs): diff --git a/swift/common/utils.py b/swift/common/utils.py index 06c9ded308..a3770972da 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -1723,7 +1723,7 @@ class RateLimitedIterator(object): self.iterator = iter(iterable) self.elements_per_second = elements_per_second self.limit_after = limit_after - self.running_time = 0 + self.rate_limiter = EventletRateLimiter(elements_per_second) self.ratelimit_if = ratelimit_if def __iter__(self): @@ -1736,8 +1736,7 @@ class RateLimitedIterator(object): if self.limit_after > 0: self.limit_after -= 1 else: - self.running_time = ratelimit_sleep(self.running_time, - self.elements_per_second) + self.rate_limiter.wait() return next_value __next__ = next @@ -3451,6 +3450,91 @@ def audit_location_generator(devices, datadir, suffix='', hook_post_device(os.path.join(devices, device)) +class AbstractRateLimiter(object): + # 1,000 milliseconds = 1 second + clock_accuracy = 1000.0 + + def __init__(self, max_rate, rate_buffer=5, running_time=0): + """ + :param max_rate: The maximum rate per second allowed for the process. + Must be > 0 to engage rate-limiting behavior. + :param rate_buffer: Number of seconds the rate counter can drop and be + allowed to catch up (at a faster than listed rate). A larger number + will result in larger spikes in rate but better average accuracy. + :param running_time: The running time in milliseconds of the next + allowable request. Setting this to any time in the past will cause + the rate limiter to immediately allow requests; setting this to a + future time will cause the rate limiter to deny requests until that + time. + """ + self.max_rate = max_rate + self.rate_buffer_ms = rate_buffer * self.clock_accuracy + self.running_time = running_time + self.time_per_incr = (self.clock_accuracy / self.max_rate + if self.max_rate else 0) + + def _sleep(self, seconds): + # subclasses should override to implement a sleep + raise NotImplementedError + + def is_allowed(self, incr_by=1, now=None, block=False): + """ + Check if the calling process is allowed to proceed according to the + rate limit. + + :param incr_by: How much to increment the counter. Useful if you want + to ratelimit 1024 bytes/sec and have differing sizes + of requests. Must be > 0 to engage rate-limiting + behavior. + :param now: The time in seconds; defaults to time.time() + :param block: if True, the call will sleep until the calling process + is allowed to proceed; otherwise the call returns immediately. + :return: True if the the calling process is allowed to proceed, False + otherwise. + """ + if self.max_rate <= 0 or incr_by <= 0: + return True + + now = now or time.time() + # Convert seconds to milliseconds + now = now * self.clock_accuracy + + # Calculate time per request in milliseconds + time_per_request = self.time_per_incr * float(incr_by) + + # Convert rate_buffer to milliseconds and compare + if now - self.running_time > self.rate_buffer_ms: + self.running_time = now + + if now >= self.running_time: + self.running_time += time_per_request + allowed = True + elif block: + sleep_time = (self.running_time - now) / self.clock_accuracy + # increment running time before sleeping in case the sleep allows + # another thread to inspect the rate limiter state + self.running_time += time_per_request + # Convert diff to a floating point number of seconds and sleep + self._sleep(sleep_time) + allowed = True + else: + allowed = False + + return allowed + + def wait(self, incr_by=1, now=None): + self.is_allowed(incr_by=incr_by, now=now, block=True) + + +class EventletRateLimiter(AbstractRateLimiter): + def __init__(self, max_rate, rate_buffer=5, running_time=0): + super(EventletRateLimiter, self).__init__( + max_rate, rate_buffer, running_time) + + def _sleep(self, seconds): + eventlet.sleep(seconds) + + def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): """ Will eventlet.sleep() for the appropriate time so that the max_rate @@ -3471,30 +3555,18 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): A larger number will result in larger spikes in rate but better average accuracy. Must be > 0 to engage rate-limiting behavior. + :return: The absolute time for the next interval in milliseconds; note + that time could have passed well beyond that point, but the next call + will catch that and skip the sleep. """ - if max_rate <= 0 or incr_by <= 0: - return running_time - - # 1,000 milliseconds = 1 second - clock_accuracy = 1000.0 - - # Convert seconds to milliseconds - now = time.time() * clock_accuracy - - # Calculate time per request in milliseconds - time_per_request = clock_accuracy * (float(incr_by) / max_rate) - - # Convert rate_buffer to milliseconds and compare - if now - running_time > rate_buffer * clock_accuracy: - running_time = now - elif running_time - now > time_per_request: - # Convert diff back to a floating point number of seconds and sleep - eventlet.sleep((running_time - now) / clock_accuracy) - - # Return the absolute time for the next interval in milliseconds; note - # that time could have passed well beyond that point, but the next call - # will catch that and skip the sleep. - return running_time + time_per_request + warnings.warn( + 'ratelimit_sleep() is deprecated; use the ``EventletRateLimiter`` ' + 'class instead.', DeprecationWarning + ) + rate_limit = EventletRateLimiter(max_rate, rate_buffer=rate_buffer, + running_time=running_time) + rate_limit.wait(incr_by=incr_by) + return rate_limit.running_time class ContextPool(GreenPool): diff --git a/swift/container/updater.py b/swift/container/updater.py index a22bf0b716..fc7b60eaff 100644 --- a/swift/container/updater.py +++ b/swift/container/updater.py @@ -31,7 +31,7 @@ from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ConnectionTimeout, LockTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, config_true_value, \ - dump_recon_cache, majority_size, Timestamp, ratelimit_sleep, \ + dump_recon_cache, majority_size, Timestamp, EventletRateLimiter, \ eventlet_monkey_patch from swift.common.daemon import Daemon from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR @@ -59,10 +59,10 @@ class ContainerUpdater(Daemon): float(conf.get('slowdown', '0.01')) + 0.01) else: containers_per_second = 50 - self.containers_running_time = 0 self.max_containers_per_second = \ float(conf.get('containers_per_second', containers_per_second)) + self.rate_limiter = EventletRateLimiter(self.max_containers_per_second) self.node_timeout = float(conf.get('node_timeout', 3)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.no_changes = 0 @@ -226,9 +226,7 @@ class ContainerUpdater(Daemon): self.logger.exception( "Error processing container %s: %s", dbfile, e) - self.containers_running_time = ratelimit_sleep( - self.containers_running_time, - self.max_containers_per_second) + self.rate_limiter.wait() def process_container(self, dbfile): """ diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index e0d95bb4d9..f9013748a9 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -30,7 +30,7 @@ from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES from swift.common.utils import ( config_auto_int_value, dump_recon_cache, get_logger, list_from_csv, - listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep, + listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter, readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter) from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH @@ -85,8 +85,10 @@ class AuditorWorker(object): self.auditor_type = 'ZBF' self.log_time = int(conf.get('log_time', 3600)) self.last_logged = 0 - self.files_running_time = 0 - self.bytes_running_time = 0 + self.files_rate_limiter = EventletRateLimiter( + self.max_files_per_second) + self.bytes_rate_limiter = EventletRateLimiter( + self.max_bytes_per_second) self.bytes_processed = 0 self.total_bytes_processed = 0 self.total_files_processed = 0 @@ -146,8 +148,7 @@ class AuditorWorker(object): loop_time = time.time() self.failsafe_object_audit(location) self.logger.timing_since('timing', loop_time) - self.files_running_time = ratelimit_sleep( - self.files_running_time, self.max_files_per_second) + self.files_rate_limiter.wait() self.total_files_processed += 1 now = time.time() if now - self.last_logged >= self.log_time: @@ -266,10 +267,7 @@ class AuditorWorker(object): with closing(reader): for chunk in reader: chunk_len = len(chunk) - self.bytes_running_time = ratelimit_sleep( - self.bytes_running_time, - self.max_bytes_per_second, - incr_by=chunk_len) + self.bytes_rate_limiter.wait(incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len for watcher in self.watchers: diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 01c609f13c..2ee7c35fad 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -33,7 +33,8 @@ from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ - non_negative_float, config_positive_int_value, non_negative_int + non_negative_float, config_positive_int_value, non_negative_int, \ + EventletRateLimiter from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \ HTTP_MOVED_PERMANENTLY -class RateLimiterBucket(object): - def __init__(self, update_delta): - self.update_delta = update_delta - self.last_time = 0 +class RateLimiterBucket(EventletRateLimiter): + """ + Extends EventletRateLimiter to also maintain a deque of items that have + been deferred due to rate-limiting, and to provide a comparator for sorting + instanced by readiness. + """ + def __init__(self, max_updates_per_second): + super(RateLimiterBucket, self).__init__(max_updates_per_second, + rate_buffer=0) self.deque = deque() - @property - def wait_until(self): - return self.last_time + self.update_delta - def __len__(self): return len(self.deque) @@ -62,10 +64,10 @@ class RateLimiterBucket(object): __nonzero__ = __bool__ # py2 def __lt__(self, other): - # used to sort buckets by readiness + # used to sort RateLimiterBuckets by readiness if isinstance(other, RateLimiterBucket): - return self.wait_until < other.wait_until - return self.wait_until < other + return self.running_time < other.running_time + return self.running_time < other class BucketizedUpdateSkippingLimiter(object): @@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object): self.stats = stats # if we want a smaller "blast radius" we could make this number bigger self.num_buckets = max(num_buckets, 1) - try: - self.bucket_update_delta = 1.0 / max_elements_per_group_per_second - except ZeroDivisionError: - self.bucket_update_delta = -1 self.max_deferred_elements = max_deferred_elements self.deferred_buckets = deque() self.drain_until = drain_until self.salt = str(uuid.uuid4()) - self.buckets = [RateLimiterBucket(self.bucket_update_delta) + self.buckets = [RateLimiterBucket(max_elements_per_group_per_second) for _ in range(self.num_buckets)] self.buckets_ordered_by_readiness = None @@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object): for update_ctx in self.iterator: bucket = self.buckets[self._bucket_key(update_ctx['update'])] now = self._get_time() - if now >= bucket.wait_until: + if bucket.is_allowed(now=now): # no need to ratelimit, just return next update - bucket.last_time = now return update_ctx self.stats.deferrals += 1 @@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object): bucket = self.buckets_ordered_by_readiness.get_nowait() if now < self.drain_until: # wait for next element to be ready - time.sleep(max(0, bucket.wait_until - now)) + bucket.wait(now=now) # drain the most recently deferred element item = bucket.deque.pop() if bucket: # bucket has more deferred elements, re-insert in queue in # correct chronological position - bucket.last_time = self._get_time() self.buckets_ordered_by_readiness.put(bucket) self.stats.drains += 1 self.logger.increment("drains") diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index 045778a167..b7b1365e84 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -2423,7 +2423,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0, 2.0, 2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 11) # give the client the first 4 segments without ratelimiting; we'll # sleep less @@ -2435,7 +2435,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 7) # ratelimit segments under 35 bytes; this affects a-f del sleeps[:] @@ -2446,7 +2446,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 5) # ratelimit segments under 36 bytes; this now affects a-g, netting # us one more sleep than before @@ -2458,7 +2458,7 @@ class TestSloGetManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '200 OK') # sanity check - self.assertEqual(sleeps, [2.0, 2.0, 2.0]) + self.assertEqual(sleeps, [1.0] * 6) def test_get_manifest_with_submanifest(self): req = Request.blank( diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index bb9e31eb35..4f134e88f9 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -5832,6 +5832,125 @@ class TestAffinityLocalityPredicate(unittest.TestCase): utils.affinity_locality_predicate, 'r1z1=1') +class TestEventletRateLimiter(unittest.TestCase): + def test_init(self): + rl = utils.EventletRateLimiter(0.1) + self.assertEqual(0.1, rl.max_rate) + self.assertEqual(0.0, rl.running_time) + self.assertEqual(5000, rl.rate_buffer_ms) + + rl = utils.EventletRateLimiter( + 0.2, rate_buffer=2, running_time=1234567.8) + self.assertEqual(0.2, rl.max_rate) + self.assertEqual(1234567.8, rl.running_time) + self.assertEqual(2000, rl.rate_buffer_ms) + + def test_non_blocking(self): + rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=0) + with patch('time.time',) as mock_time: + with patch('eventlet.sleep') as mock_sleep: + mock_time.return_value = 0 + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + + mock_time.return_value = 9.99 + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + mock_time.return_value = 10.0 + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + + rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=20) + with patch('time.time',) as mock_time: + with patch('eventlet.sleep') as mock_sleep: + mock_time.return_value = 20.0 + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertTrue(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + self.assertFalse(rate_limiter.is_allowed()) + mock_sleep.assert_not_called() + + def _do_test(self, max_rate, running_time, start_time, rate_buffer, + incr_by=1.0): + rate_limiter = utils.EventletRateLimiter( + max_rate, + running_time=1000 * running_time, # msecs + rate_buffer=rate_buffer) + grant_times = [] + current_time = [start_time] + + def mock_time(): + return current_time[0] + + def mock_sleep(duration): + current_time[0] += duration + + with patch('time.time', mock_time): + with patch('eventlet.sleep', mock_sleep): + for i in range(5): + rate_limiter.wait(incr_by=incr_by) + grant_times.append(current_time[0]) + return [round(t, 6) for t in grant_times] + + def test_ratelimit(self): + grant_times = self._do_test(1, 0, 1, 0) + self.assertEqual([1, 2, 3, 4, 5], grant_times) + + grant_times = self._do_test(10, 0, 1, 0) + self.assertEqual([1, 1.1, 1.2, 1.3, 1.4], grant_times) + + grant_times = self._do_test(.1, 0, 1, 0) + self.assertEqual([1, 11, 21, 31, 41], grant_times) + + grant_times = self._do_test(.1, 11, 1, 0) + self.assertEqual([11, 21, 31, 41, 51], grant_times) + + def test_incr_by(self): + grant_times = self._do_test(1, 0, 1, 0, incr_by=2.5) + self.assertEqual([1, 3.5, 6, 8.5, 11], grant_times) + + def test_burst(self): + grant_times = self._do_test(1, 1, 4, 0) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + grant_times = self._do_test(1, 1, 4, 1) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + grant_times = self._do_test(1, 1, 4, 2) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + grant_times = self._do_test(1, 1, 4, 3) + self.assertEqual([4, 4, 4, 4, 5], grant_times) + + grant_times = self._do_test(1, 1, 4, 4) + self.assertEqual([4, 4, 4, 4, 5], grant_times) + + grant_times = self._do_test(1, 1, 3, 3) + self.assertEqual([3, 3, 3, 4, 5], grant_times) + + grant_times = self._do_test(1, 0, 2, 3) + self.assertEqual([2, 2, 2, 3, 4], grant_times) + + grant_times = self._do_test(1, 1, 3, 3) + self.assertEqual([3, 3, 3, 4, 5], grant_times) + + grant_times = self._do_test(1, 0, 3, 3) + self.assertEqual([3, 3, 3, 3, 4], grant_times) + + grant_times = self._do_test(1, 1, 3, 3) + self.assertEqual([3, 3, 3, 4, 5], grant_times) + + grant_times = self._do_test(1, 0, 4, 3) + self.assertEqual([4, 5, 6, 7, 8], grant_times) + + class TestRateLimitedIterator(unittest.TestCase): def run_under_pseudo_time( diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index fa3fde32f5..941ec3dc36 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -1652,7 +1652,7 @@ class TestObjectUpdater(unittest.TestCase): len(self._find_async_pending_files())) # indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral # queue by 4; 4, 3 are then drained - latencies = [0, 0.05, .051, 0, 0, 0, .11, .01] + latencies = [0, 0.05, .051, 0, 0, 0, .11] expected_success = 4 contexts_fed_in = [] @@ -1693,7 +1693,7 @@ class TestObjectUpdater(unittest.TestCase): fake_object_update), \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ - mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: daemon.run_once() self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success @@ -1719,7 +1719,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual([aorder[o] for o in expected_updates_sent], [aorder[o] for o in actual_updates_sent]) - self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats) + self.assertEqual([0, 0, 0, 0, 0, 1, 1], captured_skips_stats) expected_deferrals = [ [], @@ -1729,7 +1729,6 @@ class TestObjectUpdater(unittest.TestCase): [objs_fed_in[1], objs_fed_in[3]], [objs_fed_in[3], objs_fed_in[4]], [objs_fed_in[3]], # note: rightmost element is drained - [objs_fed_in[3]], ] self.assertEqual( expected_deferrals, @@ -1776,7 +1775,7 @@ class TestObjectUpdater(unittest.TestCase): # first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred # last 2 deferred items sent before interval elapses latencies = [0, .05, 0.051, 0, 0, .11, 0, 0, - 0.1, 0, 0.1, 0] # total 0.42 + 0.1, 0.1, 0] # total 0.411 expected_success = 5 contexts_fed_in = [] @@ -1820,7 +1819,7 @@ class TestObjectUpdater(unittest.TestCase): fake_object_update), \ mock.patch('swift.obj.updater.RateLimitedIterator', fake_rate_limited_iterator), \ - mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: daemon.run_once() self.assertEqual(expected_success, daemon.stats.successes) expected_skipped = expected_total - expected_success @@ -1840,7 +1839,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(expected_updates_sent, actual_updates_sent) # skips (un-drained deferrals) not reported until end of cycle - self.assertEqual([0] * 12, captured_skips_stats) + self.assertEqual([0] * 10, captured_skips_stats) objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in] expected_deferrals = [ @@ -1856,8 +1855,6 @@ class TestObjectUpdater(unittest.TestCase): # note: rightmost element is drained [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]], [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], - [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]], - [objs_fed_in[1], objs_fed_in[3]], [objs_fed_in[1], objs_fed_in[3]], ] self.assertEqual( @@ -1911,21 +1908,21 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): it = object_updater.BucketizedUpdateSkippingLimiter( [3, 1], self.logger, self.stats, 1000, 10) self.assertEqual(1000, it.num_buckets) - self.assertEqual(0.1, it.bucket_update_delta) + self.assertEqual([10] * 1000, [b.max_rate for b in it.buckets]) self.assertEqual([3, 1], [x for x in it.iterator]) # rate of 0 implies unlimited it = object_updater.BucketizedUpdateSkippingLimiter( iter([3, 1]), self.logger, self.stats, 9, 0) self.assertEqual(9, it.num_buckets) - self.assertEqual(-1, it.bucket_update_delta) + self.assertEqual([0] * 9, [b.max_rate for b in it.buckets]) self.assertEqual([3, 1], [x for x in it.iterator]) # num_buckets is collared at 1 it = object_updater.BucketizedUpdateSkippingLimiter( iter([3, 1]), self.logger, self.stats, 0, 1) self.assertEqual(1, it.num_buckets) - self.assertEqual(1, it.bucket_update_delta) + self.assertEqual([1], [b.max_rate for b in it.buckets]) self.assertEqual([3, 1], [x for x in it.iterator]) def test_iteration_unlimited(self): @@ -1963,7 +1960,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # enough capacity for all deferrals with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now, now]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, max_deferred_elements=2, @@ -1982,7 +1979,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # only space for one deferral with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, max_deferred_elements=1, @@ -2000,7 +1997,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # only time for one deferral with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now + 20, now + 20]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs[:3]), self.logger, self.stats, 1, 10, max_deferred_elements=2, @@ -2019,7 +2016,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): with mock.patch('swift.obj.updater.time.time', side_effect=[now, now, now, now, now, now + 20, now + 20]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs), self.logger, self.stats, 1, 10, max_deferred_elements=2, @@ -2048,7 +2045,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # deferrals stick in both buckets with mock.patch('swift.obj.updater.time.time', side_effect=[next(time_iter) for _ in range(12)]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs_1 + update_ctxs_2), self.logger, self.stats, 4, 10, @@ -2073,7 +2070,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # oldest deferral bumped from one bucket due to max_deferrals == 3 with mock.patch('swift.obj.updater.time.time', side_effect=[next(time_iter) for _ in range(10)]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs_1 + update_ctxs_2), self.logger, self.stats, 4, 10, @@ -2097,7 +2094,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): # older deferrals bumped from one bucket due to max_deferrals == 2 with mock.patch('swift.obj.updater.time.time', side_effect=[next(time_iter) for _ in range(10)]): - with mock.patch('swift.obj.updater.time.sleep') as mock_sleep: + with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep: it = object_updater.BucketizedUpdateSkippingLimiter( iter(update_ctxs_1 + update_ctxs_2), self.logger, self.stats, 4, 10, @@ -2119,16 +2116,8 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase): class TestRateLimiterBucket(unittest.TestCase): - def test_wait_until(self): - b1 = object_updater.RateLimiterBucket(10) - self.assertEqual(10, b1.wait_until) - b1.last_time = b1.wait_until - self.assertEqual(20, b1.wait_until) - b1.last_time = 12345.678 - self.assertEqual(12355.678, b1.wait_until) - def test_len(self): - b1 = object_updater.RateLimiterBucket(10) + b1 = object_updater.RateLimiterBucket(0.1) b1.deque.append(1) b1.deque.append(2) self.assertEqual(2, len(b1)) @@ -2136,7 +2125,7 @@ class TestRateLimiterBucket(unittest.TestCase): self.assertEqual(1, len(b1)) def test_bool(self): - b1 = object_updater.RateLimiterBucket(10) + b1 = object_updater.RateLimiterBucket(0.1) self.assertFalse(b1) b1.deque.append(1) self.assertTrue(b1) @@ -2148,13 +2137,13 @@ class TestRateLimiterBucket(unittest.TestCase): b1 = object_updater.RateLimiterBucket(10) b2 = object_updater.RateLimiterBucket(10) - b2.last_time = next(time_iter) + b2.running_time = next(time_iter) buckets = PriorityQueue() buckets.put(b1) buckets.put(b2) self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)]) - b1.last_time = next(time_iter) + b1.running_time = next(time_iter) buckets.put(b1) buckets.put(b2) self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])