From 02c1c2e2930743deffdd1708822c586822e4f39b Mon Sep 17 00:00:00 2001 From: Shawn Wang Date: Tue, 14 Apr 2020 15:02:05 -0700 Subject: [PATCH] Add Per Endpoint API Rate Limit Currently in nsxlib, there's no client side API rate throttling. In a scale setup it is deemed to easily overwhelm NSX backend. This patch introduces a per-endpoint rate limiter that blocks over-limit calls. Change-Id: Iccd1d2675bed16833d36fa40cc2ef56cf3464652 --- vmware_nsxlib/tests/unit/v3/test_utils.py | 71 +++++++++++++++++++++++ vmware_nsxlib/v3/cluster.py | 44 ++++++++++---- vmware_nsxlib/v3/config.py | 11 +++- vmware_nsxlib/v3/utils.py | 42 ++++++++++++++ 4 files changed, 155 insertions(+), 13 deletions(-) diff --git a/vmware_nsxlib/tests/unit/v3/test_utils.py b/vmware_nsxlib/tests/unit/v3/test_utils.py index d2c7b088..be5eabfb 100644 --- a/vmware_nsxlib/tests/unit/v3/test_utils.py +++ b/vmware_nsxlib/tests/unit/v3/test_utils.py @@ -340,3 +340,74 @@ class NsxFeaturesTestCase(nsxlib_testcase.NsxLibTestCase): nsx_constants.FEATURE_EXCLUDE_PORT_BY_TAG)) self.assertTrue(self.nsxlib.feature_supported( nsx_constants.FEATURE_MAC_LEARNING)) + + +class APIRateLimiterTestCase(nsxlib_testcase.NsxLibTestCase): + @mock.patch('time.time') + def test_calc_wait_time_no_wait(self, mock_time): + mock_time.return_value = 2.0 + rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0) + # no wait when no prev calls + self.assertEqual(rate_limiter._calc_wait_time(), 0) + # no wait when prev call in period window is less than max_calls + rate_limiter._call_time.append(0.9) + rate_limiter._call_time.append(1.5) + self.assertEqual(rate_limiter._calc_wait_time(), 0) + # timestamps out of current window should be removed + self.assertListEqual(list(rate_limiter._call_time), [1.5]) + + @mock.patch('time.time') + def test_calc_wait_time_need_wait(self, mock_time): + mock_time.return_value = 2.0 + + # At rate limit + rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0) + rate_limiter._call_time.append(0.9) + rate_limiter._call_time.append(1.2) + rate_limiter._call_time.append(1.5) + self.assertAlmostEqual(rate_limiter._calc_wait_time(), 0.2) + # timestamps out of current window should be removed + self.assertListEqual(list(rate_limiter._call_time), [1.2, 1.5]) + + # Over rate limit. Enforce no compensation wait. + rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0) + rate_limiter._call_time.append(0.9) + rate_limiter._call_time.append(1.2) + rate_limiter._call_time.append(1.5) + rate_limiter._call_time.append(1.8) + self.assertAlmostEqual(rate_limiter._calc_wait_time(), 0.5) + # timestamps out of current window should be removed + self.assertListEqual(list(rate_limiter._call_time), [1.2, 1.5, 1.8]) + + @mock.patch('vmware_nsxlib.v3.utils.APIRateLimiter._calc_wait_time') + @mock.patch('time.sleep') + @mock.patch('time.time') + def test_context_manager_no_wait(self, mock_time, mock_sleep, mock_calc): + mock_time.return_value = 2.0 + rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0) + mock_calc.return_value = 0 + with rate_limiter as wait_time: + self.assertEqual(wait_time, 0) + mock_sleep.assert_not_called() + self.assertListEqual(list(rate_limiter._call_time), [2.0]) + + @mock.patch('vmware_nsxlib.v3.utils.APIRateLimiter._calc_wait_time') + @mock.patch('time.sleep') + def test_context_manager_disabled(self, mock_sleep, mock_calc): + rate_limiter = utils.APIRateLimiter(max_calls=None) + with rate_limiter as wait_time: + self.assertEqual(wait_time, 0) + mock_sleep.assert_not_called() + mock_calc.assert_not_called() + + @mock.patch('vmware_nsxlib.v3.utils.APIRateLimiter._calc_wait_time') + @mock.patch('time.sleep') + @mock.patch('time.time') + def test_context_manager_need_wait(self, mock_time, mock_sleep, mock_calc): + mock_time.return_value = 2.0 + rate_limiter = utils.APIRateLimiter(max_calls=2, period=1.0) + mock_calc.return_value = 0.5 + with rate_limiter as wait_time: + self.assertEqual(wait_time, 0.5) + mock_sleep.assert_called_once_with(wait_time) + self.assertListEqual(list(rate_limiter._call_time), [2.0]) diff --git a/vmware_nsxlib/v3/cluster.py b/vmware_nsxlib/v3/cluster.py index 39d09d24..dbacb9aa 100644 --- a/vmware_nsxlib/v3/cluster.py +++ b/vmware_nsxlib/v3/cluster.py @@ -21,6 +21,7 @@ import inspect import itertools import logging import re +import time import eventlet from eventlet import greenpool @@ -383,11 +384,12 @@ class Endpoint(object): to the underlying connections. """ - def __init__(self, provider, pool): + def __init__(self, provider, pool, api_rate_limit=None): self.provider = provider self.pool = pool self._state = EndpointState.INITIALIZED self._last_updated = datetime.datetime.now() + self.rate_limiter = utils.APIRateLimiter(api_rate_limit) def regenerate_pool(self): self.pool = pools.Pool(min_size=self.pool.min_size, @@ -427,9 +429,11 @@ class EndpointConnection(object): Which contains an endpoint and a connection for that endpoint. """ - def __init__(self, endpoint, connection): + def __init__(self, endpoint, connection, conn_wait, rate_wait): self.endpoint = endpoint self.connection = connection + self.conn_wait = conn_wait + self.rate_wait = rate_wait class ClusteredAPI(object): @@ -445,15 +449,16 @@ class ClusteredAPI(object): http_provider, min_conns_per_pool=0, max_conns_per_pool=20, - keepalive_interval=33): + keepalive_interval=33, + api_rate_limit=None): self._http_provider = http_provider self._keepalive_interval = keepalive_interval self._print_keepalive = 0 def _init_cluster(*args, **kwargs): - self._init_endpoints(providers, - min_conns_per_pool, max_conns_per_pool) + self._init_endpoints(providers, min_conns_per_pool, + max_conns_per_pool, api_rate_limit) _init_cluster() @@ -462,8 +467,8 @@ class ClusteredAPI(object): # loops + state self._reinit_cluster = _init_cluster - def _init_endpoints(self, providers, - min_conns_per_pool, max_conns_per_pool): + def _init_endpoints(self, providers, min_conns_per_pool, + max_conns_per_pool, api_rate_limit): LOG.debug("Initializing API endpoints") def _create_conn(p): @@ -480,7 +485,7 @@ class ClusteredAPI(object): order_as_stack=True, create=_create_conn(provider)) - endpoint = Endpoint(provider, pool) + endpoint = Endpoint(provider, pool, api_rate_limit) self._endpoints[provider.id] = endpoint # service requests using round robin @@ -646,9 +651,21 @@ class ClusteredAPI(object): {'ep': endpoint, 'max': endpoint.pool.max_size, 'waiting': endpoint.pool.waiting()}) + conn_wait_start = time.time() + else: + conn_wait_start = None # pool.item() will wait if pool has 0 free with endpoint.pool.item() as conn: - yield EndpointConnection(endpoint, conn) + if conn_wait_start: + conn_wait = time.time() - conn_wait_start + else: + conn_wait = 0 + with endpoint.rate_limiter as rate_wait: + # Connection validation calls are not currently rate-limited + # by this context manager. + # This should be fine as validation api calls are sent in a + # slow rate at once per 33 seconds by default. + yield EndpointConnection(endpoint, conn, conn_wait, rate_wait) def _proxy_stub(self, proxy_for): def _call_proxy(url, *args, **kwargs): @@ -675,8 +692,10 @@ class ClusteredAPI(object): if conn.default_headers: kwargs['headers'] = kwargs.get('headers', {}) kwargs['headers'].update(conn.default_headers) - LOG.debug("API cluster proxy %s %s to %s with %s", - proxy_for.upper(), uri, url, kwargs) + LOG.debug("API cluster proxy %s %s to %s with %s. " + "Waited conn: %2.4f, rate: %2.4f", + proxy_for.upper(), uri, url, kwargs, + conn_data.conn_wait, conn_data.rate_wait) # call the actual connection method to do the # http request/response over the wire @@ -716,7 +735,8 @@ class NSXClusteredAPI(ClusteredAPI): self._build_conf_providers(), self._http_provider, max_conns_per_pool=self.nsxlib_config.concurrent_connections, - keepalive_interval=self.nsxlib_config.conn_idle_timeout) + keepalive_interval=self.nsxlib_config.conn_idle_timeout, + api_rate_limit=self.nsxlib_config.api_rate_limit_per_endpoint) LOG.debug("Created NSX clustered API with '%s' " "provider", self._http_provider.provider_id) diff --git a/vmware_nsxlib/v3/config.py b/vmware_nsxlib/v3/config.py index 7fe9969d..d030f06a 100644 --- a/vmware_nsxlib/v3/config.py +++ b/vmware_nsxlib/v3/config.py @@ -90,6 +90,13 @@ class NsxLibConfig(object): configured in the cluster, since there will be no keepalive probes in this case. + :param api_rate_limit_per_endpoint: If set to positive integer, API calls + sent to each endpoint will be limited + to a max rate of this value per second. + The rate limit is not enforced on + connection validations. This option + defaults to None, which disables rate + limit. -- Additional parameters which are relevant only for the Policy manager: :param allow_passthrough: If True, use nsx manager api for cases which are @@ -127,7 +134,8 @@ class NsxLibConfig(object): cluster_unavailable_retry=False, allow_passthrough=False, realization_max_attempts=50, - realization_wait_sec=1.0): + realization_wait_sec=1.0, + api_rate_limit_per_endpoint=None): self.nsx_api_managers = nsx_api_managers self._username = username @@ -155,6 +163,7 @@ class NsxLibConfig(object): self.allow_passthrough = allow_passthrough self.realization_max_attempts = realization_max_attempts self.realization_wait_sec = realization_wait_sec + self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry: LOG.warning("When only one endpoint is provided, keepalive probes" diff --git a/vmware_nsxlib/v3/utils.py b/vmware_nsxlib/v3/utils.py index 8c741f4f..19ed8498 100644 --- a/vmware_nsxlib/v3/utils.py +++ b/vmware_nsxlib/v3/utils.py @@ -17,6 +17,7 @@ import abc import collections import inspect import re +from threading import Lock import time from oslo_log import log @@ -667,3 +668,44 @@ def get_dhcp_opt_code(name): 'reboot-time': 211, } return _supported_options.get(name) + + +class APIRateLimiter(object): + def __init__(self, max_calls, period=1.0): + self._enabled = max_calls is not None + if not self._enabled: + return + if period <= 0 or int(max_calls) <= 0: + raise ValueError('period and max_calls should be positive') + self._period = period + self._max_calls = int(max_calls) + self._call_time = collections.deque() + self._lock = Lock() + + def __enter__(self): + if not self._enabled: + return 0 + with self._lock: + wait_time = self._calc_wait_time() + if wait_time: + time.sleep(wait_time) + # assume api call happens immediately after entering context + self._call_time.append(time.time()) + return wait_time + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def _calc_wait_time(self): + now = time.time() + # remove timestamps out of current window + while self._call_time and now - self._period > self._call_time[0]: + self._call_time.popleft() + current_rate = len(self._call_time) + if current_rate < self._max_calls: + return 0 + # call_time contains at least #max_rate timestamps. + # earliest possible time to get below rate limit is at + # T = self.call_time[-self.max_calls] + self.period + # Thus need to wait T - now + return self._call_time[-self._max_calls] + self._period - now