Fix rate limiting (middleware and util function) to allow for catch up. Configurable logging in auditor. Docs.
This commit is contained in:
@@ -229,6 +229,7 @@ Option Default Description
|
|||||||
log_name object-auditor Label used when logging
|
log_name object-auditor Label used when logging
|
||||||
log_facility LOG_LOCAL0 Syslog log facility
|
log_facility LOG_LOCAL0 Syslog log facility
|
||||||
log_level INFO Logging level
|
log_level INFO Logging level
|
||||||
|
log_time 3600 Frequency of status logs in seconds.
|
||||||
files_per_second 20 Maximum files audited per second. Should
|
files_per_second 20 Maximum files audited per second. Should
|
||||||
be tuned according to individual system
|
be tuned according to individual system
|
||||||
specs. 0 is unlimited.
|
specs. 0 is unlimited.
|
||||||
|
@@ -30,6 +30,11 @@ max_sleep_time_seconds 60 App will immediately return a 498 response
|
|||||||
log_sleep_time_seconds 0 To allow visibility into rate limiting set
|
log_sleep_time_seconds 0 To allow visibility into rate limiting set
|
||||||
this value > 0 and all sleeps greater than
|
this value > 0 and all sleeps greater than
|
||||||
the number will be logged.
|
the number will be logged.
|
||||||
|
rate_buffer_seconds 5 Number of seconds the rate counter can
|
||||||
|
drop and be allowed to catch up (at a
|
||||||
|
faster than listed rate). A larger number
|
||||||
|
will result in larger spikes in rate but
|
||||||
|
better average accuracy.
|
||||||
account_ratelimit 0 If set, will limit all requests to
|
account_ratelimit 0 If set, will limit all requests to
|
||||||
/account_name and PUTs to
|
/account_name and PUTs to
|
||||||
/account_name/container_name. Number is in
|
/account_name/container_name. Number is in
|
||||||
|
@@ -57,3 +57,4 @@ use = egg:swift#object
|
|||||||
# log_name = object-auditor
|
# log_name = object-auditor
|
||||||
# files_per_second = 20
|
# files_per_second = 20
|
||||||
# bytes_per_second = 10000000
|
# bytes_per_second = 10000000
|
||||||
|
# log_time = 3600
|
@@ -99,6 +99,8 @@ use = egg:swift#ratelimit
|
|||||||
# max_sleep_time_seconds = 60
|
# max_sleep_time_seconds = 60
|
||||||
# log_sleep_time_seconds of 0 means disabled
|
# log_sleep_time_seconds of 0 means disabled
|
||||||
# log_sleep_time_seconds = 0
|
# log_sleep_time_seconds = 0
|
||||||
|
# allows for slow rates (e.g. running up to 5 sec's behind) to catch up.
|
||||||
|
# rate_buffer_seconds = 5
|
||||||
# account_ratelimit of 0 means disabled
|
# account_ratelimit of 0 means disabled
|
||||||
# account_ratelimit = 0
|
# account_ratelimit = 0
|
||||||
|
|
||||||
|
@@ -20,7 +20,7 @@ from swift.common.utils import split_path, cache_from_env, get_logger
|
|||||||
from swift.proxy.server import get_container_memcache_key
|
from swift.proxy.server import get_container_memcache_key
|
||||||
|
|
||||||
|
|
||||||
class MaxSleepTimeHit(Exception):
|
class MaxSleepTimeHitError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -32,6 +32,8 @@ class RateLimitMiddleware(object):
|
|||||||
configurable.
|
configurable.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
BLACK_LIST_SLEEP = 1
|
||||||
|
|
||||||
def __init__(self, app, conf, logger=None):
|
def __init__(self, app, conf, logger=None):
|
||||||
self.app = app
|
self.app = app
|
||||||
if logger:
|
if logger:
|
||||||
@@ -39,17 +41,16 @@ class RateLimitMiddleware(object):
|
|||||||
else:
|
else:
|
||||||
self.logger = get_logger(conf)
|
self.logger = get_logger(conf)
|
||||||
self.account_ratelimit = float(conf.get('account_ratelimit', 0))
|
self.account_ratelimit = float(conf.get('account_ratelimit', 0))
|
||||||
self.max_sleep_time_seconds = float(conf.get('max_sleep_time_seconds',
|
self.max_sleep_time_seconds = \
|
||||||
60))
|
float(conf.get('max_sleep_time_seconds', 60))
|
||||||
self.log_sleep_time_seconds = float(conf.get('log_sleep_time_seconds',
|
self.log_sleep_time_seconds = \
|
||||||
0))
|
float(conf.get('log_sleep_time_seconds', 0))
|
||||||
self.clock_accuracy = int(conf.get('clock_accuracy', 1000))
|
self.clock_accuracy = int(conf.get('clock_accuracy', 1000))
|
||||||
|
self.rate_buffer_seconds = int(conf.get('rate_buffer_seconds', 5))
|
||||||
self.ratelimit_whitelist = [acc.strip() for acc in
|
self.ratelimit_whitelist = [acc.strip() for acc in
|
||||||
conf.get('account_whitelist', '').split(',')
|
conf.get('account_whitelist', '').split(',') if acc.strip()]
|
||||||
if acc.strip()]
|
|
||||||
self.ratelimit_blacklist = [acc.strip() for acc in
|
self.ratelimit_blacklist = [acc.strip() for acc in
|
||||||
conf.get('account_blacklist', '').split(',')
|
conf.get('account_blacklist', '').split(',') if acc.strip()]
|
||||||
if acc.strip()]
|
|
||||||
self.memcache_client = None
|
self.memcache_client = None
|
||||||
conf_limits = []
|
conf_limits = []
|
||||||
for conf_key in conf.keys():
|
for conf_key in conf.keys():
|
||||||
@@ -92,8 +93,7 @@ class RateLimitMiddleware(object):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def get_ratelimitable_key_tuples(self, req_method, account_name,
|
def get_ratelimitable_key_tuples(self, req_method, account_name,
|
||||||
container_name=None,
|
container_name=None, obj_name=None):
|
||||||
obj_name=None):
|
|
||||||
"""
|
"""
|
||||||
Returns a list of key (used in memcache), ratelimit tuples. Keys
|
Returns a list of key (used in memcache), ratelimit tuples. Keys
|
||||||
should be checked in order.
|
should be checked in order.
|
||||||
@@ -105,19 +105,20 @@ class RateLimitMiddleware(object):
|
|||||||
"""
|
"""
|
||||||
keys = []
|
keys = []
|
||||||
if self.account_ratelimit and account_name and (
|
if self.account_ratelimit and account_name and (
|
||||||
not (container_name or obj_name) or
|
not (container_name or obj_name) or
|
||||||
(container_name and not obj_name and req_method == 'PUT')):
|
(container_name and not obj_name and
|
||||||
|
req_method in ('PUT', 'DELETE'))):
|
||||||
keys.append(("ratelimit/%s" % account_name,
|
keys.append(("ratelimit/%s" % account_name,
|
||||||
self.account_ratelimit))
|
self.account_ratelimit))
|
||||||
|
|
||||||
if account_name and container_name and (
|
if account_name and container_name and (
|
||||||
(not obj_name and req_method in ('GET', 'HEAD')) or
|
(not obj_name and req_method in ('GET', 'HEAD')) or
|
||||||
(obj_name and req_method in ('PUT', 'DELETE'))):
|
(obj_name and req_method in ('PUT', 'DELETE'))):
|
||||||
container_size = None
|
container_size = None
|
||||||
memcache_key = get_container_memcache_key(account_name,
|
memcache_key = get_container_memcache_key(account_name,
|
||||||
container_name)
|
container_name)
|
||||||
container_info = self.memcache_client.get(memcache_key)
|
container_info = self.memcache_client.get(memcache_key)
|
||||||
if type(container_info) == dict:
|
if isinstance(container_info, dict):
|
||||||
container_size = container_info.get('container_size', 0)
|
container_size = container_info.get('container_size', 0)
|
||||||
container_rate = self.get_container_maxrate(container_size)
|
container_rate = self.get_container_maxrate(container_size)
|
||||||
if container_rate:
|
if container_rate:
|
||||||
@@ -129,31 +130,32 @@ class RateLimitMiddleware(object):
|
|||||||
def _get_sleep_time(self, key, max_rate):
|
def _get_sleep_time(self, key, max_rate):
|
||||||
'''
|
'''
|
||||||
Returns the amount of time (a float in seconds) that the app
|
Returns the amount of time (a float in seconds) that the app
|
||||||
should sleep. Throws a MaxSleepTimeHit exception if maximum
|
should sleep.
|
||||||
sleep time is exceeded.
|
|
||||||
|
|
||||||
:param key: a memcache key
|
:param key: a memcache key
|
||||||
:param max_rate: maximum rate allowed in requests per second
|
:param max_rate: maximum rate allowed in requests per second
|
||||||
|
:raises: MaxSleepTimeHitError if max sleep time is exceeded.
|
||||||
'''
|
'''
|
||||||
now_m = int(round(time.time() * self.clock_accuracy))
|
now_m = int(round(time.time() * self.clock_accuracy))
|
||||||
time_per_request_m = int(round(self.clock_accuracy / max_rate))
|
time_per_request_m = int(round(self.clock_accuracy / max_rate))
|
||||||
running_time_m = self.memcache_client.incr(key,
|
running_time_m = self.memcache_client.incr(key,
|
||||||
delta=time_per_request_m)
|
delta=time_per_request_m)
|
||||||
need_to_sleep_m = 0
|
need_to_sleep_m = 0
|
||||||
request_time_limit = now_m + (time_per_request_m * max_rate)
|
if (now_m - running_time_m >
|
||||||
if running_time_m < now_m:
|
self.rate_buffer_seconds * self.clock_accuracy):
|
||||||
next_avail_time = int(now_m + time_per_request_m)
|
next_avail_time = int(now_m + time_per_request_m)
|
||||||
self.memcache_client.set(key, str(next_avail_time),
|
self.memcache_client.set(key, str(next_avail_time),
|
||||||
serialize=False)
|
serialize=False)
|
||||||
elif running_time_m - now_m - time_per_request_m > 0:
|
else:
|
||||||
need_to_sleep_m = running_time_m - now_m - time_per_request_m
|
need_to_sleep_m = \
|
||||||
|
max(running_time_m - now_m - time_per_request_m, 0)
|
||||||
|
|
||||||
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
|
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
|
||||||
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
|
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
|
||||||
# treat as no-op decrement time
|
# treat as no-op decrement time
|
||||||
self.memcache_client.decr(key, delta=time_per_request_m)
|
self.memcache_client.decr(key, delta=time_per_request_m)
|
||||||
raise MaxSleepTimeHit("Max Sleep Time Exceeded: %s" %
|
raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" %
|
||||||
need_to_sleep_m)
|
need_to_sleep_m)
|
||||||
|
|
||||||
return float(need_to_sleep_m) / self.clock_accuracy
|
return float(need_to_sleep_m) / self.clock_accuracy
|
||||||
|
|
||||||
@@ -168,26 +170,25 @@ class RateLimitMiddleware(object):
|
|||||||
'''
|
'''
|
||||||
if account_name in self.ratelimit_blacklist:
|
if account_name in self.ratelimit_blacklist:
|
||||||
self.logger.error(_('Returning 497 because of blacklisting'))
|
self.logger.error(_('Returning 497 because of blacklisting'))
|
||||||
|
eventlet.sleep(self.BLACK_LIST_SLEEP)
|
||||||
return Response(status='497 Blacklisted',
|
return Response(status='497 Blacklisted',
|
||||||
body='Your account has been blacklisted', request=req)
|
body='Your account has been blacklisted', request=req)
|
||||||
if account_name in self.ratelimit_whitelist:
|
if account_name in self.ratelimit_whitelist:
|
||||||
return None
|
return None
|
||||||
for key, max_rate in self.get_ratelimitable_key_tuples(
|
for key, max_rate in self.get_ratelimitable_key_tuples(
|
||||||
req.method,
|
req.method, account_name, container_name=container_name,
|
||||||
account_name,
|
obj_name=obj_name):
|
||||||
container_name=container_name,
|
|
||||||
obj_name=obj_name):
|
|
||||||
try:
|
try:
|
||||||
need_to_sleep = self._get_sleep_time(key, max_rate)
|
need_to_sleep = self._get_sleep_time(key, max_rate)
|
||||||
if self.log_sleep_time_seconds and \
|
if self.log_sleep_time_seconds and \
|
||||||
need_to_sleep > self.log_sleep_time_seconds:
|
need_to_sleep > self.log_sleep_time_seconds:
|
||||||
self.logger.info(_("Ratelimit sleep log: %(sleep)s for "
|
self.logger.warning(_("Ratelimit sleep log: %(sleep)s for "
|
||||||
"%(account)s/%(container)s/%(object)s"),
|
"%(account)s/%(container)s/%(object)s"),
|
||||||
{'sleep': need_to_sleep, 'account': account_name,
|
{'sleep': need_to_sleep, 'account': account_name,
|
||||||
'container': container_name, 'object': obj_name})
|
'container': container_name, 'object': obj_name})
|
||||||
if need_to_sleep > 0:
|
if need_to_sleep > 0:
|
||||||
eventlet.sleep(need_to_sleep)
|
eventlet.sleep(need_to_sleep)
|
||||||
except MaxSleepTimeHit, e:
|
except MaxSleepTimeHitError, e:
|
||||||
self.logger.error(_('Returning 498 because of ops rate '
|
self.logger.error(_('Returning 498 because of ops rate '
|
||||||
'limiting (Max Sleep) %s') % str(e))
|
'limiting (Max Sleep) %s') % str(e))
|
||||||
error_resp = Response(status='498 Rate Limited',
|
error_resp = Response(status='498 Rate Limited',
|
||||||
|
@@ -820,7 +820,7 @@ def audit_location_generator(devices, datadir, mount_check=True, logger=None):
|
|||||||
yield path, device, partition
|
yield path, device, partition
|
||||||
|
|
||||||
|
|
||||||
def ratelimit_sleep(running_time, max_rate, incr_by=1):
|
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
|
||||||
'''
|
'''
|
||||||
Will eventlet.sleep() for the appropriate time so that the max_rate
|
Will eventlet.sleep() for the appropriate time so that the max_rate
|
||||||
is never exceeded. If max_rate is 0, will not ratelimit. The
|
is never exceeded. If max_rate is 0, will not ratelimit. The
|
||||||
@@ -834,13 +834,17 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1):
|
|||||||
:param incr_by: How much to increment the counter. Useful if you want
|
:param incr_by: How much to increment the counter. Useful if you want
|
||||||
to ratelimit 1024 bytes/sec and have differing sizes
|
to ratelimit 1024 bytes/sec and have differing sizes
|
||||||
of requests. Must be >= 0.
|
of requests. Must be >= 0.
|
||||||
|
:param rate_buffer: Number of seconds the rate counter can drop and be
|
||||||
|
allowed to catch up (at a faster than listed rate).
|
||||||
|
A larger number will result in larger spikes in rate
|
||||||
|
but better average accuracy.
|
||||||
'''
|
'''
|
||||||
if not max_rate or incr_by <= 0:
|
if not max_rate or incr_by <= 0:
|
||||||
return running_time
|
return running_time
|
||||||
clock_accuracy = 1000.0
|
clock_accuracy = 1000.0
|
||||||
now = time.time() * clock_accuracy
|
now = time.time() * clock_accuracy
|
||||||
time_per_request = clock_accuracy * (float(incr_by) / max_rate)
|
time_per_request = clock_accuracy * (float(incr_by) / max_rate)
|
||||||
if running_time < now:
|
if now - running_time > rate_buffer * clock_accuracy:
|
||||||
running_time = now
|
running_time = now
|
||||||
elif running_time - now > time_per_request:
|
elif running_time - now > time_per_request:
|
||||||
eventlet.sleep((running_time - now) / clock_accuracy)
|
eventlet.sleep((running_time - now) / clock_accuracy)
|
||||||
|
@@ -38,6 +38,7 @@ class ObjectAuditor(Daemon):
|
|||||||
self.max_files_per_second = float(conf.get('files_per_second', 20))
|
self.max_files_per_second = float(conf.get('files_per_second', 20))
|
||||||
self.max_bytes_per_second = float(conf.get('bytes_per_second',
|
self.max_bytes_per_second = float(conf.get('bytes_per_second',
|
||||||
10000000))
|
10000000))
|
||||||
|
self.log_time = int(conf.get('log_time', 3600))
|
||||||
self.files_running_time = 0
|
self.files_running_time = 0
|
||||||
self.bytes_running_time = 0
|
self.bytes_running_time = 0
|
||||||
self.bytes_processed = 0
|
self.bytes_processed = 0
|
||||||
@@ -46,7 +47,6 @@ class ObjectAuditor(Daemon):
|
|||||||
self.passes = 0
|
self.passes = 0
|
||||||
self.quarantines = 0
|
self.quarantines = 0
|
||||||
self.errors = 0
|
self.errors = 0
|
||||||
self.log_time = 3600 # once an hour
|
|
||||||
|
|
||||||
def run_forever(self):
|
def run_forever(self):
|
||||||
"""Run the object audit until stopped."""
|
"""Run the object audit until stopped."""
|
||||||
|
@@ -95,13 +95,13 @@ class FakeApp(object):
|
|||||||
class FakeLogger(object):
|
class FakeLogger(object):
|
||||||
# a thread safe logger
|
# a thread safe logger
|
||||||
|
|
||||||
def error(self, msg):
|
def error(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def info(self, msg):
|
def info(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def warning(self, msg):
|
def warning(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -224,6 +224,7 @@ class TestRateLimit(unittest.TestCase):
|
|||||||
'account_whitelist': 'a',
|
'account_whitelist': 'a',
|
||||||
'account_blacklist': 'b'}
|
'account_blacklist': 'b'}
|
||||||
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
|
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
|
||||||
|
self.test_ratelimit.BLACK_LIST_SLEEP = 0
|
||||||
ratelimit.http_connect = mock_http_connect(204)
|
ratelimit.http_connect = mock_http_connect(204)
|
||||||
req = Request.blank('/v/b/c')
|
req = Request.blank('/v/b/c')
|
||||||
req.environ['swift.cache'] = FakeMemcache()
|
req.environ['swift.cache'] = FakeMemcache()
|
||||||
@@ -260,6 +261,7 @@ class TestRateLimit(unittest.TestCase):
|
|||||||
# making clock less accurate for nosetests running slow
|
# making clock less accurate for nosetests running slow
|
||||||
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
|
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
|
||||||
ratelimit.http_connect = mock_http_connect(204)
|
ratelimit.http_connect = mock_http_connect(204)
|
||||||
|
self.test_ratelimit.log_sleep_time_seconds = .00001
|
||||||
req = Request.blank('/v/a')
|
req = Request.blank('/v/a')
|
||||||
req.environ['swift.cache'] = FakeMemcache()
|
req.environ['swift.cache'] = FakeMemcache()
|
||||||
begin = time.time()
|
begin = time.time()
|
||||||
@@ -402,7 +404,5 @@ class TestRateLimit(unittest.TestCase):
|
|||||||
self._run(make_app_call, num_calls, current_rate)
|
self._run(make_app_call, num_calls, current_rate)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@@ -456,15 +456,6 @@ log_name = yarr'''
|
|||||||
# make sure its accurate to 10th of a second
|
# make sure its accurate to 10th of a second
|
||||||
self.assertTrue(abs(25 - (time.time() - start) * 100) < 10)
|
self.assertTrue(abs(25 - (time.time() - start) * 100) < 10)
|
||||||
|
|
||||||
def test_ratelimit_sleep_with_sleep(self):
|
|
||||||
running_time = 0
|
|
||||||
start = time.time()
|
|
||||||
for i in range(25):
|
|
||||||
running_time = utils.ratelimit_sleep(running_time, 50)
|
|
||||||
time.sleep(1.0 / 75)
|
|
||||||
# make sure its accurate to 10th of a second
|
|
||||||
self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
|
|
||||||
|
|
||||||
def test_ratelimit_sleep_with_incr(self):
|
def test_ratelimit_sleep_with_incr(self):
|
||||||
running_time = 0
|
running_time = 0
|
||||||
start = time.time()
|
start = time.time()
|
||||||
@@ -477,6 +468,17 @@ log_name = yarr'''
|
|||||||
total += i
|
total += i
|
||||||
self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
|
self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
|
||||||
|
|
||||||
|
def test_ratelimit_sleep_with_sleep(self):
|
||||||
|
running_time = 0
|
||||||
|
start = time.time()
|
||||||
|
sleeps = [0] * 7 + [.2] * 3 + [0] * 30
|
||||||
|
for i in sleeps:
|
||||||
|
running_time = utils.ratelimit_sleep(running_time, 40,
|
||||||
|
rate_buffer=1)
|
||||||
|
time.sleep(i)
|
||||||
|
# make sure its accurate to 10th of a second
|
||||||
|
self.assertTrue(abs(100 - (time.time() - start) * 100) < 10)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Reference in New Issue
Block a user