diff --git a/distributedcloud/dcagent/common/config.py b/distributedcloud/dcagent/common/config.py index 702a7fd5f..1393ca4dd 100644 --- a/distributedcloud/dcagent/common/config.py +++ b/distributedcloud/dcagent/common/config.py @@ -107,6 +107,11 @@ endpoint_cache_opts = [ "http_connect_timeout", help="Request timeout value for communicating with Identity API server.", ), + cfg.IntOpt( + "token_cache_size", + default=5000, + help="Maximum number of entries in the in-memory token cache", + ), ] scheduler_opts = [ diff --git a/distributedcloud/dccommon/drivers/openstack/dcagent_v1.py b/distributedcloud/dccommon/drivers/openstack/dcagent_v1.py index fd9f582f7..41bb670be 100644 --- a/distributedcloud/dccommon/drivers/openstack/dcagent_v1.py +++ b/distributedcloud/dccommon/drivers/openstack/dcagent_v1.py @@ -6,7 +6,6 @@ from keystoneauth1.session import Session as keystone_session from oslo_log import log -import requests from dccommon import consts from dccommon.drivers import base @@ -26,6 +25,8 @@ class DcagentClient(base.DriverBase): session: keystone_session, endpoint: str = None, ): + self.session = session + # Get an endpoint and token. if endpoint is None: self.endpoint = session.get_endpoint( @@ -36,14 +37,11 @@ class DcagentClient(base.DriverBase): else: self.endpoint = endpoint - self.token = session.get_token() - def audit(self, audit_data, timeout=DCAGENT_REST_DEFAULT_TIMEOUT): """Audit subcloud""" url = self.endpoint + "/v1/dcaudit" - headers = {"X-Auth-Token": self.token} - response = requests.patch( - url, headers=headers, json=audit_data, timeout=timeout + response = self.session.patch( + url, json=audit_data, timeout=timeout, raise_exc=False ) if response.status_code == 200: diff --git a/distributedcloud/dccommon/drivers/openstack/dcmanager_v1.py b/distributedcloud/dccommon/drivers/openstack/dcmanager_v1.py index 3461693b9..b02ee364e 100644 --- a/distributedcloud/dccommon/drivers/openstack/dcmanager_v1.py +++ b/distributedcloud/dccommon/drivers/openstack/dcmanager_v1.py @@ -3,8 +3,8 @@ # SPDX-License-Identifier: Apache-2.0 # +from keystoneauth1 import session as ks_session from oslo_log import log -import requests from requests_toolbelt import MultipartEncoder from dccommon import consts @@ -22,8 +22,8 @@ class DcmanagerClient(base.DriverBase): def __init__( self, - region, - session, + region: str, + session: ks_session.Session, timeout=DCMANAGER_CLIENT_REST_DEFAULT_TIMEOUT, endpoint_type=consts.KS_ENDPOINT_PUBLIC, endpoint=None, @@ -33,8 +33,8 @@ class DcmanagerClient(base.DriverBase): service_type="dcmanager", region_name=region, interface=endpoint_type ) self.endpoint = endpoint - self.token = session.get_token() self.timeout = timeout + self.session = session def get_system_peer(self, system_peer_uuid): """Get system peer.""" @@ -42,8 +42,7 @@ class DcmanagerClient(base.DriverBase): raise ValueError("system_peer_uuid is required.") url = f"{self.endpoint}/system-peers/{system_peer_uuid}" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=self.timeout) + response = self.session.get(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: return response.json() @@ -63,10 +62,14 @@ class DcmanagerClient(base.DriverBase): raise ValueError("subcloud_ref is required.") url = f"{self.endpoint}/subclouds/{subcloud_ref}/detail" - headers = {"X-Auth-Token": self.token} - if is_region_name: - headers["User-Agent"] = consts.DCMANAGER_V1_HTTP_AGENT - response = requests.get(url, headers=headers, timeout=self.timeout) + user_agent = consts.DCMANAGER_V1_HTTP_AGENT if is_region_name else None + + response = self.session.get( + url, + timeout=self.timeout, + user_agent=user_agent, + raise_exc=False, + ) if response.status_code == 200: return response.json() @@ -84,8 +87,7 @@ class DcmanagerClient(base.DriverBase): """Get subcloud list.""" url = f"{self.endpoint}/subclouds" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=self.timeout) + response = self.session.get(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -99,8 +101,7 @@ class DcmanagerClient(base.DriverBase): """Get subcloud group list.""" url = f"{self.endpoint}/subcloud-groups" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=self.timeout) + response = self.session.get(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -116,8 +117,7 @@ class DcmanagerClient(base.DriverBase): """Get subcloud peer group list.""" url = f"{self.endpoint}/subcloud-peer-groups" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=self.timeout) + response = self.session.get(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -136,8 +136,7 @@ class DcmanagerClient(base.DriverBase): raise ValueError("peer_group_ref is required.") url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=self.timeout) + response = self.session.get(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: return response.json() @@ -162,8 +161,7 @@ class DcmanagerClient(base.DriverBase): raise ValueError("peer_group_ref is required.") url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}/subclouds" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=self.timeout) + response = self.session.get(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -196,8 +194,7 @@ class DcmanagerClient(base.DriverBase): """Get peer group association list.""" url = f"{self.endpoint}/peer-group-associations" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=self.timeout) + response = self.session.get(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -214,9 +211,9 @@ class DcmanagerClient(base.DriverBase): """Add a subcloud peer group.""" url = f"{self.endpoint}/subcloud-peer-groups" - headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"} - response = requests.post( - url, json=kwargs, headers=headers, timeout=self.timeout + headers = {"Content-Type": "application/json"} + response = self.session.post( + url, json=kwargs, timeout=self.timeout, headers=headers, raise_exc=False ) if response.status_code == 200: @@ -256,11 +253,12 @@ class DcmanagerClient(base.DriverBase): fields.update(data) enc = MultipartEncoder(fields=fields) headers = { - "X-Auth-Token": self.token, "Content-Type": enc.content_type, "User-Agent": consts.DCMANAGER_V1_HTTP_AGENT, } - response = requests.post(url, headers=headers, data=enc, timeout=self.timeout) + response = self.session.post( + url, data=enc, timeout=self.timeout, headers=headers, raise_exc=False + ) if response.status_code == 200: return response.json() @@ -276,9 +274,8 @@ class DcmanagerClient(base.DriverBase): """Add a peer group association.""" url = f"{self.endpoint}/peer-group-associations" - headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"} - response = requests.post( - url, json=kwargs, headers=headers, timeout=self.timeout + response = self.session.post( + url, json=kwargs, timeout=self.timeout, raise_exc=False ) if response.status_code == 200: @@ -298,9 +295,8 @@ class DcmanagerClient(base.DriverBase): url = f"{self.endpoint}/peer-group-associations/{association_id}" update_kwargs = {"sync_status": sync_status} - headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"} - response = requests.patch( - url, json=update_kwargs, headers=headers, timeout=self.timeout + response = self.session.patch( + url, json=update_kwargs, timeout=self.timeout, raise_exc=False ) if response.status_code == 200: @@ -327,13 +323,12 @@ class DcmanagerClient(base.DriverBase): raise ValueError("peer_group_ref is required.") url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}" - headers = { - "X-Auth-Token": self.token, - "Content-Type": "application/json", - "User-Agent": consts.DCMANAGER_V1_HTTP_AGENT, - } - response = requests.patch( - url, json=kwargs, headers=headers, timeout=self.timeout + response = self.session.patch( + url, + json=kwargs, + timeout=self.timeout, + user_agent=consts.DCMANAGER_V1_HTTP_AGENT, + raise_exc=False, ) if response.status_code == 200: @@ -359,9 +354,8 @@ class DcmanagerClient(base.DriverBase): raise ValueError("peer_group_ref is required.") url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}/audit" - headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"} - response = requests.patch( - url, json=kwargs, headers=headers, timeout=self.timeout + response = self.session.patch( + url, json=kwargs, timeout=self.timeout, raise_exc=False ) if response.status_code == 200: @@ -402,12 +396,12 @@ class DcmanagerClient(base.DriverBase): fields.update(data) enc = MultipartEncoder(fields=fields) - headers = {"X-Auth-Token": self.token, "Content-Type": enc.content_type} - # Add header to flag the request is from another DC, - # server will treat subcloud_ref as a region_name + headers = {"Content-Type": enc.content_type} if is_region_name: headers["User-Agent"] = consts.DCMANAGER_V1_HTTP_AGENT - response = requests.patch(url, headers=headers, data=enc, timeout=self.timeout) + response = self.session.patch( + url, data=enc, timeout=self.timeout, headers=headers, raise_exc=False + ) if response.status_code == 200: return response.json() @@ -428,8 +422,7 @@ class DcmanagerClient(base.DriverBase): raise ValueError("association_id is required.") url = f"{self.endpoint}/peer-group-associations/{association_id}" - headers = {"X-Auth-Token": self.token} - response = requests.delete(url, headers=headers, timeout=self.timeout) + response = self.session.delete(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: return response.json() @@ -454,8 +447,7 @@ class DcmanagerClient(base.DriverBase): raise ValueError("peer_group_ref is required.") url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}" - headers = {"X-Auth-Token": self.token} - response = requests.delete(url, headers=headers, timeout=self.timeout) + response = self.session.delete(url, timeout=self.timeout, raise_exc=False) if response.status_code == 200: return response.json() @@ -488,11 +480,12 @@ class DcmanagerClient(base.DriverBase): raise ValueError("subcloud_ref is required.") url = f"{self.endpoint}/subclouds/{subcloud_ref}" - headers = { - "X-Auth-Token": self.token, - "User-Agent": consts.DCMANAGER_V1_HTTP_AGENT, - } - response = requests.delete(url, headers=headers, timeout=self.timeout) + response = self.session.delete( + url, + timeout=self.timeout, + user_agent=consts.DCMANAGER_V1_HTTP_AGENT, + raise_exc=False, + ) if response.status_code == 200: return response.json() diff --git a/distributedcloud/dccommon/drivers/openstack/fm.py b/distributedcloud/dccommon/drivers/openstack/fm.py index af6a84edb..5ca4f1943 100644 --- a/distributedcloud/dccommon/drivers/openstack/fm.py +++ b/distributedcloud/dccommon/drivers/openstack/fm.py @@ -14,11 +14,11 @@ # import fmclient +from keystoneauth1 import session as ks_session from oslo_log import log from dccommon import consts as dccommon_consts from dccommon.drivers import base -from dccommon import exceptions LOG = log.getLogger(__name__) API_VERSION = "1" @@ -29,30 +29,32 @@ class FmClient(base.DriverBase): def __init__( self, - region, - session, + region: str, + session: ks_session.Session, endpoint_type=dccommon_consts.KS_ENDPOINT_DEFAULT, - endpoint=None, - token=None, + endpoint: str = None, + token: str = None, ): self.region_name = region - token = token if token else session.get_token() - if not endpoint: - endpoint = session.get_endpoint( - service_type=dccommon_consts.ENDPOINT_TYPE_FM, - region_name=region, - interface=endpoint_type, - ) - try: - self.fm = fmclient.Client( - API_VERSION, - region_name=region, - endpoint_type=endpoint_type, - endpoint=endpoint, - auth_token=token, - ) - except exceptions.ServiceUnavailable: - raise + + # If the token is specified, use it instead of using the session + if token: + if not endpoint: + endpoint = session.get_endpoint( + service_type=dccommon_consts.ENDPOINT_TYPE_FM, + region_name=region, + interface=endpoint_type, + ) + session = None + + self.fm = fmclient.Client( + API_VERSION, + session=session, + region_name=region, + endpoint_type=endpoint_type, + endpoint=endpoint, + auth_token=token, + ) def get_alarm_summary(self): """Get this region alarm summary""" diff --git a/distributedcloud/dccommon/drivers/openstack/patching_v1.py b/distributedcloud/dccommon/drivers/openstack/patching_v1.py index bd60957d8..654d80c1b 100644 --- a/distributedcloud/dccommon/drivers/openstack/patching_v1.py +++ b/distributedcloud/dccommon/drivers/openstack/patching_v1.py @@ -13,8 +13,8 @@ # under the License. # +from keystoneauth1 import session as ks_session from oslo_log import log -import requests from requests_toolbelt import MultipartEncoder from dccommon import consts @@ -35,18 +35,16 @@ PATCH_REST_DEFAULT_TIMEOUT = 900 class PatchingClient(base.DriverBase): """Patching V1 driver.""" - def __init__(self, region, session, endpoint=None): - # Get an endpoint and token. - if endpoint is None: + def __init__(self, region: str, session: ks_session.Session, endpoint: str = None): + self.session = session + self.endpoint = endpoint + + if not self.endpoint: self.endpoint = session.get_endpoint( service_type="patching", region_name=region, interface=consts.KS_ENDPOINT_ADMIN, ) - else: - self.endpoint = endpoint - - self.token = session.get_token() def query(self, state=None, release=None, timeout=PATCH_REST_DEFAULT_TIMEOUT): """Query patches""" @@ -55,8 +53,7 @@ class PatchingClient(base.DriverBase): url += "?show=%s" % state.lower() if release is not None: url += "&release=%s" % release - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=timeout) + response = self.session.get(url, timeout=timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -74,8 +71,7 @@ class PatchingClient(base.DriverBase): def query_hosts(self, timeout=PATCH_REST_DEFAULT_TIMEOUT): """Query hosts""" url = self.endpoint + "/v1/query_hosts" - headers = {"X-Auth-Token": self.token} - response = requests.get(url, headers=headers, timeout=timeout) + response = self.session.get(url, timeout=timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -94,8 +90,7 @@ class PatchingClient(base.DriverBase): """Apply patches""" patch_str = "/".join(patches) url = self.endpoint + "/v1/apply/%s" % patch_str - headers = {"X-Auth-Token": self.token} - response = requests.post(url, headers=headers, timeout=timeout) + response = self.session.post(url, timeout=timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -114,8 +109,7 @@ class PatchingClient(base.DriverBase): """Remove patches""" patch_str = "/".join(patches) url = self.endpoint + "/v1/remove/%s" % patch_str - headers = {"X-Auth-Token": self.token} - response = requests.post(url, headers=headers, timeout=timeout) + response = self.session.post(url, timeout=timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -134,8 +128,7 @@ class PatchingClient(base.DriverBase): """Delete patches""" patch_str = "/".join(patches) url = self.endpoint + "/v1/delete/%s" % patch_str - headers = {"X-Auth-Token": self.token} - response = requests.post(url, headers=headers, timeout=timeout) + response = self.session.post(url, timeout=timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -154,8 +147,7 @@ class PatchingClient(base.DriverBase): """Commit patches""" patch_str = "/".join(patches) url = self.endpoint + "/v1/commit/%s" % patch_str - headers = {"X-Auth-Token": self.token} - response = requests.post(url, headers=headers, timeout=timeout) + response = self.session.post(url, timeout=timeout, raise_exc=False) if response.status_code == 200: data = response.json() @@ -183,8 +175,10 @@ class PatchingClient(base.DriverBase): } ) url = self.endpoint + "/v1/upload" - headers = {"X-Auth-Token": self.token, "Content-Type": enc.content_type} - response = requests.post(url, data=enc, headers=headers, timeout=timeout) + headers = {"Content-Type": enc.content_type} + response = self.session.post( + url, data=enc, headers=headers, timeout=timeout, raise_exc=False + ) if response.status_code == 200: data = response.json() diff --git a/distributedcloud/dccommon/drivers/openstack/software_v1.py b/distributedcloud/dccommon/drivers/openstack/software_v1.py index 43b890e67..f6144f926 100644 --- a/distributedcloud/dccommon/drivers/openstack/software_v1.py +++ b/distributedcloud/dccommon/drivers/openstack/software_v1.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 # +from urllib import parse from keystoneauth1.session import Session as keystone_session from oslo_log import log @@ -40,59 +41,66 @@ class SoftwareClient(base.DriverBase): endpoint_type: str = consts.KS_ENDPOINT_ADMIN, token: str = None, ): - # Get an endpoint and token. - if not endpoint: + self.session = session + self.endpoint = endpoint + self.token = token + + if not self.endpoint: self.endpoint = session.get_endpoint( service_type=consts.ENDPOINT_TYPE_USM, region_name=region, interface=endpoint_type, ) - else: - self.endpoint = endpoint - # The usm systemcontroller endpoint ends with a slash but the regionone - # and the subcloud endpoint don't. The slash is removed to standardize - # with the other endpoints. - self.endpoint = self.endpoint.rstrip("/") + "/v1" - self.token = token if token else session.get_token() - self.headers = {"X-Auth-Token": self.token} + self.endpoint = parse.urljoin(self.endpoint, "/v1") + self.headers = {"X-Auth-Token": self.token} if token else None + + def request(self, url: str, method: str, timeout: int): + """Request directly if token is passed, otherwise use the session""" + if self.token: + return requests.request( + method=method, url=url, headers=self.headers, timeout=timeout + ) + return self.session.request( + url, method=method, timeout=timeout, raise_exc=False + ) def list(self, timeout=REST_DEFAULT_TIMEOUT): """List releases""" url = self.endpoint + "/release" - response = requests.get(url, headers=self.headers, timeout=timeout) + response = self.request(url, "GET", timeout) return self._handle_response(response, operation="List") def show(self, release, timeout=REST_SHOW_TIMEOUT): """Show release""" url = self.endpoint + f"/release/{release}" - response = requests.get(url, headers=self.headers, timeout=timeout) + response = self.request(url, "GET", timeout) return self._handle_response(response, operation="Show") def delete(self, releases, timeout=REST_DELETE_TIMEOUT): """Delete release""" release_str = "/".join(releases) url = self.endpoint + f"/release/{release_str}" - response = requests.delete(url, headers=self.headers, timeout=timeout) + response = self.request(url, "DELETE", timeout) return self._handle_response(response, operation="Delete") def deploy_precheck(self, deployment, timeout=REST_DEFAULT_TIMEOUT): """Deploy precheck""" url = self.endpoint + f"/deploy/{deployment}/precheck" - response = requests.post(url, headers=self.headers, timeout=timeout) + response = self.request(url, "POST", timeout) return self._handle_response(response, operation="Deploy precheck") def show_deploy(self, timeout=REST_DEFAULT_TIMEOUT): """Show deploy""" url = self.endpoint + "/deploy" - response = requests.get(url, headers=self.headers, timeout=timeout) + response = self.request(url, "GET", timeout) return self._handle_response(response, operation="Show deploy") def commit_patch(self, releases, timeout=REST_DEFAULT_TIMEOUT): """Commit patch""" release_str = "/".join(releases) url = self.endpoint + f"/commit_patch/{release_str}" - response = requests.post(url, headers=self.headers, timeout=timeout) + response = self.request(url, "POST", timeout) return self._handle_response(response, operation="Commit patch") def _handle_response(self, response, operation): diff --git a/distributedcloud/dccommon/drivers/openstack/sysinv_v1.py b/distributedcloud/dccommon/drivers/openstack/sysinv_v1.py index 668cc2696..d70f9ccf7 100644 --- a/distributedcloud/dccommon/drivers/openstack/sysinv_v1.py +++ b/distributedcloud/dccommon/drivers/openstack/sysinv_v1.py @@ -126,26 +126,32 @@ class SysinvClient(base.DriverBase): self, region: str, session: keystone_session, - timeout: int = consts.SYSINV_CLIENT_REST_DEFAULT_TIMEOUT, + timeout: float = consts.SYSINV_CLIENT_REST_DEFAULT_TIMEOUT, endpoint_type: str = consts.KS_ENDPOINT_ADMIN, endpoint: str = None, token: str = None, ): self.region_name = region - # The sysinv client doesn't support a session, so we need to - # get an endpoint and token. - if not endpoint: - endpoint = session.get_endpoint( - service_type=consts.ENDPOINT_TYPE_PLATFORM, - region_name=region, - interface=endpoint_type, - ) + kwargs = {} - token = token if token else session.get_token() - self.sysinv_client = client.Client( - API_VERSION, endpoint=endpoint, token=token, timeout=timeout - ) + # If the token is specified, use it instead of using the session + if token: + kwargs["token"] = token + kwargs["timeout"] = timeout + if not endpoint: + endpoint = session.get_endpoint( + service_type=consts.ENDPOINT_TYPE_PLATFORM, + region_name=region, + interface=endpoint_type, + ) + else: + session.timeout = timeout + kwargs["session"] = session + + kwargs["endpoint"] = endpoint + + self.sysinv_client = client.Client(API_VERSION, **kwargs) def get_host(self, hostname_or_id): """Get a host by its hostname or id.""" diff --git a/distributedcloud/dccommon/drivers/openstack/vim.py b/distributedcloud/dccommon/drivers/openstack/vim.py index 56bb343d5..ddab0f106 100644 --- a/distributedcloud/dccommon/drivers/openstack/vim.py +++ b/distributedcloud/dccommon/drivers/openstack/vim.py @@ -13,8 +13,10 @@ # under the License. # +from functools import wraps import json +from keystoneauth1 import session as ks_session from nfv_client.openstack import rest_api from nfv_client.openstack import sw_update from oslo_log import log @@ -80,36 +82,63 @@ TRANSITORY_STATES = [ VIM_AUTHORIZATION_FAILED = "Authorization failed" +# VIM API returns a 403 instead of a 401 for unauthenticated requests, so we +# can't use the internal re-auth functionality of the keystone session, we must +# manually check for the VIM_AUTHORIZATION_FAILED string in the raised exception +def retry_on_auth_failure(): + def decorator(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except Exception as e: + # Invalidate token cache and retry + if VIM_AUTHORIZATION_FAILED in str(e): + self.session.invalidate() + return func(self, *args, **kwargs) + # Raise any other type of exception + raise e + + return wrapper + + return decorator + + +# TODO(gherzmann): Enhance VIM client to use session-based connections, +# enabling TCP connection reuse for improved efficiency class VimClient(base.DriverBase): """VIM driver.""" - def __init__(self, region, session, endpoint=None): - try: - # The nfv_client doesn't support a session, so we need to - # get an endpoint and token. - if endpoint is None: - self.endpoint = session.get_endpoint( - service_type="nfv", - region_name=region, - interface=consts.KS_ENDPOINT_ADMIN, - ) - else: - self.endpoint = endpoint + @property + def token(self): + # The property is used to guarantee we always get the most recent token + return self.session.get_token() - self.token = session.get_token() - # session.get_user_id() returns a UUID - # that always corresponds to 'dcmanager' - self.username = consts.DCMANAGER_USER_NAME - # session object does not provide a domain query - # The only domain used for dcmanager is 'default' - self.user_domain_name = "default" - # session.get_project_id() returns a UUID - # that always corresponds to 'services' - self.tenant = consts.SERVICES_USER_NAME + def __init__(self, region: str, session: ks_session.Session, endpoint: str = None): + self.session = session - except exceptions.ServiceUnavailable: - raise + # The nfv_client doesn't support a session, so we need to + # get an endpoint and token. + if endpoint is None: + self.endpoint = session.get_endpoint( + service_type="nfv", + region_name=region, + interface=consts.KS_ENDPOINT_ADMIN, + ) + else: + self.endpoint = endpoint + # session.get_user_id() returns a UUID + # that always corresponds to 'dcmanager' + self.username = consts.DCMANAGER_USER_NAME + # session object does not provide a domain query + # The only domain used for dcmanager is 'default' + self.user_domain_name = "default" + # session.get_project_id() returns a UUID + # that always corresponds to 'services' + self.tenant = consts.SERVICES_USER_NAME + + @retry_on_auth_failure() def create_strategy( self, strategy_name, @@ -193,6 +222,7 @@ class VimClient(base.DriverBase): ) return self._add_strategy_response(response) + @retry_on_auth_failure() def get_strategy(self, strategy_name, raise_error_if_missing=True): """Get VIM orchestration strategy""" @@ -231,6 +261,7 @@ class VimClient(base.DriverBase): ) return self._add_strategy_response(response) + @retry_on_auth_failure() def get_current_strategy(self): """Get the current active VIM orchestration strategy""" @@ -243,6 +274,7 @@ class VimClient(base.DriverBase): LOG.debug("Strategy: %s" % strategy) return strategy + @retry_on_auth_failure() def delete_strategy(self, strategy_name): """Delete the current VIM orchestration strategy""" @@ -265,6 +297,7 @@ class VimClient(base.DriverBase): LOG.debug("Strategy deleted") + @retry_on_auth_failure() def apply_strategy(self, strategy_name): """Apply the current orchestration strategy""" @@ -332,6 +365,7 @@ class VimClient(base.DriverBase): return sw_update._get_strategy_object_from_response(response) + @retry_on_auth_failure() def abort_strategy(self, strategy_name): """Abort the current orchestration strategy""" diff --git a/distributedcloud/dccommon/endpoint_cache.py b/distributedcloud/dccommon/endpoint_cache.py index fa0054bd7..b906be92c 100644 --- a/distributedcloud/dccommon/endpoint_cache.py +++ b/distributedcloud/dccommon/endpoint_cache.py @@ -16,11 +16,12 @@ # import collections -from typing import Callable -from typing import List -from typing import Tuple +from collections.abc import Callable +from typing import Any +from typing import Optional from typing import Union +from keystoneauth1 import access from keystoneauth1.identity import v3 from keystoneauth1 import loading from keystoneauth1 import session @@ -37,6 +38,120 @@ LOG = logging.getLogger(__name__) LOCK_NAME = "dc-keystone-endpoint-cache" +class TCPKeepAliveSingleConnectionAdapter(session.TCPKeepAliveAdapter): + def __init__(self, *args, **kwargs): + # Set the maximum connections to 1 to reduce the number of open file descriptors + kwargs["pool_connections"] = 1 + kwargs["pool_maxsize"] = 1 + super().__init__(*args, **kwargs) + + +class BoundedFIFOCache(collections.OrderedDict): + """A First-In-First-Out (FIFO) cache with a maximum size limit. + + This cache maintains insertion order and automatically removes the oldest + items when the maximum size is reached. + """ + + def __init__(self, *args, **kwargs) -> None: + """Initialize the FIFO cache. + + :param args: Additional positional arguments passed to OrderedDict constructor. + :param kwargs: Additional keyword arguments passed to OrderedDict constructor. + """ + self._maxsize = None + super().__init__(*args, **kwargs) + + def __setitem__(self, key: Any, value: Any) -> None: + """Set an item in the cache. + + If the cache is at maximum capacity, the oldest item is discarded. + + :param key: The key of the item. + :param value: The value of the item. + """ + super().__setitem__(key, value) + self.move_to_end(key) + + # The CONF endpoint_cache section doesn't exist at the + # time the class is defined, so we define it here instead + if self._maxsize is None: + self._maxsize = CONF.endpoint_cache.token_cache_size + + if self._maxsize > 0 and len(self) > self._maxsize: + discarded = self.popitem(last=False) + LOG.info(f"Maximum cache size reached, discarding token for {discarded[0]}") + + +class CachedV3Password(v3.Password): + """Cached v3.Password authentication class that caches auth tokens. + + This class uses a bounded FIFO cache to store and retrieve auth tokens, + reducing the number of token requests made to the authentication server. + """ + + _CACHE = BoundedFIFOCache() + _CACHE_LOCK = lockutils.ReaderWriterLock() + + def _get_from_cache(self) -> Optional[tuple[dict, str]]: + """Retrieve the cached auth info for the current auth_url. + + :return: The cached authentication information, if available. + """ + with CachedV3Password._CACHE_LOCK.read_lock(): + return CachedV3Password._CACHE.get(self.auth_url) + + def _update_cache(self, access_info: access.AccessInfoV3) -> None: + """Update the cache with new auth info. + + :param access_info: The access information to cache. + """ + with CachedV3Password._CACHE_LOCK.write_lock(): + # pylint: disable=protected-access + CachedV3Password._CACHE[self.auth_url] = ( + access_info._data, + access_info._auth_token, + ) + + def _remove_from_cache(self) -> Optional[tuple[dict, str]]: + """Remove the auth info for the current auth_url from the cache.""" + with CachedV3Password._CACHE_LOCK.write_lock(): + return CachedV3Password._CACHE.pop(self.auth_url, None) + + def get_auth_ref(self, _session: session.Session, **kwargs) -> access.AccessInfoV3: + """Get the authentication reference, using the cache if possible. + + This method first checks the cache for a valid token. If found and not + expiring soon, it returns the cached token. Otherwise, it requests a new + token from the auth server and updates the cache. + + :param session: The session to use for authentication. + :param kwargs: Additional keyword arguments passed to the parent method. + :return: The authentication reference. + """ + cached_data = self._get_from_cache() + if cached_data and not utils.is_token_expiring_soon(cached_data[0]["token"]): + LOG.debug("Reuse cached token for %s", self.auth_url) + return access.AccessInfoV3(*cached_data) + + # If not in cache or expired, fetch new token and update cache + LOG.debug("Getting a new token from %s", self.auth_url) + new_access_info = super().get_auth_ref(_session, **kwargs) + self._update_cache(new_access_info) + return new_access_info + + def invalidate(self) -> bool: + """Remove token from cache when the parent invalidate method is called. + + This method is called by the session when a request returns a 401 (Unauthorized) + + :return: The result of the parent invalidate method. + """ + LOG.debug("Invalidating token for %s", self.auth_url) + self._remove_from_cache() + return super().invalidate() + + class EndpointCache(object): """Cache for storing endpoint information. @@ -196,7 +311,7 @@ class EndpointCache(object): :rtype: session.Session """ - user_auth = v3.Password( + user_auth = CachedV3Password( auth_url=auth_url, username=user_name, user_domain_name=user_domain_name, @@ -215,12 +330,18 @@ class EndpointCache(object): CONF.endpoint_cache.http_connect_timeout if timeout is None else timeout ) - return session.Session( + ks_session = session.Session( auth=user_auth, additional_headers=consts.USER_HEADER, timeout=(discovery_timeout, read_timeout), ) + # Mount the custom adapters + ks_session.session.mount("http://", TCPKeepAliveSingleConnectionAdapter()) + ks_session.session.mount("https://", TCPKeepAliveSingleConnectionAdapter()) + + return ks_session + @staticmethod def _is_central_cloud(region_name: str) -> bool: """Check if the region is a central cloud. @@ -287,7 +408,7 @@ class EndpointCache(object): return endpoint @lockutils.synchronized(LOCK_NAME) - def get_all_regions(self) -> List[str]: + def get_all_regions(self) -> list[str]: """Get region list. return: List of regions @@ -382,7 +503,7 @@ class EndpointCache(object): @lockutils.synchronized(LOCK_NAME) def get_cached_master_keystone_client_and_region_endpoint_map( self, region_name: str - ) -> Tuple[ks_client.Client, dict]: + ) -> tuple[ks_client.Client, dict]: """Get the cached master Keystone client and region endpoint map. :param region_name: The name of the region. diff --git a/distributedcloud/dccommon/tests/unit/drivers/test_dcmanager_v1.py b/distributedcloud/dccommon/tests/unit/drivers/test_dcmanager_v1.py index 07831b4cd..271e72a34 100644 --- a/distributedcloud/dccommon/tests/unit/drivers/test_dcmanager_v1.py +++ b/distributedcloud/dccommon/tests/unit/drivers/test_dcmanager_v1.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 # -import os +from io import BytesIO import uuid import yaml @@ -62,342 +62,181 @@ class TestDcmanagerClient(base.DCCommonTestCase): def setUp(self): super(TestDcmanagerClient, self).setUp() - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = FAKE_SUBCLOUD_DATA - mock_get.return_value = mock_response + self.mock_response = mock.MagicMock() + self.mock_response.status_code = 200 + self.mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None + self.mock_session = mock.MagicMock() + + self.client = dcmanager_v1.DcmanagerClient( + dccommon_consts.SYSTEM_CONTROLLER_NAME, + session=self.mock_session, + timeout=FAKE_TIMEOUT, + endpoint=FAKE_ENDPOINT, ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT - actual_subcloud = client.get_subcloud(SUBCLOUD_NAME) + def test_get_subcloud(self): + self.mock_response.json.return_value = FAKE_SUBCLOUD_DATA + self.mock_session.get.return_value = self.mock_response + + actual_subcloud = self.client.get_subcloud(SUBCLOUD_NAME) self.assertEqual(SUBCLOUD_NAME, actual_subcloud.get("name")) - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_not_found(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 404 - mock_response.text = "Subcloud not found" - mock_get.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + def test_get_subcloud_not_found(self): + self.mock_response.status_code = 404 + self.mock_response.text = "Subcloud not found" + self.mock_session.get.return_value = self.mock_response self.assertRaises( - dccommon_exceptions.SubcloudNotFound, client.get_subcloud, SUBCLOUD_NAME + dccommon_exceptions.SubcloudNotFound, + self.client.get_subcloud, + SUBCLOUD_NAME, ) - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_list(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]} - mock_get.return_value = mock_response + def test_get_subcloud_list(self): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]} + self.mock_session.get.return_value = self.mock_response - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT - - actual_subclouds = client.get_subcloud_list() + actual_subclouds = self.client.get_subcloud_list() self.assertEqual(1, len(actual_subclouds)) self.assertEqual(SUBCLOUD_NAME, actual_subclouds[0].get("name")) - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_group_list(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { + def test_get_subcloud_group_list(self): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = { "subcloud_groups": [{"name": SUBCLOUD_GROUP_NAME}] } - mock_get.return_value = mock_response + self.mock_session.get.return_value = self.mock_response - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + actual_subcloud_groups = self.client.get_subcloud_group_list() + self.assertListEqual(actual_subcloud_groups, [{"name": SUBCLOUD_GROUP_NAME}]) - actual_subcloud_groups = client.get_subcloud_group_list() - self.assertEqual(1, len(actual_subcloud_groups)) - self.assertEqual(SUBCLOUD_GROUP_NAME, actual_subcloud_groups[0].get("name")) - - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_peer_group_list(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = { + def test_get_subcloud_peer_group_list(self): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = { "subcloud_peer_groups": [FAKE_SUBCLOUD_PEER_GROUP_DATA] } - mock_get.return_value = mock_response + self.mock_session.get.return_value = self.mock_response - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + actual_peer_group = self.client.get_subcloud_peer_group_list() + self.assertListEqual(actual_peer_group, [FAKE_SUBCLOUD_PEER_GROUP_DATA]) - actual_peer_group = client.get_subcloud_peer_group_list() - self.assertEqual(1, len(actual_peer_group)) - self.assertEqual( - SUBCLOUD_PEER_GROUP_NAME, actual_peer_group[0].get("peer-group-name") - ) + def test_get_subcloud_peer_group(self): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA + self.mock_session.get.return_value = self.mock_response - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_peer_group(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA - mock_get.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT - - actual_peer_group = client.get_subcloud_peer_group(SUBCLOUD_PEER_GROUP_NAME) - self.assertEqual( - SUBCLOUD_PEER_GROUP_NAME, actual_peer_group.get("peer-group-name") - ) - - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_peer_group_not_found(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 404 - mock_response.text = "Subcloud Peer Group not found" - mock_get.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT - - self.assertRaises( - dccommon_exceptions.SubcloudPeerGroupNotFound, - client.get_subcloud_peer_group, - SUBCLOUD_PEER_GROUP_NAME, - ) - - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_list_by_peer_group(self, mock_client_init, mock_get): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]} - mock_get.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT - - actual_subclouds = client.get_subcloud_list_by_peer_group( + actual_peer_group = self.client.get_subcloud_peer_group( SUBCLOUD_PEER_GROUP_NAME ) - self.assertEqual(1, len(actual_subclouds)) - self.assertEqual(SUBCLOUD_NAME, actual_subclouds[0].get("name")) + self.assertDictEqual(actual_peer_group, FAKE_SUBCLOUD_PEER_GROUP_DATA) - @mock.patch("requests.get") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_get_subcloud_list_by_peer_group_not_found( - self, mock_client_init, mock_get - ): - mock_response = mock.MagicMock() - mock_response.status_code = 404 - mock_response.text = "Subcloud Peer Group not found" - mock_get.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + def test_get_subcloud_peer_group_not_found(self): + self.mock_response.status_code = 404 + self.mock_response.text = "Subcloud Peer Group not found" + self.mock_session.get.return_value = self.mock_response self.assertRaises( dccommon_exceptions.SubcloudPeerGroupNotFound, - client.get_subcloud_list_by_peer_group, + self.client.get_subcloud_peer_group, SUBCLOUD_PEER_GROUP_NAME, ) - @mock.patch("requests.post") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_add_subcloud_peer_group(self, mock_client_init, mock_post): + def test_get_subcloud_list_by_peer_group(self): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]} + self.mock_session.get.return_value = self.mock_response + + actual_subclouds = self.client.get_subcloud_list_by_peer_group( + SUBCLOUD_PEER_GROUP_NAME + ) + self.assertListEqual(actual_subclouds, [FAKE_SUBCLOUD_DATA]) + + def test_get_subcloud_list_by_peer_group_not_found(self): + self.mock_response.status_code = 404 + self.mock_response.text = "Subcloud Peer Group not found" + self.mock_session.get.return_value = self.mock_response + + self.assertRaises( + dccommon_exceptions.SubcloudPeerGroupNotFound, + self.client.get_subcloud_list_by_peer_group, + SUBCLOUD_PEER_GROUP_NAME, + ) + + def test_add_subcloud_peer_group(self): peer_group_kwargs = {"peer-group-name": SUBCLOUD_PEER_GROUP_NAME} - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA - mock_post.return_value = mock_response + self.mock_response.status_code = 200 + self.mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA + self.mock_session.post.return_value = self.mock_response - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + actual_peer_group = self.client.add_subcloud_peer_group(**peer_group_kwargs) + self.assertDictEqual(actual_peer_group, FAKE_SUBCLOUD_PEER_GROUP_DATA) - actual_peer_group = client.add_subcloud_peer_group(**peer_group_kwargs) - self.assertEqual( - SUBCLOUD_PEER_GROUP_NAME, actual_peer_group.get("peer-group-name") - ) + @mock.patch("builtins.open", new_callable=mock.mock_open) + def test_add_subcloud_with_secondary_status(self, mock_open): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = FAKE_SUBCLOUD_DATA + self.mock_session.post.return_value = self.mock_response - @mock.patch("requests.post") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_add_subcloud_with_secondary_status(self, mock_client_init, mock_post): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = FAKE_SUBCLOUD_DATA - mock_post.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT - - # create the cache file for subcloud create - yaml_data = yaml.dump(FAKE_SUBCLOUD_DATA) - with open(SUBCLOUD_BOOTSTRAP_VALUE_PATH, "w") as file: - file.write(yaml_data) + # Mock the file content to be returned when reading + yaml_data = yaml.dump(FAKE_SUBCLOUD_DATA).encode("utf-8") + mock_open.return_value = BytesIO(yaml_data) subcloud_kwargs = { "data": {"bootstrap-address": SUBCLOUD_BOOTSTRAP_ADDRESS}, "files": {"bootstrap_values": SUBCLOUD_BOOTSTRAP_VALUE_PATH}, } - actual_subcloud = client.add_subcloud_with_secondary_status(**subcloud_kwargs) - self.assertEqual(SUBCLOUD_NAME, actual_subcloud.get("name")) - - # purge the cache file - os.remove(SUBCLOUD_BOOTSTRAP_VALUE_PATH) - - @mock.patch("requests.delete") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_delete_subcloud_peer_group(self, mock_client_init, mock_delete): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = "" - mock_delete.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None + actual_subcloud = self.client.add_subcloud_with_secondary_status( + **subcloud_kwargs ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + self.assertDictEqual(actual_subcloud, FAKE_SUBCLOUD_DATA) - result = client.delete_subcloud_peer_group(SUBCLOUD_PEER_GROUP_NAME) - mock_delete.assert_called_once_with( + def test_delete_subcloud_peer_group(self): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = "" + self.mock_session.delete.return_value = self.mock_response + + result = self.client.delete_subcloud_peer_group(SUBCLOUD_PEER_GROUP_NAME) + self.mock_session.delete.assert_called_once_with( FAKE_ENDPOINT + "/subcloud-peer-groups/" + SUBCLOUD_PEER_GROUP_NAME, - headers={"X-Auth-Token": FAKE_TOKEN}, timeout=FAKE_TIMEOUT, + raise_exc=False, ) self.assertEqual(result, "") - @mock.patch("requests.delete") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_delete_subcloud_peer_group_not_found(self, mock_client_init, mock_delete): - mock_response = mock.MagicMock() - mock_response.status_code = 404 - mock_response.text = "Subcloud Peer Group not found" - mock_delete.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + def test_delete_subcloud_peer_group_not_found(self): + self.mock_response.status_code = 404 + self.mock_response.text = "Subcloud Peer Group not found" + self.mock_session.delete.return_value = self.mock_response self.assertRaises( dccommon_exceptions.SubcloudPeerGroupNotFound, - client.delete_subcloud_peer_group, + self.client.delete_subcloud_peer_group, SUBCLOUD_PEER_GROUP_NAME, ) - @mock.patch("requests.delete") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_delete_subcloud(self, mock_client_init, mock_delete): - mock_response = mock.MagicMock() - mock_response.status_code = 200 - mock_response.json.return_value = "" - mock_delete.return_value = mock_response + def test_delete_subcloud(self): + self.mock_response.status_code = 200 + self.mock_response.json.return_value = "" + self.mock_session.delete.return_value = self.mock_response - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT - - result = client.delete_subcloud(SUBCLOUD_NAME) - mock_delete.assert_called_once_with( + result = self.client.delete_subcloud(SUBCLOUD_NAME) + self.mock_session.delete.assert_called_once_with( FAKE_ENDPOINT + "/subclouds/" + SUBCLOUD_NAME, - headers={ - "X-Auth-Token": FAKE_TOKEN, - "User-Agent": dccommon_consts.DCMANAGER_V1_HTTP_AGENT, - }, timeout=FAKE_TIMEOUT, + user_agent=dccommon_consts.DCMANAGER_V1_HTTP_AGENT, + raise_exc=False, ) self.assertEqual(result, "") - @mock.patch("requests.delete") - @mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__") - def test_delete_subcloud_not_found(self, mock_client_init, mock_delete): - mock_response = mock.MagicMock() - mock_response.status_code = 404 - mock_response.text = "Subcloud not found" - mock_delete.return_value = mock_response - - mock_client_init.return_value = None - client = dcmanager_v1.DcmanagerClient( - dccommon_consts.SYSTEM_CONTROLLER_NAME, None - ) - client.endpoint = FAKE_ENDPOINT - client.token = FAKE_TOKEN - client.timeout = FAKE_TIMEOUT + def test_delete_subcloud_not_found(self): + self.mock_response.status_code = 404 + self.mock_response.text = "Subcloud not found" + self.mock_session.delete.return_value = self.mock_response self.assertRaises( - dccommon_exceptions.SubcloudNotFound, client.delete_subcloud, SUBCLOUD_NAME + dccommon_exceptions.SubcloudNotFound, + self.client.delete_subcloud, + SUBCLOUD_NAME, ) diff --git a/distributedcloud/dccommon/tests/unit/drivers/test_software_v1.py b/distributedcloud/dccommon/tests/unit/drivers/test_software_v1.py index b6526e100..f21049da8 100644 --- a/distributedcloud/dccommon/tests/unit/drivers/test_software_v1.py +++ b/distributedcloud/dccommon/tests/unit/drivers/test_software_v1.py @@ -79,13 +79,13 @@ URLS = ["/deploy", "/commit_patch"] def mocked_requests_success(*args, **kwargs): response_content = None - if args[0].endswith("/release"): + if kwargs["url"].endswith("/release"): response_content = json.dumps(LIST_RESPONSE) - elif args[0].endswith("/release/DC.1"): + elif kwargs["url"].endswith("/release/DC.1"): response_content = json.dumps(SHOW_RESPONSE) - elif args[0].endswith("/release/DC.1/DC.2"): + elif kwargs["url"].endswith("/release/DC.1/DC.2"): response_content = json.dumps(INFO_RESPONSE) - elif any([url in args[0] for url in URLS]): + elif any([url in kwargs["url"] for url in URLS]): response_content = json.dumps(INFO_RESPONSE) response = requests.Response() response.status_code = 200 @@ -108,66 +108,65 @@ class TestSoftwareClient(base.DCCommonTestCase): self.ctx = utils.dummy_context() self.session = mock.MagicMock() self.software_client = SoftwareClient( - session=mock.MagicMock(), - endpoint=FAKE_ENDPOINT, + session=mock.MagicMock(), endpoint=FAKE_ENDPOINT, token="TOKEN" ) - @mock.patch("requests.get") - def test_list_success(self, mock_get): - mock_get.side_effect = mocked_requests_success + @mock.patch("requests.request") + def test_list_success(self, mock_request): + mock_request.side_effect = mocked_requests_success response = self.software_client.list() self.assertEqual(response, CLIENT_LIST_RESPONSE) - @mock.patch("requests.get") - def test_list_failure(self, mock_get): - mock_get.side_effect = mocked_requests_failure + @mock.patch("requests.request") + def test_list_failure(self, mock_request): + mock_request.side_effect = mocked_requests_failure exc = self.assertRaises(exceptions.ApiException, self.software_client.list) - self.assertTrue("List failed with status code: 500" in str(exc)) + self.assertIn("List failed with status code: 500", str(exc)) - @mock.patch("requests.get") - def test_show_success(self, mock_get): - mock_get.side_effect = mocked_requests_success + @mock.patch("requests.request") + def test_show_success(self, mock_request): + mock_request.side_effect = mocked_requests_success release = "DC.1" response = self.software_client.show(release) self.assertEqual(response, SHOW_RESPONSE) - @mock.patch("requests.get") - def test_show_failure(self, mock_get): - mock_get.side_effect = mocked_requests_failure + @mock.patch("requests.request") + def test_show_failure(self, mock_request): + mock_request.side_effect = mocked_requests_failure release = "DC.1" exc = self.assertRaises( exceptions.ApiException, self.software_client.show, release ) - self.assertTrue("Show failed with status code: 500" in str(exc)) + self.assertIn("Show failed with status code: 500", str(exc)) - @mock.patch("requests.delete") - def test_delete_success(self, mock_delete): - mock_delete.side_effect = mocked_requests_success + @mock.patch("requests.request") + def test_delete_success(self, mock_request): + mock_request.side_effect = mocked_requests_success releases = ["DC.1", "DC.2"] response = self.software_client.delete(releases) self.assertEqual(response, INFO_RESPONSE) - @mock.patch("requests.delete") - def test_delete_failure(self, mock_delete): - mock_delete.side_effect = mocked_requests_failure + @mock.patch("requests.request") + def test_delete_failure(self, mock_request): + mock_request.side_effect = mocked_requests_failure releases = ["DC.1", "DC.2"] exc = self.assertRaises( exceptions.ApiException, self.software_client.delete, releases ) - self.assertTrue("Delete failed with status code: 500" in str(exc)) + self.assertIn("Delete failed with status code: 500", str(exc)) - @mock.patch("requests.post") - def test_commit_patch_success(self, mock_post): - mock_post.side_effect = mocked_requests_success + @mock.patch("requests.request") + def test_commit_patch_success(self, mock_request): + mock_request.side_effect = mocked_requests_success releases = ["DC.1", "DC.2"] response = self.software_client.commit_patch(releases) self.assertEqual(response, INFO_RESPONSE) - @mock.patch("requests.post") - def test_commit_patch_failure(self, mock_post): - mock_post.side_effect = mocked_requests_failure + @mock.patch("requests.request") + def test_commit_patch_failure(self, mock_request): + mock_request.side_effect = mocked_requests_failure releases = ["DC.1", "DC.2"] exc = self.assertRaises( exceptions.ApiException, self.software_client.commit_patch, releases ) - self.assertTrue("Commit patch failed with status code: 500" in str(exc)) + self.assertIn("Commit patch failed with status code: 500", str(exc)) diff --git a/distributedcloud/dccommon/tests/unit/test_endpoint_cache.py b/distributedcloud/dccommon/tests/unit/test_endpoint_cache.py index 03a32186c..dbbe95250 100644 --- a/distributedcloud/dccommon/tests/unit/test_endpoint_cache.py +++ b/distributedcloud/dccommon/tests/unit/test_endpoint_cache.py @@ -17,11 +17,14 @@ import collections import copy -import netaddr +import threading +import time +from keystoneauth1 import access from keystoneclient.v3 import services from keystoneclient.v3 import tokens import mock +import netaddr from oslo_config import cfg from dccommon import endpoint_cache @@ -77,10 +80,12 @@ FAKE_SERVICES_LIST = [ FakeService(7, "dcorch", "dcorch", True), ] +FAKE_AUTH_URL = "http://fake.auth/url" + class EndpointCacheTest(base.DCCommonTestCase): def setUp(self): - super(EndpointCacheTest, self).setUp() + super().setUp() auth_uri_opts = [ cfg.StrOpt("auth_uri", default="fake_auth_uri"), cfg.StrOpt("username", default="fake_user"), @@ -231,3 +236,123 @@ class EndpointCacheTest(base.DCCommonTestCase): expected_endpoints = {} actual_endpoints = utils.build_subcloud_endpoints(empty_ips) self.assertEqual(expected_endpoints, actual_endpoints) + + +class TestBoundedFIFOCache(base.DCCommonTestCase): + def setUp(self): + # pylint: disable=protected-access + super().setUp() + self.cache = endpoint_cache.BoundedFIFOCache() + self.cache._maxsize = 3 # Set a small max size for testing + + def test_insertion_and_order(self): + self.cache["a"] = 1 + self.cache["b"] = 2 + self.cache["c"] = 3 + self.assertEqual(list(self.cache.keys()), ["a", "b", "c"]) + + def test_max_size_limit(self): + self.cache["a"] = 1 + self.cache["b"] = 2 + self.cache["c"] = 3 + self.cache["d"] = 4 + self.assertEqual(len(self.cache), 3) + self.assertEqual(list(self.cache.keys()), ["b", "c", "d"]) + + def test_update_existing_key(self): + self.cache["a"] = 1 + self.cache["b"] = 2 + self.cache["a"] = 3 + self.assertEqual(list(self.cache.keys()), ["b", "a"]) + self.assertEqual(self.cache["a"], 3) + + +class TestCachedV3Password(base.DCCommonTestCase): + # pylint: disable=protected-access + def setUp(self): + super().setUp() + self.auth = endpoint_cache.CachedV3Password(auth_url=FAKE_AUTH_URL) + endpoint_cache.CachedV3Password._CACHE.clear() + + # Set a maxsize value so it doesn't try to read from the config file + endpoint_cache.CachedV3Password._CACHE._maxsize = 50 + + mock_get_auth_ref_object = mock.patch("endpoint_cache.v3.Password.get_auth_ref") + self.mock_parent_get_auth_ref = mock_get_auth_ref_object.start() + self.addCleanup(mock_get_auth_ref_object.stop) + + @mock.patch("endpoint_cache.utils.is_token_expiring_soon") + def test_get_auth_ref_cached(self, mock_is_expiring): + mock_is_expiring.return_value = False + mock_session = mock.MagicMock() + + # Simulate a cached token + cached_data = ({"token": "fake_token"}, "auth_token") + self.auth._CACHE[FAKE_AUTH_URL] = cached_data + + result = self.auth.get_auth_ref(mock_session) + + self.assertIsInstance(result, access.AccessInfoV3) + self.assertEqual(result._auth_token, "auth_token") + # Ensure we didn't call the parent method + self.mock_parent_get_auth_ref.assert_not_called() + + def test_get_auth_ref_new_token(self): + mock_session = mock.MagicMock() + mock_access_info = mock.MagicMock(spec=access.AccessInfoV3) + mock_access_info._data = {"token": "new_token"} + mock_access_info._auth_token = "new_auth_token" + self.mock_parent_get_auth_ref.return_value = mock_access_info + + result = self.auth.get_auth_ref(mock_session) + + self.assertEqual(result, mock_access_info) + self.mock_parent_get_auth_ref.assert_called_once_with(mock_session) + self.assertEqual( + self.auth._CACHE[FAKE_AUTH_URL], + (mock_access_info._data, mock_access_info._auth_token), + ) + + def test_get_auth_concurrent_access(self): + auth_obj_list = [ + endpoint_cache.CachedV3Password(auth_url=f"{FAKE_AUTH_URL}/{i}") + for i in range(1, 51) + ] + call_count = 0 + generated_tokens = [] + + def mock_get_auth_ref(_, **__): + nonlocal call_count, generated_tokens + time.sleep(0.1) # Simulate network delay + call_count += 1 + token = f"auth_token_{call_count}" + generated_tokens.append(token) + return access.AccessInfoV3({"token": token}, token) + + self.mock_parent_get_auth_ref.side_effect = mock_get_auth_ref + + threads = [ + threading.Thread(target=auth.get_auth_ref, args=(None,)) + for auth in auth_obj_list + ] + + for t in threads: + t.start() + for t in threads: + t.join() + + # All URLs should have generated their own tokens + self.assertEqual(len(endpoint_cache.CachedV3Password._CACHE), 50) + cached_tokens = [v[1] for v in endpoint_cache.CachedV3Password._CACHE.values()] + self.assertCountEqual(generated_tokens, cached_tokens) + self.assertEqual(self.mock_parent_get_auth_ref.call_count, 50) + + @mock.patch("endpoint_cache.v3.Password.invalidate") + def test_invalidate(self, mock_parent_invalidate): + # Set up a fake cached token + self.auth._CACHE[FAKE_AUTH_URL] = ({"token": "fake_token"}, "auth_token") + + self.auth.invalidate() + + self.assertNotIn(FAKE_AUTH_URL, self.auth._CACHE) + mock_parent_invalidate.assert_called_once() diff --git a/distributedcloud/dcdbsync/dbsyncclient/httpclient.py b/distributedcloud/dcdbsync/dbsyncclient/httpclient.py index 43bf7f2c9..2292eb774 100644 --- a/distributedcloud/dcdbsync/dbsyncclient/httpclient.py +++ b/distributedcloud/dcdbsync/dbsyncclient/httpclient.py @@ -24,6 +24,8 @@ import logging import os import requests +from keystoneauth1 import exceptions as ks_exceptions +from keystoneauth1 import session as ks_session from oslo_utils import importutils from dcdbsync.dbsyncclient import exceptions @@ -52,6 +54,7 @@ class HTTPClient(object): cacert=None, insecure=False, request_timeout=None, + session=None, ): self.base_url = base_url self.token = token @@ -59,6 +62,7 @@ class HTTPClient(object): self.user_id = user_id self.ssl_options = {} self.request_timeout = request_timeout + self.session: ks_session.Session = session if self.base_url.startswith("https"): if cacert and not os.path.exists(cacert): @@ -72,95 +76,73 @@ class HTTPClient(object): self.ssl_options["verify"] = not insecure self.ssl_options["cert"] = cacert + def request(self, url: str, method: str, data=None, **kwargs): + """Request directly if session is not passed, otherwise use the session""" + try: + if self.session: + return self.session.request( + url, + method=method, + data=data, + timeout=self.request_timeout, + raise_exc=False, + **kwargs, + ) + return requests.request( + method=method, + url=url, + data=data, + timeout=self.request_timeout, + **kwargs, + ) + except ( + requests.exceptions.Timeout, + ks_exceptions.ConnectTimeout, + ) as e: + msg = f"Request to {url} timed out" + raise exceptions.ConnectTimeout(msg) from e + except ( + requests.exceptions.ConnectionError, + ks_exceptions.ConnectionError, + ) as e: + msg = f"Unable to establish connection to {url}: {e}" + raise exceptions.ConnectFailure(msg) from e + except ( + requests.exceptions.RequestException, + ks_exceptions.ClientException, + ) as e: + msg = f"Unexpected exception for {url}: {e}" + raise exceptions.UnknownConnectionError(msg) from e + @log_request def get(self, url, headers=None): options = self._get_request_options("get", headers) - - try: - url = self.base_url + url - timeout = self.request_timeout - return requests.get(url, timeout=timeout, **options) - except requests.exceptions.Timeout: - msg = "Request to %s timed out" % url - raise exceptions.ConnectTimeout(msg) - except requests.exceptions.ConnectionError as e: - msg = "Unable to establish connection to %s: %s" % (url, e) - raise exceptions.ConnectFailure(msg) - except requests.exceptions.RequestException as e: - msg = "Unexpected exception for %s: %s" % (url, e) - raise exceptions.UnknownConnectionError(msg) + url = self.base_url + url + return self.request(url, "GET", **options) @log_request def post(self, url, body, headers=None): options = self._get_request_options("post", headers) - - try: - url = self.base_url + url - timeout = self.request_timeout - return requests.post(url, body, timeout=timeout, **options) - except requests.exceptions.Timeout: - msg = "Request to %s timed out" % url - raise exceptions.ConnectTimeout(msg) - except requests.exceptions.ConnectionError as e: - msg = "Unable to establish connection to %s: %s" % (url, e) - raise exceptions.ConnectFailure(msg) - except requests.exceptions.RequestException as e: - msg = "Unexpected exception for %s: %s" % (url, e) - raise exceptions.UnknownConnectionError(msg) + url = self.base_url + url + return self.request(url, "POST", data=body, **options) @log_request def put(self, url, body, headers=None): options = self._get_request_options("put", headers) - - try: - url = self.base_url + url - timeout = self.request_timeout - return requests.put(url, body, timeout=timeout, **options) - except requests.exceptions.Timeout: - msg = "Request to %s timed out" % url - raise exceptions.ConnectTimeout(msg) - except requests.exceptions.ConnectionError as e: - msg = "Unable to establish connection to %s: %s" % (url, e) - raise exceptions.ConnectFailure(msg) - except requests.exceptions.RequestException as e: - msg = "Unexpected exception for %s: %s" % (url, e) - raise exceptions.UnknownConnectionError(msg) + url = self.base_url + url + return self.request(url, "PUT", data=body, **options) @log_request def patch(self, url, body, headers=None): options = self._get_request_options("patch", headers) - - try: - url = self.base_url + url - timeout = self.request_timeout - return requests.patch(url, body, timeout=timeout, **options) - except requests.exceptions.Timeout: - msg = "Request to %s timed out" % url - raise exceptions.ConnectTimeout(msg) - except requests.exceptions.ConnectionError as e: - msg = "Unable to establish connection to %s: %s" % (url, e) - raise exceptions.ConnectFailure(msg) - except requests.exceptions.RequestException as e: - msg = "Unexpected exception for %s: %s" % (url, e) - raise exceptions.UnknownConnectionError(msg) + url = self.base_url + url + return self.request(url, "PATCH", data=body, **options) @log_request def delete(self, url, headers=None): options = self._get_request_options("delete", headers) - - try: - url = self.base_url + url - timeout = self.request_timeout - return requests.delete(url, timeout=timeout, **options) - except requests.exceptions.Timeout: - msg = "Request to %s timed out" % url - raise exceptions.ConnectTimeout(msg) - except requests.exceptions.ConnectionError as e: - msg = "Unable to establish connection to %s: %s" % (url, e) - raise exceptions.ConnectFailure(msg) - except requests.exceptions.RequestException as e: - msg = "Unexpected exception for %s: %s" % (url, e) - raise exceptions.UnknownConnectionError(msg) + url = self.base_url + url + return self.request(url, "DELETE", **options) def _get_request_options(self, method, headers): headers = self._update_headers(headers) @@ -178,9 +160,11 @@ class HTTPClient(object): if not headers: headers = {} - token = headers.get("x-auth-token", self.token) - if token: - headers["x-auth-token"] = token + # If the session exists, let it handle the token + if not self.session: + token = headers.get("x-auth-token", self.token) + if token: + headers["x-auth-token"] = token project_id = headers.get("X-Project-Id", self.project_id) if project_id: diff --git a/distributedcloud/dcdbsync/dbsyncclient/v1/client.py b/distributedcloud/dcdbsync/dbsyncclient/v1/client.py index d344c2478..bcb44c477 100644 --- a/distributedcloud/dcdbsync/dbsyncclient/v1/client.py +++ b/distributedcloud/dcdbsync/dbsyncclient/v1/client.py @@ -103,6 +103,7 @@ class Client(object): cacert=cacert, insecure=insecure, request_timeout=_DEFAULT_REQUEST_TIMEOUT, + session=session, ) # Create all managers diff --git a/distributedcloud/dcmanager/audit/patch_audit.py b/distributedcloud/dcmanager/audit/patch_audit.py index 493c15610..f0d6aaff8 100644 --- a/distributedcloud/dcmanager/audit/patch_audit.py +++ b/distributedcloud/dcmanager/audit/patch_audit.py @@ -42,7 +42,7 @@ class PatchAudit(object): def get_regionone_audit_data(self): return PatchAuditData() - def subcloud_patch_audit(self, keystone_session, subcloud): + def subcloud_patch_audit(self, keystone_client, subcloud): LOG.info("Triggered patch audit for: %s." % subcloud.name) # NOTE(nicodemos): Patch audit not supported for 24.09 subcloud @@ -51,7 +51,7 @@ class PatchAudit(object): # NOTE(nicodemos): If the subcloud is on the 22.12 release with USM enabled, # skip the patch audit. - if utils.has_usm_service(subcloud.region_name, keystone_session): + if utils.has_usm_service(subcloud.region_name, keystone_client): return consts.SYNC_STATUS_NOT_AVAILABLE # NOTE(nicodemos): As of version 24.09, the patching orchestration only diff --git a/distributedcloud/dcmanager/audit/service.py b/distributedcloud/dcmanager/audit/service.py index 848c75289..ea4f352b9 100644 --- a/distributedcloud/dcmanager/audit/service.py +++ b/distributedcloud/dcmanager/audit/service.py @@ -65,7 +65,7 @@ class DCManagerAuditService(service.Service): self.subcloud_audit_manager = None def start(self): - utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile) + utils.set_open_file_limit(cfg.CONF.audit_worker_rlimit_nofile) target = oslo_messaging.Target( version=self.rpc_api_version, server=self.host, topic=self.topic ) @@ -193,7 +193,7 @@ class DCManagerAuditWorkerService(service.Service): self.subcloud_audit_worker_manager = None def start(self): - utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile) + utils.set_open_file_limit(cfg.CONF.audit_worker_rlimit_nofile) self.init_tgm() self.init_audit_managers() target = oslo_messaging.Target( diff --git a/distributedcloud/dcmanager/audit/software_audit.py b/distributedcloud/dcmanager/audit/software_audit.py index 0080fb5d0..85bd14893 100644 --- a/distributedcloud/dcmanager/audit/software_audit.py +++ b/distributedcloud/dcmanager/audit/software_audit.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: Apache-2.0 # -from keystoneauth1.session import Session as keystone_session +from keystoneclient.v3.client import Client as KeystoneClient from oslo_log import log as logging from tsconfig.tsconfig import SW_VERSION @@ -142,7 +142,7 @@ class SoftwareAudit(object): def subcloud_software_audit( self, - keystone_session: keystone_session, + keystone_client: KeystoneClient, subcloud: models.Subcloud, audit_data: SoftwareAuditData, ): @@ -150,7 +150,7 @@ class SoftwareAudit(object): # TODO(nicodemos): Remove this method after all support to patching is removed # NOTE(nicodemos): Software audit not support on 22.12 subcloud without USM if subcloud.software_version != SW_VERSION and not utils.has_usm_service( - subcloud.region_name, keystone_session + subcloud.region_name, keystone_client ): LOG.info(f"Software audit not supported for {subcloud.name} without USM.") return dccommon_consts.SYNC_STATUS_NOT_AVAILABLE @@ -160,7 +160,7 @@ class SoftwareAudit(object): subcloud.management_start_ip, dccommon_consts.ENDPOINT_NAME_USM ) software_client = SoftwareClient( - keystone_session, endpoint=software_endpoint + keystone_client.session, endpoint=software_endpoint ) except Exception: LOG.exception( diff --git a/distributedcloud/dcmanager/audit/subcloud_audit_worker_manager.py b/distributedcloud/dcmanager/audit/subcloud_audit_worker_manager.py index 029bcb669..9abbf4f7e 100644 --- a/distributedcloud/dcmanager/audit/subcloud_audit_worker_manager.py +++ b/distributedcloud/dcmanager/audit/subcloud_audit_worker_manager.py @@ -561,7 +561,7 @@ class SubcloudAuditWorkerManager(manager.Manager): try: endpoint_data[dccommon_consts.ENDPOINT_TYPE_PATCHING] = ( self.patch_audit.subcloud_patch_audit( - keystone_client.session, + keystone_client.keystone_client, subcloud, ) ) @@ -685,7 +685,7 @@ class SubcloudAuditWorkerManager(manager.Manager): try: endpoint_data[dccommon_consts.ENDPOINT_TYPE_PATCHING] = ( self.patch_audit.subcloud_patch_audit( - keystone_client.session, + keystone_client.keystone_client, subcloud, ) ) @@ -772,7 +772,7 @@ class SubcloudAuditWorkerManager(manager.Manager): try: endpoint_data[dccommon_consts.AUDIT_TYPE_SOFTWARE] = ( self.software_audit.subcloud_software_audit( - keystone_client.session, + keystone_client.keystone_client, subcloud, software_audit_data, ) diff --git a/distributedcloud/dcmanager/common/config.py b/distributedcloud/dcmanager/common/config.py index ea917f1a9..cd74ba544 100644 --- a/distributedcloud/dcmanager/common/config.py +++ b/distributedcloud/dcmanager/common/config.py @@ -33,11 +33,6 @@ global_opts = [ default=60, help="Seconds between running periodic reporting tasks.", ), - cfg.IntOpt( - "worker_rlimit_nofile", - default=4096, - help="Maximum number of open files per worker process.", - ), ] # OpenStack credentials used for Endpoint Cache @@ -133,6 +128,11 @@ endpoint_cache_opts = [ "http_connect_timeout", help="Request timeout value for communicating with Identity API server.", ), + cfg.IntOpt( + "token_cache_size", + default=5000, + help="Maximum number of entries in the in-memory token cache", + ), ] scheduler_opts = [ @@ -180,6 +180,26 @@ common_opts = [ "1:enabled via rvmc_debug_level, 2:globally enabled" ), ), + cfg.IntOpt( + "dcmanager_worker_rlimit_nofile", + default=4096, + help="Maximum number of open files per dcmanager_manager worker process.", + ), + cfg.IntOpt( + "orchestrator_worker_rlimit_nofile", + default=8192, + help="Maximum number of open files per dcmanager_orchestrator worker process.", + ), + cfg.IntOpt( + "audit_worker_rlimit_nofile", + default=4096, + help="Maximum number of open files per dcmanager_audit worker process.", + ), + cfg.IntOpt( + "state_worker_rlimit_nofile", + default=4096, + help="Maximum number of open files per dcmanager_state worker process.", + ), ] scheduler_opt_group = cfg.OptGroup( diff --git a/distributedcloud/dcmanager/common/utils.py b/distributedcloud/dcmanager/common/utils.py index 3229c6964..c27ea1794 100644 --- a/distributedcloud/dcmanager/common/utils.py +++ b/distributedcloud/dcmanager/common/utils.py @@ -32,6 +32,7 @@ import uuid import xml.etree.ElementTree as ElementTree from keystoneauth1 import exceptions as keystone_exceptions +from keystoneclient.v3.client import Client as KeystoneClient import netaddr from oslo_concurrency import lockutils from oslo_config import cfg @@ -1200,7 +1201,7 @@ def get_system_controller_software_list( ) return software_client.list() - except requests.exceptions.ConnectionError: + except (requests.exceptions.ConnectionError, keystone_exceptions.ConnectionError): LOG.exception("Failed to get software list for %s", region_name) raise except Exception: @@ -2254,27 +2255,28 @@ def validate_software_strategy(release_id: str): pecan.abort(400, _(message)) -def has_usm_service(subcloud_region, keystone_session=None): +def has_usm_service( + subcloud_region: str, keystone_client: KeystoneClient = None +) -> bool: # Lookup keystone client session if not specified - if not keystone_session: + if not keystone_client: try: - os_client = OpenStackDriver( + keystone_client = OpenStackDriver( region_name=subcloud_region, region_clients=None, fetch_subcloud_ips=fetch_subcloud_mgmt_ips, - ) - keystone_session = os_client.keystone_client.session - except Exception: + ).keystone_client.keystone_client + except Exception as e: LOG.exception( f"Failed to get keystone client for subcloud_region: {subcloud_region}" ) - raise exceptions.InternalError() + raise exceptions.InternalError() from e try: - # Try to get the usm endpoint for the subcloud. - software_v1.SoftwareClient(keystone_session, region=subcloud_region) + # Try to get the USM service for the subcloud. + keystone_client.services.find(name=dccommon_consts.ENDPOINT_NAME_USM) return True - except keystone_exceptions.EndpointNotFound: + except keystone_exceptions.NotFound: LOG.warning("USM service not found for subcloud_region: %s", subcloud_region) return False diff --git a/distributedcloud/dcmanager/manager/service.py b/distributedcloud/dcmanager/manager/service.py index e620bb576..fbf1fb8f7 100644 --- a/distributedcloud/dcmanager/manager/service.py +++ b/distributedcloud/dcmanager/manager/service.py @@ -100,7 +100,7 @@ class DCManagerService(service.Service): self.system_peer_manager = SystemPeerManager(self.peer_monitor_manager) def start(self): - utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile) + utils.set_open_file_limit(cfg.CONF.dcmanager_worker_rlimit_nofile) self.dcmanager_id = uuidutils.generate_uuid() self.init_managers() target = oslo_messaging.Target( diff --git a/distributedcloud/dcmanager/orchestrator/orch_thread.py b/distributedcloud/dcmanager/orchestrator/orch_thread.py index 815d2299d..43670df93 100644 --- a/distributedcloud/dcmanager/orchestrator/orch_thread.py +++ b/distributedcloud/dcmanager/orchestrator/orch_thread.py @@ -552,7 +552,8 @@ class OrchThread(threading.Thread): # First check if the strategy has been created. try: - subcloud_strategy = OrchThread.get_vim_client(region).get_strategy( + vim_client = OrchThread.get_vim_client(region) + subcloud_strategy = vim_client.get_strategy( strategy_name=self.vim_strategy_name ) except (keystone_exceptions.EndpointNotFound, IndexError): @@ -586,9 +587,7 @@ class OrchThread(threading.Thread): # If we are here, we need to delete the strategy try: - OrchThread.get_vim_client(region).delete_strategy( - strategy_name=self.vim_strategy_name - ) + vim_client.delete_strategy(strategy_name=self.vim_strategy_name) except Exception: message = "(%s) Vim strategy:(%s) delete failed for region:(%s)" % ( self.update_type, diff --git a/distributedcloud/dcmanager/orchestrator/service.py b/distributedcloud/dcmanager/orchestrator/service.py index e9800b468..a65888329 100644 --- a/distributedcloud/dcmanager/orchestrator/service.py +++ b/distributedcloud/dcmanager/orchestrator/service.py @@ -64,7 +64,7 @@ class DCManagerOrchestratorService(service.Service): self.sw_update_manager = None def start(self): - utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile) + utils.set_open_file_limit(cfg.CONF.orchestrator_worker_rlimit_nofile) self.init_tgm() self.init_manager() target = oslo_messaging.Target( diff --git a/distributedcloud/dcmanager/orchestrator/states/base.py b/distributedcloud/dcmanager/orchestrator/states/base.py index 8c988ef44..adb898109 100644 --- a/distributedcloud/dcmanager/orchestrator/states/base.py +++ b/distributedcloud/dcmanager/orchestrator/states/base.py @@ -5,6 +5,7 @@ # import abc +from functools import lru_cache from typing import Optional from typing import Type @@ -26,6 +27,11 @@ from dcmanager.db import api as db_api LOG = logging.getLogger(__name__) +# The cache is scoped to the strategy state object, so we only cache clients +# for the subcloud region. This reduces redundant clients and minimizes +# the number of unnecessary TCP connections. +CLIENT_CACHE_SIZE = 1 + class BaseState(object, metaclass=abc.ABCMeta): @@ -164,60 +170,73 @@ class BaseState(object, metaclass=abc.ABCMeta): return strategy_step.subcloud.name @staticmethod - def get_keystone_client(region_name=dccommon_consts.DEFAULT_REGION_NAME): + @lru_cache(maxsize=CLIENT_CACHE_SIZE) + def get_keystone_client(region_name: str = dccommon_consts.DEFAULT_REGION_NAME): """Construct a (cached) keystone client (and token)""" - try: - os_client = OpenStackDriver( + return OpenStackDriver( region_name=region_name, region_clients=None, fetch_subcloud_ips=utils.fetch_subcloud_mgmt_ips, - ) - return os_client.keystone_client + ).keystone_client except Exception: LOG.warning( f"Failure initializing KeystoneClient for region: {region_name}" ) raise - def get_sysinv_client(self, region_name): - """construct a sysinv client""" + @lru_cache(maxsize=CLIENT_CACHE_SIZE) + def get_sysinv_client(self, region_name: str) -> SysinvClient: + """Get the Sysinv client for the given region.""" keystone_client = self.get_keystone_client(region_name) endpoint = keystone_client.endpoint_cache.get_endpoint("sysinv") return SysinvClient(region_name, keystone_client.session, endpoint=endpoint) - def get_fm_client(self, region_name): + @lru_cache(maxsize=CLIENT_CACHE_SIZE) + def get_fm_client(self, region_name: str) -> FmClient: + """Get the FM client for the given region.""" keystone_client = self.get_keystone_client(region_name) endpoint = keystone_client.endpoint_cache.get_endpoint("fm") return FmClient(region_name, keystone_client.session, endpoint=endpoint) - def get_patching_client(self, region_name=dccommon_consts.DEFAULT_REGION_NAME): + @lru_cache(maxsize=CLIENT_CACHE_SIZE) + def get_patching_client( + self, region_name: str = dccommon_consts.DEFAULT_REGION_NAME + ) -> PatchingClient: + """Get the Patching client for the given region.""" keystone_client = self.get_keystone_client(region_name) return PatchingClient(region_name, keystone_client.session) - def get_software_client(self, region_name=dccommon_consts.DEFAULT_REGION_NAME): + @lru_cache(maxsize=CLIENT_CACHE_SIZE) + def get_software_client( + self, region_name: str = dccommon_consts.DEFAULT_REGION_NAME + ) -> SoftwareClient: + """Get the Software client for the given region.""" keystone_client = self.get_keystone_client(region_name) return SoftwareClient(keystone_client.session, region_name) + @lru_cache(maxsize=CLIENT_CACHE_SIZE) + def get_barbican_client(self, region_name: str) -> BarbicanClient: + """Get the Barbican client for the given region.""" + keystone_client = self.get_keystone_client(region_name) + return BarbicanClient(region_name, keystone_client.session) + + @lru_cache(maxsize=CLIENT_CACHE_SIZE) + def get_vim_client(self, region_name: str) -> VimClient: + """Get the Vim client for the given region.""" + keystone_client = self.get_keystone_client(region_name) + return VimClient(region_name, keystone_client.session) + @property - def local_sysinv(self): + def local_sysinv(self) -> SysinvClient: + """Return the local Sysinv client.""" return self.get_sysinv_client(dccommon_consts.DEFAULT_REGION_NAME) @property - def subcloud_sysinv(self): + def subcloud_sysinv(self) -> SysinvClient: + """Return the subcloud Sysinv client.""" return self.get_sysinv_client(self.region_name) - def get_barbican_client(self, region_name): - """construct a barbican client""" - keystone_client = self.get_keystone_client(region_name) - - return BarbicanClient(region_name, keystone_client.session) - - def get_vim_client(self, region_name): - """construct a vim client for a region.""" - keystone_client = self.get_keystone_client(region_name) - return VimClient(region_name, keystone_client.session) - def add_shared_caches(self, shared_caches): # Shared caches not required by all states, so instantiate only if necessary self._shared_caches = shared_caches diff --git a/distributedcloud/dcmanager/state/service.py b/distributedcloud/dcmanager/state/service.py index 6e78cb419..a2772c7bc 100644 --- a/distributedcloud/dcmanager/state/service.py +++ b/distributedcloud/dcmanager/state/service.py @@ -78,7 +78,7 @@ class DCManagerStateService(service.Service): def start(self): LOG.info("Starting %s", self.__class__.__name__) - utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile) + utils.set_open_file_limit(cfg.CONF.state_worker_rlimit_nofile) self._init_managers() target = oslo_messaging.Target( version=self.rpc_api_version, server=self.host, topic=self.topic diff --git a/distributedcloud/dcmanager/tests/unit/audit/test_patch_audit_manager.py b/distributedcloud/dcmanager/tests/unit/audit/test_patch_audit_manager.py index 5b1e00afe..ac905180c 100644 --- a/distributedcloud/dcmanager/tests/unit/audit/test_patch_audit_manager.py +++ b/distributedcloud/dcmanager/tests/unit/audit/test_patch_audit_manager.py @@ -15,13 +15,13 @@ # from keystoneauth1 import exceptions as keystone_exceptions +import mock from dccommon import consts as dccommon_consts from dcmanager.audit import patch_audit from dcmanager.audit import subcloud_audit_manager from dcmanager.tests import base from dcmanager.tests.unit.common.fake_subcloud import create_fake_subcloud -from dcmanager.tests.unit import fakes class TestPatchAudit(base.DCManagerTestCase): @@ -39,14 +39,12 @@ class TestPatchAudit(base.DCManagerTestCase): self.am = subcloud_audit_manager.SubcloudAuditManager() self.am.patch_audit = self.pm - # Mock KeystoneClient.session - self.keystone_session = fakes.FakeKeystone().session + self.keystone_client = mock.MagicMock() def test_patch_audit_previous_release_usm_enabled(self): subcloud = create_fake_subcloud(self.ctx) - self.keystone_session.get_endpoint.return_value = "http://fake_endpoint" patch_response = self.pm.subcloud_patch_audit( - self.keystone_session, + self.keystone_client, subcloud, ) load_response = self.pm.subcloud_load_audit() @@ -54,18 +52,16 @@ class TestPatchAudit(base.DCManagerTestCase): expected_patch_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE expected_load_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE - self.assertTrue(self.keystone_session.get_endpoint.called) + self.keystone_client.services.find.assert_called_once() self.assertEqual(patch_response, expected_patch_response) self.assertEqual(load_response, expected_load_response) def test_patch_audit_previous_release(self): subcloud = create_fake_subcloud(self.ctx) - self.keystone_session.get_endpoint.side_effect = ( - keystone_exceptions.EndpointNotFound - ) + self.keystone_client.services.find.side_effect = keystone_exceptions.NotFound patch_response = self.pm.subcloud_patch_audit( - self.keystone_session, + self.keystone_client, subcloud, ) load_response = self.pm.subcloud_load_audit() @@ -73,14 +69,14 @@ class TestPatchAudit(base.DCManagerTestCase): expected_patch_response = dccommon_consts.SYNC_STATUS_OUT_OF_SYNC expected_load_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE - self.assertTrue(self.keystone_session.get_endpoint.called) + self.keystone_client.services.find.assert_called_once() self.assertEqual(patch_response, expected_patch_response) self.assertEqual(load_response, expected_load_response) def test_patch_audit_current_release(self): subcloud = create_fake_subcloud(self.ctx, software_version="TEST.SW.VERSION") patch_response = self.pm.subcloud_patch_audit( - self.keystone_session, + self.keystone_client, subcloud, ) load_response = self.pm.subcloud_load_audit() @@ -88,6 +84,6 @@ class TestPatchAudit(base.DCManagerTestCase): expected_patch_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE expected_load_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE - self.assertFalse(self.keystone_session.get_endpoint.called) + self.assertFalse(self.keystone_client.get_endpoint.called) self.assertEqual(patch_response, expected_patch_response) self.assertEqual(load_response, expected_load_response) diff --git a/distributedcloud/dcmanager/tests/unit/audit/test_software_audit.py b/distributedcloud/dcmanager/tests/unit/audit/test_software_audit.py index e13af3483..a03d999e0 100644 --- a/distributedcloud/dcmanager/tests/unit/audit/test_software_audit.py +++ b/distributedcloud/dcmanager/tests/unit/audit/test_software_audit.py @@ -5,13 +5,13 @@ # from keystoneauth1 import exceptions as keystone_exceptions +import mock from dccommon import consts as dccommon_consts from dcmanager.audit import software_audit from dcmanager.audit import subcloud_audit_manager from dcmanager.tests import base from dcmanager.tests.unit.common.fake_subcloud import create_fake_subcloud -from dcmanager.tests.unit import fakes FAKE_REGIONONE_RELEASES = [ { @@ -98,8 +98,7 @@ class TestSoftwareAudit(base.DCManagerTestCase): self.audit_manager = subcloud_audit_manager.SubcloudAuditManager() self.audit_manager.software_audit = self.software_audit - # Mock KeystoneClient.session - self.keystone_session = fakes.FakeKeystone().session + self.keystone_client = mock.MagicMock() # Mock RegionOne SoftwareClient's list method regionone_software_client = self.mock_software_client.return_value @@ -116,11 +115,9 @@ class TestSoftwareAudit(base.DCManagerTestCase): def test_software_audit_previous_release_not_usm(self): software_audit_data = self.get_software_audit_data() subcloud = create_fake_subcloud(self.ctx) - self.keystone_session.get_endpoint.side_effect = ( - keystone_exceptions.EndpointNotFound - ) + self.keystone_client.services.find.side_effect = keystone_exceptions.NotFound software_response = self.software_audit.subcloud_software_audit( - self.keystone_session, + self.keystone_client, subcloud, software_audit_data, ) @@ -130,14 +127,13 @@ class TestSoftwareAudit(base.DCManagerTestCase): def test_software_audit_previous_release_usm(self): software_audit_data = self.get_software_audit_data() subcloud = create_fake_subcloud(self.ctx) - self.keystone_session.get_endpoint.return_value = "http://fake_endpoint" sc_software_client = self.mock_software_client(subcloud.region_name) sc_software_client.list.return_value = ( FAKE_SUBCLOUD_RELEASES_MISSING_OUT_OF_SYNC ) software_response = self.software_audit.subcloud_software_audit( - self.keystone_session, + self.keystone_client, subcloud, software_audit_data, ) @@ -152,7 +148,7 @@ class TestSoftwareAudit(base.DCManagerTestCase): sc_software_client = self.mock_software_client(subcloud.region_name) sc_software_client.list.return_value = FAKE_SUBCLOUD_RELEASES_IN_SYNC software_response = self.software_audit.subcloud_software_audit( - self.keystone_session, + self.keystone_client, subcloud, software_audit_data, ) @@ -169,7 +165,7 @@ class TestSoftwareAudit(base.DCManagerTestCase): FAKE_SUBCLOUD_RELEASES_MISSING_OUT_OF_SYNC ) software_response = self.software_audit.subcloud_software_audit( - self.keystone_session, + self.keystone_client, subcloud, software_audit_data, ) @@ -184,7 +180,7 @@ class TestSoftwareAudit(base.DCManagerTestCase): sc_software_client = self.mock_software_client(subcloud.region_name) sc_software_client.list.return_value = FAKE_SUBCLOUD_RELEASES_EXTRA_OUT_OF_SYNC software_response = self.software_audit.subcloud_software_audit( - self.keystone_session, + self.keystone_client, subcloud, software_audit_data, ) diff --git a/distributedcloud/dcorch/common/config.py b/distributedcloud/dcorch/common/config.py index de617ea18..e3e2453da 100644 --- a/distributedcloud/dcorch/common/config.py +++ b/distributedcloud/dcorch/common/config.py @@ -225,6 +225,11 @@ endpoint_cache_opts = [ "http_connect_timeout", help="Request timeout value for communicating with Identity API server.", ), + cfg.IntOpt( + "token_cache_size", + default=5000, + help="Maximum number of entries in the in-memory token cache", + ), ] scheduler_opts = [ diff --git a/distributedcloud/dcorch/common/utils.py b/distributedcloud/dcorch/common/utils.py index bbb4949aa..d2bc2b4b8 100644 --- a/distributedcloud/dcorch/common/utils.py +++ b/distributedcloud/dcorch/common/utils.py @@ -17,6 +17,7 @@ import itertools import uuid +from keystoneauth1 import session as ks_session from oslo_db import exception as oslo_db_exception from oslo_log import log as logging @@ -236,3 +237,15 @@ def enqueue_work( # pylint: disable-next=no-member f"{rsrc.id}/{resource_type}/{source_resource_id}/{operation_type}" ) + + +def close_session(session: ks_session.Session, operation: str, region_ref: str) -> None: + if session: + try: + LOG.debug("Closing session after %s for %s", operation, region_ref) + session.session.close() + except Exception: + LOG.warning( + f"Failed to close session for {region_ref} after {operation}", + exc_info=True, + ) diff --git a/distributedcloud/dcorch/engine/generic_sync_worker_manager.py b/distributedcloud/dcorch/engine/generic_sync_worker_manager.py index db183c479..9a1320520 100644 --- a/distributedcloud/dcorch/engine/generic_sync_worker_manager.py +++ b/distributedcloud/dcorch/engine/generic_sync_worker_manager.py @@ -11,6 +11,7 @@ from dccommon import consts as dccommon_consts from dcorch.common import consts as dco_consts from dcorch.common import context from dcorch.common import exceptions +from dcorch.common import utils from dcorch.db import api as db_api from dcorch.engine import scheduler from dcorch.engine.sync_services.identity import IdentitySyncThread @@ -107,7 +108,8 @@ class GenericSyncWorkerManager(object): def _sync_subcloud( self, context, subcloud_name, endpoint_type, management_ip, software_version ): - LOG.info(f"Start to sync subcloud {subcloud_name}/{endpoint_type}.") + subcloud_ref_str = f"{subcloud_name}/{endpoint_type}" + LOG.info(f"Start to sync subcloud {subcloud_ref_str}.") sync_obj = sync_object_class_map[endpoint_type]( subcloud_name, endpoint_type, management_ip, software_version ) @@ -115,8 +117,10 @@ class GenericSyncWorkerManager(object): try: sync_obj.sync() except Exception: - LOG.exception(f"Sync failed for {subcloud_name}/{endpoint_type}") + LOG.exception(f"Sync failed for {subcloud_ref_str}") new_state = dco_consts.SYNC_STATUS_FAILED + finally: + utils.close_session(sync_obj.sc_admin_session, "sync", subcloud_ref_str) db_api.subcloud_sync_update( context, subcloud_name, endpoint_type, values={"sync_request": new_state} diff --git a/distributedcloud/dcorch/engine/initial_sync_worker_manager.py b/distributedcloud/dcorch/engine/initial_sync_worker_manager.py index f6bf3c1aa..5114dc68d 100644 --- a/distributedcloud/dcorch/engine/initial_sync_worker_manager.py +++ b/distributedcloud/dcorch/engine/initial_sync_worker_manager.py @@ -10,6 +10,7 @@ from oslo_log import log as logging from dcorch.common import consts from dcorch.common import context +from dcorch.common import utils from dcorch.db import api as db_api from dcorch.engine.fernet_key_manager import FernetKeyManager from dcorch.engine import scheduler @@ -208,4 +209,9 @@ class InitialSyncWorkerManager(object): def initial_sync(self, subcloud_name, sync_objs): LOG.debug(f"Initial sync subcloud {subcloud_name} {self.engine_id}") for sync_obj in sync_objs.values(): - sync_obj.initial_sync() + try: + sync_obj.initial_sync() + finally: + utils.close_session( + sync_obj.sc_admin_session, "initial sync", subcloud_name + ) diff --git a/distributedcloud/dcorch/engine/sync_services/sysinv.py b/distributedcloud/dcorch/engine/sync_services/sysinv.py index 4cd1cbfec..d6fddcb61 100644 --- a/distributedcloud/dcorch/engine/sync_services/sysinv.py +++ b/distributedcloud/dcorch/engine/sync_services/sysinv.py @@ -588,8 +588,7 @@ class SysinvSyncThread(SyncThread): return None def post_audit(self): - # TODO(lzhu1): This should be revisited once the master cache service - # is implemented. + super().post_audit() OpenStackDriver.delete_region_clients_for_thread(self.region_name, "audit") OpenStackDriver.delete_region_clients_for_thread( dccommon_consts.CLOUD_0, "audit" @@ -780,7 +779,7 @@ class SysinvSyncThread(SyncThread): if finding == AUDIT_RESOURCE_MISSING: # The missing resource should be created by underlying subcloud # thus action is to update for a 'missing' resource - # should not get here since audit discrepency will handle this + # should not get here since audit discrepancy will handle this resource_id = self.get_resource_id(resource_type, resource) self.schedule_work( self.endpoint_type, diff --git a/distributedcloud/dcorch/engine/sync_thread.py b/distributedcloud/dcorch/engine/sync_thread.py index a37dfe0d8..779ecb067 100644 --- a/distributedcloud/dcorch/engine/sync_thread.py +++ b/distributedcloud/dcorch/engine/sync_thread.py @@ -569,7 +569,7 @@ class SyncThread(object): # If the request was aborted due to an expired certificate, # update the status to 'out-of-sync' and just return so the # sync_request is updated to "completed". This way, the sync - # job won't attemp to retry the sync in the next cycle. + # job won't attempt to retry the sync in the next cycle. if request_aborted: self.set_sync_status(dccommon_consts.SYNC_STATUS_OUT_OF_SYNC) LOG.info( @@ -620,7 +620,10 @@ class SyncThread(object): LOG.debug( "Engine id={}: sync_audit started".format(engine_id), extra=self.log_extra ) - self.sync_audit(engine_id) + try: + self.sync_audit(engine_id) + finally: + self.post_audit() def sync_audit(self, engine_id): LOG.debug( @@ -776,11 +779,12 @@ class SyncThread(object): "{}: done sync audit".format(threading.currentThread().getName()), extra=self.log_extra, ) - self.post_audit() def post_audit(self): # Some specific SyncThread subclasses may perform post audit actions - pass + utils.close_session( + self.sc_admin_session, "audit", f"{self.subcloud_name}/{self.endpoint_type}" + ) @classmethod @lockutils.synchronized(AUDIT_LOCK_NAME) diff --git a/distributedcloud/dcorch/tests/base.py b/distributedcloud/dcorch/tests/base.py index 67d79141a..58fba0e0b 100644 --- a/distributedcloud/dcorch/tests/base.py +++ b/distributedcloud/dcorch/tests/base.py @@ -35,7 +35,7 @@ from dcorch.tests import utils get_engine = api.get_engine -CAPABILITES = { +CAPABILITIES = { "endpoint_types": [ dccommon_consts.ENDPOINT_TYPE_PLATFORM, dccommon_consts.ENDPOINT_TYPE_IDENTITY, diff --git a/distributedcloud/dcorch/tests/unit/engine/test_generic_sync_worker_manager.py b/distributedcloud/dcorch/tests/unit/engine/test_generic_sync_worker_manager.py index 03d37c21d..bf83ae28e 100644 --- a/distributedcloud/dcorch/tests/unit/engine/test_generic_sync_worker_manager.py +++ b/distributedcloud/dcorch/tests/unit/engine/test_generic_sync_worker_manager.py @@ -58,7 +58,7 @@ class TestGenericSyncWorkerManager(base.OrchestratorTestCase): def test_create_sync_objects(self): sync_objs = self.gswm.create_sync_objects( - "subcloud1", base.CAPABILITES, "192.168.1.11", "24.09" + "subcloud1", base.CAPABILITIES, "192.168.1.11", "24.09" ) # Verify both endpoint types have corresponding sync object diff --git a/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_manager.py b/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_manager.py index 6d478362f..facf1dab8 100644 --- a/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_manager.py +++ b/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_manager.py @@ -113,7 +113,7 @@ class TestInitialSyncManager(base.OrchestratorTestCase): management_ip="192.168.1." + str(i), ) chunks[chunk_num][subcloud.region_name] = ( - base.CAPABILITES, + base.CAPABILITIES, subcloud.management_ip, subcloud.software_version, False, diff --git a/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_worker_manager.py b/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_worker_manager.py index 76f7cf555..f172b14f3 100644 --- a/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_worker_manager.py +++ b/distributedcloud/dcorch/tests/unit/engine/test_initial_sync_worker_manager.py @@ -16,6 +16,10 @@ from dcorch.tests import utils class FakeSyncObject(object): + def __init__(self): + # sc_admin_session is used when attempting to close the session + self.sc_admin_session = mock.MagicMock() + def initial_sync(self): pass @@ -136,7 +140,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase): self.iswm._initial_sync_subcloud( self.ctx, subcloud.region_name, - base.CAPABILITES, + base.CAPABILITIES, subcloud.management_ip, subcloud.software_version, False, @@ -168,7 +172,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase): self.iswm._initial_sync_subcloud( self.ctx, subcloud.region_name, - base.CAPABILITES, + base.CAPABILITIES, subcloud.management_ip, subcloud.software_version, False, @@ -198,7 +202,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase): self.iswm._initial_sync_subcloud( self.ctx, subcloud.region_name, - base.CAPABILITES, + base.CAPABILITIES, subcloud.management_ip, subcloud.software_version, True, @@ -233,7 +237,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase): self.iswm._initial_sync_subcloud( self.ctx, subcloud.region_name, - base.CAPABILITES, + base.CAPABILITIES, subcloud.management_ip, subcloud.software_version, False, @@ -294,13 +298,13 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase): ) subcloud_capabilities = { subcloud1.region_name: ( - base.CAPABILITES, + base.CAPABILITIES, subcloud1.management_ip, subcloud1.software_version, False, ), subcloud2.region_name: ( - base.CAPABILITES, + base.CAPABILITIES, subcloud2.management_ip, subcloud2.software_version, False, @@ -314,7 +318,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase): self.iswm._initial_sync_subcloud, mock.ANY, subcloud1.region_name, - base.CAPABILITES, + base.CAPABILITIES, subcloud1.management_ip, subcloud1.software_version, False, @@ -323,7 +327,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase): self.iswm._initial_sync_subcloud, mock.ANY, subcloud2.region_name, - base.CAPABILITES, + base.CAPABILITIES, subcloud2.management_ip, subcloud2.software_version, False, diff --git a/distributedcloud/dcorch/tests/utils.py b/distributedcloud/dcorch/tests/utils.py index f2d2f0965..dbd426794 100644 --- a/distributedcloud/dcorch/tests/utils.py +++ b/distributedcloud/dcorch/tests/utils.py @@ -106,7 +106,7 @@ def create_subcloud_static(ctxt, name, **kwargs): "management_state": dccommon_consts.MANAGEMENT_MANAGED, "availability_status": dccommon_consts.AVAILABILITY_ONLINE, "initial_sync_state": "", - "capabilities": base.CAPABILITES, + "capabilities": base.CAPABILITIES, "management_ip": "192.168.0.1", } values.update(kwargs)