Merge "Refactor rate-limiting helper into a class"

This commit is contained in:
Zuul 2022-05-16 20:51:27 +00:00 committed by Gerrit Code Review
commit 8e92a9aaf1
8 changed files with 271 additions and 100 deletions

View File

@ -22,7 +22,7 @@ from eventlet import Timeout
import swift.common.db
from swift.common.utils import get_logger, audit_location_generator, \
config_true_value, dump_recon_cache, ratelimit_sleep
config_true_value, dump_recon_cache, EventletRateLimiter
from swift.common.daemon import Daemon
from swift.common.exceptions import DatabaseAuditorException
from swift.common.recon import DEFAULT_RECON_CACHE_PATH, \
@ -56,9 +56,9 @@ class DatabaseAuditor(Daemon):
self.logging_interval = 3600 # once an hour
self.passes = 0
self.failures = 0
self.running_time = 0
self.max_dbs_per_second = \
float(conf.get('{}s_per_second'.format(self.server_type), 200))
self.rate_limiter = EventletRateLimiter(self.max_dbs_per_second)
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
self.recon_cache_path = conf.get('recon_cache_path',
@ -88,8 +88,7 @@ class DatabaseAuditor(Daemon):
reported = time.time()
self.passes = 0
self.failures = 0
self.running_time = ratelimit_sleep(
self.running_time, self.max_dbs_per_second)
self.rate_limiter.wait()
return reported
def run_forever(self, *args, **kwargs):

View File

@ -1753,7 +1753,7 @@ class RateLimitedIterator(object):
self.iterator = iter(iterable)
self.elements_per_second = elements_per_second
self.limit_after = limit_after
self.running_time = 0
self.rate_limiter = EventletRateLimiter(elements_per_second)
self.ratelimit_if = ratelimit_if
def __iter__(self):
@ -1766,8 +1766,7 @@ class RateLimitedIterator(object):
if self.limit_after > 0:
self.limit_after -= 1
else:
self.running_time = ratelimit_sleep(self.running_time,
self.elements_per_second)
self.rate_limiter.wait()
return next_value
__next__ = next
@ -3481,6 +3480,91 @@ def audit_location_generator(devices, datadir, suffix='',
hook_post_device(os.path.join(devices, device))
class AbstractRateLimiter(object):
# 1,000 milliseconds = 1 second
clock_accuracy = 1000.0
def __init__(self, max_rate, rate_buffer=5, running_time=0):
"""
:param max_rate: The maximum rate per second allowed for the process.
Must be > 0 to engage rate-limiting behavior.
:param rate_buffer: Number of seconds the rate counter can drop and be
allowed to catch up (at a faster than listed rate). A larger number
will result in larger spikes in rate but better average accuracy.
:param running_time: The running time in milliseconds of the next
allowable request. Setting this to any time in the past will cause
the rate limiter to immediately allow requests; setting this to a
future time will cause the rate limiter to deny requests until that
time.
"""
self.max_rate = max_rate
self.rate_buffer_ms = rate_buffer * self.clock_accuracy
self.running_time = running_time
self.time_per_incr = (self.clock_accuracy / self.max_rate
if self.max_rate else 0)
def _sleep(self, seconds):
# subclasses should override to implement a sleep
raise NotImplementedError
def is_allowed(self, incr_by=1, now=None, block=False):
"""
Check if the calling process is allowed to proceed according to the
rate limit.
:param incr_by: How much to increment the counter. Useful if you want
to ratelimit 1024 bytes/sec and have differing sizes
of requests. Must be > 0 to engage rate-limiting
behavior.
:param now: The time in seconds; defaults to time.time()
:param block: if True, the call will sleep until the calling process
is allowed to proceed; otherwise the call returns immediately.
:return: True if the the calling process is allowed to proceed, False
otherwise.
"""
if self.max_rate <= 0 or incr_by <= 0:
return True
now = now or time.time()
# Convert seconds to milliseconds
now = now * self.clock_accuracy
# Calculate time per request in milliseconds
time_per_request = self.time_per_incr * float(incr_by)
# Convert rate_buffer to milliseconds and compare
if now - self.running_time > self.rate_buffer_ms:
self.running_time = now
if now >= self.running_time:
self.running_time += time_per_request
allowed = True
elif block:
sleep_time = (self.running_time - now) / self.clock_accuracy
# increment running time before sleeping in case the sleep allows
# another thread to inspect the rate limiter state
self.running_time += time_per_request
# Convert diff to a floating point number of seconds and sleep
self._sleep(sleep_time)
allowed = True
else:
allowed = False
return allowed
def wait(self, incr_by=1, now=None):
self.is_allowed(incr_by=incr_by, now=now, block=True)
class EventletRateLimiter(AbstractRateLimiter):
def __init__(self, max_rate, rate_buffer=5, running_time=0):
super(EventletRateLimiter, self).__init__(
max_rate, rate_buffer, running_time)
def _sleep(self, seconds):
eventlet.sleep(seconds)
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
"""
Will eventlet.sleep() for the appropriate time so that the max_rate
@ -3501,30 +3585,18 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
A larger number will result in larger spikes in rate
but better average accuracy. Must be > 0 to engage
rate-limiting behavior.
:return: The absolute time for the next interval in milliseconds; note
that time could have passed well beyond that point, but the next call
will catch that and skip the sleep.
"""
if max_rate <= 0 or incr_by <= 0:
return running_time
# 1,000 milliseconds = 1 second
clock_accuracy = 1000.0
# Convert seconds to milliseconds
now = time.time() * clock_accuracy
# Calculate time per request in milliseconds
time_per_request = clock_accuracy * (float(incr_by) / max_rate)
# Convert rate_buffer to milliseconds and compare
if now - running_time > rate_buffer * clock_accuracy:
running_time = now
elif running_time - now > time_per_request:
# Convert diff back to a floating point number of seconds and sleep
eventlet.sleep((running_time - now) / clock_accuracy)
# Return the absolute time for the next interval in milliseconds; note
# that time could have passed well beyond that point, but the next call
# will catch that and skip the sleep.
return running_time + time_per_request
warnings.warn(
'ratelimit_sleep() is deprecated; use the ``EventletRateLimiter`` '
'class instead.', DeprecationWarning
)
rate_limit = EventletRateLimiter(max_rate, rate_buffer=rate_buffer,
running_time=running_time)
rate_limit.wait(incr_by=incr_by)
return rate_limit.running_time
class ContextPool(GreenPool):

View File

@ -31,7 +31,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout, LockTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, config_true_value, \
dump_recon_cache, majority_size, Timestamp, ratelimit_sleep, \
dump_recon_cache, majority_size, Timestamp, EventletRateLimiter, \
eventlet_monkey_patch
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
@ -59,10 +59,10 @@ class ContainerUpdater(Daemon):
float(conf.get('slowdown', '0.01')) + 0.01)
else:
containers_per_second = 50
self.containers_running_time = 0
self.max_containers_per_second = \
float(conf.get('containers_per_second',
containers_per_second))
self.rate_limiter = EventletRateLimiter(self.max_containers_per_second)
self.node_timeout = float(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.no_changes = 0
@ -226,9 +226,7 @@ class ContainerUpdater(Daemon):
self.logger.exception(
"Error processing container %s: %s", dbfile, e)
self.containers_running_time = ratelimit_sleep(
self.containers_running_time,
self.max_containers_per_second)
self.rate_limiter.wait()
def process_container(self, dbfile):
"""

View File

@ -30,7 +30,7 @@ from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
from swift.common.utils import (
config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter,
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
@ -85,8 +85,10 @@ class AuditorWorker(object):
self.auditor_type = 'ZBF'
self.log_time = int(conf.get('log_time', 3600))
self.last_logged = 0
self.files_running_time = 0
self.bytes_running_time = 0
self.files_rate_limiter = EventletRateLimiter(
self.max_files_per_second)
self.bytes_rate_limiter = EventletRateLimiter(
self.max_bytes_per_second)
self.bytes_processed = 0
self.total_bytes_processed = 0
self.total_files_processed = 0
@ -146,8 +148,7 @@ class AuditorWorker(object):
loop_time = time.time()
self.failsafe_object_audit(location)
self.logger.timing_since('timing', loop_time)
self.files_running_time = ratelimit_sleep(
self.files_running_time, self.max_files_per_second)
self.files_rate_limiter.wait()
self.total_files_processed += 1
now = time.time()
if now - self.last_logged >= self.log_time:
@ -266,10 +267,7 @@ class AuditorWorker(object):
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_rate_limiter.wait(incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
for watcher in self.watchers:

View File

@ -33,7 +33,8 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
non_negative_float, config_positive_int_value, non_negative_int
non_negative_float, config_positive_int_value, non_negative_int, \
EventletRateLimiter
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
class RateLimiterBucket(object):
def __init__(self, update_delta):
self.update_delta = update_delta
self.last_time = 0
class RateLimiterBucket(EventletRateLimiter):
"""
Extends EventletRateLimiter to also maintain a deque of items that have
been deferred due to rate-limiting, and to provide a comparator for sorting
instanced by readiness.
"""
def __init__(self, max_updates_per_second):
super(RateLimiterBucket, self).__init__(max_updates_per_second,
rate_buffer=0)
self.deque = deque()
@property
def wait_until(self):
return self.last_time + self.update_delta
def __len__(self):
return len(self.deque)
@ -62,10 +64,10 @@ class RateLimiterBucket(object):
__nonzero__ = __bool__ # py2
def __lt__(self, other):
# used to sort buckets by readiness
# used to sort RateLimiterBuckets by readiness
if isinstance(other, RateLimiterBucket):
return self.wait_until < other.wait_until
return self.wait_until < other
return self.running_time < other.running_time
return self.running_time < other
class BucketizedUpdateSkippingLimiter(object):
@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object):
self.stats = stats
# if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1)
try:
self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
except ZeroDivisionError:
self.bucket_update_delta = -1
self.max_deferred_elements = max_deferred_elements
self.deferred_buckets = deque()
self.drain_until = drain_until
self.salt = str(uuid.uuid4())
self.buckets = [RateLimiterBucket(self.bucket_update_delta)
self.buckets = [RateLimiterBucket(max_elements_per_group_per_second)
for _ in range(self.num_buckets)]
self.buckets_ordered_by_readiness = None
@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object):
for update_ctx in self.iterator:
bucket = self.buckets[self._bucket_key(update_ctx['update'])]
now = self._get_time()
if now >= bucket.wait_until:
if bucket.is_allowed(now=now):
# no need to ratelimit, just return next update
bucket.last_time = now
return update_ctx
self.stats.deferrals += 1
@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object):
bucket = self.buckets_ordered_by_readiness.get_nowait()
if now < self.drain_until:
# wait for next element to be ready
time.sleep(max(0, bucket.wait_until - now))
bucket.wait(now=now)
# drain the most recently deferred element
item = bucket.deque.pop()
if bucket:
# bucket has more deferred elements, re-insert in queue in
# correct chronological position
bucket.last_time = self._get_time()
self.buckets_ordered_by_readiness.put(bucket)
self.stats.drains += 1
self.logger.increment("drains")

View File

@ -2423,7 +2423,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0, 2.0, 2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 11)
# give the client the first 4 segments without ratelimiting; we'll
# sleep less
@ -2435,7 +2435,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 7)
# ratelimit segments under 35 bytes; this affects a-f
del sleeps[:]
@ -2446,7 +2446,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 5)
# ratelimit segments under 36 bytes; this now affects a-g, netting
# us one more sleep than before
@ -2458,7 +2458,7 @@ class TestSloGetManifest(SloTestCase):
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK') # sanity check
self.assertEqual(sleeps, [2.0, 2.0, 2.0])
self.assertEqual(sleeps, [1.0] * 6)
def test_get_manifest_with_submanifest(self):
req = Request.blank(

View File

@ -5854,6 +5854,125 @@ class TestAffinityLocalityPredicate(unittest.TestCase):
utils.affinity_locality_predicate, 'r1z1=1')
class TestEventletRateLimiter(unittest.TestCase):
def test_init(self):
rl = utils.EventletRateLimiter(0.1)
self.assertEqual(0.1, rl.max_rate)
self.assertEqual(0.0, rl.running_time)
self.assertEqual(5000, rl.rate_buffer_ms)
rl = utils.EventletRateLimiter(
0.2, rate_buffer=2, running_time=1234567.8)
self.assertEqual(0.2, rl.max_rate)
self.assertEqual(1234567.8, rl.running_time)
self.assertEqual(2000, rl.rate_buffer_ms)
def test_non_blocking(self):
rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=0)
with patch('time.time',) as mock_time:
with patch('eventlet.sleep') as mock_sleep:
mock_time.return_value = 0
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
mock_time.return_value = 9.99
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
mock_time.return_value = 10.0
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
rate_limiter = utils.EventletRateLimiter(0.1, rate_buffer=20)
with patch('time.time',) as mock_time:
with patch('eventlet.sleep') as mock_sleep:
mock_time.return_value = 20.0
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertTrue(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
self.assertFalse(rate_limiter.is_allowed())
mock_sleep.assert_not_called()
def _do_test(self, max_rate, running_time, start_time, rate_buffer,
incr_by=1.0):
rate_limiter = utils.EventletRateLimiter(
max_rate,
running_time=1000 * running_time, # msecs
rate_buffer=rate_buffer)
grant_times = []
current_time = [start_time]
def mock_time():
return current_time[0]
def mock_sleep(duration):
current_time[0] += duration
with patch('time.time', mock_time):
with patch('eventlet.sleep', mock_sleep):
for i in range(5):
rate_limiter.wait(incr_by=incr_by)
grant_times.append(current_time[0])
return [round(t, 6) for t in grant_times]
def test_ratelimit(self):
grant_times = self._do_test(1, 0, 1, 0)
self.assertEqual([1, 2, 3, 4, 5], grant_times)
grant_times = self._do_test(10, 0, 1, 0)
self.assertEqual([1, 1.1, 1.2, 1.3, 1.4], grant_times)
grant_times = self._do_test(.1, 0, 1, 0)
self.assertEqual([1, 11, 21, 31, 41], grant_times)
grant_times = self._do_test(.1, 11, 1, 0)
self.assertEqual([11, 21, 31, 41, 51], grant_times)
def test_incr_by(self):
grant_times = self._do_test(1, 0, 1, 0, incr_by=2.5)
self.assertEqual([1, 3.5, 6, 8.5, 11], grant_times)
def test_burst(self):
grant_times = self._do_test(1, 1, 4, 0)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
grant_times = self._do_test(1, 1, 4, 1)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
grant_times = self._do_test(1, 1, 4, 2)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
grant_times = self._do_test(1, 1, 4, 3)
self.assertEqual([4, 4, 4, 4, 5], grant_times)
grant_times = self._do_test(1, 1, 4, 4)
self.assertEqual([4, 4, 4, 4, 5], grant_times)
grant_times = self._do_test(1, 1, 3, 3)
self.assertEqual([3, 3, 3, 4, 5], grant_times)
grant_times = self._do_test(1, 0, 2, 3)
self.assertEqual([2, 2, 2, 3, 4], grant_times)
grant_times = self._do_test(1, 1, 3, 3)
self.assertEqual([3, 3, 3, 4, 5], grant_times)
grant_times = self._do_test(1, 0, 3, 3)
self.assertEqual([3, 3, 3, 3, 4], grant_times)
grant_times = self._do_test(1, 1, 3, 3)
self.assertEqual([3, 3, 3, 4, 5], grant_times)
grant_times = self._do_test(1, 0, 4, 3)
self.assertEqual([4, 5, 6, 7, 8], grant_times)
class TestRateLimitedIterator(unittest.TestCase):
def run_under_pseudo_time(

View File

@ -1652,7 +1652,7 @@ class TestObjectUpdater(unittest.TestCase):
len(self._find_async_pending_files()))
# indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral
# queue by 4; 4, 3 are then drained
latencies = [0, 0.05, .051, 0, 0, 0, .11, .01]
latencies = [0, 0.05, .051, 0, 0, 0, .11]
expected_success = 4
contexts_fed_in = []
@ -1693,7 +1693,7 @@ class TestObjectUpdater(unittest.TestCase):
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
@ -1719,7 +1719,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual([aorder[o] for o in expected_updates_sent],
[aorder[o] for o in actual_updates_sent])
self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats)
self.assertEqual([0, 0, 0, 0, 0, 1, 1], captured_skips_stats)
expected_deferrals = [
[],
@ -1729,7 +1729,6 @@ class TestObjectUpdater(unittest.TestCase):
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[3]], # note: rightmost element is drained
[objs_fed_in[3]],
]
self.assertEqual(
expected_deferrals,
@ -1776,7 +1775,7 @@ class TestObjectUpdater(unittest.TestCase):
# first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred
# last 2 deferred items sent before interval elapses
latencies = [0, .05, 0.051, 0, 0, .11, 0, 0,
0.1, 0, 0.1, 0] # total 0.42
0.1, 0.1, 0] # total 0.411
expected_success = 5
contexts_fed_in = []
@ -1820,7 +1819,7 @@ class TestObjectUpdater(unittest.TestCase):
fake_object_update), \
mock.patch('swift.obj.updater.RateLimitedIterator',
fake_rate_limited_iterator), \
mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
@ -1840,7 +1839,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(expected_updates_sent, actual_updates_sent)
# skips (un-drained deferrals) not reported until end of cycle
self.assertEqual([0] * 12, captured_skips_stats)
self.assertEqual([0] * 10, captured_skips_stats)
objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in]
expected_deferrals = [
@ -1856,8 +1855,6 @@ class TestObjectUpdater(unittest.TestCase):
# note: rightmost element is drained
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
[objs_fed_in[1], objs_fed_in[3]],
[objs_fed_in[1], objs_fed_in[3]],
]
self.assertEqual(
@ -1911,21 +1908,21 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
it = object_updater.BucketizedUpdateSkippingLimiter(
[3, 1], self.logger, self.stats, 1000, 10)
self.assertEqual(1000, it.num_buckets)
self.assertEqual(0.1, it.bucket_update_delta)
self.assertEqual([10] * 1000, [b.max_rate for b in it.buckets])
self.assertEqual([3, 1], [x for x in it.iterator])
# rate of 0 implies unlimited
it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 9, 0)
self.assertEqual(9, it.num_buckets)
self.assertEqual(-1, it.bucket_update_delta)
self.assertEqual([0] * 9, [b.max_rate for b in it.buckets])
self.assertEqual([3, 1], [x for x in it.iterator])
# num_buckets is collared at 1
it = object_updater.BucketizedUpdateSkippingLimiter(
iter([3, 1]), self.logger, self.stats, 0, 1)
self.assertEqual(1, it.num_buckets)
self.assertEqual(1, it.bucket_update_delta)
self.assertEqual([1], [b.max_rate for b in it.buckets])
self.assertEqual([3, 1], [x for x in it.iterator])
def test_iteration_unlimited(self):
@ -1963,7 +1960,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# enough capacity for all deferrals
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
@ -1982,7 +1979,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# only space for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=1,
@ -2000,7 +1997,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# only time for one deferral
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
@ -2019,7 +2016,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
with mock.patch('swift.obj.updater.time.time',
side_effect=[now, now, now, now, now,
now + 20, now + 20]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs), self.logger, self.stats, 1, 10,
max_deferred_elements=2,
@ -2048,7 +2045,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# deferrals stick in both buckets
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(12)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
@ -2073,7 +2070,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# oldest deferral bumped from one bucket due to max_deferrals == 3
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
@ -2097,7 +2094,7 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
# older deferrals bumped from one bucket due to max_deferrals == 2
with mock.patch('swift.obj.updater.time.time',
side_effect=[next(time_iter) for _ in range(10)]):
with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
with mock.patch('swift.common.utils.eventlet.sleep') as mock_sleep:
it = object_updater.BucketizedUpdateSkippingLimiter(
iter(update_ctxs_1 + update_ctxs_2),
self.logger, self.stats, 4, 10,
@ -2119,16 +2116,8 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
class TestRateLimiterBucket(unittest.TestCase):
def test_wait_until(self):
b1 = object_updater.RateLimiterBucket(10)
self.assertEqual(10, b1.wait_until)
b1.last_time = b1.wait_until
self.assertEqual(20, b1.wait_until)
b1.last_time = 12345.678
self.assertEqual(12355.678, b1.wait_until)
def test_len(self):
b1 = object_updater.RateLimiterBucket(10)
b1 = object_updater.RateLimiterBucket(0.1)
b1.deque.append(1)
b1.deque.append(2)
self.assertEqual(2, len(b1))
@ -2136,7 +2125,7 @@ class TestRateLimiterBucket(unittest.TestCase):
self.assertEqual(1, len(b1))
def test_bool(self):
b1 = object_updater.RateLimiterBucket(10)
b1 = object_updater.RateLimiterBucket(0.1)
self.assertFalse(b1)
b1.deque.append(1)
self.assertTrue(b1)
@ -2148,13 +2137,13 @@ class TestRateLimiterBucket(unittest.TestCase):
b1 = object_updater.RateLimiterBucket(10)
b2 = object_updater.RateLimiterBucket(10)
b2.last_time = next(time_iter)
b2.running_time = next(time_iter)
buckets = PriorityQueue()
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)])
b1.last_time = next(time_iter)
b1.running_time = next(time_iter)
buckets.put(b1)
buckets.put(b2)
self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])