Merge "Improve subcloud endpoint update"
This commit is contained in:
commit
c772c904a5
@ -606,9 +606,10 @@ class OptimizedEndpointCache(object):
|
||||
)
|
||||
return session.Session(auth=auth)
|
||||
|
||||
@classmethod
|
||||
@lockutils.synchronized(LOCK_NAME)
|
||||
def update_master_service_endpoint_region(
|
||||
self, region_name: str, endpoint_values: dict
|
||||
cls, region_name: str, endpoint_values: dict
|
||||
) -> None:
|
||||
"""Update the master endpoint map for a specific region.
|
||||
|
||||
@ -622,12 +623,13 @@ class OptimizedEndpointCache(object):
|
||||
f"{region_name} with endpoints: {endpoint_values}"
|
||||
)
|
||||
# Update the current endpoint map
|
||||
if OptimizedEndpointCache.master_service_endpoint_map:
|
||||
OptimizedEndpointCache.master_service_endpoint_map[region_name] = (
|
||||
endpoint_values
|
||||
)
|
||||
|
||||
# Update the cached subcloud endpoit map
|
||||
if OptimizedEndpointCache.subcloud_endpoints and not self._is_central_cloud(
|
||||
if OptimizedEndpointCache.subcloud_endpoints and not cls._is_central_cloud(
|
||||
region_name
|
||||
):
|
||||
LOG.debug(
|
||||
|
@ -26,7 +26,7 @@ from dccommon.drivers.openstack.sdk_platform import (
|
||||
OptimizedOpenStackDriver as OpenStackDriver
|
||||
)
|
||||
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
|
||||
from dccommon.endpoint_cache import build_subcloud_endpoint
|
||||
from dccommon import endpoint_cache
|
||||
from dcmanager.audit import alarm_aggregation
|
||||
from dcmanager.audit import firmware_audit
|
||||
from dcmanager.audit import kube_rootca_update_audit
|
||||
@ -150,20 +150,11 @@ class SubcloudAuditWorkerManager(manager.Manager):
|
||||
do_software_audit)
|
||||
|
||||
def update_subcloud_endpoints(self, context, subcloud_name, endpoints):
|
||||
try:
|
||||
LOG.info("Updating service endpoints for subcloud %s "
|
||||
"in endpoint cache" % subcloud_name)
|
||||
endpoint_cache = OpenStackDriver(
|
||||
region_name=dccommon_consts.CLOUD_0,
|
||||
fetch_subcloud_ips=utils.fetch_subcloud_mgmt_ips,
|
||||
).keystone_client.endpoint_cache
|
||||
endpoint_cache.update_master_service_endpoint_region(
|
||||
subcloud_name, endpoints)
|
||||
except (keystone_exceptions.EndpointNotFound,
|
||||
keystone_exceptions.ConnectFailure,
|
||||
IndexError):
|
||||
LOG.error("Failed to update the service endpoints "
|
||||
"for subcloud %s." % subcloud_name)
|
||||
LOG.info(f"Updating service endpoints for subcloud {subcloud_name} "
|
||||
"in endpoint cache")
|
||||
endpoint_cache.OptimizedEndpointCache.update_master_service_endpoint_region(
|
||||
subcloud_name, endpoints
|
||||
)
|
||||
|
||||
def _update_subcloud_audit_fail_count(self, subcloud,
|
||||
audit_fail_count):
|
||||
@ -371,9 +362,11 @@ class SubcloudAuditWorkerManager(manager.Manager):
|
||||
).keystone_client
|
||||
admin_session = keystone_client.session
|
||||
sysinv_client = SysinvClient(
|
||||
subcloud_region, admin_session, endpoint=build_subcloud_endpoint(
|
||||
subcloud_region,
|
||||
admin_session,
|
||||
endpoint=endpoint_cache.build_subcloud_endpoint(
|
||||
subcloud_management_ip, "sysinv"
|
||||
)
|
||||
),
|
||||
)
|
||||
fm_client = FmClient(subcloud_region, admin_session)
|
||||
except keystone_exceptions.ConnectTimeout:
|
||||
|
@ -42,10 +42,8 @@ import tsconfig.tsconfig as tsc
|
||||
import yaml
|
||||
|
||||
from dccommon import consts as dccommon_consts
|
||||
# TODO(gherzman): Replace OpenStackDriver with OptimizedOpenStackDriver
|
||||
# during dcmanager OptimizedOpenStackDriver integration
|
||||
from dccommon.drivers.openstack.sdk_platform import OpenStackDriver
|
||||
from dccommon.drivers.openstack.sdk_platform import OptimizedOpenStackDriver
|
||||
from dccommon.drivers.openstack.sdk_platform import OptimizedOpenStackDriver as \
|
||||
OpenStackDriver
|
||||
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
|
||||
from dccommon.drivers.openstack import vim
|
||||
from dccommon import exceptions as dccommon_exceptions
|
||||
@ -1048,8 +1046,11 @@ def is_subcloud_healthy(subcloud_region):
|
||||
|
||||
system_health = ""
|
||||
try:
|
||||
os_client = OpenStackDriver(region_name=subcloud_region,
|
||||
region_clients=None)
|
||||
os_client = OpenStackDriver(
|
||||
region_name=subcloud_region,
|
||||
region_clients=None,
|
||||
fetch_subcloud_ips=fetch_subcloud_mgmt_ips,
|
||||
)
|
||||
keystone_client = os_client.keystone_client
|
||||
endpoint = keystone_client.endpoint_cache.get_endpoint('sysinv')
|
||||
sysinv_client = SysinvClient(subcloud_region,
|
||||
@ -1078,7 +1079,7 @@ def is_subcloud_healthy(subcloud_region):
|
||||
|
||||
def get_systemcontroller_installed_loads():
|
||||
try:
|
||||
os_client = OptimizedOpenStackDriver(
|
||||
os_client = OpenStackDriver(
|
||||
region_name=dccommon_consts.SYSTEM_CONTROLLER_NAME,
|
||||
region_clients=None,
|
||||
fetch_subcloud_ips=fetch_subcloud_mgmt_ips,
|
||||
@ -1359,7 +1360,7 @@ def decode_and_normalize_passwd(input_passwd):
|
||||
|
||||
def get_failure_msg(subcloud_region):
|
||||
try:
|
||||
os_client = OptimizedOpenStackDriver(
|
||||
os_client = OpenStackDriver(
|
||||
region_name=subcloud_region,
|
||||
region_clients=None,
|
||||
fetch_subcloud_ips=fetch_subcloud_mgmt_ips,
|
||||
@ -1618,7 +1619,9 @@ def validate_name(name, prohibited_name_list=[],
|
||||
def get_local_system():
|
||||
m_ks_client = OpenStackDriver(
|
||||
region_name=dccommon_consts.DEFAULT_REGION_NAME,
|
||||
region_clients=None).keystone_client
|
||||
region_clients=None,
|
||||
fetch_subcloud_ips=fetch_subcloud_mgmt_ips,
|
||||
).keystone_client
|
||||
endpoint = m_ks_client.endpoint_cache.get_endpoint('sysinv')
|
||||
sysinv_client = SysinvClient(dccommon_consts.DEFAULT_REGION_NAME,
|
||||
m_ks_client.session,
|
||||
|
@ -47,6 +47,7 @@ from dccommon.drivers.openstack.sdk_platform import (
|
||||
OptimizedOpenStackDriver as OpenStackDriver
|
||||
)
|
||||
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
|
||||
from dccommon.endpoint_cache import OptimizedEndpointCache as EndpointCache
|
||||
from dccommon.exceptions import PlaybookExecutionFailed
|
||||
from dccommon.exceptions import SubcloudNotFound
|
||||
from dccommon import kubeoperator
|
||||
@ -3451,11 +3452,7 @@ class SubcloudManager(manager.Manager):
|
||||
# Update service URLs in subcloud endpoint cache
|
||||
self.audit_rpc_client.trigger_subcloud_endpoints_update(
|
||||
context, subcloud_region, services_endpoints)
|
||||
# TODO(gherzm): Remove the update_subcloud_endpoints call once
|
||||
# the OpenStackDriver is moved to be used only in the master process
|
||||
self.dcorch_rpc_client.update_subcloud_endpoints(
|
||||
context, subcloud_region, services_endpoints)
|
||||
# Update the management ip inside dcorch database
|
||||
# Update the management ip inside dcorch database (triggers endpoint update)
|
||||
self.dcorch_rpc_client.update_subcloud_management_ip(
|
||||
context, subcloud_region, endpoint_ip)
|
||||
# Update sysinv URL in cert-mon cache
|
||||
@ -3463,6 +3460,11 @@ class SubcloudManager(manager.Manager):
|
||||
dc_notification.subcloud_sysinv_endpoint_update(
|
||||
context, subcloud_region, services_endpoints.get("sysinv"))
|
||||
|
||||
# Update dcmanager endpoint cache
|
||||
EndpointCache.update_master_service_endpoint_region(
|
||||
subcloud_region, services_endpoints
|
||||
)
|
||||
|
||||
def _create_subcloud_update_overrides_file(
|
||||
self, payload, subcloud_name, filename_suffix):
|
||||
update_overrides_file = os.path.join(
|
||||
|
@ -2354,12 +2354,6 @@ class TestSubcloudsPatchPrestage(BaseTestSubcloudsPatch):
|
||||
def test_patch_prestage_fails_with_invalid_release(self):
|
||||
"""Test patch prestage fails with invalid release"""
|
||||
|
||||
# TODO(gherzmann): Remove the following mock when the OpenStackDriver
|
||||
# is fully replaced by OptimizedOpenStackDriver
|
||||
mock_patch_object = mock.patch.object(cutils, 'OptimizedOpenStackDriver')
|
||||
self.mock_openstack_driver = mock_patch_object.start()
|
||||
self.addCleanup(mock_patch_object.stop)
|
||||
|
||||
self.params["release"] = "21.12"
|
||||
|
||||
self.mock_sysinv_client().get_loads.return_values = FakeLoad(
|
||||
|
@ -30,7 +30,8 @@ import webob.dec
|
||||
import webob.exc
|
||||
|
||||
from dccommon import consts as dccommon_consts
|
||||
from dccommon.drivers.openstack.sdk_platform import OpenStackDriver
|
||||
from dccommon.drivers.openstack.sdk_platform import OptimizedOpenStackDriver as \
|
||||
OpenStackDriver
|
||||
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
|
||||
from dcmanager.rpc import client as dcmanager_rpc_client
|
||||
from dcorch.api.proxy.apps.dispatcher import APIDispatcher
|
||||
|
@ -24,7 +24,9 @@ from keystoneauth1 import exceptions as keystone_exceptions
|
||||
from oslo_log import log as logging
|
||||
|
||||
from dccommon import consts as dccommon_consts
|
||||
from dccommon.drivers.openstack import sdk_platform as sdk
|
||||
from dccommon.drivers.openstack.sdk_platform import (
|
||||
OptimizedOpenStackDriver as OpenStackDriver
|
||||
)
|
||||
from dcorch.common import consts
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -149,8 +151,11 @@ def set_request_forward_environ(req, remote_host, remote_port):
|
||||
|
||||
def _get_fernet_keys():
|
||||
"""Get fernet keys from sysinv."""
|
||||
os_client = sdk.OpenStackDriver(region_name=dccommon_consts.CLOUD_0,
|
||||
thread_name='proxy')
|
||||
os_client = OpenStackDriver(
|
||||
region_name=dccommon_consts.CLOUD_0,
|
||||
region_clients=("sysinv",),
|
||||
thread_name="proxy",
|
||||
)
|
||||
try:
|
||||
key_list = os_client.sysinv_client.get_fernet_keys()
|
||||
return [str(getattr(key, 'key')) for key in key_list]
|
||||
@ -158,12 +163,13 @@ def _get_fernet_keys():
|
||||
keystone_exceptions.ConnectFailure) as e:
|
||||
LOG.info("get_fernet_keys: cloud {} is not reachable [{}]"
|
||||
.format(dccommon_consts.CLOUD_0, str(e)))
|
||||
sdk.OpenStackDriver.delete_region_clients(dccommon_consts.CLOUD_0)
|
||||
OpenStackDriver.delete_region_clients(dccommon_consts.CLOUD_0)
|
||||
return None
|
||||
except (AttributeError, TypeError) as e:
|
||||
LOG.info("get_fernet_keys error {}".format(e))
|
||||
sdk.OpenStackDriver.delete_region_clients(dccommon_consts.CLOUD_0,
|
||||
clear_token=True)
|
||||
OpenStackDriver.delete_region_clients(
|
||||
dccommon_consts.CLOUD_0, clear_token=True
|
||||
)
|
||||
return None
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
|
@ -5,7 +5,6 @@
|
||||
#
|
||||
|
||||
import eventlet
|
||||
from keystoneauth1 import exceptions as keystone_exceptions
|
||||
from oslo_log import log as logging
|
||||
|
||||
from dccommon import consts as dccommon_consts
|
||||
@ -13,7 +12,6 @@ from dcorch.common import consts as dco_consts
|
||||
from dcorch.common import context
|
||||
from dcorch.common import exceptions
|
||||
from dcorch.db import api as db_api
|
||||
from dcorch.drivers.openstack import sdk
|
||||
from dcorch.engine import scheduler
|
||||
from dcorch.engine.sync_services.identity import IdentitySyncThread
|
||||
from dcorch.engine.sync_services.sysinv import SysinvSyncThread
|
||||
@ -290,20 +288,6 @@ class GenericSyncWorkerManager(object):
|
||||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
def update_subcloud_endpoints(self, context, subcloud_name, endpoints):
|
||||
try:
|
||||
LOG.info(f"Updating service endpoints for subcloud {subcloud_name} "
|
||||
f"in endpoint cache")
|
||||
endpoint_cache = sdk.OpenStackDriver(
|
||||
region_name=dccommon_consts.CLOUD_0).keystone_client.endpoint_cache
|
||||
endpoint_cache.update_master_service_endpoint_region(
|
||||
subcloud_name, endpoints)
|
||||
except (keystone_exceptions.EndpointNotFound,
|
||||
keystone_exceptions.ConnectFailure,
|
||||
IndexError):
|
||||
LOG.error(f"Failed to update services endpoints for "
|
||||
f"subcloud: {subcloud_name} in dcorch.")
|
||||
|
||||
def update_subcloud_management_ip(self, context, subcloud_name, management_ip):
|
||||
try:
|
||||
sc = subcloud.Subcloud.get_by_name(context, subcloud_name)
|
||||
|
@ -322,10 +322,6 @@ class EngineWorkerService(service.Service):
|
||||
def update_subcloud_version(self, ctxt, subcloud_name, sw_version):
|
||||
self.gswm.update_subcloud_version(ctxt, subcloud_name, sw_version)
|
||||
|
||||
@request_context
|
||||
def update_subcloud_endpoints(self, ctxt, subcloud_name, endpoints):
|
||||
self.gswm.update_subcloud_endpoints(ctxt, subcloud_name, endpoints)
|
||||
|
||||
@request_context
|
||||
def update_subcloud_management_ip(self, ctxt, subcloud_name, management_ip):
|
||||
self.gswm.update_subcloud_management_ip(ctxt, subcloud_name, management_ip)
|
||||
@ -341,12 +337,12 @@ class EngineWorkerService(service.Service):
|
||||
except Exception as ex:
|
||||
LOG.error(f"Failed to stop engine-worker service: {six.text_type(ex)}")
|
||||
|
||||
def stop(self):
|
||||
def stop(self, graceful=False):
|
||||
self._stop_rpc_server()
|
||||
|
||||
# Terminate the engine process
|
||||
LOG.info("All threads were gone, terminating engine-worker")
|
||||
super(EngineWorkerService, self).stop()
|
||||
super(EngineWorkerService, self).stop(graceful)
|
||||
|
||||
@request_context
|
||||
# The sync job info has been written to the DB, alert the sync engine
|
||||
|
@ -28,7 +28,7 @@ from dcdbsync.dbsyncclient.client import Client
|
||||
from dcdbsync.dbsyncclient import exceptions as dbsync_exceptions
|
||||
from dcorch.common import consts
|
||||
from dcorch.common import exceptions
|
||||
from dcorch.engine.sync_thread import get_os_client
|
||||
from dcorch.engine.sync_thread import get_master_os_client
|
||||
from dcorch.engine.sync_thread import SyncThread
|
||||
from dcorch.objects import resource
|
||||
|
||||
@ -109,11 +109,10 @@ class IdentitySyncThread(SyncThread):
|
||||
session=self.sc_admin_session)
|
||||
|
||||
def get_master_ks_client(self):
|
||||
return get_os_client(self.master_region_name, ['dbsync']).\
|
||||
keystone_client.keystone_client
|
||||
return get_master_os_client().keystone_client.keystone_client
|
||||
|
||||
def get_master_dbs_client(self):
|
||||
return get_os_client(self.master_region_name, ['dbsync']).dbsync_client
|
||||
return get_master_os_client(["dbsync"]).dbsync_client
|
||||
|
||||
def get_sc_ks_client(self):
|
||||
if self.sc_ks_client is None:
|
||||
|
@ -24,7 +24,9 @@ from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from dccommon import consts as dccommon_consts
|
||||
from dccommon.drivers.openstack import sdk_platform as sdk
|
||||
from dccommon.drivers.openstack.sdk_platform import (
|
||||
OptimizedOpenStackDriver as OpenStackDriver
|
||||
)
|
||||
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
|
||||
from dccommon.endpoint_cache import build_subcloud_endpoint
|
||||
from dccommon import exceptions as dccommon_exceptions
|
||||
@ -35,7 +37,7 @@ from dcorch.engine.fernet_key_manager import FERNET_REPO_MASTER_ID
|
||||
from dcorch.engine.fernet_key_manager import FernetKeyManager
|
||||
from dcorch.engine.sync_thread import AUDIT_RESOURCE_EXTRA
|
||||
from dcorch.engine.sync_thread import AUDIT_RESOURCE_MISSING
|
||||
from dcorch.engine.sync_thread import get_os_client
|
||||
from dcorch.engine.sync_thread import get_master_os_client
|
||||
from dcorch.engine.sync_thread import SyncThread
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -98,7 +100,7 @@ class SysinvSyncThread(SyncThread):
|
||||
endpoint=sc_sysinv_url)
|
||||
|
||||
def get_master_sysinv_client(self):
|
||||
return get_os_client(self.master_region_name, ['sysinv']).sysinv_client
|
||||
return get_master_os_client(['sysinv']).sysinv_client
|
||||
|
||||
def get_sc_sysinv_client(self):
|
||||
if self.sc_sysinv_client is None:
|
||||
@ -133,7 +135,7 @@ class SysinvSyncThread(SyncThread):
|
||||
LOG.info("{} {} region_name {} not authorized".format(
|
||||
request.orch_job.operation_type, rsrc.resource_type,
|
||||
self.region_name), extra=self.log_extra)
|
||||
sdk.OpenStackDriver.delete_region_clients(self.region_name)
|
||||
OpenStackDriver.delete_region_clients(self.region_name)
|
||||
raise exceptions.SyncRequestFailedRetry
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
@ -469,7 +471,7 @@ class SysinvSyncThread(SyncThread):
|
||||
"[{}]".format(resource_type,
|
||||
self.region_name,
|
||||
str(e)), extra=self.log_extra)
|
||||
sdk.OpenStackDriver.delete_region_clients(self.region_name)
|
||||
OpenStackDriver.delete_region_clients(self.region_name)
|
||||
return None
|
||||
except (AttributeError, TypeError) as e:
|
||||
LOG.info("get subcloud_resources {} error {}".format(
|
||||
@ -482,9 +484,9 @@ class SysinvSyncThread(SyncThread):
|
||||
def post_audit(self):
|
||||
# TODO(lzhu1): This should be revisited once the master cache service
|
||||
# is implemented.
|
||||
sdk.OpenStackDriver.delete_region_clients_for_thread(
|
||||
OpenStackDriver.delete_region_clients_for_thread(
|
||||
self.region_name, 'audit')
|
||||
sdk.OpenStackDriver.delete_region_clients_for_thread(
|
||||
OpenStackDriver.delete_region_clients_for_thread(
|
||||
dccommon_consts.CLOUD_0, 'audit')
|
||||
|
||||
def get_certificates_resources(self, sysinv_client):
|
||||
|
@ -64,17 +64,18 @@ AUDIT_RESOURCE_EXTRA = 'extra_resource'
|
||||
AUDIT_LOCK_NAME = 'dcorch-audit'
|
||||
|
||||
|
||||
def get_os_client(region, region_clients):
|
||||
def get_master_os_client(region_clients=None):
|
||||
# Used by the master clients only. The subcloud clients don't need to be
|
||||
# cached in the openstack driver, because we don't want to hold the admin
|
||||
# sessions for the subclouds.
|
||||
try:
|
||||
os_client = sdk.OptimizedOpenStackDriver(
|
||||
region_name=region,
|
||||
region_name=dccommon_consts.CLOUD_0,
|
||||
region_clients=region_clients)
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
f"Failed to get os_client for {region}/{region_clients}: {e}.")
|
||||
"Failed to get os_client for "
|
||||
f"{dccommon_consts.CLOUD_0}/{region_clients}: {e}.")
|
||||
raise e
|
||||
return os_client
|
||||
|
||||
|
@ -171,11 +171,6 @@ class EngineWorkerClient(object):
|
||||
self.make_msg('update_subcloud_version',
|
||||
subcloud_name=subcloud_name, sw_version=sw_version))
|
||||
|
||||
def update_subcloud_endpoints(self, ctxt, subcloud_name, endpoints):
|
||||
return self.cast(ctxt, self.make_msg(
|
||||
'update_subcloud_endpoints', subcloud_name=subcloud_name,
|
||||
endpoints=endpoints), fanout=True, version=self.BASE_RPC_API_VERSION)
|
||||
|
||||
def update_subcloud_management_ip(self, ctxt, subcloud_name, management_ip):
|
||||
return self.call(
|
||||
ctxt,
|
||||
|
Loading…
x
Reference in New Issue
Block a user