From c61adab74ccd3cc14bfda8fb4fc8fb2ef9f1e341 Mon Sep 17 00:00:00 2001 From: Shawn Wang Date: Wed, 26 Aug 2020 17:25:42 -0700 Subject: [PATCH] Add API Call Collector This patch adds an option to collect per cluster or per endpoint API call records during _proxy() call. This enables client side API auditing without the need to rely on NSX support bundles. By default this option is disabled. Change-Id: Ied30d90fc745d5009850c1c83c74eacd46d5fbd9 (cherry picked from commit 22c9a66f05ed5927411073e5403abad301d78256) --- vmware_nsxlib/tests/unit/v3/test_utils.py | 40 ++++++++++++++++++ vmware_nsxlib/v3/cluster.py | 49 ++++++++++++++++++++--- vmware_nsxlib/v3/config.py | 10 ++++- vmware_nsxlib/v3/constants.py | 4 ++ vmware_nsxlib/v3/utils.py | 26 ++++++++++++ 5 files changed, 122 insertions(+), 7 deletions(-) diff --git a/vmware_nsxlib/tests/unit/v3/test_utils.py b/vmware_nsxlib/tests/unit/v3/test_utils.py index 9d20f734..dfea4aac 100644 --- a/vmware_nsxlib/tests/unit/v3/test_utils.py +++ b/vmware_nsxlib/tests/unit/v3/test_utils.py @@ -520,3 +520,43 @@ class APIRateLimiterAIMDTestCase(APIRateLimiterTestCase): rate_limiter = self.rate_limiter(max_calls=None) rate_limiter.adjust_rate(wait_time=0.001, status_code=200) self.assertFalse(hasattr(rate_limiter, '_max_calls')) + + +class APICallCollectorTestCase(nsxlib_testcase.NsxLibTestCase): + def setUp(self, *args, **kwargs): + super(APICallCollectorTestCase, self).setUp(with_mocks=False) + self.api_collector = utils.APICallCollector('1.2.3.4', max_entry=2) + + def test_add_record(self): + record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200) + record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404) + self.api_collector.add_record(record1) + self.api_collector.add_record(record2) + self.assertListEqual(list(self.api_collector._api_log_store), + [record1, record2]) + + def test_add_record_overflow(self): + record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200) + record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404) + record3 = utils.APICallRecord('ts3', 'delete', 'uri_3', 429) + self.api_collector.add_record(record1) + self.api_collector.add_record(record2) + self.api_collector.add_record(record3) + self.assertListEqual(list(self.api_collector._api_log_store), + [record2, record3]) + + def test_pop_record(self): + record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200) + record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404) + self.api_collector.add_record(record1) + self.api_collector.add_record(record2) + self.assertEqual(self.api_collector.pop_record(), record1) + self.assertEqual(self.api_collector.pop_record(), record2) + + def test_pop_all_records(self): + record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200) + record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404) + self.api_collector.add_record(record1) + self.api_collector.add_record(record2) + self.assertListEqual(self.api_collector.pop_all_records(), + [record1, record2]) diff --git a/vmware_nsxlib/v3/cluster.py b/vmware_nsxlib/v3/cluster.py index 4897f400..d10f965d 100644 --- a/vmware_nsxlib/v3/cluster.py +++ b/vmware_nsxlib/v3/cluster.py @@ -366,7 +366,7 @@ class Endpoint(object): """ def __init__(self, provider, pool, api_rate_limit=None, - api_rate_mode=None): + api_rate_mode=None, api_call_collector=None): self.provider = provider self.pool = pool self._state = EndpointState.INITIALIZED @@ -376,6 +376,7 @@ class Endpoint(object): max_calls=api_rate_limit) else: self.rate_limiter = utils.APIRateLimiter(max_calls=api_rate_limit) + self.api_call_collector = api_call_collector def regenerate_pool(self): self.pool = pools.Pool(min_size=self.pool.min_size, @@ -405,6 +406,15 @@ class Endpoint(object): return old_state + def add_api_record(self, record): + if self.api_call_collector: + self.api_call_collector.add_record(record) + + def pop_all_api_records(self): + if self.api_call_collector: + return self.api_call_collector.pop_all_records() + return [] + def __str__(self): return "[%s] %s" % (self.state, self.provider) @@ -437,17 +447,19 @@ class ClusteredAPI(object): max_conns_per_pool=20, keepalive_interval=33, api_rate_limit=None, - api_rate_mode=None): + api_rate_mode=None, + api_log_mode=None): self._http_provider = http_provider self._keepalive_interval = keepalive_interval self._print_keepalive = 0 self._silent = False + self._api_call_collectors = [] def _init_cluster(*args, **kwargs): self._init_endpoints(providers, min_conns_per_pool, max_conns_per_pool, api_rate_limit, - api_rate_mode) + api_rate_mode, api_log_mode) _init_cluster() @@ -460,7 +472,8 @@ class ClusteredAPI(object): self._silent = silent_mode def _init_endpoints(self, providers, min_conns_per_pool, - max_conns_per_pool, api_rate_limit, api_rate_mode): + max_conns_per_pool, api_rate_limit, api_rate_mode, + api_log_mode): LOG.debug("Initializing API endpoints") def _create_conn(p): @@ -469,6 +482,14 @@ class ClusteredAPI(object): return _conn + self._api_call_collectors = [] + api_call_collector = None + if api_log_mode == constants.API_CALL_LOG_PER_CLUSTER: + # Init one instance of collector for the entire cluster + api_call_collector = utils.APICallCollector( + ",".join([provider.id for provider in providers])) + self._api_call_collectors.append(api_call_collector) + self._endpoints = {} for provider in providers: pool = pools.Pool( @@ -477,7 +498,13 @@ class ClusteredAPI(object): order_as_stack=True, create=_create_conn(provider)) - endpoint = Endpoint(provider, pool, api_rate_limit, api_rate_mode) + if api_log_mode == constants.API_CALL_LOG_PER_ENDPOINT: + # Init one instance of collector for each endpoint + api_call_collector = utils.APICallCollector(provider.id) + self._api_call_collectors.append(api_call_collector) + + endpoint = Endpoint(provider, pool, api_rate_limit, api_rate_mode, + api_call_collector) self._endpoints[provider.id] = endpoint # service requests using round robin @@ -541,6 +568,10 @@ class ClusteredAPI(object): def http_provider(self): return self._http_provider + @property + def api_call_collectors(self): + return self._api_call_collectors + @property def health(self): down = 0 @@ -738,6 +769,11 @@ class ClusteredAPI(object): response = do_request(url, *args, **kwargs) endpoint.set_state(EndpointState.UP) + # add api call log + api_record = utils.APICallRecord( + verb=proxy_for, uri=uri, status=response.status_code) + endpoint.add_api_record(api_record) + # Adjust API Rate Limit before raising HTTP exception endpoint.rate_limiter.adjust_rate( wait_time=conn_data.rate_wait, @@ -785,7 +821,8 @@ class NSXClusteredAPI(ClusteredAPI): max_conns_per_pool=self.nsxlib_config.concurrent_connections, keepalive_interval=self.nsxlib_config.conn_idle_timeout, api_rate_limit=self.nsxlib_config.api_rate_limit_per_endpoint, - api_rate_mode=self.nsxlib_config.api_rate_mode) + api_rate_mode=self.nsxlib_config.api_rate_mode, + api_log_mode=self.nsxlib_config.api_log_mode) LOG.debug("Created NSX clustered API with '%s' " "provider", self._http_provider.provider_id) diff --git a/vmware_nsxlib/v3/config.py b/vmware_nsxlib/v3/config.py index 7ce69637..c1036c48 100644 --- a/vmware_nsxlib/v3/config.py +++ b/vmware_nsxlib/v3/config.py @@ -152,6 +152,12 @@ class NsxLibConfig(object): by half after 429/503 error for each period. The rate has hard max limit of min(100/s, param api_rate_limit_per_endpoint). + :param api_log_mode: Option to collect API call logs within nsxlib. + When set to API_LOG_PER_CLUSTER, API call sent to all + endpoints will be collected at one place. + When set to API_LOG_PER_ENDPOINT, API call sent to + each endpoint will be collected individually. + By default, this option is disabled as set to None. -- Additional parameters which are relevant only for the Policy manager: :param allow_passthrough: If True, use nsx manager api for cases which are @@ -191,7 +197,8 @@ class NsxLibConfig(object): realization_wait_sec=1.0, api_rate_limit_per_endpoint=None, api_rate_mode=None, - exception_config=None): + exception_config=None, + api_log_mode=None): self.nsx_api_managers = nsx_api_managers self._username = username @@ -222,6 +229,7 @@ class NsxLibConfig(object): self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint self.api_rate_mode = api_rate_mode self.exception_config = exception_config or ExceptionConfig() + self.api_log_mode = api_log_mode if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry: LOG.warning("When only one endpoint is provided, keepalive probes" diff --git a/vmware_nsxlib/v3/constants.py b/vmware_nsxlib/v3/constants.py index 6ac183fb..5809c1e7 100644 --- a/vmware_nsxlib/v3/constants.py +++ b/vmware_nsxlib/v3/constants.py @@ -123,3 +123,7 @@ API_REDUCE_RATE_CODES = [429, 503] # Minimum time in seconds to consider a call as blocked due to rate limit API_WAIT_MIN_THRESHOLD = 0.01 API_DEFAULT_MAX_RATE = 100 + +# API Call Log Related const +API_CALL_LOG_PER_CLUSTER = 'API_LOG_PER_CLUSTER' +API_CALL_LOG_PER_ENDPOINT = 'API_LOG_PER_ENDPOINT' diff --git a/vmware_nsxlib/v3/utils.py b/vmware_nsxlib/v3/utils.py index fdc88563..ddfeeb52 100644 --- a/vmware_nsxlib/v3/utils.py +++ b/vmware_nsxlib/v3/utils.py @@ -773,3 +773,29 @@ class APIRateLimiterAIMD(APIRateLimiter): self._pos_sig = 0 self._neg_sig = 0 self._last_adjust_rate = now + + +class APICallRecord(object): + def __init__(self, verb, uri, status, timestamp=None): + self.timestamp = timestamp or time.time() + self.verb = verb + self.uri = uri + self.status = status + + +class APICallCollector(object): + def __init__(self, provider, max_entry=50000): + self._api_log_store = collections.deque(maxlen=max_entry) + self.provider = provider + + def add_record(self, record): + self._api_log_store.append(record) + + def pop_record(self): + return self._api_log_store.popleft() + + def pop_all_records(self): + records = [] + while len(self._api_log_store) > 0: + records.append(self.pop_record()) + return records