Add Per Endpoint API Rate Limit

Currently in nsxlib, there's no client side API rate throttling. In a
scale setup it is deemed to easily overwhelm NSX backend. This patch
introduces a per-endpoint rate limiter that blocks over-limit calls.

Change-Id: Iccd1d2675bed16833d36fa40cc2ef56cf3464652
This commit is contained in:
Shawn Wang 2020-04-14 15:02:05 -07:00
parent 0a76d4e3a1
commit 02c1c2e293
No known key found for this signature in database
GPG Key ID: C98A86CC967E89A7
4 changed files with 155 additions and 13 deletions

View File

@ -340,3 +340,74 @@ class NsxFeaturesTestCase(nsxlib_testcase.NsxLibTestCase):
nsx_constants.FEATURE_EXCLUDE_PORT_BY_TAG))
self.assertTrue(self.nsxlib.feature_supported(
nsx_constants.FEATURE_MAC_LEARNING))
class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase):
@mock.patch('time.time')
def test_calc_wait_time_no_wait(self, mock_time):
mock_time.return_value = 2.0
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
# no wait when no prev calls
self.assertEqual(rate_limiter._calc_wait_time(), 0)
# no wait when prev call in period window is less than max_calls
rate_limiter._call_time.append(0.9)
rate_limiter._call_time.append(1.5)
self.assertEqual(rate_limiter._calc_wait_time(), 0)
# timestamps out of current window should be removed
self.assertListEqual(list(rate_limiter._call_time), [1.5])
@mock.patch('time.time')
def test_calc_wait_time_need_wait(self, mock_time):
mock_time.return_value = 2.0
# At rate limit
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
rate_limiter._call_time.append(0.9)
rate_limiter._call_time.append(1.2)
rate_limiter._call_time.append(1.5)
self.assertAlmostEqual(rate_limiter._calc_wait_time(), 0.2)
# timestamps out of current window should be removed
self.assertListEqual(list(rate_limiter._call_time), [1.2, 1.5])
# Over rate limit. Enforce no compensation wait.
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
rate_limiter._call_time.append(0.9)
rate_limiter._call_time.append(1.2)
rate_limiter._call_time.append(1.5)
rate_limiter._call_time.append(1.8)
self.assertAlmostEqual(rate_limiter._calc_wait_time(), 0.5)
# timestamps out of current window should be removed
self.assertListEqual(list(rate_limiter._call_time), [1.2, 1.5, 1.8])
@mock.patch('vmware_nsxlib.v3.utils.APIRateLimiter._calc_wait_time')
@mock.patch('time.sleep')
@mock.patch('time.time')
def test_context_manager_no_wait(self, mock_time, mock_sleep, mock_calc):
mock_time.return_value = 2.0
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
mock_calc.return_value = 0
with rate_limiter as wait_time:
self.assertEqual(wait_time, 0)
mock_sleep.assert_not_called()
self.assertListEqual(list(rate_limiter._call_time), [2.0])
@mock.patch('vmware_nsxlib.v3.utils.APIRateLimiter._calc_wait_time')
@mock.patch('time.sleep')
def test_context_manager_disabled(self, mock_sleep, mock_calc):
rate_limiter = utils.APIRateLimiter(max_calls=None)
with rate_limiter as wait_time:
self.assertEqual(wait_time, 0)
mock_sleep.assert_not_called()
mock_calc.assert_not_called()
@mock.patch('vmware_nsxlib.v3.utils.APIRateLimiter._calc_wait_time')
@mock.patch('time.sleep')
@mock.patch('time.time')
def test_context_manager_need_wait(self, mock_time, mock_sleep, mock_calc):
mock_time.return_value = 2.0
rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0)
mock_calc.return_value = 0.5
with rate_limiter as wait_time:
self.assertEqual(wait_time, 0.5)
mock_sleep.assert_called_once_with(wait_time)
self.assertListEqual(list(rate_limiter._call_time), [2.0])

View File

@ -21,6 +21,7 @@ import inspect
import itertools
import logging
import re
import time
import eventlet
from eventlet import greenpool
@ -383,11 +384,12 @@ class Endpoint(object):
to the underlying connections.
"""
def __init__(self, provider, pool):
def __init__(self, provider, pool, api_rate_limit=None):
self.provider = provider
self.pool = pool
self._state = EndpointState.INITIALIZED
self._last_updated = datetime.datetime.now()
self.rate_limiter = utils.APIRateLimiter(api_rate_limit)
def regenerate_pool(self):
self.pool = pools.Pool(min_size=self.pool.min_size,
@ -427,9 +429,11 @@ class EndpointConnection(object):
Which contains an endpoint and a connection for that endpoint.
"""
def __init__(self, endpoint, connection):
def __init__(self, endpoint, connection, conn_wait, rate_wait):
self.endpoint = endpoint
self.connection = connection
self.conn_wait = conn_wait
self.rate_wait = rate_wait
class ClusteredAPI(object):
@ -445,15 +449,16 @@ class ClusteredAPI(object):
http_provider,
min_conns_per_pool=0,
max_conns_per_pool=20,
keepalive_interval=33):
keepalive_interval=33,
api_rate_limit=None):
self._http_provider = http_provider
self._keepalive_interval = keepalive_interval
self._print_keepalive = 0
def _init_cluster(*args, **kwargs):
self._init_endpoints(providers,
min_conns_per_pool, max_conns_per_pool)
self._init_endpoints(providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit)
_init_cluster()
@ -462,8 +467,8 @@ class ClusteredAPI(object):
# loops + state
self._reinit_cluster = _init_cluster
def _init_endpoints(self, providers,
min_conns_per_pool, max_conns_per_pool):
def _init_endpoints(self, providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit):
LOG.debug("Initializing API endpoints")
def _create_conn(p):
@ -480,7 +485,7 @@ class ClusteredAPI(object):
order_as_stack=True,
create=_create_conn(provider))
endpoint = Endpoint(provider, pool)
endpoint = Endpoint(provider, pool, api_rate_limit)
self._endpoints[provider.id] = endpoint
# service requests using round robin
@ -646,9 +651,21 @@ class ClusteredAPI(object):
{'ep': endpoint,
'max': endpoint.pool.max_size,
'waiting': endpoint.pool.waiting()})
conn_wait_start = time.time()
else:
conn_wait_start = None
# pool.item() will wait if pool has 0 free
with endpoint.pool.item() as conn:
yield EndpointConnection(endpoint, conn)
if conn_wait_start:
conn_wait = time.time() - conn_wait_start
else:
conn_wait = 0
with endpoint.rate_limiter as rate_wait:
# Connection validation calls are not currently rate-limited
# by this context manager.
# This should be fine as validation api calls are sent in a
# slow rate at once per 33 seconds by default.
yield EndpointConnection(endpoint, conn, conn_wait, rate_wait)
def _proxy_stub(self, proxy_for):
def _call_proxy(url, *args, **kwargs):
@ -675,8 +692,10 @@ class ClusteredAPI(object):
if conn.default_headers:
kwargs['headers'] = kwargs.get('headers', {})
kwargs['headers'].update(conn.default_headers)
LOG.debug("API cluster proxy %s %s to %s with %s",
proxy_for.upper(), uri, url, kwargs)
LOG.debug("API cluster proxy %s %s to %s with %s. "
"Waited conn: %2.4f, rate: %2.4f",
proxy_for.upper(), uri, url, kwargs,
conn_data.conn_wait, conn_data.rate_wait)
# call the actual connection method to do the
# http request/response over the wire
@ -716,7 +735,8 @@ class NSXClusteredAPI(ClusteredAPI):
self._build_conf_providers(),
self._http_provider,
max_conns_per_pool=self.nsxlib_config.concurrent_connections,
keepalive_interval=self.nsxlib_config.conn_idle_timeout)
keepalive_interval=self.nsxlib_config.conn_idle_timeout,
api_rate_limit=self.nsxlib_config.api_rate_limit_per_endpoint)
LOG.debug("Created NSX clustered API with '%s' "
"provider", self._http_provider.provider_id)

View File

@ -90,6 +90,13 @@ class NsxLibConfig(object):
configured in the cluster, since there
will be no keepalive probes in this
case.
:param api_rate_limit_per_endpoint: If set to positive integer, API calls
sent to each endpoint will be limited
to a max rate of this value per second.
The rate limit is not enforced on
connection validations. This option
defaults to None, which disables rate
limit.
-- Additional parameters which are relevant only for the Policy manager:
:param allow_passthrough: If True, use nsx manager api for cases which are
@ -127,7 +134,8 @@ class NsxLibConfig(object):
cluster_unavailable_retry=False,
allow_passthrough=False,
realization_max_attempts=50,
realization_wait_sec=1.0):
realization_wait_sec=1.0,
api_rate_limit_per_endpoint=None):
self.nsx_api_managers = nsx_api_managers
self._username = username
@ -155,6 +163,7 @@ class NsxLibConfig(object):
self.allow_passthrough = allow_passthrough
self.realization_max_attempts = realization_max_attempts
self.realization_wait_sec = realization_wait_sec
self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint
if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry:
LOG.warning("When only one endpoint is provided, keepalive probes"

View File

@ -17,6 +17,7 @@ import abc
import collections
import inspect
import re
from threading import Lock
import time
from oslo_log import log
@ -667,3 +668,44 @@ def get_dhcp_opt_code(name):
'reboot-time': 211,
}
return _supported_options.get(name)
class APIRateLimiter(object):
def __init__(self, max_calls, period=1.0):
self._enabled = max_calls is not None
if not self._enabled:
return
if period <= 0 or int(max_calls) <= 0:
raise ValueError('period and max_calls should be positive')
self._period = period
self._max_calls = int(max_calls)
self._call_time = collections.deque()
self._lock = Lock()
def __enter__(self):
if not self._enabled:
return 0
with self._lock:
wait_time = self._calc_wait_time()
if wait_time:
time.sleep(wait_time)
# assume api call happens immediately after entering context
self._call_time.append(time.time())
return wait_time
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def _calc_wait_time(self):
now = time.time()
# remove timestamps out of current window
while self._call_time and now - self._period > self._call_time[0]:
self._call_time.popleft()
current_rate = len(self._call_time)
if current_rate < self._max_calls:
return 0
# call_time contains at least #max_rate timestamps.
# earliest possible time to get below rate limit is at
# T = self.call_time[-self.max_calls] + self.period
# Thus need to wait T - now
return self._call_time[-self._max_calls] + self._period - now