Merge "Add API Call Collector"

This commit is contained in:
Zuul 2020-08-27 17:22:07 +00:00 committed by Gerrit Code Review
commit 2774cbd6ac
5 changed files with 122 additions and 7 deletions

View File

@ -520,3 +520,43 @@ class APIRateLimiterAIMDTestCase(APIRateLimiterTestCase):
rate_limiter = self.rate_limiter(max_calls=None)
rate_limiter.adjust_rate(wait_time=0.001, status_code=200)
self.assertFalse(hasattr(rate_limiter, '_max_calls'))
class APICallCollectorTestCase(nsxlib_testcase.NsxLibTestCase):
def setUp(self, *args, **kwargs):
super(APICallCollectorTestCase, self).setUp(with_mocks=False)
self.api_collector = utils.APICallCollector('1.2.3.4', max_entry=2)
def test_add_record(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.assertListEqual(list(self.api_collector._api_log_store),
[record1, record2])
def test_add_record_overflow(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
record3 = utils.APICallRecord('ts3', 'delete', 'uri_3', 429)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.api_collector.add_record(record3)
self.assertListEqual(list(self.api_collector._api_log_store),
[record2, record3])
def test_pop_record(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.assertEqual(self.api_collector.pop_record(), record1)
self.assertEqual(self.api_collector.pop_record(), record2)
def test_pop_all_records(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.assertListEqual(self.api_collector.pop_all_records(),
[record1, record2])

View File

@ -369,7 +369,7 @@ class Endpoint(object):
"""
def __init__(self, provider, pool, api_rate_limit=None,
api_rate_mode=None):
api_rate_mode=None, api_call_collector=None):
self.provider = provider
self.pool = pool
self._state = EndpointState.INITIALIZED
@ -379,6 +379,7 @@ class Endpoint(object):
max_calls=api_rate_limit)
else:
self.rate_limiter = utils.APIRateLimiter(max_calls=api_rate_limit)
self.api_call_collector = api_call_collector
def regenerate_pool(self):
self.pool = pools.Pool(min_size=self.pool.min_size,
@ -408,6 +409,15 @@ class Endpoint(object):
return old_state
def add_api_record(self, record):
if self.api_call_collector:
self.api_call_collector.add_record(record)
def pop_all_api_records(self):
if self.api_call_collector:
return self.api_call_collector.pop_all_records()
return []
def __str__(self):
return "[%s] %s" % (self.state, self.provider)
@ -440,17 +450,19 @@ class ClusteredAPI(object):
max_conns_per_pool=20,
keepalive_interval=33,
api_rate_limit=None,
api_rate_mode=None):
api_rate_mode=None,
api_log_mode=None):
self._http_provider = http_provider
self._keepalive_interval = keepalive_interval
self._print_keepalive = 0
self._silent = False
self._api_call_collectors = []
def _init_cluster(*args, **kwargs):
self._init_endpoints(providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit,
api_rate_mode)
api_rate_mode, api_log_mode)
_init_cluster()
@ -463,7 +475,8 @@ class ClusteredAPI(object):
self._silent = silent_mode
def _init_endpoints(self, providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit, api_rate_mode):
max_conns_per_pool, api_rate_limit, api_rate_mode,
api_log_mode):
LOG.debug("Initializing API endpoints")
def _create_conn(p):
@ -472,6 +485,14 @@ class ClusteredAPI(object):
return _conn
self._api_call_collectors = []
api_call_collector = None
if api_log_mode == constants.API_CALL_LOG_PER_CLUSTER:
# Init one instance of collector for the entire cluster
api_call_collector = utils.APICallCollector(
",".join([provider.id for provider in providers]))
self._api_call_collectors.append(api_call_collector)
self._endpoints = {}
for provider in providers:
pool = pools.Pool(
@ -480,7 +501,13 @@ class ClusteredAPI(object):
order_as_stack=True,
create=_create_conn(provider))
endpoint = Endpoint(provider, pool, api_rate_limit, api_rate_mode)
if api_log_mode == constants.API_CALL_LOG_PER_ENDPOINT:
# Init one instance of collector for each endpoint
api_call_collector = utils.APICallCollector(provider.id)
self._api_call_collectors.append(api_call_collector)
endpoint = Endpoint(provider, pool, api_rate_limit, api_rate_mode,
api_call_collector)
self._endpoints[provider.id] = endpoint
# service requests using round robin
@ -544,6 +571,10 @@ class ClusteredAPI(object):
def http_provider(self):
return self._http_provider
@property
def api_call_collectors(self):
return self._api_call_collectors
@property
def health(self):
down = 0
@ -741,6 +772,11 @@ class ClusteredAPI(object):
response = do_request(url, *args, **kwargs)
endpoint.set_state(EndpointState.UP)
# add api call log
api_record = utils.APICallRecord(
verb=proxy_for, uri=uri, status=response.status_code)
endpoint.add_api_record(api_record)
# Adjust API Rate Limit before raising HTTP exception
endpoint.rate_limiter.adjust_rate(
wait_time=conn_data.rate_wait,
@ -788,7 +824,8 @@ class NSXClusteredAPI(ClusteredAPI):
max_conns_per_pool=self.nsxlib_config.concurrent_connections,
keepalive_interval=self.nsxlib_config.conn_idle_timeout,
api_rate_limit=self.nsxlib_config.api_rate_limit_per_endpoint,
api_rate_mode=self.nsxlib_config.api_rate_mode)
api_rate_mode=self.nsxlib_config.api_rate_mode,
api_log_mode=self.nsxlib_config.api_log_mode)
LOG.debug("Created NSX clustered API with '%s' "
"provider", self._http_provider.provider_id)

View File

@ -152,6 +152,12 @@ class NsxLibConfig(object):
by half after 429/503 error for each period.
The rate has hard max limit of min(100/s, param
api_rate_limit_per_endpoint).
:param api_log_mode: Option to collect API call logs within nsxlib.
When set to API_LOG_PER_CLUSTER, API call sent to all
endpoints will be collected at one place.
When set to API_LOG_PER_ENDPOINT, API call sent to
each endpoint will be collected individually.
By default, this option is disabled as set to None.
-- Additional parameters which are relevant only for the Policy manager:
:param allow_passthrough: If True, use nsx manager api for cases which are
@ -191,7 +197,8 @@ class NsxLibConfig(object):
realization_wait_sec=1.0,
api_rate_limit_per_endpoint=None,
api_rate_mode=None,
exception_config=None):
exception_config=None,
api_log_mode=None):
self.nsx_api_managers = nsx_api_managers
self._username = username
@ -222,6 +229,7 @@ class NsxLibConfig(object):
self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint
self.api_rate_mode = api_rate_mode
self.exception_config = exception_config or ExceptionConfig()
self.api_log_mode = api_log_mode
if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry:
LOG.warning("When only one endpoint is provided, keepalive probes"

View File

@ -123,3 +123,7 @@ API_REDUCE_RATE_CODES = [429, 503]
# Minimum time in seconds to consider a call as blocked due to rate limit
API_WAIT_MIN_THRESHOLD = 0.01
API_DEFAULT_MAX_RATE = 100
# API Call Log Related const
API_CALL_LOG_PER_CLUSTER = 'API_LOG_PER_CLUSTER'
API_CALL_LOG_PER_ENDPOINT = 'API_LOG_PER_ENDPOINT'

View File

@ -773,3 +773,29 @@ class APIRateLimiterAIMD(APIRateLimiter):
self._pos_sig = 0
self._neg_sig = 0
self._last_adjust_rate = now
class APICallRecord(object):
def __init__(self, verb, uri, status, timestamp=None):
self.timestamp = timestamp or time.time()
self.verb = verb
self.uri = uri
self.status = status
class APICallCollector(object):
def __init__(self, provider, max_entry=50000):
self._api_log_store = collections.deque(maxlen=max_entry)
self.provider = provider
def add_record(self, record):
self._api_log_store.append(record)
def pop_record(self):
return self._api_log_store.popleft()
def pop_all_records(self):
records = []
while len(self._api_log_store) > 0:
records.append(self.pop_record())
return records