Browse Source

Add API Call Collector

This patch adds an option to collect per cluster or per endpoint API
call records during _proxy() call. This enables client side API auditing
without the need to rely on NSX support bundles. By default this option
is disabled.

Change-Id: Ied30d90fc745d5009850c1c83c74eacd46d5fbd9
(cherry picked from commit 22c9a66f05)
tags/16.1.0
Shawn Wang 1 month ago
parent
commit
c61adab74c
5 changed files with 122 additions and 7 deletions
  1. +40
    -0
      vmware_nsxlib/tests/unit/v3/test_utils.py
  2. +43
    -6
      vmware_nsxlib/v3/cluster.py
  3. +9
    -1
      vmware_nsxlib/v3/config.py
  4. +4
    -0
      vmware_nsxlib/v3/constants.py
  5. +26
    -0
      vmware_nsxlib/v3/utils.py

+ 40
- 0
vmware_nsxlib/tests/unit/v3/test_utils.py View File

@@ -520,3 +520,43 @@ class APIRateLimiterAIMDTestCase(APIRateLimiterTestCase):
rate_limiter = self.rate_limiter(max_calls=None)
rate_limiter.adjust_rate(wait_time=0.001, status_code=200)
self.assertFalse(hasattr(rate_limiter, '_max_calls'))


class APICallCollectorTestCase(nsxlib_testcase.NsxLibTestCase):
def setUp(self, *args, **kwargs):
super(APICallCollectorTestCase, self).setUp(with_mocks=False)
self.api_collector = utils.APICallCollector('1.2.3.4', max_entry=2)

def test_add_record(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.assertListEqual(list(self.api_collector._api_log_store),
[record1, record2])

def test_add_record_overflow(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
record3 = utils.APICallRecord('ts3', 'delete', 'uri_3', 429)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.api_collector.add_record(record3)
self.assertListEqual(list(self.api_collector._api_log_store),
[record2, record3])

def test_pop_record(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.assertEqual(self.api_collector.pop_record(), record1)
self.assertEqual(self.api_collector.pop_record(), record2)

def test_pop_all_records(self):
record1 = utils.APICallRecord('ts1', 'get', 'uri_1', 200)
record2 = utils.APICallRecord('ts2', 'post', 'uri_2', 404)
self.api_collector.add_record(record1)
self.api_collector.add_record(record2)
self.assertListEqual(self.api_collector.pop_all_records(),
[record1, record2])

+ 43
- 6
vmware_nsxlib/v3/cluster.py View File

@@ -366,7 +366,7 @@ class Endpoint(object):
"""

def __init__(self, provider, pool, api_rate_limit=None,
api_rate_mode=None):
api_rate_mode=None, api_call_collector=None):
self.provider = provider
self.pool = pool
self._state = EndpointState.INITIALIZED
@@ -376,6 +376,7 @@ class Endpoint(object):
max_calls=api_rate_limit)
else:
self.rate_limiter = utils.APIRateLimiter(max_calls=api_rate_limit)
self.api_call_collector = api_call_collector

def regenerate_pool(self):
self.pool = pools.Pool(min_size=self.pool.min_size,
@@ -405,6 +406,15 @@ class Endpoint(object):

return old_state

def add_api_record(self, record):
if self.api_call_collector:
self.api_call_collector.add_record(record)

def pop_all_api_records(self):
if self.api_call_collector:
return self.api_call_collector.pop_all_records()
return []

def __str__(self):
return "[%s] %s" % (self.state, self.provider)

@@ -437,17 +447,19 @@ class ClusteredAPI(object):
max_conns_per_pool=20,
keepalive_interval=33,
api_rate_limit=None,
api_rate_mode=None):
api_rate_mode=None,
api_log_mode=None):

self._http_provider = http_provider
self._keepalive_interval = keepalive_interval
self._print_keepalive = 0
self._silent = False
self._api_call_collectors = []

def _init_cluster(*args, **kwargs):
self._init_endpoints(providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit,
api_rate_mode)
api_rate_mode, api_log_mode)

_init_cluster()

@@ -460,7 +472,8 @@ class ClusteredAPI(object):
self._silent = silent_mode

def _init_endpoints(self, providers, min_conns_per_pool,
max_conns_per_pool, api_rate_limit, api_rate_mode):
max_conns_per_pool, api_rate_limit, api_rate_mode,
api_log_mode):
LOG.debug("Initializing API endpoints")

def _create_conn(p):
@@ -469,6 +482,14 @@ class ClusteredAPI(object):

return _conn

self._api_call_collectors = []
api_call_collector = None
if api_log_mode == constants.API_CALL_LOG_PER_CLUSTER:
# Init one instance of collector for the entire cluster
api_call_collector = utils.APICallCollector(
",".join([provider.id for provider in providers]))
self._api_call_collectors.append(api_call_collector)

self._endpoints = {}
for provider in providers:
pool = pools.Pool(
@@ -477,7 +498,13 @@ class ClusteredAPI(object):
order_as_stack=True,
create=_create_conn(provider))

endpoint = Endpoint(provider, pool, api_rate_limit, api_rate_mode)
if api_log_mode == constants.API_CALL_LOG_PER_ENDPOINT:
# Init one instance of collector for each endpoint
api_call_collector = utils.APICallCollector(provider.id)
self._api_call_collectors.append(api_call_collector)

endpoint = Endpoint(provider, pool, api_rate_limit, api_rate_mode,
api_call_collector)
self._endpoints[provider.id] = endpoint

# service requests using round robin
@@ -541,6 +568,10 @@ class ClusteredAPI(object):
def http_provider(self):
return self._http_provider

@property
def api_call_collectors(self):
return self._api_call_collectors

@property
def health(self):
down = 0
@@ -738,6 +769,11 @@ class ClusteredAPI(object):
response = do_request(url, *args, **kwargs)
endpoint.set_state(EndpointState.UP)

# add api call log
api_record = utils.APICallRecord(
verb=proxy_for, uri=uri, status=response.status_code)
endpoint.add_api_record(api_record)

# Adjust API Rate Limit before raising HTTP exception
endpoint.rate_limiter.adjust_rate(
wait_time=conn_data.rate_wait,
@@ -785,7 +821,8 @@ class NSXClusteredAPI(ClusteredAPI):
max_conns_per_pool=self.nsxlib_config.concurrent_connections,
keepalive_interval=self.nsxlib_config.conn_idle_timeout,
api_rate_limit=self.nsxlib_config.api_rate_limit_per_endpoint,
api_rate_mode=self.nsxlib_config.api_rate_mode)
api_rate_mode=self.nsxlib_config.api_rate_mode,
api_log_mode=self.nsxlib_config.api_log_mode)

LOG.debug("Created NSX clustered API with '%s' "
"provider", self._http_provider.provider_id)


+ 9
- 1
vmware_nsxlib/v3/config.py View File

@@ -152,6 +152,12 @@ class NsxLibConfig(object):
by half after 429/503 error for each period.
The rate has hard max limit of min(100/s, param
api_rate_limit_per_endpoint).
:param api_log_mode: Option to collect API call logs within nsxlib.
When set to API_LOG_PER_CLUSTER, API call sent to all
endpoints will be collected at one place.
When set to API_LOG_PER_ENDPOINT, API call sent to
each endpoint will be collected individually.
By default, this option is disabled as set to None.

-- Additional parameters which are relevant only for the Policy manager:
:param allow_passthrough: If True, use nsx manager api for cases which are
@@ -191,7 +197,8 @@ class NsxLibConfig(object):
realization_wait_sec=1.0,
api_rate_limit_per_endpoint=None,
api_rate_mode=None,
exception_config=None):
exception_config=None,
api_log_mode=None):

self.nsx_api_managers = nsx_api_managers
self._username = username
@@ -222,6 +229,7 @@ class NsxLibConfig(object):
self.api_rate_limit_per_endpoint = api_rate_limit_per_endpoint
self.api_rate_mode = api_rate_mode
self.exception_config = exception_config or ExceptionConfig()
self.api_log_mode = api_log_mode

if len(nsx_api_managers) == 1 and not self.cluster_unavailable_retry:
LOG.warning("When only one endpoint is provided, keepalive probes"


+ 4
- 0
vmware_nsxlib/v3/constants.py View File

@@ -123,3 +123,7 @@ API_REDUCE_RATE_CODES = [429, 503]
# Minimum time in seconds to consider a call as blocked due to rate limit
API_WAIT_MIN_THRESHOLD = 0.01
API_DEFAULT_MAX_RATE = 100

# API Call Log Related const
API_CALL_LOG_PER_CLUSTER = 'API_LOG_PER_CLUSTER'
API_CALL_LOG_PER_ENDPOINT = 'API_LOG_PER_ENDPOINT'

+ 26
- 0
vmware_nsxlib/v3/utils.py View File

@@ -773,3 +773,29 @@ class APIRateLimiterAIMD(APIRateLimiter):
self._pos_sig = 0
self._neg_sig = 0
self._last_adjust_rate = now


class APICallRecord(object):
def __init__(self, verb, uri, status, timestamp=None):
self.timestamp = timestamp or time.time()
self.verb = verb
self.uri = uri
self.status = status


class APICallCollector(object):
def __init__(self, provider, max_entry=50000):
self._api_log_store = collections.deque(maxlen=max_entry)
self.provider = provider

def add_record(self, record):
self._api_log_store.append(record)

def pop_record(self):
return self._api_log_store.popleft()

def pop_all_records(self):
records = []
while len(self._api_log_store) > 0:
records.append(self.pop_record())
return records

Loading…
Cancel
Save