From ffc92c1d10f5354f6b6747c85885e7bd6b9bc4e0 Mon Sep 17 00:00:00 2001 From: Gustavo Herzmann Date: Mon, 6 May 2024 09:38:14 -0300 Subject: [PATCH] Implement optimized OpenStackDriver This commit implements an optimized OpenStackDriver that builds the endpoints for subclouds directly using their management IPs instead of retrieving them from the keystone database. Subcloud endpoints will be removed from Keystone due to performance reasons in a future commit. - The driver now accepts a fetch_subcloud_ips function as an argument. - This function retrieves a dictionary of subcloud region names to their management IPs (without a region argument) or a specific subcloud's management IP (with a region argument). - Dcmanager services and dcorch should implement their own fetch_subcloud_ips function to provide the driver with subcloud IP information. This approach improves performance and prepares for the removal of subcloud endpoints from Keystone. NOTE: The original OpenStackDriver, KeystoneClient and EndpointCache will be removed in a future commit, after the DC services are updated to use the new optimized OpenStackDriver. The optimized one will be integrated with the DC services in separate commits. Test Plan: Remove the subcloud endpoints from the keystone DB, modify the dcmanager-audit service to use the new classes and then run the following tests: 1. PASS - Verify that audit is able to get both the RegionOne and subclouds endpoints without issues using the new driver. 2. PASS - Verify that the hourly token refresh only triggers the refresh of central region token and endpoints. 3. PASS - Verify that when adding a new subcloud, the endpoint cache is updated to include the endpoints for the new subcloud. Story: 2011106 Task: 50035 Change-Id: I146592eb17f6a5433eae25f20e8de2f01c813055 Signed-off-by: Gustavo Herzmann --- .../dccommon/drivers/openstack/keystone_v3.py | 109 +++- .../drivers/openstack/sdk_platform.py | 326 +++++++++++- distributedcloud/dccommon/endpoint_cache.py | 466 +++++++++++++++++- distributedcloud/dcmanager/common/utils.py | 21 + 4 files changed, 918 insertions(+), 4 deletions(-) diff --git a/distributedcloud/dccommon/drivers/openstack/keystone_v3.py b/distributedcloud/dccommon/drivers/openstack/keystone_v3.py index 6d3ad8be5..a8b45e4d2 100644 --- a/distributedcloud/dccommon/drivers/openstack/keystone_v3.py +++ b/distributedcloud/dccommon/drivers/openstack/keystone_v3.py @@ -1,5 +1,5 @@ # Copyright 2012-2013 OpenStack Foundation -# Copyright (c) 2017-2021 Wind River Systems, Inc. +# Copyright (c) 2017-2021, 2024 Wind River Systems, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -20,6 +20,7 @@ from oslo_utils import importutils from dccommon import consts from dccommon.drivers import base from dccommon.endpoint_cache import EndpointCache +from dccommon.endpoint_cache import OptimizedEndpointCache from dccommon import exceptions # Ensure keystonemiddleware options are imported @@ -139,3 +140,109 @@ class KeystoneClient(base.DriverBase): self.keystone_client.regions.delete(region_name) except keystone_exceptions.NotFound: pass + + +class OptimizedKeystoneClient(base.DriverBase): + """Keystone V3 Driver. + + :param region_name: The name of the region. + :type region_name: str + :param auth_url: The authentication URL. + :type auth_url: str + :param fetch_subcloud_ips: A function to fetch subcloud IPs. + :type fetch_subcloud_ips: Callable + """ + + def __init__(self, region_name=None, auth_url=None, fetch_subcloud_ips=None): + self.endpoint_cache = OptimizedEndpointCache( + region_name, auth_url, fetch_subcloud_ips + ) + self.session = self.endpoint_cache.admin_session + self.keystone_client = self.endpoint_cache.keystone_client + if region_name in [consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD]: + self.services_list = OptimizedEndpointCache.master_services_list + else: + self.services_list = self.keystone_client.services.list() + + def get_enabled_projects(self, id_only=True): + project_list = self.keystone_client.projects.list() + if id_only: + return [ + current_project.id + for current_project in project_list + if current_project.enabled + ] + else: + return [ + current_project + for current_project in project_list + if current_project.enabled + ] + + def get_project_by_id(self, projectid): + if not projectid: + return None + return self.keystone_client.projects.get(projectid) + + def get_project_by_name(self, projectname): + if not projectname: + return None + project_list = self.get_enabled_projects(id_only=False) + for project in project_list: + if project.name == projectname: + return project + + def get_enabled_users(self, id_only=True): + user_list = self.keystone_client.users.list() + if id_only: + return [ + current_user.id for current_user in user_list if current_user.enabled + ] + else: + return [ + current_user for current_user in user_list if current_user.enabled + ] + + def get_user_by_id(self, userid): + if not userid: + return None + return self.keystone_client.users.get(userid) + + def get_user_by_name(self, username): + if not username: + return None + user_list = self.get_enabled_users(id_only=False) + for user in user_list: + if user.name == username: + return user + + def is_service_enabled(self, service): + for current_service in self.services_list: + if service in current_service.type: + return True + return False + + # Returns list of regions if endpoint filter is applied for the project + def get_filtered_region(self, project_id): + try: + region_list = [] + endpoint_manager = endpoint_filter.EndpointFilterManager( + self.keystone_client + ) + endpoint_lists = endpoint_manager.list_endpoints_for_project(project_id) + for endpoint in endpoint_lists: + region_list.append(endpoint.region) + return region_list + except keystone_exceptions.NotFound: + raise exceptions.ProjectNotFound(project_id=project_id) + + def delete_endpoints(self, region_name): + endpoints = self.keystone_client.endpoints.list(region=region_name) + for endpoint in endpoints: + self.keystone_client.endpoints.delete(endpoint) + + def delete_region(self, region_name): + try: + self.keystone_client.regions.delete(region_name) + except keystone_exceptions.NotFound: + pass diff --git a/distributedcloud/dccommon/drivers/openstack/sdk_platform.py b/distributedcloud/dccommon/drivers/openstack/sdk_platform.py index 28f1e37d5..4201475d2 100644 --- a/distributedcloud/dccommon/drivers/openstack/sdk_platform.py +++ b/distributedcloud/dccommon/drivers/openstack/sdk_platform.py @@ -16,6 +16,9 @@ OpenStack Driver """ import collections +from typing import Callable +from typing import List + from keystoneauth1 import exceptions as keystone_exceptions from oslo_concurrency import lockutils from oslo_log import log @@ -24,6 +27,7 @@ from dccommon import consts from dccommon.drivers.openstack.barbican import BarbicanClient from dccommon.drivers.openstack.fm import FmClient from dccommon.drivers.openstack.keystone_v3 import KeystoneClient +from dccommon.drivers.openstack.keystone_v3 import OptimizedKeystoneClient from dccommon.drivers.openstack.sysinv_v1 import SysinvClient from dccommon import exceptions from dccommon.utils import is_token_expiring_soon @@ -40,12 +44,12 @@ LOG = log.getLogger(__name__) LOCK_NAME = 'dc-openstackdriver-platform' -SUPPORTED_REGION_CLIENTS = [ +SUPPORTED_REGION_CLIENTS = ( SYSINV_CLIENT_NAME, FM_CLIENT_NAME, BARBICAN_CLIENT_NAME, DBSYNC_CLIENT_NAME, -] +) # Region client type and class mappings region_client_class_map = { @@ -246,3 +250,321 @@ class OpenStackDriver(object): return False else: return True + + +class OptimizedOpenStackDriver(object): + """An OpenStack driver for managing external services clients. + + :param region_name: The name of the region. Defaults to "RegionOne". + :type region_name: str + :param thread_name: The name of the thread. Defaults to "dcorch". + :type thread_name: str + :param auth_url: The authentication URL. + :type auth_url: str + :param region_clients: The list of region clients to initialize. + :type region_clients: list + :param endpoint_type: The type of endpoint. Defaults to "admin". + :type endpoint_type: str + :param fetch_subcloud_ips: A function to fetch subcloud management IPs. + :type fetch_subcloud_ips: Callable + """ + + os_clients_dict = collections.defaultdict(dict) + _identity_tokens = {} + + def __init__( + self, + region_name: str = consts.CLOUD_0, + thread_name: str = "dc", + auth_url: str = None, + region_clients: List[str] = SUPPORTED_REGION_CLIENTS, + endpoint_type: str = consts.KS_ENDPOINT_DEFAULT, + fetch_subcloud_ips: Callable = None, + ): + self.region_name = region_name + self.keystone_client = None + + # These clients are created dynamically by initialize_region_clients + self.sysinv_client = None + self.fm_client = None + self.barbican_client = None + self.dbsync_client = None + + self.get_cached_keystone_client(region_name, auth_url, fetch_subcloud_ips) + + if self.keystone_client is None: + self.initialize_keystone_client(auth_url, fetch_subcloud_ips) + + OptimizedOpenStackDriver.update_region_clients_cache( + region_name, KEYSTONE_CLIENT_NAME, self.keystone_client + ) + # Clear client object cache + if region_name != consts.CLOUD_0: + OptimizedOpenStackDriver.os_clients_dict[region_name] = ( + collections.defaultdict(dict) + ) + + if region_clients: + self.initialize_region_clients( + region_clients, thread_name, endpoint_type + ) + + def initialize_region_clients( + self, region_clients: List[str], thread_name: str, endpoint_type: str + ) -> None: + """Initialize region clients dynamically setting them as attributes + + :param region_clients: The list of region clients to initialize. + :type region_clients: list + :param thread_name: The name of the thread. + :type thread_name: str + :param endpoint_type: The type of endpoint. + :type endpoint_type: str + """ + self.get_cached_region_clients_for_thread( + self.region_name, thread_name, region_clients + ) + for client_name in region_clients: + client_obj_name = f"{client_name}_client" + + # If the clien object already exists, do nothing + if getattr(self, client_obj_name) is not None: + continue + + # Create new client object and cache it + try: + try: + client_class = region_client_class_map[client_name] + except KeyError as e: + msg = f"Requested region client is not supported: {client_name}" + LOG.error(msg) + raise exceptions.InvalidInputError from e + + args = { + "region": self.region_name, + "session": self.keystone_client.session, + "endpoint_type": endpoint_type, + } + + # Since SysinvClient (cgtsclient) does not support session, + # also pass the cached endpoint so it does not need to + # retrieve it from keystone. + if client_name == "sysinv": + args["endpoint"] = ( + self.keystone_client.endpoint_cache.get_endpoint("sysinv") + ) + + client_object = client_class(**args) + + # Store the new client + setattr(self, client_obj_name, client_object) + OptimizedOpenStackDriver.update_region_clients_cache( + self.region_name, client_name, client_object, thread_name + ) + except Exception as exception: + LOG.error( + f"Region {self.region_name} client {client_name} " + f"thread {thread_name} error: {str(exception)}" + ) + raise exception + + def initialize_keystone_client( + self, auth_url: str, fetch_subcloud_ips: Callable + ) -> None: + """Initialize a new Keystone client. + + :param auth_url: The authentication URL. + :type auth_url: str + :param fetch_subcloud_ips: A function to fetch subcloud management IPs. + :type fetch_subcloud_ips: Callable + """ + LOG.debug(f"get new keystone client for region {self.region_name}") + try: + self.keystone_client = OptimizedKeystoneClient( + self.region_name, auth_url, fetch_subcloud_ips + ) + except ( + keystone_exceptions.ConnectFailure, + keystone_exceptions.ConnectTimeout, + keystone_exceptions.NotFound, + keystone_exceptions.ServiceUnavailable, + keystone_exceptions.ConnectFailure, + ) as exception: + LOG.error( + f"keystone_client region {self.region_name} error: {str(exception)}" + ) + raise exception + except Exception as exception: + LOG.exception( + f"Unable to get a new keystone client for region: {self.region_name}" + ) + raise exception + + @lockutils.synchronized(LOCK_NAME) + def get_cached_keystone_client( + self, region_name: str, auth_url: str, fetch_subcloud_ips: Callable + ) -> None: + """Get the cached Keystone client if it exists + + :param region_name: The name of the region. + :type region_name: str + :param auth_url: The authentication URL. + :type auth_url: str + :param fetch_subcloud_ips: A function to fetch subcloud management IPs. + :type fetch_subcloud_ips: Callable + """ + os_clients_dict = OptimizedOpenStackDriver.os_clients_dict + keystone_client = os_clients_dict.get(region_name, {}).get( + KEYSTONE_CLIENT_NAME + ) + + # If there's a cached keystone client and the token is valid, use it + if keystone_client and self._is_token_valid(region_name): + self.keystone_client = keystone_client + # Else if master region, create a new keystone client + elif region_name in (consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD): + self.initialize_keystone_client(auth_url, fetch_subcloud_ips) + os_clients_dict[region_name][KEYSTONE_CLIENT_NAME] = self.keystone_client + + @lockutils.synchronized(LOCK_NAME) + def get_cached_region_clients_for_thread( + self, region_name: str, thread_name: str, clients: List[str] + ) -> None: + """Get and assign the cached region clients as object attributes. + + Also initializes the os_clients_dict region and + thread dictionaries if they don't already exist. + + :param region_name: The name of the region. + :type region_name: str + :param thread_name: The name of the thread. + :type thread_name: str + :param clients: The list of client names. + :type clients: list + """ + os_clients = OpenStackDriver.os_clients_dict + + for client in clients: + client_obj = ( + os_clients.setdefault(region_name, {}) + .setdefault(thread_name, {}) + .get(client) + ) + if client_obj is not None: + LOG.debug( + "Using cached OS {client} client objects " + f"{region_name} {thread_name}" + ) + setattr(self, f"{client}_client", client_obj) + + @classmethod + @lockutils.synchronized(LOCK_NAME) + def update_region_clients_cache( + cls, + region_name: str, + client_name: str, + client_object: object, + thread_name: str = None, + ) -> None: + """Update the region clients cache. + + :param region_name: The name of the region. + :type region_name: str + :param client_name: The name of the client. + :type client_name: str + :param client_object: The client object. + :param thread_name: The name of the thread. Defaults to None. + :type thread_name: str + """ + region_dict = cls.os_clients_dict[region_name] + if thread_name is None: + region_dict[client_name] = client_object + else: + region_dict[thread_name][client_name] = client_object + + @classmethod + @lockutils.synchronized(LOCK_NAME) + def delete_region_clients( + cls, region_name: str, clear_token: bool = False + ) -> None: + """Delete region clients from cache. + + :param region_name: The name of the region. + :type region_name: str + :param clear_token: Whether to clear the token cache. Defaults to False. + :type clear_token: bool + """ + LOG.warn(f"delete_region_clients={region_name}, clear_token={clear_token}") + try: + del cls.os_clients_dict[region_name] + except KeyError: + pass + + if clear_token: + cls._identity_tokens[region_name] = None + + @classmethod + @lockutils.synchronized(LOCK_NAME) + def delete_region_clients_for_thread( + cls, region_name: str, thread_name: str + ) -> None: + """Delete region clients for a specific thread from cache. + + :param region_name: The name of the region. + :type region_name: str + :param thread_name: The name of the thread. + :type thread_name: str + """ + LOG.debug(f"delete_region_clients={region_name}, thread_name={thread_name}") + try: + del cls.os_clients_dict[region_name][thread_name] + except KeyError: + pass + + @staticmethod + def _reset_cached_clients_and_token(region_name: str) -> None: + OptimizedOpenStackDriver.os_clients_dict[region_name] = ( + collections.defaultdict(dict) + ) + OptimizedOpenStackDriver._identity_tokens[region_name] = None + + def _is_token_valid(self, region_name: str) -> bool: + """Check if the cached token is valid. + + :param region_name: The name of the region. + :type region_name: str + """ + cached_os_clients = OptimizedOpenStackDriver.os_clients_dict + + # If the token is not cached, validate the session token and cache it + try: + keystone = cached_os_clients[region_name]["keystone"].keystone_client + cached_tokens = OptimizedOpenStackDriver._identity_tokens + if not cached_tokens.get(region_name): + cached_tokens[region_name] = keystone.tokens.validate( + keystone.session.get_token(), include_catalog=False + ) + + LOG.info( + f"Token for subcloud {region_name} expires_at=" + f"{cached_tokens[region_name]['expires_at']}" + ) + except Exception as exception: + LOG.info( + f"_is_token_valid handle: region: {region_name} " + f"error: {str(exception)}" + ) + self._reset_cached_clients_and_token(region_name) + return False + + # If token is expiring soon, reset cached data and return False. + if is_token_expiring_soon(token=cached_tokens[region_name]): + LOG.info( + f"The cached keystone token for subcloud {region_name} will " + f"expire soon {cached_tokens[region_name]['expires_at']}" + ) + # Reset the cached dictionary + self._reset_cached_clients_and_token(region_name) + return False + + return True diff --git a/distributedcloud/dccommon/endpoint_cache.py b/distributedcloud/dccommon/endpoint_cache.py index 4f2adbbfe..b8b7a7a2d 100644 --- a/distributedcloud/dccommon/endpoint_cache.py +++ b/distributedcloud/dccommon/endpoint_cache.py @@ -17,9 +17,14 @@ import collections import threading +from typing import Callable +from typing import List +from typing import Tuple +from typing import Union from keystoneauth1 import loading from keystoneauth1 import session +import netaddr from keystoneclient.v3 import client as ks_client @@ -30,11 +35,21 @@ from oslo_log import log as logging from dccommon import consts from dccommon.utils import is_token_expiring_soon + CONF = cfg.CONF LOG = logging.getLogger(__name__) -LOCK_NAME = 'dc-keystone-endpoint-cache' +LOCK_NAME = "dc-keystone-endpoint-cache" + +ENDPOINT_URLS = { + "fm": "https://{}:18003", + "keystone": "https://{}:5001/v3", + "patching": "https://{}:5492", + "sysinv": "https://{}:6386/v1", + "usm": "https://{}:5498", + "vim": "https://{}:4546", +} class EndpointCache(object): @@ -276,3 +291,452 @@ class EndpointCache(object): EndpointCache.master_keystone_client.services.list()) EndpointCache.master_service_endpoint_map = ( self._generate_master_service_endpoint_map(self)) + + +def build_subcloud_endpoint_map(ip): + """Builds a mapping of service endpoints for a given IP address. + + :param ip: The IP address for which service endpoints need to be mapped. + :type ip: str + :return: A dictionary containing service names as keys and formatted + endpoint URLs as values. + :rtype: dict + """ + endpoint_map = {} + for service, endpoint in ENDPOINT_URLS.items(): + if netaddr.IPAddress(ip).version == 6: + ip = f"[{ip}]" + endpoint_map[service] = endpoint.format(ip) + return endpoint_map + + +def build_subcloud_endpoints(subcloud_mgmt_ips: dict) -> dict: + """Builds a dictionary of service endpoints for multiple subcloud management IPs. + + :param subcloud_mgmt_ips: A dictionary containing subcloud regions as keys + and the corresponding management IP as value. + :type subcloud_mgmt_ips: dict + :return: A dictionary with subcloud regions as keys and their respective + service endpoints as values. + :rtype: dict + """ + subcloud_endpoints = {} + for region, ip in subcloud_mgmt_ips.items(): + subcloud_endpoints[region] = build_subcloud_endpoint_map(ip) + return subcloud_endpoints + + +class OptimizedEndpointCache(object): + """Cache for storing endpoint information. + + :param region_name: The name of the region. + :type region_name: str + :param auth_url: The authentication URL. + :type auth_url: str + :param fetch_subcloud_ips: A function to fetch subcloud IPs. It should + accept the region_name as an optional argument. If it's called without + the region_name, it should return a dictionary where the key is the + region_name and the value is the subclouds management IP. If it's called + with the region_name, it should return the management IP of the + specified region. + :type fetch_subcloud_ips: Callable[[str], Union[str, dict]] + """ + + plugin_loader = None + plugin_lock = threading.Lock() + master_keystone_client = None + master_token = {} + master_services_list = None + master_service_endpoint_map = collections.defaultdict(dict) + subcloud_endpoints: dict = None + fetch_subcloud_ips: Callable[[str], Union[str, dict]] = None + + def __init__( + self, + region_name: str = None, + auth_url: str = None, + fetch_subcloud_ips: Callable[[str], Union[str, dict]] = None, + ): + # Region specific service endpoint map + self.service_endpoint_map = collections.defaultdict(dict) + self.admin_session = None + self.keystone_client = None + + # Cache the fetch_subcloud_ips function + if fetch_subcloud_ips: + OptimizedEndpointCache.fetch_subcloud_ips = fetch_subcloud_ips + + self._initialize_subcloud_endpoints() + + # if auth_url is provided use that otherwise use the one + # defined in the config + if auth_url: + self.external_auth_url = auth_url + else: + self.external_auth_url = CONF.endpoint_cache.auth_uri + + self._initialize_keystone_client(region_name, auth_url) + + @lockutils.synchronized("subcloud_endpoints") + def _initialize_subcloud_endpoints(self): + # Initialize and cache the subcloud endpoints + if ( + OptimizedEndpointCache.subcloud_endpoints is None + and OptimizedEndpointCache.fetch_subcloud_ips + ): + LOG.info("Initializing and caching subcloud endpoints") + # pylint: disable=not-callable + OptimizedEndpointCache.subcloud_endpoints = build_subcloud_endpoints( + OptimizedEndpointCache.fetch_subcloud_ips() + ) + + def _initialize_keystone_client( + self, region_name: str = None, auth_url: str = None + ) -> None: + """Initialize the Keystone client. + + :param region_name: The name of the region. + :type region_name: str + :param auth_url: The authentication URL. + :type auth_url: str + """ + self.admin_session = OptimizedEndpointCache.get_admin_session( + self.external_auth_url, + CONF.endpoint_cache.username, + CONF.endpoint_cache.user_domain_name, + CONF.endpoint_cache.password, + CONF.endpoint_cache.project_name, + CONF.endpoint_cache.project_domain_name, + ) + + self.keystone_client, self.service_endpoint_map = ( + self.get_cached_master_keystone_client_and_region_endpoint_map( + region_name + ) + ) + + # If endpoint cache is intended for a subcloud then we need to + # retrieve the subcloud token and session. Skip this if auth_url + # was provided as its assumed that the auth_url would correspond + # to a subcloud so session was set up above + if ( + not auth_url + and region_name + and region_name not in [consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD] + ): + try: + sc_auth_url = self.service_endpoint_map["keystone"] + except KeyError: + # Should not be here... + LOG.exception( + f"Endpoint not found for {region_name=}." + "Refreshing cached data..." + ) + self.re_initialize_master_keystone_client() + raise + + # We assume that the dcmanager user names and passwords are the + # same on this subcloud since this is an audited resource + self.admin_session = OptimizedEndpointCache.get_admin_session( + sc_auth_url, + CONF.endpoint_cache.username, + CONF.endpoint_cache.user_domain_name, + CONF.endpoint_cache.password, + CONF.endpoint_cache.project_name, + CONF.endpoint_cache.project_domain_name, + ) + + try: + self.keystone_client = ks_client.Client( + session=self.admin_session, region_name=region_name + ) + except Exception: + LOG.error(f"Retrying keystone client creation for {region_name}") + self.keystone_client = ks_client.Client( + session=self.admin_session, region_name=region_name + ) + self.external_auth_url = sc_auth_url + + @classmethod + def get_admin_session( + cls, + auth_url: str, + user_name: str, + user_domain_name: str, + user_password: str, + user_project: str, + user_project_domain: str, + timeout: float = None, + ) -> None: + """Get the admin session. + + :param auth_url: The authentication URL. + :type auth_url: str + :param user_name: The user name. + :type user_name: str + :param user_domain_name: The user domain name. + :type user_domain_name: str + :param user_password: The user password. + :type user_password: str + :param user_project: The user project. + :type user_project: str + :param user_project_domain: The user project domain. + :type user_project_domain: str + :param timeout: The timeout. + :type timeout: int + :return: The admin session. + :rtype: session.Session + """ + with OptimizedEndpointCache.plugin_lock: + if OptimizedEndpointCache.plugin_loader is None: + OptimizedEndpointCache.plugin_loader = loading.get_plugin_loader( + CONF.endpoint_cache.auth_plugin + ) + + user_auth = OptimizedEndpointCache.plugin_loader.load_from_options( + auth_url=auth_url, + username=user_name, + user_domain_name=user_domain_name, + password=user_password, + project_name=user_project, + project_domain_name=user_project_domain, + ) + timeout = ( + CONF.endpoint_cache.http_connect_timeout if timeout is None else timeout + ) + return session.Session( + auth=user_auth, additional_headers=consts.USER_HEADER, timeout=timeout + ) + + @staticmethod + def _is_central_cloud(region_name: str) -> None: + """Check if the region is a central cloud. + + :param region_id: The region ID. + :type region_id: str + :return: True if the region is a central cloud, False otherwise. + :rtype: bool + """ + central_cloud_regions = [consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD] + return region_name in central_cloud_regions + + @staticmethod + def _get_master_endpoint_map() -> dict: + service_id_name_map = {} + + # pylint: disable-next=not-an-iterable + for service in OptimizedEndpointCache.master_services_list: + service_id_name_map[service.id] = service.name + + service_endpoint_map = collections.defaultdict(dict) + for ( + endpoint + ) in OptimizedEndpointCache.master_keystone_client.endpoints.list(): + # Within central cloud, use only internal endpoints + if OptimizedEndpointCache._is_central_cloud(endpoint.region): + if endpoint.interface != consts.KS_ENDPOINT_INTERNAL: + continue + + # For other regions store only admin endpoints + elif endpoint.interface != consts.KS_ENDPOINT_ADMIN: + continue + + # Add the endpoint url to the service endpoint map + service_name = service_id_name_map[endpoint.service_id] + service_endpoint_map[endpoint.region][service_name] = endpoint.url + + return service_endpoint_map + + @staticmethod + def _generate_master_service_endpoint_map() -> dict: + LOG.info("Generating service endpoint map") + # Get the master endpoint map using keystone + service_endpoint_map = OptimizedEndpointCache._get_master_endpoint_map() + + # Insert the subcloud endpoints into the service_endpoint_map + if OptimizedEndpointCache.subcloud_endpoints: + LOG.debug("Inserting subcloud endpoints into service_endpoint_map") + service_endpoint_map.update(OptimizedEndpointCache.subcloud_endpoints) + + return service_endpoint_map + + def get_endpoint(self, service: str) -> Union[str, None]: + """Get the endpoint for the specified service. + + :param service: The service name. + :type service: str + return: service url or None + """ + try: + endpoint = self.service_endpoint_map[service] + except KeyError: + LOG.error(f"Unknown service: {service}") + endpoint = None + + return endpoint + + @lockutils.synchronized(LOCK_NAME) + def get_all_regions(self) -> List[str]: + """Get region list. + + return: List of regions + """ + return list(OptimizedEndpointCache.master_service_endpoint_map.keys()) + + def get_session_from_token(self, token: str, project_id: str) -> session.Session: + """Get session based on token to communicate with openstack services. + + :param token: token with which the request is triggered. + :type token: str + :param project_id: UUID of the project. + :type project_id: str + + :return: session object. + """ + loader = loading.get_plugin_loader("token") + auth = loader.load_from_options( + auth_url=self.external_auth_url, token=token, project_id=project_id + ) + return session.Session(auth=auth) + + @lockutils.synchronized(LOCK_NAME) + def update_master_service_endpoint_region( + self, region_name: str, endpoint_values: dict + ) -> None: + """Update the master endpoint map for a specific region. + + :param region_name: The name of the region. + :type region_name: str + :param endpoint_values: The endpoint values. + :type endpoint_values: dict + """ + LOG.info( + "Updating service endpoint map for region: " + f"{region_name} with endpoints: {endpoint_values}" + ) + # Update the current endpoint map + OptimizedEndpointCache.master_service_endpoint_map[region_name] = ( + endpoint_values + ) + + # Update the cached subcloud endpoit map + if OptimizedEndpointCache.subcloud_endpoints and not self._is_central_cloud( + region_name + ): + LOG.debug( + "Updating subcloud_endpoints for region: " + f"{region_name} with endpoints: {endpoint_values}" + ) + # pylint: disable-next=unsupported-assignment-operation + OptimizedEndpointCache.subcloud_endpoints[region_name] = endpoint_values + + def refresh_subcloud_endpoints(self, region_name: str) -> None: + """Refresh the subcloud endpoints. + + :param region_name: The name of the region. + :type region_name: str + """ + LOG.info(f"Refreshing subcloud endpoinds of region_name: {region_name}") + if not OptimizedEndpointCache.fetch_subcloud_ips: + raise Exception( + f"Unable to fetch endpoints for region {region_name}: " + "missing fetch_subcloud_ips" + ) + # pylint: disable-next=not-callable + subcloud_ip = OptimizedEndpointCache.fetch_subcloud_ips(region_name) + endpoint_map = build_subcloud_endpoint_map(subcloud_ip) + # pylint: disable-next=unsupported-assignment-operation + OptimizedEndpointCache.subcloud_endpoints[region_name] = endpoint_map + + @lockutils.synchronized(LOCK_NAME) + def get_cached_master_keystone_client_and_region_endpoint_map( + self, region_name: str + ) -> Tuple[ks_client.Client, dict]: + """Get the cached master Keystone client and region endpoint map. + + :param region_name: The name of the region. + :type region_name: str + :return: The master Keystone client and region endpoint map. + :rtype: tuple + """ + if OptimizedEndpointCache.master_keystone_client is None: + self._create_master_cached_data() + LOG.info( + "Generated Master keystone client and master token the " + "very first time" + ) + else: + token_expiring_soon = is_token_expiring_soon( + token=OptimizedEndpointCache.master_token + ) + + # If token is expiring soon, initialize a new master keystone + # client + if token_expiring_soon: + LOG.info( + f"The cached keystone token for '{consts.CLOUD_0}' will expire " + f"soon: {OptimizedEndpointCache.master_token['expires_at']}" + ) + self._create_master_cached_data() + LOG.info( + "Generated Master keystone client and master token as they " + "are expiring soon" + ) + else: + # Check if the cached master service endpoint map needs to be + # refreshed + if region_name not in self.master_service_endpoint_map: + previous_size = len( + OptimizedEndpointCache.master_service_endpoint_map + ) + + if not self._is_central_cloud(region_name): + self.refresh_subcloud_endpoints(region_name) + + OptimizedEndpointCache.master_service_endpoint_map = ( + self._generate_master_service_endpoint_map() + ) + current_size = len( + OptimizedEndpointCache.master_service_endpoint_map + ) + LOG.info( + "Master endpoints list refreshed to include " + f"region {region_name}: " + f"prev_size={previous_size}, current_size={current_size}" + ) + + if region_name is not None: + region_service_endpoint_map = ( + OptimizedEndpointCache.master_service_endpoint_map[region_name] + ) + else: + region_service_endpoint_map = collections.defaultdict(dict) + + return ( + OptimizedEndpointCache.master_keystone_client, + region_service_endpoint_map, + ) + + @lockutils.synchronized(LOCK_NAME) + def re_initialize_master_keystone_client(self) -> None: + """Reinitialize the master Keystone client.""" + self._create_master_cached_data() + LOG.info("Generated Master keystone client and master token upon exception") + + def _create_master_cached_data(self) -> None: + OptimizedEndpointCache.master_keystone_client = ks_client.Client( + session=self.admin_session, region_name=consts.CLOUD_0 + ) + OptimizedEndpointCache.master_token = ( + OptimizedEndpointCache.master_keystone_client.tokens.validate( + OptimizedEndpointCache.master_keystone_client.session.get_token(), + include_catalog=False, + ) + ) + if OptimizedEndpointCache.master_services_list is None: + OptimizedEndpointCache.master_services_list = ( + OptimizedEndpointCache.master_keystone_client.services.list() + ) + OptimizedEndpointCache.master_service_endpoint_map = ( + self._generate_master_service_endpoint_map() + ) diff --git a/distributedcloud/dcmanager/common/utils.py b/distributedcloud/dcmanager/common/utils.py index 1c681ceea..ac7cc0685 100644 --- a/distributedcloud/dcmanager/common/utils.py +++ b/distributedcloud/dcmanager/common/utils.py @@ -46,6 +46,7 @@ from dccommon.drivers.openstack import vim from dccommon import exceptions as dccommon_exceptions from dccommon import kubeoperator from dcmanager.common import consts +from dcmanager.common import context from dcmanager.common import exceptions from dcmanager.common.i18n import _ from dcmanager.db import api as db_api @@ -1613,3 +1614,23 @@ def generate_sync_info_message(association_ids): info_message += (f"$ dcmanager peer-group-association" f" sync {association_id}\n") return info_message + + +def fetch_subcloud_mgmt_ips(region_name: str = None): + """Fetch the subcloud(s) management IP(s). + + :param region_name: The subcloud region name, defaults to None + :return: A dictionary of region names to IPs (if no region provided) + or a single IP string (for specific region). + """ + LOG.info(f"Fetching subcloud(s) management IP(s) ({region_name=})") + ctx = context.get_admin_context() + if region_name: + subcloud = db_api.subcloud_get_by_region_name(ctx, region_name) + return subcloud.management_start_ip + + ip_map = {} + subclouds = db_api.subcloud_get_all(ctx) + for subcloud in subclouds: + ip_map[subcloud.region_name] = subcloud.management_start_ip + return ip_map