Optimize cert-mon token cache sizing and re-use

During testing it was found that caching the DC endpoint keystone tokens
consumes an excessive amount of memory on systems with a large number of
subclouds. This is because the raw token data contains data across all
subclouds.

There are two related parts to this commit:

- Re-use the keystone Token across all DC subclouds. This single token
  is also cached.
    - Since the DC token is a "universal token" (project-scoped), we can
      re-use this token for all subclouds instead of getting a new token
      for each subcloud.
    - The subcloud catalog data also contains information for all
      subcloud endpoint URLs (required for the second point below)

- cert-mon utility functions which extract endpoint URLs from token data
  are updated to use an existing token rather than requesting a new
  token. This reduces system resource usage on large DC systems.

Test Plan:
- PASS: Verify token cache does not significantly impact memory
  consumption
- PASS: Verify token re-use in cert-mon utils maintains existing
  URL-extraction behaviour
- PASS: Verify cert-mon/keystone resource consumption during cert-mon
  daily audits

Failure Path:
- PASS: Verify token expiry is correctly handled

Story: 2008960
Task: 43504

Signed-off-by: Kyle MacLeod <kyle.macleod@windriver.com>
Change-Id: I0327eb50db1cb01a6f57c8675a2436389c0b7c94
This commit is contained in:
Kyle MacLeod 2021-10-01 14:01:06 -04:00
parent ed12890557
commit 7af9200325
5 changed files with 191 additions and 234 deletions

View File

@ -77,8 +77,8 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
super(CertificateMonManager, self).__init__(CONF)
self.mon_threads = []
self.audit_thread = None
self.token_cache = utils.TokenCache()
self.dc_token_cache = utils.DCTokenCache()
self.token_cache = utils.TokenCache('internal')
self.dc_token_cache = utils.TokenCache('dc')
self.dc_monitor = None
self.restapicert_monitor = None
self.registrycert_monitor = None
@ -206,11 +206,11 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
def my_dc_token():
"""Ensure we always have a valid token"""
return self.dc_token_cache.get_dc_token(subcloud_name)
return self.dc_token_cache.get_token()
try:
subcloud_sysinv_url = utils.dc_get_subcloud_sysinv_url(
subcloud_name)
subcloud_name, my_dc_token())
sc_ssl_cert = utils.get_endpoint_certificate(subcloud_sysinv_url)
except Exception:

View File

@ -26,9 +26,6 @@ import tempfile
import requests
from eventlet.green import subprocess
from six.moves.urllib.parse import urlparse
from keystoneclient.v3 import client as keystone_client
from keystoneauth1 import session
from keystoneclient.auth.identity import v3
from oslo_config import cfg
from oslo_log import log
from oslo_utils import encodeutils
@ -143,76 +140,31 @@ def verify_adminep_cert_chain():
return False
def dc_get_subcloud_sysinv_url(subcloud_name):
user_auth = v3.Password(
auth_url=CONF.endpoint_cache.auth_uri,
username=CONF.endpoint_cache.username,
user_domain_name=CONF.endpoint_cache.user_domain_name,
password=CONF.endpoint_cache.password,
project_name=CONF.endpoint_cache.project_name,
project_domain_name=CONF.endpoint_cache.project_domain_name,
)
timeout = CONF.endpoint_cache.http_connect_timeout
admin_session = session.Session(auth=user_auth, timeout=timeout)
ks_client = keystone_client.Client(
session=admin_session,
region_name=constants.REGION_ONE_NAME,
interface=constants.OS_INTERFACE_INTERNAL)
services = ks_client.services.list(name='sysinv')
if len(services) == 0:
raise Exception('Cannot find sysinv service')
s_id = services[0].id
sc_sysinv_urls = ks_client.endpoints.list(
service=s_id,
interface=constants.OS_INTERFACE_ADMIN,
region=subcloud_name)
if len(sc_sysinv_urls) == 0:
raise Exception('Cannot find sysinv endpoint for %s' %
subcloud_name)
sc_sysinv_url = sc_sysinv_urls[0].url
LOG.info('%s sysinv endpoint %s' % (subcloud_name, sc_sysinv_url))
if not sc_sysinv_url:
raise Exception('{} sysinv endpoint is None'.format(subcloud_name))
return sc_sysinv_url
def dc_get_subcloud_sysinv_url(subcloud_name, dc_token):
"""Pulls the sysinv platform URL from the given token"""
url = dc_token.get_service_admin_url(constants.SERVICE_TYPE_PLATFORM,
constants.SYSINV_USERNAME,
subcloud_name)
if url:
LOG.debug('%s sysinv endpoint %s' % (subcloud_name, url))
return url
else:
LOG.error('Cannot find sysinv endpoint for %s' % subcloud_name)
raise Exception('Cannot find sysinv endpoint for %s' % subcloud_name)
def dc_get_service_endpoint_url(region, service_name, endpoint_type):
user_auth = v3.Password(
auth_url=CONF.endpoint_cache.auth_uri,
username=CONF.endpoint_cache.username,
user_domain_name=CONF.endpoint_cache.user_domain_name,
password=CONF.endpoint_cache.password,
project_name=CONF.endpoint_cache.project_name,
project_domain_name=CONF.endpoint_cache.project_domain_name,
)
timeout = CONF.endpoint_cache.http_connect_timeout
admin_session = session.Session(auth=user_auth, timeout=timeout)
ks_client = keystone_client.Client(
session=admin_session,
region_name=constants.REGION_ONE_NAME,
interface=constants.OS_INTERFACE_INTERNAL)
services = ks_client.services.list(name=service_name)
if len(services) == 0:
raise Exception('Cannot find %s service' % service_name)
s_id = services[0].id
endpoint_urls = ks_client.endpoints.list(
service=s_id,
interface=endpoint_type,
region=region)
if len(endpoint_urls) == 0:
raise Exception('Cannot find %s endpoint for %s' %
(service_name, region))
endpoint_url = endpoint_urls[0].url
LOG.info('%s %s endpoint %s' % (region, service_name, endpoint_url))
return endpoint_url
def dc_get_service_endpoint_url(token,
service_name='dcmanager',
service_type='dcmanager',
region=constants.SYSTEM_CONTROLLER_REGION):
"""Pulls the dcmanager service internal URL from the given token"""
url = token.get_service_internal_url(service_type, service_name, region)
if url:
LOG.debug('%s %s endpoint %s' % (region, service_name, url))
return url
else:
LOG.error('Cannot find %s endpoint for %s' % (service_name, region))
raise Exception('Cannot find %s endpoint for %s' % (service_name, region))
def update_subcloud_ca_cert(
@ -238,10 +190,7 @@ def update_subcloud_ca_cert(
def get_subcloud(token, subcloud_name):
service_name = 'dcmanager'
api_url = dc_get_service_endpoint_url(constants.SYSTEM_CONTROLLER_REGION,
service_name,
constants.OS_INTERFACE_INTERNAL)
api_url = dc_get_service_endpoint_url(token)
api_cmd = api_url + '/subclouds/%s' % subcloud_name
LOG.info('api_cmd %s' % api_cmd)
resp = rest_api_request(token, "GET", api_cmd)
@ -250,10 +199,8 @@ def get_subcloud(token, subcloud_name):
def load_subclouds(resp):
data = resp
print(data)
list = []
for obj in data['subclouds']:
sc_list = []
for obj in resp['subclouds']:
sc = {}
sc['name'] = obj['name']
sc['management-state'] = obj['management-state']
@ -261,16 +208,13 @@ def load_subclouds(resp):
sc['sync_status'] = obj['sync_status']
for ss in obj['endpoint_sync_status']:
sc[ss['endpoint_type']] = ss['sync_status']
list.append(sc)
sc_list.append(sc)
return list
return sc_list
def get_subclouds_from_dcmanager(token):
service_name = 'dcmanager'
api_url = dc_get_service_endpoint_url(constants.SYSTEM_CONTROLLER_REGION,
service_name,
constants.OS_INTERFACE_INTERNAL)
api_url = dc_get_service_endpoint_url(token)
api_cmd = api_url + '/subclouds'
LOG.debug('api_cmd %s' % api_cmd)
resp = rest_api_request(token, "GET", api_cmd)
@ -290,10 +234,7 @@ def is_subcloud_online(subcloud_name, token=None):
def update_subcloud_status(token, subcloud_name, status):
service_name = 'dcmanager'
api_url = dc_get_service_endpoint_url(constants.SYSTEM_CONTROLLER_REGION,
service_name,
constants.OS_INTERFACE_INTERNAL)
api_url = dc_get_service_endpoint_url(token)
api_cmd = api_url + '/subclouds/%s/update_status' % subcloud_name
api_cmd_payload = dict()
api_cmd_payload['endpoint'] = ENDPOINT_TYPE_DC_CERT
@ -413,6 +354,7 @@ def get_system(token, method, api_cmd, api_cmd_headers=None,
def get_token():
"""Get token for the sysinv user."""
token = _get_token(
CONF.keystone_authtoken.auth_url + '/v3/auth/tokens',
CONF.keystone_authtoken.project_name,
@ -421,11 +363,17 @@ def get_token():
CONF.keystone_authtoken.user_domain_name,
CONF.keystone_authtoken.project_domain_name,
CONF.keystone_authtoken.region_name)
return token
def get_dc_token(region_name):
def get_dc_token(region_name=constants.SYSTEM_CONTROLLER_REGION):
"""Get token for the dcmanager user.
Note: Although region_name can be specified, the token used here is a
"project-scoped" token (i.e., not specific to the subcloud/region name).
A token obtained using one region_name can be re-used across any
subcloud. We take advantage of this in our DC token caching strategy.
"""
token = _get_token(
CONF.endpoint_cache.auth_uri + '/auth/tokens',
CONF.endpoint_cache.project_name,
@ -434,12 +382,17 @@ def get_dc_token(region_name):
CONF.endpoint_cache.user_domain_name,
CONF.endpoint_cache.project_domain_name,
region_name)
return token
def _get_token(auth_url, auth_project, username, password, user_domain,
project_domain, region_name, timeout=60):
def _get_token(auth_url,
auth_project,
username,
password,
user_domain,
project_domain,
region_name,
timeout=60):
"""
Ask OpenStack Keystone for a token
Returns: token object or None on failure
@ -474,8 +427,10 @@ def _get_token(auth_url, auth_project, username, password, user_domain,
# Identity API v3 returns token id in X-Subject-Token
# response header.
token_id = request.headers.get('X-Subject-Token')
response = json.loads(request.read())
json_response = request.read()
response = json.loads(json_response)
request.close()
# save the region name for service url lookup
return Token(response, token_id, region_name)
@ -717,34 +672,17 @@ def update_platform_cert(token, cert_type, pem_file_path, force=False):
class TokenCache(object):
"""Simple token cache. This class holds one keystone token.
"""
def __init__(self):
token_getters = {'internal': get_token, 'dc': get_dc_token}
def __init__(self, token_type):
self._token = None
self._token_type = token_type
self._getter_func = self.token_getters[token_type]
def get_token(self):
"""Get a new token if required; otherwise use the cached token"""
if not self._token or self._token.is_expired():
LOG.debug("%s, Acquiring new token, previous token: %s",
self.__class__.__name__, self._token)
self._token = get_token()
LOG.debug("TokenCache %s, Acquiring new token, previous token: %s",
self._token_type, self._token)
self._token = self._getter_func()
return self._token
class DCTokenCache(object):
"""Simple token cache for DC tokens (keyed by region_name).
For the DC keystone user (e.g. 'dcmanager').
"""
def __init__(self):
self._dc_tokens = {}
def get_dc_token(self, region_name):
"""Get a new token if required; otherwise use the cached token"""
if region_name in self._dc_tokens:
dc_token = self._dc_tokens[region_name]
else:
dc_token = None
if not dc_token or dc_token.is_expired():
LOG.debug("Acquiring new DC token, region_name: %s, "
"previous token: %s", region_name, dc_token)
dc_token = get_dc_token(region_name)
self._dc_tokens[region_name] = dc_token
return dc_token

View File

@ -51,8 +51,8 @@ class MonitorContext(object):
# Reuse cached tokens across all contexts
# (i.e. all watches reuse these caches)
token_cache = utils.TokenCache()
dc_token_cache = utils.DCTokenCache()
token_cache = utils.TokenCache('internal')
dc_token_cache = utils.TokenCache('dc')
def __init__(self):
self.dc_role = None
@ -67,9 +67,9 @@ class MonitorContext(object):
return MonitorContext.token_cache.get_token()
@staticmethod
def get_dc_token(region_name):
def get_dc_token():
"""Uses the cached DC token for subcloud"""
return MonitorContext.dc_token_cache.get_dc_token(region_name)
return MonitorContext.dc_token_cache.get_token()
class CertUpdateEventData(object):
@ -482,10 +482,12 @@ class DCIntermediateCertRenew(CertificateRenew):
def update_certificate(self, event_data):
subcloud_name = self._get_subcloud_name(event_data)
LOG.info('update_certificate: subcloud %s %s' % (subcloud_name, event_data))
LOG.info('update_certificate: subcloud %s %s', subcloud_name,
event_data)
token = self.context.get_dc_token(subcloud_name)
subcloud_sysinv_url = utils.dc_get_subcloud_sysinv_url(subcloud_name)
token = self.context.get_dc_token()
subcloud_sysinv_url = utils.dc_get_subcloud_sysinv_url(subcloud_name,
token)
utils.update_subcloud_ca_cert(token,
subcloud_name,
subcloud_sysinv_url,
@ -523,7 +525,7 @@ class DCIntermediateCertRenew(CertificateRenew):
# update subcloud to dc-cert out-of-sync b/c last intermediate
# CA cert was not updated successfully
# an audit (default within 24 hours) will pick up and reattempt
dc_token = self.context.get_dc_token(constants.SYSTEM_CONTROLLER_REGION)
dc_token = self.context.get_dc_token()
utils.update_subcloud_status(dc_token, sc_name,
utils.SYNC_STATUS_OUT_OF_SYNC)
break

View File

@ -12,16 +12,30 @@ LOG = log.getLogger(__name__)
class Token(object):
def __init__(self, token_data, token_id, region_name):
"""Represents a keystone token"""
def __init__(self,
token_data,
token_id,
region_name):
"""Intialize the token.
token_data: The token data, converted from json
token_id: The id
region_name: The default region name for this token
Note: universal (project-scoped) tokens will contain
other region names in the token catalog data.
"""
self.expired = False
self.data = token_data
self.token_id = token_id
self.region_name = region_name
def set_expired(self):
"""Mark this token as expired"""
self.expired = True
def is_expired(self, within_seconds=300):
"""Check if expired or will expire within given number of seconds"""
if not self.expired:
end = iso8601.parse_date(self.data['token']['expires_at'])
now = iso8601.parse_date(datetime.datetime.utcnow().isoformat())
@ -32,49 +46,73 @@ class Token(object):
return True
def get_id(self):
"""
Get the identifier of the token.
"""Get the identifier of the token.
"""
return self.token_id
def _get_service_url(self, service_type, service_name, interface_type):
def _get_service_url(self,
service_type,
service_name,
interface_type,
region_name):
"""
Search the catalog of a service for the url based on the interface
Returns: url or None on failure
"""
if region_name is None:
region_name = self.region_name
for catalog in self.data['token']['catalog']:
if catalog['type'] == service_type:
if catalog['name'] == service_name:
if len(catalog['endpoints']) != 0:
for endpoint in catalog['endpoints']:
if ((endpoint['interface'] == interface_type) and
(endpoint['region'] == self.region_name)):
return endpoint['url']
if (catalog['type'] == service_type
and catalog['name'] == service_name
and len(catalog['endpoints']) != 0):
for endpoint in catalog['endpoints']:
if (endpoint['interface'] == interface_type
and endpoint['region'] == region_name):
return endpoint['url']
return None
def get_service_admin_url(self, service_type, service_name):
"""
Search the catalog of a service for the administrative url
def get_service_admin_url(self,
service_type,
service_name,
region_name=None):
"""Search the catalog of a service for the administrative url
Returns: admin url or None on failure
"""
return self._get_service_url(service_type, service_name, 'admin')
return self._get_service_url(service_type, service_name, 'admin',
region_name)
def get_service_internal_url(self, service_type, service_name):
def get_service_internal_url(self,
service_type,
service_name,
region_name=None):
"""Search the catalog of a service for the internal url
Returns: internal url or None on failure
"""
Search the catalog of a service for the administrative url
return self._get_service_url(service_type, service_name, 'internal',
region_name)
def get_service_public_url(self,
service_type,
service_name,
region_name=None):
"""Search the catalog of a service for the public url
Returns: public url or None on failure
"""
return self._get_service_url(service_type, service_name, 'public',
region_name)
def get_service_url(self, service_type, service_name, region_name=None):
"""Search the catalog of a service for the administrative url
Returns: admin url or None on failure
"""
return self._get_service_url(service_type, service_name, 'internal')
return self.get_service_admin_url(service_type, service_name,
region_name)
def get_service_public_url(self, service_type, service_name):
"""
Search the catalog of a service for the administrative url
Returns: admin url or None on failure
"""
return self._get_service_url(service_type, service_name, 'public')
def get_service_url(self, service_type, service_name):
return self.get_service_admin_url(service_type, service_name)
def get_full_str(self):
"""Formats the entire token, used only for debugging."""
return "id: {}, expired: {}, region_name: {}, expires_at: {}, data: {}"\
.format(self.token_id, self.expired, self.region_name,
self.data['token']['expires_at'], self.data)
def __str__(self):
return "id: {}, expired: {}, region_name: {}, expires_at: {}".format(

View File

@ -238,38 +238,48 @@ class CertMonTestCase(base.DbTestCase):
get_endpoint_certificate=mock.DEFAULT,
get_sc_intermediate_ca_secret=mock.DEFAULT,
is_subcloud_online=mock.DEFAULT,
get_token=mock.DEFAULT,
get_dc_token=mock.DEFAULT,
update_subcloud_status=mock.DEFAULT,
update_subcloud_ca_cert=mock.DEFAULT) as mocks:
update_subcloud_ca_cert=mock.DEFAULT) \
as utils_mock:
# returns an SSL cert in PEM-encoded string
mocks["dc_get_subcloud_sysinv_url"].return_value \
utils_mock["dc_get_subcloud_sysinv_url"].return_value \
= "https://example.com"
mocks["get_endpoint_certificate"].return_value \
utils_mock["get_endpoint_certificate"].return_value \
= self._get_valid_certificate_pem()
mocks["get_sc_intermediate_ca_secret"].return_value \
utils_mock["get_sc_intermediate_ca_secret"].return_value \
= self._get_sc_intermediate_ca_secret()
mocks["is_subcloud_online"].return_value = True
mocks["get_dc_token"].return_value = None # don"t care
mocks["update_subcloud_status"].return_value = None
mocks["update_subcloud_ca_cert"].return_value = None
utils_mock["is_subcloud_online"].return_value = True
utils_mock["get_dc_token"].return_value = None # don"t care
utils_mock["update_subcloud_status"].return_value = None
utils_mock["update_subcloud_ca_cert"].return_value = None
cmgr = cert_mon_manager.CertificateMonManager()
cmgr.use_sc_audit_pool = False # easier for testing in serial
# also need to mock the TokenCache
with mock.patch.multiple("sysinv.cert_mon.utils.TokenCache",
get_token=mock.DEFAULT) \
as token_cache_mock:
token_cache_mock["get_token"].return_value = None # don"t care
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test1"), delay_secs=1)
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test2"), delay_secs=2)
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
cmgr = cert_mon_manager.CertificateMonManager()
cmgr.use_sc_audit_pool = False # easier for testing in serial
# Run audit immediately, it should not have picked up anything
cmgr.audit_sc_cert_task(None)
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test1"),
delay_secs=1)
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test2"),
delay_secs=2)
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
time.sleep(3)
cmgr.audit_sc_cert_task(None)
# It should now be drained:
self.assertEqual(cmgr.sc_audit_queue.qsize(), 0)
# Run audit immediately, it should not have picked up anything
cmgr.audit_sc_cert_task(None)
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
time.sleep(3)
cmgr.audit_sc_cert_task(None)
# It should now be drained:
self.assertEqual(cmgr.sc_audit_queue.qsize(), 0)
def test_token_cache(self):
"""Basic test case for TokenCache"""
@ -282,58 +292,27 @@ class CertMonTestCase(base.DbTestCase):
self.token_cache_num += 1
return token
with mock.patch("sysinv.cert_mon.utils.get_token") as mock_get_token:
mock_get_token.side_effect = get_cache_test_token
token_cache = cert_mon_utils.TokenCache()
token = token_cache.get_token()
self.assertEqual(token.get_id(), "token1")
self.assertFalse(token.is_expired())
self.assertEqual(token_cache.get_token().get_id(), "token1")
token.set_expired()
self.assertTrue(token.is_expired())
# should now get a new, unexpired token:
token = token_cache.get_token()
self.assertEqual(token.get_id(), "token2")
self.assertFalse(token.is_expired())
self.assertEqual(token_cache.get_token().get_id(), "token2")
token_cache.get_token().set_expired()
self.assertTrue(token.is_expired())
token_cache = cert_mon_utils.TokenCache('internal')
def test_dc_token_cache(self):
"""Basic test case for DCTokenCache"""
def get_cache_test_token(region_name):
"""Increments the token id each invocation
This method is called in place of utils.get_token() for this test."""
token = self.get_keystone_token()
token.token_id = "token{}".format(self.token_cache_num)
token.region_name = region_name
self.token_cache_num += 1
return token
# override the cache getter function for our test:
token_cache._getter_func = get_cache_test_token
with mock.patch(
"sysinv.cert_mon.utils.get_dc_token") as mock_get_dc_token:
mock_get_dc_token.side_effect = get_cache_test_token
token_cache = cert_mon_utils.DCTokenCache()
region = "RegionOne"
token = token_cache.get_dc_token(region)
self.assertEqual(token.get_id(), "token1")
self.assertEqual(token.region_name, "RegionOne")
self.assertFalse(token.is_expired())
self.assertEqual(token_cache.get_dc_token(region).get_id(), "token1")
token.set_expired()
self.assertTrue(token.is_expired())
# should now get a new, unexpired token:
token = token_cache.get_dc_token(region)
self.assertEqual(token.get_id(), "token2")
self.assertFalse(token.is_expired())
self.assertEqual(token_cache.get_dc_token(region).get_id(), "token2")
token_cache.get_dc_token(region).set_expired()
self.assertTrue(token.is_expired())
token = token_cache.get_dc_token(region)
self.assertEqual(token.get_id(), "token3")
self.assertFalse(token.is_expired())
# Test getting token for a different region:
region = "RegionTwo"
token = token_cache.get_dc_token(region)
self.assertEqual(token.get_id(), "token4")
self.assertFalse(token.is_expired())
token = token_cache.get_token()
self.assertEqual(token.get_id(), "token1")
self.assertFalse(token.is_expired())
self.assertEqual(token_cache.get_token().get_id(), "token1")
token.set_expired()
self.assertTrue(token.is_expired())
# should now get a new, unexpired token:
token = token_cache.get_token()
self.assertEqual(token.get_id(), "token2")
self.assertFalse(token.is_expired())
self.assertEqual(token_cache.get_token().get_id(), "token2")
token_cache.get_token().set_expired()
self.assertTrue(token.is_expired())
token = token_cache.get_token()
self.assertEqual(token.get_id(), "token3")
self.assertFalse(token.is_expired())
token = token_cache.get_token()
self.assertEqual(token.get_id(), "token3")
self.assertFalse(token.is_expired())