Add AIMD for Adaptive API Rate Limit

In a multi cluster setup, an adaptive API rate limit is more useful as
utilization can be dynamically balanced across all active clusters.

AIMD from TCP congestion control is a simple but effective algorithm
that fits our need here, as:
- API rate is similar to TCP window size. Each API call sent
  concurrently is similar to packets in the fly.
- Each successful API call that was blocked before sent will cause rate
  limit to be increased by 1. Similar to each ACK received.
- Each failed API call due to Server Busy (429/503) will cause rate
  limit to be decreased by half. Similar to packet loss.

When adaptive rate is set to AIMD, a custom hard limit can still be set,
max at 100/s. TCP slow start is not implemented as the upperbound of
rate is relativly small. API rate will be adjusted per period. API
rate under no circumstances will exceed the hard limit.

Change-Id: I7360f422c704d63adf59895b893dcdbef05cfd23
(cherry picked from commit 56cb08691d)
This commit is contained in:
Shawn Wang 2020-05-27 16:02:42 -07:00
parent b1b45342b5
commit 034f2d201d
5 changed files with 149 additions and 18 deletions

View File

@ -392,11 +392,13 @@ class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase):
def setUp(self, *args, **kwargs):
super(APIRateLimiterTestCase, self).setUp(with_mocks=False)
self.rate_limiter = utils.APIRateLimiter
@mock.patch('time.time')
def test_calc_wait_time_no_wait(self, mock_time):
mock_time.return_value = 2.0
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
rate_limiter = self.rate_limiter(max_calls=2, period=1.0)
rate_limiter._max_calls = 2
# no wait when no prev calls
self.assertEqual(rate_limiter._calc_wait_time(), 0)
# no wait when prev call in period window is less than max_calls
@ -411,7 +413,8 @@ class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase):
mock_time.return_value = 2.0
# At rate limit
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
rate_limiter = self.rate_limiter(max_calls=2, period=1.0)
rate_limiter._max_calls = 2
rate_limiter._call_time.append(0.9)
rate_limiter._call_time.append(1.2)
rate_limiter._call_time.append(1.5)
@ -420,7 +423,8 @@ class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase):
self.assertListEqual(list(rate_limiter._call_time), [1.2, 1.5])
# Over rate limit. Enforce no compensation wait.
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
rate_limiter = self.rate_limiter(max_calls=2, period=1.0)
rate_limiter._max_calls = 2
rate_limiter._call_time.append(0.9)
rate_limiter._call_time.append(1.2)
rate_limiter._call_time.append(1.5)
@ -434,7 +438,7 @@ class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase):
@mock.patch('time.time')
def test_context_manager_no_wait(self, mock_time, mock_sleep, mock_calc):
mock_time.return_value = 2.0
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
rate_limiter = self.rate_limiter(max_calls=2, period=1.0)
mock_calc.return_value = 0
with rate_limiter as wait_time:
self.assertEqual(wait_time, 0)
@ -444,7 +448,7 @@ class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase):
@mock.patch('vmware_nsxlib.v3.utils.APIRateLimiter._calc_wait_time')
@mock.patch('time.sleep')
def test_context_manager_disabled(self, mock_sleep, mock_calc):
rate_limiter = utils.APIRateLimiter(max_calls=None)
rate_limiter = self.rate_limiter(max_calls=None)
with rate_limiter as wait_time:
self.assertEqual(wait_time, 0)
mock_sleep.assert_not_called()
@ -454,10 +458,65 @@ class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase):
@mock.patch('time.sleep')
@mock.patch('time.time')
def test_context_manager_need_wait(self, mock_time, mock_sleep, mock_calc):
mock_time.return_value = 2.0
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
mock_time.return_value = 0.0
rate_limiter = self.rate_limiter(max_calls=2, period=1.0)
mock_time.side_effect = [2.0, 2.5]
mock_calc.return_value = 0.5
with rate_limiter as wait_time:
self.assertEqual(wait_time, 0.5)
mock_sleep.assert_called_once_with(wait_time)
self.assertListEqual(list(rate_limiter._call_time), [2.0])
self.assertListEqual(list(rate_limiter._call_time), [2.5])
class APIRateLimiterAIMDTestCase(APIRateLimiterTestCase):
def setUp(self, *args, **kwargs):
super(APIRateLimiterAIMDTestCase, self).setUp(with_mocks=False)
self.rate_limiter = utils.APIRateLimiterAIMD
@mock.patch('time.time')
def test_adjust_rate_increase(self, mock_time):
mock_time.side_effect = [0.0, 2.0, 4.0, 6.0]
rate_limiter = self.rate_limiter(max_calls=10)
rate_limiter._max_calls = 8
# normal period increases rate by 1, even for non-200 normal codes
rate_limiter.adjust_rate(wait_time=1.0, status_code=404)
self.assertEqual(rate_limiter._max_calls, 9)
# max calls limited by top limit
rate_limiter.adjust_rate(wait_time=1.0, status_code=200)
rate_limiter.adjust_rate(wait_time=1.0, status_code=200)
self.assertEqual(rate_limiter._max_calls, 10)
@mock.patch('time.time')
def test_adjust_rate_decrease(self, mock_time):
mock_time.side_effect = [0.0, 2.0, 4.0, 6.0]
rate_limiter = self.rate_limiter(max_calls=10)
rate_limiter._max_calls = 4
# 429 or 503 should decrease rate by half
rate_limiter.adjust_rate(wait_time=1.0, status_code=429)
self.assertEqual(rate_limiter._max_calls, 2)
rate_limiter.adjust_rate(wait_time=0.0, status_code=503)
self.assertEqual(rate_limiter._max_calls, 1)
# lower bound should be 1
rate_limiter.adjust_rate(wait_time=1.0, status_code=503)
self.assertEqual(rate_limiter._max_calls, 1)
@mock.patch('time.time')
def test_adjust_rate_no_change(self, mock_time):
mock_time.side_effect = [0.0, 2.0, 2.5, 2.6]
rate_limiter = self.rate_limiter(max_calls=10)
rate_limiter._max_calls = 4
# non blocked successful calls should not change rate
rate_limiter.adjust_rate(wait_time=0.001, status_code=200)
self.assertEqual(rate_limiter._max_calls, 4)
# too fast calls should not change rate
rate_limiter.adjust_rate(wait_time=1.0, status_code=200)
self.assertEqual(rate_limiter._max_calls, 4)
rate_limiter.adjust_rate(wait_time=1.0, status_code=429)
self.assertEqual(rate_limiter._max_calls, 4)
def test_adjust_rate_disabled(self):
rate_limiter = self.rate_limiter(max_calls=None)
rate_limiter.adjust_rate(wait_time=0.001, status_code=200)
self.assertFalse(hasattr(rate_limiter, '_max_calls'))

View File

@ -38,6 +38,7 @@ import urllib3
from vmware_nsxlib._i18n import _
from vmware_nsxlib.v3 import client as nsx_client
from vmware_nsxlib.v3 import constants
from vmware_nsxlib.v3 import exceptions
from vmware_nsxlib.v3 import utils
@ -360,12 +361,17 @@ class Endpoint(object):
to the underlying connections.
"""
def __init__(self, provider, pool, api_rate_limit=None):
def __init__(self, provider, pool, api_rate_limit=None,
api_rate_mode=None):
self.provider = provider
self.pool = pool
self._state = EndpointState.INITIALIZED
self._last_updated = datetime.datetime.now()
self.rate_limiter = utils.APIRateLimiter(api_rate_limit)
if api_rate_mode == constants.API_RATE_MODE_AIMD:
self.rate_limiter = utils.APIRateLimiterAIMD(
max_calls=api_rate_limit)
else:
self.rate_limiter = utils.APIRateLimiter(max_calls=api_rate_limit)
def regenerate_pool(self):
self.pool = pools.Pool(min_size=self.pool.min_size,
@ -426,7 +432,8 @@ class ClusteredAPI(object):
min_conns_per_pool=0,
max_conns_per_pool=20,
keepalive_interval=33,
api_rate_limit=None):
api_rate_limit=None,
api_rate_mode=None):
self._http_provider = http_provider
self._keepalive_interval = keepalive_interval
@ -434,7 +441,8 @@ class ClusteredAPI(object):
def _init_cluster(*args, **kwargs):
self._init_endpoints(providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit)
max_conns_per_pool, api_rate_limit,
api_rate_mode)
_init_cluster()
@ -444,7 +452,7 @@ class ClusteredAPI(object):
self._reinit_cluster = _init_cluster
def _init_endpoints(self, providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit):
max_conns_per_pool, api_rate_limit, api_rate_mode):
LOG.debug("Initializing API endpoints")
def _create_conn(p):
@ -461,7 +469,7 @@ class ClusteredAPI(object):
order_as_stack=True,
create=_create_conn(provider))
endpoint = Endpoint(provider, pool, api_rate_limit)
endpoint = Endpoint(provider, pool, api_rate_limit, api_rate_mode)
self._endpoints[provider.id] = endpoint
# service requests using round robin
@ -719,6 +727,11 @@ class ClusteredAPI(object):
response = do_request(url, *args, **kwargs)
endpoint.set_state(EndpointState.UP)
# Adjust API Rate Limit before raising HTTP exception
endpoint.rate_limiter.adjust_rate(
wait_time=conn_data.rate_wait,
status_code=response.status_code)
# for some status codes, we need to bring the cluster
# down or retry API call
self._raise_http_exception_if_needed(response, endpoint)
@ -760,7 +773,8 @@ class NSXClusteredAPI(ClusteredAPI):
self._http_provider,
max_conns_per_pool=self.nsxlib_config.concurrent_connections,
keepalive_interval=self.nsxlib_config.conn_idle_timeout,
api_rate_limit=self.nsxlib_config.api_rate_limit_per_endpoint)
api_rate_limit=self.nsxlib_config.api_rate_limit_per_endpoint,
api_rate_mode=self.nsxlib_config.api_rate_mode)
LOG.debug("Created NSX clustered API with '%s' "
"provider", self._http_provider.provider_id)

View File

@ -144,6 +144,14 @@ class NsxLibConfig(object):
connection validations. This option
defaults to None, which disables rate
limit.
:param api_rate_mode: Algorithm used to adaptively adjust max API rate
limit. If not set, the max rate will not be
automatically changed. If set to 'AIMD', max API
rate will be increase by 1 after successful calls
that was blocked before sent, and will be decreased
by half after 429/503 error for each period.
The rate has hard max limit of min(100/s, param
api_rate_limit_per_endpoint).
-- Additional parameters which are relevant only for the Policy manager:
:param allow_passthrough: If True, use nsx manager api for cases which are
@ -182,6 +190,7 @@ class NsxLibConfig(object):
realization_max_attempts=50,
realization_wait_sec=1.0,
api_rate_limit_per_endpoint=None,
api_rate_mode=None,
exception_config=None):
self.nsx_api_managers = nsx_api_managers
@ -211,6 +220,7 @@ class NsxLibConfig(object):
self.realization_max_attempts = realization_max_attempts
self.realization_wait_sec = realization_wait_sec
self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint
self.api_rate_mode = api_rate_mode
self.exception_config = exception_config or ExceptionConfig()
if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry:

View File

@ -115,3 +115,11 @@ IPV4_ICMP_TYPES = {0: [0], # Echo reply
IPV4_ICMP_STRICT_TYPES = IPV4_ICMP_TYPES.copy()
# Note: replace item 9 as we did a shallow copy
IPV4_ICMP_STRICT_TYPES[9] = [0]
# API Rate Limiter Related const
API_RATE_MODE_AIMD = 'AIMD'
# HTTP status code to trigger API rate decrement
API_REDUCE_RATE_CODES = [429, 503]
# Minimum time in seconds to consider a call as blocked due to rate limit
API_WAIT_MIN_THRESHOLD = 0.01
API_DEFAULT_MAX_RATE = 100

View File

@ -702,24 +702,30 @@ class APIRateLimiter(object):
if period <= 0 or int(max_calls) <= 0:
raise ValueError('period and max_calls should be positive')
self._period = period
self._max_calls = int(max_calls)
self._max_calls = min(int(max_calls),
constants.API_DEFAULT_MAX_RATE)
self._call_time = collections.deque()
self._lock = Lock()
def __enter__(self):
if not self._enabled:
return 0
pre_wait_ts = time.time()
with self._lock:
wait_time = self._calc_wait_time()
if wait_time:
time.sleep(wait_time)
# assume api call happens immediately after entering context
self._call_time.append(time.time())
return wait_time
post_wait_ts = time.time()
self._call_time.append(post_wait_ts)
return post_wait_ts - pre_wait_ts
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def adjust_rate(self, **kwargs):
pass
def _calc_wait_time(self):
now = time.time()
# remove timestamps out of current window
@ -733,3 +739,37 @@ class APIRateLimiter(object):
# T = self.call_time[-self.max_calls] + self.period
# Thus need to wait T - now
return self._call_time[-self._max_calls] + self._period - now
class APIRateLimiterAIMD(APIRateLimiter):
def __init__(self, max_calls, period=1.0):
super(APIRateLimiterAIMD, self).__init__(max_calls, period=period)
if self._enabled:
self._top_rate = self._max_calls
self._max_calls = 1
self._pos_sig = 0
self._neg_sig = 0
self._last_adjust_rate = time.time()
def adjust_rate(self, wait_time=0.0, status_code=200, **kwargs):
if not self._enabled:
return
with self._lock:
if status_code in constants.API_REDUCE_RATE_CODES:
self._neg_sig += 1
elif wait_time >= constants.API_WAIT_MIN_THRESHOLD:
self._pos_sig += 1
now = time.time()
if now - self._last_adjust_rate >= self._period:
if self._neg_sig > 0:
self._max_calls = max(self._max_calls // 2, 1)
LOG.debug("Decreasing API rate limit to %d due to HTTP "
"status code %d", self._max_calls, status_code)
elif self._pos_sig > 0:
self._max_calls = min(self._max_calls + 1, self._top_rate)
LOG.debug("Increasing API rate limit to %d with HTTP "
"status code %d", self._max_calls, status_code)
self._pos_sig = 0
self._neg_sig = 0
self._last_adjust_rate = now