Add in-memory token caching for DC services

This commit introduces an in-memory, dictionary-based token caching
mechanism to reduce the number of token requests made to subclouds'
identity APIs.

The caching is implemented by subclassing the v3.Password
authentication class, which normally handles HTTP requests to the
identity API. The cache first checks if a valid, non-expired token
exists and returns it if found. If not, it proceeds with the actual
request and caches the new token for future use.

Tokens can be invalidated early when all fernet keys are rotated
(e.g., during the initial sync between subcloud and system controller).
The cache leverages Keystone's session reauthentication mechanism to
automatically invalidate cached tokens when necessary.

This commit also raises the open file descriptor limit for the DC
orchestrator service. With the use of sessions, TCP connections are
reused and are not closed immediately after each request.

Test Plan:
01. PASS - Deploy a subcloud and verify token caching behavior.
02. PASS - Deploy a subcloud with remote install, ensuring the token
    cache works.
03. PASS - Prestage a subcloud for install and software deployment,
    validating token caching during the process.
04. PASS - Run prestage orchestration and verify proper use of the
    token cache.
05. PASS - Manage a subcloud for the first time and verify that the
    initial sync functions as expected. Ensure fernet key rotation
    causes cached tokens to invalidate, and confirm reauthentication
    requests are made.
06. PASS - Unmanage a subcloud, rotate all fernet keys manually, then
    manage the subcloud again. Verify token invalidation and
    reauthentication function as expected.
07. PASS - Create a subcloud backup and ensure no token cache issues
    arise.
08. PASS - Restore a subcloud from backup and verify proper
    functionality of the token cache.
09. PASS - Deploy an N-1 subcloud and validate token caching for this
    subcloud.
10. PASS - Verify that audits correctly identify an N-1 subcloud
    without the USM patch as missing the USM service.
11. PASS - Apply the USM patch to the N-1 subcloud and verify that
    the audit detects the USM service and prestage orchestration for
    software deployment functions correctly.
12. PASS - Test DC orchestration audit and sync by creating a new
    OpenStack user, and verify the user is replicated to the subcloud.
13. PASS - Apply a patch to subclouds using software deployment
    orchestration, verifying token cache performance.
14. PASS - Test dcmanager API commands that send requests to
    subclouds (e.g., 'dcmanager subcloud show <subcloud> --details'),
    ensuring token cache is used.
15. PASS - Conduct a soak test of all DC services to verify token
    expiration, renewal, and cache behavior over extended use.
16. PASS - Monitor TCP connections to ensure they are properly
    closed after each use, preventing lingering open connections during
    token caching or HTTP request handling.
17. PASS - Run end-to-end geo-redundancy operation and verify that it
    completes successfully.
18. PASS - Run kube rootca update orchestration and verify that it
    completes successfully.
19. PASS - Verify that the number of POST token requests made by the DC
    audit to the subcloud per hour is equal to the number of DC audit
    workers on the system controller.
20. PASS - Monitor the number of open file descriptors to ensure it
    does not reach the new limit while executing a DC kube rootca
    update strategy with the maximum number of supported subclouds.
    Additionally, verify that all sessions are closed after the
    strategy is complete.

Closes-Bug: 2084490

Change-Id: Ie3c17f58c09ae08df8cd9f0c92f50ab0c556c263
Signed-off-by: Gustavo Herzmann <gustavo.herzmann@windriver.com>
This commit is contained in:
Gustavo Herzmann 2024-10-08 14:38:15 -03:00
parent 4429018879
commit 2ac4be0d5a
38 changed files with 833 additions and 657 deletions

View File

@ -107,6 +107,11 @@ endpoint_cache_opts = [
"http_connect_timeout",
help="Request timeout value for communicating with Identity API server.",
),
cfg.IntOpt(
"token_cache_size",
default=5000,
help="Maximum number of entries in the in-memory token cache",
),
]
scheduler_opts = [

View File

@ -6,7 +6,6 @@
from keystoneauth1.session import Session as keystone_session
from oslo_log import log
import requests
from dccommon import consts
from dccommon.drivers import base
@ -26,6 +25,8 @@ class DcagentClient(base.DriverBase):
session: keystone_session,
endpoint: str = None,
):
self.session = session
# Get an endpoint and token.
if endpoint is None:
self.endpoint = session.get_endpoint(
@ -36,14 +37,11 @@ class DcagentClient(base.DriverBase):
else:
self.endpoint = endpoint
self.token = session.get_token()
def audit(self, audit_data, timeout=DCAGENT_REST_DEFAULT_TIMEOUT):
"""Audit subcloud"""
url = self.endpoint + "/v1/dcaudit"
headers = {"X-Auth-Token": self.token}
response = requests.patch(
url, headers=headers, json=audit_data, timeout=timeout
response = self.session.patch(
url, json=audit_data, timeout=timeout, raise_exc=False
)
if response.status_code == 200:

View File

@ -3,8 +3,8 @@
# SPDX-License-Identifier: Apache-2.0
#
from keystoneauth1 import session as ks_session
from oslo_log import log
import requests
from requests_toolbelt import MultipartEncoder
from dccommon import consts
@ -22,8 +22,8 @@ class DcmanagerClient(base.DriverBase):
def __init__(
self,
region,
session,
region: str,
session: ks_session.Session,
timeout=DCMANAGER_CLIENT_REST_DEFAULT_TIMEOUT,
endpoint_type=consts.KS_ENDPOINT_PUBLIC,
endpoint=None,
@ -33,8 +33,8 @@ class DcmanagerClient(base.DriverBase):
service_type="dcmanager", region_name=region, interface=endpoint_type
)
self.endpoint = endpoint
self.token = session.get_token()
self.timeout = timeout
self.session = session
def get_system_peer(self, system_peer_uuid):
"""Get system peer."""
@ -42,8 +42,7 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("system_peer_uuid is required.")
url = f"{self.endpoint}/system-peers/{system_peer_uuid}"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=self.timeout)
response = self.session.get(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
return response.json()
@ -63,10 +62,14 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("subcloud_ref is required.")
url = f"{self.endpoint}/subclouds/{subcloud_ref}/detail"
headers = {"X-Auth-Token": self.token}
if is_region_name:
headers["User-Agent"] = consts.DCMANAGER_V1_HTTP_AGENT
response = requests.get(url, headers=headers, timeout=self.timeout)
user_agent = consts.DCMANAGER_V1_HTTP_AGENT if is_region_name else None
response = self.session.get(
url,
timeout=self.timeout,
user_agent=user_agent,
raise_exc=False,
)
if response.status_code == 200:
return response.json()
@ -84,8 +87,7 @@ class DcmanagerClient(base.DriverBase):
"""Get subcloud list."""
url = f"{self.endpoint}/subclouds"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=self.timeout)
response = self.session.get(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -99,8 +101,7 @@ class DcmanagerClient(base.DriverBase):
"""Get subcloud group list."""
url = f"{self.endpoint}/subcloud-groups"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=self.timeout)
response = self.session.get(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -116,8 +117,7 @@ class DcmanagerClient(base.DriverBase):
"""Get subcloud peer group list."""
url = f"{self.endpoint}/subcloud-peer-groups"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=self.timeout)
response = self.session.get(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -136,8 +136,7 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("peer_group_ref is required.")
url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=self.timeout)
response = self.session.get(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
return response.json()
@ -162,8 +161,7 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("peer_group_ref is required.")
url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}/subclouds"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=self.timeout)
response = self.session.get(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -196,8 +194,7 @@ class DcmanagerClient(base.DriverBase):
"""Get peer group association list."""
url = f"{self.endpoint}/peer-group-associations"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=self.timeout)
response = self.session.get(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -214,9 +211,9 @@ class DcmanagerClient(base.DriverBase):
"""Add a subcloud peer group."""
url = f"{self.endpoint}/subcloud-peer-groups"
headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"}
response = requests.post(
url, json=kwargs, headers=headers, timeout=self.timeout
headers = {"Content-Type": "application/json"}
response = self.session.post(
url, json=kwargs, timeout=self.timeout, headers=headers, raise_exc=False
)
if response.status_code == 200:
@ -256,11 +253,12 @@ class DcmanagerClient(base.DriverBase):
fields.update(data)
enc = MultipartEncoder(fields=fields)
headers = {
"X-Auth-Token": self.token,
"Content-Type": enc.content_type,
"User-Agent": consts.DCMANAGER_V1_HTTP_AGENT,
}
response = requests.post(url, headers=headers, data=enc, timeout=self.timeout)
response = self.session.post(
url, data=enc, timeout=self.timeout, headers=headers, raise_exc=False
)
if response.status_code == 200:
return response.json()
@ -276,9 +274,8 @@ class DcmanagerClient(base.DriverBase):
"""Add a peer group association."""
url = f"{self.endpoint}/peer-group-associations"
headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"}
response = requests.post(
url, json=kwargs, headers=headers, timeout=self.timeout
response = self.session.post(
url, json=kwargs, timeout=self.timeout, raise_exc=False
)
if response.status_code == 200:
@ -298,9 +295,8 @@ class DcmanagerClient(base.DriverBase):
url = f"{self.endpoint}/peer-group-associations/{association_id}"
update_kwargs = {"sync_status": sync_status}
headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"}
response = requests.patch(
url, json=update_kwargs, headers=headers, timeout=self.timeout
response = self.session.patch(
url, json=update_kwargs, timeout=self.timeout, raise_exc=False
)
if response.status_code == 200:
@ -327,13 +323,12 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("peer_group_ref is required.")
url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}"
headers = {
"X-Auth-Token": self.token,
"Content-Type": "application/json",
"User-Agent": consts.DCMANAGER_V1_HTTP_AGENT,
}
response = requests.patch(
url, json=kwargs, headers=headers, timeout=self.timeout
response = self.session.patch(
url,
json=kwargs,
timeout=self.timeout,
user_agent=consts.DCMANAGER_V1_HTTP_AGENT,
raise_exc=False,
)
if response.status_code == 200:
@ -359,9 +354,8 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("peer_group_ref is required.")
url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}/audit"
headers = {"X-Auth-Token": self.token, "Content-Type": "application/json"}
response = requests.patch(
url, json=kwargs, headers=headers, timeout=self.timeout
response = self.session.patch(
url, json=kwargs, timeout=self.timeout, raise_exc=False
)
if response.status_code == 200:
@ -402,12 +396,12 @@ class DcmanagerClient(base.DriverBase):
fields.update(data)
enc = MultipartEncoder(fields=fields)
headers = {"X-Auth-Token": self.token, "Content-Type": enc.content_type}
# Add header to flag the request is from another DC,
# server will treat subcloud_ref as a region_name
headers = {"Content-Type": enc.content_type}
if is_region_name:
headers["User-Agent"] = consts.DCMANAGER_V1_HTTP_AGENT
response = requests.patch(url, headers=headers, data=enc, timeout=self.timeout)
response = self.session.patch(
url, data=enc, timeout=self.timeout, headers=headers, raise_exc=False
)
if response.status_code == 200:
return response.json()
@ -428,8 +422,7 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("association_id is required.")
url = f"{self.endpoint}/peer-group-associations/{association_id}"
headers = {"X-Auth-Token": self.token}
response = requests.delete(url, headers=headers, timeout=self.timeout)
response = self.session.delete(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
return response.json()
@ -454,8 +447,7 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("peer_group_ref is required.")
url = f"{self.endpoint}/subcloud-peer-groups/{peer_group_ref}"
headers = {"X-Auth-Token": self.token}
response = requests.delete(url, headers=headers, timeout=self.timeout)
response = self.session.delete(url, timeout=self.timeout, raise_exc=False)
if response.status_code == 200:
return response.json()
@ -488,11 +480,12 @@ class DcmanagerClient(base.DriverBase):
raise ValueError("subcloud_ref is required.")
url = f"{self.endpoint}/subclouds/{subcloud_ref}"
headers = {
"X-Auth-Token": self.token,
"User-Agent": consts.DCMANAGER_V1_HTTP_AGENT,
}
response = requests.delete(url, headers=headers, timeout=self.timeout)
response = self.session.delete(
url,
timeout=self.timeout,
user_agent=consts.DCMANAGER_V1_HTTP_AGENT,
raise_exc=False,
)
if response.status_code == 200:
return response.json()

View File

@ -14,11 +14,11 @@
#
import fmclient
from keystoneauth1 import session as ks_session
from oslo_log import log
from dccommon import consts as dccommon_consts
from dccommon.drivers import base
from dccommon import exceptions
LOG = log.getLogger(__name__)
API_VERSION = "1"
@ -29,30 +29,32 @@ class FmClient(base.DriverBase):
def __init__(
self,
region,
session,
region: str,
session: ks_session.Session,
endpoint_type=dccommon_consts.KS_ENDPOINT_DEFAULT,
endpoint=None,
token=None,
endpoint: str = None,
token: str = None,
):
self.region_name = region
token = token if token else session.get_token()
if not endpoint:
endpoint = session.get_endpoint(
service_type=dccommon_consts.ENDPOINT_TYPE_FM,
region_name=region,
interface=endpoint_type,
)
try:
self.fm = fmclient.Client(
API_VERSION,
region_name=region,
endpoint_type=endpoint_type,
endpoint=endpoint,
auth_token=token,
)
except exceptions.ServiceUnavailable:
raise
# If the token is specified, use it instead of using the session
if token:
if not endpoint:
endpoint = session.get_endpoint(
service_type=dccommon_consts.ENDPOINT_TYPE_FM,
region_name=region,
interface=endpoint_type,
)
session = None
self.fm = fmclient.Client(
API_VERSION,
session=session,
region_name=region,
endpoint_type=endpoint_type,
endpoint=endpoint,
auth_token=token,
)
def get_alarm_summary(self):
"""Get this region alarm summary"""

View File

@ -13,8 +13,8 @@
# under the License.
#
from keystoneauth1 import session as ks_session
from oslo_log import log
import requests
from requests_toolbelt import MultipartEncoder
from dccommon import consts
@ -35,18 +35,16 @@ PATCH_REST_DEFAULT_TIMEOUT = 900
class PatchingClient(base.DriverBase):
"""Patching V1 driver."""
def __init__(self, region, session, endpoint=None):
# Get an endpoint and token.
if endpoint is None:
def __init__(self, region: str, session: ks_session.Session, endpoint: str = None):
self.session = session
self.endpoint = endpoint
if not self.endpoint:
self.endpoint = session.get_endpoint(
service_type="patching",
region_name=region,
interface=consts.KS_ENDPOINT_ADMIN,
)
else:
self.endpoint = endpoint
self.token = session.get_token()
def query(self, state=None, release=None, timeout=PATCH_REST_DEFAULT_TIMEOUT):
"""Query patches"""
@ -55,8 +53,7 @@ class PatchingClient(base.DriverBase):
url += "?show=%s" % state.lower()
if release is not None:
url += "&release=%s" % release
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=timeout)
response = self.session.get(url, timeout=timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -74,8 +71,7 @@ class PatchingClient(base.DriverBase):
def query_hosts(self, timeout=PATCH_REST_DEFAULT_TIMEOUT):
"""Query hosts"""
url = self.endpoint + "/v1/query_hosts"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers, timeout=timeout)
response = self.session.get(url, timeout=timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -94,8 +90,7 @@ class PatchingClient(base.DriverBase):
"""Apply patches"""
patch_str = "/".join(patches)
url = self.endpoint + "/v1/apply/%s" % patch_str
headers = {"X-Auth-Token": self.token}
response = requests.post(url, headers=headers, timeout=timeout)
response = self.session.post(url, timeout=timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -114,8 +109,7 @@ class PatchingClient(base.DriverBase):
"""Remove patches"""
patch_str = "/".join(patches)
url = self.endpoint + "/v1/remove/%s" % patch_str
headers = {"X-Auth-Token": self.token}
response = requests.post(url, headers=headers, timeout=timeout)
response = self.session.post(url, timeout=timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -134,8 +128,7 @@ class PatchingClient(base.DriverBase):
"""Delete patches"""
patch_str = "/".join(patches)
url = self.endpoint + "/v1/delete/%s" % patch_str
headers = {"X-Auth-Token": self.token}
response = requests.post(url, headers=headers, timeout=timeout)
response = self.session.post(url, timeout=timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -154,8 +147,7 @@ class PatchingClient(base.DriverBase):
"""Commit patches"""
patch_str = "/".join(patches)
url = self.endpoint + "/v1/commit/%s" % patch_str
headers = {"X-Auth-Token": self.token}
response = requests.post(url, headers=headers, timeout=timeout)
response = self.session.post(url, timeout=timeout, raise_exc=False)
if response.status_code == 200:
data = response.json()
@ -183,8 +175,10 @@ class PatchingClient(base.DriverBase):
}
)
url = self.endpoint + "/v1/upload"
headers = {"X-Auth-Token": self.token, "Content-Type": enc.content_type}
response = requests.post(url, data=enc, headers=headers, timeout=timeout)
headers = {"Content-Type": enc.content_type}
response = self.session.post(
url, data=enc, headers=headers, timeout=timeout, raise_exc=False
)
if response.status_code == 200:
data = response.json()

View File

@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0
#
from urllib import parse
from keystoneauth1.session import Session as keystone_session
from oslo_log import log
@ -40,59 +41,66 @@ class SoftwareClient(base.DriverBase):
endpoint_type: str = consts.KS_ENDPOINT_ADMIN,
token: str = None,
):
# Get an endpoint and token.
if not endpoint:
self.session = session
self.endpoint = endpoint
self.token = token
if not self.endpoint:
self.endpoint = session.get_endpoint(
service_type=consts.ENDPOINT_TYPE_USM,
region_name=region,
interface=endpoint_type,
)
else:
self.endpoint = endpoint
# The usm systemcontroller endpoint ends with a slash but the regionone
# and the subcloud endpoint don't. The slash is removed to standardize
# with the other endpoints.
self.endpoint = self.endpoint.rstrip("/") + "/v1"
self.token = token if token else session.get_token()
self.headers = {"X-Auth-Token": self.token}
self.endpoint = parse.urljoin(self.endpoint, "/v1")
self.headers = {"X-Auth-Token": self.token} if token else None
def request(self, url: str, method: str, timeout: int):
"""Request directly if token is passed, otherwise use the session"""
if self.token:
return requests.request(
method=method, url=url, headers=self.headers, timeout=timeout
)
return self.session.request(
url, method=method, timeout=timeout, raise_exc=False
)
def list(self, timeout=REST_DEFAULT_TIMEOUT):
"""List releases"""
url = self.endpoint + "/release"
response = requests.get(url, headers=self.headers, timeout=timeout)
response = self.request(url, "GET", timeout)
return self._handle_response(response, operation="List")
def show(self, release, timeout=REST_SHOW_TIMEOUT):
"""Show release"""
url = self.endpoint + f"/release/{release}"
response = requests.get(url, headers=self.headers, timeout=timeout)
response = self.request(url, "GET", timeout)
return self._handle_response(response, operation="Show")
def delete(self, releases, timeout=REST_DELETE_TIMEOUT):
"""Delete release"""
release_str = "/".join(releases)
url = self.endpoint + f"/release/{release_str}"
response = requests.delete(url, headers=self.headers, timeout=timeout)
response = self.request(url, "DELETE", timeout)
return self._handle_response(response, operation="Delete")
def deploy_precheck(self, deployment, timeout=REST_DEFAULT_TIMEOUT):
"""Deploy precheck"""
url = self.endpoint + f"/deploy/{deployment}/precheck"
response = requests.post(url, headers=self.headers, timeout=timeout)
response = self.request(url, "POST", timeout)
return self._handle_response(response, operation="Deploy precheck")
def show_deploy(self, timeout=REST_DEFAULT_TIMEOUT):
"""Show deploy"""
url = self.endpoint + "/deploy"
response = requests.get(url, headers=self.headers, timeout=timeout)
response = self.request(url, "GET", timeout)
return self._handle_response(response, operation="Show deploy")
def commit_patch(self, releases, timeout=REST_DEFAULT_TIMEOUT):
"""Commit patch"""
release_str = "/".join(releases)
url = self.endpoint + f"/commit_patch/{release_str}"
response = requests.post(url, headers=self.headers, timeout=timeout)
response = self.request(url, "POST", timeout)
return self._handle_response(response, operation="Commit patch")
def _handle_response(self, response, operation):

View File

@ -126,26 +126,32 @@ class SysinvClient(base.DriverBase):
self,
region: str,
session: keystone_session,
timeout: int = consts.SYSINV_CLIENT_REST_DEFAULT_TIMEOUT,
timeout: float = consts.SYSINV_CLIENT_REST_DEFAULT_TIMEOUT,
endpoint_type: str = consts.KS_ENDPOINT_ADMIN,
endpoint: str = None,
token: str = None,
):
self.region_name = region
# The sysinv client doesn't support a session, so we need to
# get an endpoint and token.
if not endpoint:
endpoint = session.get_endpoint(
service_type=consts.ENDPOINT_TYPE_PLATFORM,
region_name=region,
interface=endpoint_type,
)
kwargs = {}
token = token if token else session.get_token()
self.sysinv_client = client.Client(
API_VERSION, endpoint=endpoint, token=token, timeout=timeout
)
# If the token is specified, use it instead of using the session
if token:
kwargs["token"] = token
kwargs["timeout"] = timeout
if not endpoint:
endpoint = session.get_endpoint(
service_type=consts.ENDPOINT_TYPE_PLATFORM,
region_name=region,
interface=endpoint_type,
)
else:
session.timeout = timeout
kwargs["session"] = session
kwargs["endpoint"] = endpoint
self.sysinv_client = client.Client(API_VERSION, **kwargs)
def get_host(self, hostname_or_id):
"""Get a host by its hostname or id."""

View File

@ -13,8 +13,10 @@
# under the License.
#
from functools import wraps
import json
from keystoneauth1 import session as ks_session
from nfv_client.openstack import rest_api
from nfv_client.openstack import sw_update
from oslo_log import log
@ -80,36 +82,63 @@ TRANSITORY_STATES = [
VIM_AUTHORIZATION_FAILED = "Authorization failed"
# VIM API returns a 403 instead of a 401 for unauthenticated requests, so we
# can't use the internal re-auth functionality of the keystone session, we must
# manually check for the VIM_AUTHORIZATION_FAILED string in the raised exception
def retry_on_auth_failure():
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as e:
# Invalidate token cache and retry
if VIM_AUTHORIZATION_FAILED in str(e):
self.session.invalidate()
return func(self, *args, **kwargs)
# Raise any other type of exception
raise e
return wrapper
return decorator
# TODO(gherzmann): Enhance VIM client to use session-based connections,
# enabling TCP connection reuse for improved efficiency
class VimClient(base.DriverBase):
"""VIM driver."""
def __init__(self, region, session, endpoint=None):
try:
# The nfv_client doesn't support a session, so we need to
# get an endpoint and token.
if endpoint is None:
self.endpoint = session.get_endpoint(
service_type="nfv",
region_name=region,
interface=consts.KS_ENDPOINT_ADMIN,
)
else:
self.endpoint = endpoint
@property
def token(self):
# The property is used to guarantee we always get the most recent token
return self.session.get_token()
self.token = session.get_token()
# session.get_user_id() returns a UUID
# that always corresponds to 'dcmanager'
self.username = consts.DCMANAGER_USER_NAME
# session object does not provide a domain query
# The only domain used for dcmanager is 'default'
self.user_domain_name = "default"
# session.get_project_id() returns a UUID
# that always corresponds to 'services'
self.tenant = consts.SERVICES_USER_NAME
def __init__(self, region: str, session: ks_session.Session, endpoint: str = None):
self.session = session
except exceptions.ServiceUnavailable:
raise
# The nfv_client doesn't support a session, so we need to
# get an endpoint and token.
if endpoint is None:
self.endpoint = session.get_endpoint(
service_type="nfv",
region_name=region,
interface=consts.KS_ENDPOINT_ADMIN,
)
else:
self.endpoint = endpoint
# session.get_user_id() returns a UUID
# that always corresponds to 'dcmanager'
self.username = consts.DCMANAGER_USER_NAME
# session object does not provide a domain query
# The only domain used for dcmanager is 'default'
self.user_domain_name = "default"
# session.get_project_id() returns a UUID
# that always corresponds to 'services'
self.tenant = consts.SERVICES_USER_NAME
@retry_on_auth_failure()
def create_strategy(
self,
strategy_name,
@ -193,6 +222,7 @@ class VimClient(base.DriverBase):
)
return self._add_strategy_response(response)
@retry_on_auth_failure()
def get_strategy(self, strategy_name, raise_error_if_missing=True):
"""Get VIM orchestration strategy"""
@ -231,6 +261,7 @@ class VimClient(base.DriverBase):
)
return self._add_strategy_response(response)
@retry_on_auth_failure()
def get_current_strategy(self):
"""Get the current active VIM orchestration strategy"""
@ -243,6 +274,7 @@ class VimClient(base.DriverBase):
LOG.debug("Strategy: %s" % strategy)
return strategy
@retry_on_auth_failure()
def delete_strategy(self, strategy_name):
"""Delete the current VIM orchestration strategy"""
@ -265,6 +297,7 @@ class VimClient(base.DriverBase):
LOG.debug("Strategy deleted")
@retry_on_auth_failure()
def apply_strategy(self, strategy_name):
"""Apply the current orchestration strategy"""
@ -332,6 +365,7 @@ class VimClient(base.DriverBase):
return sw_update._get_strategy_object_from_response(response)
@retry_on_auth_failure()
def abort_strategy(self, strategy_name):
"""Abort the current orchestration strategy"""

View File

@ -16,11 +16,12 @@
#
import collections
from typing import Callable
from typing import List
from typing import Tuple
from collections.abc import Callable
from typing import Any
from typing import Optional
from typing import Union
from keystoneauth1 import access
from keystoneauth1.identity import v3
from keystoneauth1 import loading
from keystoneauth1 import session
@ -37,6 +38,120 @@ LOG = logging.getLogger(__name__)
LOCK_NAME = "dc-keystone-endpoint-cache"
class TCPKeepAliveSingleConnectionAdapter(session.TCPKeepAliveAdapter):
def __init__(self, *args, **kwargs):
# Set the maximum connections to 1 to reduce the number of open file descriptors
kwargs["pool_connections"] = 1
kwargs["pool_maxsize"] = 1
super().__init__(*args, **kwargs)
class BoundedFIFOCache(collections.OrderedDict):
"""A First-In-First-Out (FIFO) cache with a maximum size limit.
This cache maintains insertion order and automatically removes the oldest
items when the maximum size is reached.
"""
def __init__(self, *args, **kwargs) -> None:
"""Initialize the FIFO cache.
:param args: Additional positional arguments passed to OrderedDict constructor.
:param kwargs: Additional keyword arguments passed to OrderedDict constructor.
"""
self._maxsize = None
super().__init__(*args, **kwargs)
def __setitem__(self, key: Any, value: Any) -> None:
"""Set an item in the cache.
If the cache is at maximum capacity, the oldest item is discarded.
:param key: The key of the item.
:param value: The value of the item.
"""
super().__setitem__(key, value)
self.move_to_end(key)
# The CONF endpoint_cache section doesn't exist at the
# time the class is defined, so we define it here instead
if self._maxsize is None:
self._maxsize = CONF.endpoint_cache.token_cache_size
if self._maxsize > 0 and len(self) > self._maxsize:
discarded = self.popitem(last=False)
LOG.info(f"Maximum cache size reached, discarding token for {discarded[0]}")
class CachedV3Password(v3.Password):
"""Cached v3.Password authentication class that caches auth tokens.
This class uses a bounded FIFO cache to store and retrieve auth tokens,
reducing the number of token requests made to the authentication server.
"""
_CACHE = BoundedFIFOCache()
_CACHE_LOCK = lockutils.ReaderWriterLock()
def _get_from_cache(self) -> Optional[tuple[dict, str]]:
"""Retrieve the cached auth info for the current auth_url.
:return: The cached authentication information, if available.
"""
with CachedV3Password._CACHE_LOCK.read_lock():
return CachedV3Password._CACHE.get(self.auth_url)
def _update_cache(self, access_info: access.AccessInfoV3) -> None:
"""Update the cache with new auth info.
:param access_info: The access information to cache.
"""
with CachedV3Password._CACHE_LOCK.write_lock():
# pylint: disable=protected-access
CachedV3Password._CACHE[self.auth_url] = (
access_info._data,
access_info._auth_token,
)
def _remove_from_cache(self) -> Optional[tuple[dict, str]]:
"""Remove the auth info for the current auth_url from the cache."""
with CachedV3Password._CACHE_LOCK.write_lock():
return CachedV3Password._CACHE.pop(self.auth_url, None)
def get_auth_ref(self, _session: session.Session, **kwargs) -> access.AccessInfoV3:
"""Get the authentication reference, using the cache if possible.
This method first checks the cache for a valid token. If found and not
expiring soon, it returns the cached token. Otherwise, it requests a new
token from the auth server and updates the cache.
:param session: The session to use for authentication.
:param kwargs: Additional keyword arguments passed to the parent method.
:return: The authentication reference.
"""
cached_data = self._get_from_cache()
if cached_data and not utils.is_token_expiring_soon(cached_data[0]["token"]):
LOG.debug("Reuse cached token for %s", self.auth_url)
return access.AccessInfoV3(*cached_data)
# If not in cache or expired, fetch new token and update cache
LOG.debug("Getting a new token from %s", self.auth_url)
new_access_info = super().get_auth_ref(_session, **kwargs)
self._update_cache(new_access_info)
return new_access_info
def invalidate(self) -> bool:
"""Remove token from cache when the parent invalidate method is called.
This method is called by the session when a request returns a 401 (Unauthorized)
:return: The result of the parent invalidate method.
"""
LOG.debug("Invalidating token for %s", self.auth_url)
self._remove_from_cache()
return super().invalidate()
class EndpointCache(object):
"""Cache for storing endpoint information.
@ -196,7 +311,7 @@ class EndpointCache(object):
:rtype: session.Session
"""
user_auth = v3.Password(
user_auth = CachedV3Password(
auth_url=auth_url,
username=user_name,
user_domain_name=user_domain_name,
@ -215,12 +330,18 @@ class EndpointCache(object):
CONF.endpoint_cache.http_connect_timeout if timeout is None else timeout
)
return session.Session(
ks_session = session.Session(
auth=user_auth,
additional_headers=consts.USER_HEADER,
timeout=(discovery_timeout, read_timeout),
)
# Mount the custom adapters
ks_session.session.mount("http://", TCPKeepAliveSingleConnectionAdapter())
ks_session.session.mount("https://", TCPKeepAliveSingleConnectionAdapter())
return ks_session
@staticmethod
def _is_central_cloud(region_name: str) -> bool:
"""Check if the region is a central cloud.
@ -287,7 +408,7 @@ class EndpointCache(object):
return endpoint
@lockutils.synchronized(LOCK_NAME)
def get_all_regions(self) -> List[str]:
def get_all_regions(self) -> list[str]:
"""Get region list.
return: List of regions
@ -382,7 +503,7 @@ class EndpointCache(object):
@lockutils.synchronized(LOCK_NAME)
def get_cached_master_keystone_client_and_region_endpoint_map(
self, region_name: str
) -> Tuple[ks_client.Client, dict]:
) -> tuple[ks_client.Client, dict]:
"""Get the cached master Keystone client and region endpoint map.
:param region_name: The name of the region.

View File

@ -3,7 +3,7 @@
# SPDX-License-Identifier: Apache-2.0
#
import os
from io import BytesIO
import uuid
import yaml
@ -62,342 +62,181 @@ class TestDcmanagerClient(base.DCCommonTestCase):
def setUp(self):
super(TestDcmanagerClient, self).setUp()
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = FAKE_SUBCLOUD_DATA
mock_get.return_value = mock_response
self.mock_response = mock.MagicMock()
self.mock_response.status_code = 200
self.mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
self.mock_session = mock.MagicMock()
self.client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME,
session=self.mock_session,
timeout=FAKE_TIMEOUT,
endpoint=FAKE_ENDPOINT,
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
actual_subcloud = client.get_subcloud(SUBCLOUD_NAME)
def test_get_subcloud(self):
self.mock_response.json.return_value = FAKE_SUBCLOUD_DATA
self.mock_session.get.return_value = self.mock_response
actual_subcloud = self.client.get_subcloud(SUBCLOUD_NAME)
self.assertEqual(SUBCLOUD_NAME, actual_subcloud.get("name"))
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_not_found(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 404
mock_response.text = "Subcloud not found"
mock_get.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
def test_get_subcloud_not_found(self):
self.mock_response.status_code = 404
self.mock_response.text = "Subcloud not found"
self.mock_session.get.return_value = self.mock_response
self.assertRaises(
dccommon_exceptions.SubcloudNotFound, client.get_subcloud, SUBCLOUD_NAME
dccommon_exceptions.SubcloudNotFound,
self.client.get_subcloud,
SUBCLOUD_NAME,
)
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_list(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]}
mock_get.return_value = mock_response
def test_get_subcloud_list(self):
self.mock_response.status_code = 200
self.mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]}
self.mock_session.get.return_value = self.mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
actual_subclouds = client.get_subcloud_list()
actual_subclouds = self.client.get_subcloud_list()
self.assertEqual(1, len(actual_subclouds))
self.assertEqual(SUBCLOUD_NAME, actual_subclouds[0].get("name"))
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_group_list(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
def test_get_subcloud_group_list(self):
self.mock_response.status_code = 200
self.mock_response.json.return_value = {
"subcloud_groups": [{"name": SUBCLOUD_GROUP_NAME}]
}
mock_get.return_value = mock_response
self.mock_session.get.return_value = self.mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
actual_subcloud_groups = self.client.get_subcloud_group_list()
self.assertListEqual(actual_subcloud_groups, [{"name": SUBCLOUD_GROUP_NAME}])
actual_subcloud_groups = client.get_subcloud_group_list()
self.assertEqual(1, len(actual_subcloud_groups))
self.assertEqual(SUBCLOUD_GROUP_NAME, actual_subcloud_groups[0].get("name"))
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_peer_group_list(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
def test_get_subcloud_peer_group_list(self):
self.mock_response.status_code = 200
self.mock_response.json.return_value = {
"subcloud_peer_groups": [FAKE_SUBCLOUD_PEER_GROUP_DATA]
}
mock_get.return_value = mock_response
self.mock_session.get.return_value = self.mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
actual_peer_group = self.client.get_subcloud_peer_group_list()
self.assertListEqual(actual_peer_group, [FAKE_SUBCLOUD_PEER_GROUP_DATA])
actual_peer_group = client.get_subcloud_peer_group_list()
self.assertEqual(1, len(actual_peer_group))
self.assertEqual(
SUBCLOUD_PEER_GROUP_NAME, actual_peer_group[0].get("peer-group-name")
)
def test_get_subcloud_peer_group(self):
self.mock_response.status_code = 200
self.mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA
self.mock_session.get.return_value = self.mock_response
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_peer_group(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA
mock_get.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
actual_peer_group = client.get_subcloud_peer_group(SUBCLOUD_PEER_GROUP_NAME)
self.assertEqual(
SUBCLOUD_PEER_GROUP_NAME, actual_peer_group.get("peer-group-name")
)
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_peer_group_not_found(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 404
mock_response.text = "Subcloud Peer Group not found"
mock_get.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
self.assertRaises(
dccommon_exceptions.SubcloudPeerGroupNotFound,
client.get_subcloud_peer_group,
SUBCLOUD_PEER_GROUP_NAME,
)
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_list_by_peer_group(self, mock_client_init, mock_get):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]}
mock_get.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
actual_subclouds = client.get_subcloud_list_by_peer_group(
actual_peer_group = self.client.get_subcloud_peer_group(
SUBCLOUD_PEER_GROUP_NAME
)
self.assertEqual(1, len(actual_subclouds))
self.assertEqual(SUBCLOUD_NAME, actual_subclouds[0].get("name"))
self.assertDictEqual(actual_peer_group, FAKE_SUBCLOUD_PEER_GROUP_DATA)
@mock.patch("requests.get")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_get_subcloud_list_by_peer_group_not_found(
self, mock_client_init, mock_get
):
mock_response = mock.MagicMock()
mock_response.status_code = 404
mock_response.text = "Subcloud Peer Group not found"
mock_get.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
def test_get_subcloud_peer_group_not_found(self):
self.mock_response.status_code = 404
self.mock_response.text = "Subcloud Peer Group not found"
self.mock_session.get.return_value = self.mock_response
self.assertRaises(
dccommon_exceptions.SubcloudPeerGroupNotFound,
client.get_subcloud_list_by_peer_group,
self.client.get_subcloud_peer_group,
SUBCLOUD_PEER_GROUP_NAME,
)
@mock.patch("requests.post")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_add_subcloud_peer_group(self, mock_client_init, mock_post):
def test_get_subcloud_list_by_peer_group(self):
self.mock_response.status_code = 200
self.mock_response.json.return_value = {"subclouds": [FAKE_SUBCLOUD_DATA]}
self.mock_session.get.return_value = self.mock_response
actual_subclouds = self.client.get_subcloud_list_by_peer_group(
SUBCLOUD_PEER_GROUP_NAME
)
self.assertListEqual(actual_subclouds, [FAKE_SUBCLOUD_DATA])
def test_get_subcloud_list_by_peer_group_not_found(self):
self.mock_response.status_code = 404
self.mock_response.text = "Subcloud Peer Group not found"
self.mock_session.get.return_value = self.mock_response
self.assertRaises(
dccommon_exceptions.SubcloudPeerGroupNotFound,
self.client.get_subcloud_list_by_peer_group,
SUBCLOUD_PEER_GROUP_NAME,
)
def test_add_subcloud_peer_group(self):
peer_group_kwargs = {"peer-group-name": SUBCLOUD_PEER_GROUP_NAME}
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA
mock_post.return_value = mock_response
self.mock_response.status_code = 200
self.mock_response.json.return_value = FAKE_SUBCLOUD_PEER_GROUP_DATA
self.mock_session.post.return_value = self.mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
actual_peer_group = self.client.add_subcloud_peer_group(**peer_group_kwargs)
self.assertDictEqual(actual_peer_group, FAKE_SUBCLOUD_PEER_GROUP_DATA)
actual_peer_group = client.add_subcloud_peer_group(**peer_group_kwargs)
self.assertEqual(
SUBCLOUD_PEER_GROUP_NAME, actual_peer_group.get("peer-group-name")
)
@mock.patch("builtins.open", new_callable=mock.mock_open)
def test_add_subcloud_with_secondary_status(self, mock_open):
self.mock_response.status_code = 200
self.mock_response.json.return_value = FAKE_SUBCLOUD_DATA
self.mock_session.post.return_value = self.mock_response
@mock.patch("requests.post")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_add_subcloud_with_secondary_status(self, mock_client_init, mock_post):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = FAKE_SUBCLOUD_DATA
mock_post.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
# create the cache file for subcloud create
yaml_data = yaml.dump(FAKE_SUBCLOUD_DATA)
with open(SUBCLOUD_BOOTSTRAP_VALUE_PATH, "w") as file:
file.write(yaml_data)
# Mock the file content to be returned when reading
yaml_data = yaml.dump(FAKE_SUBCLOUD_DATA).encode("utf-8")
mock_open.return_value = BytesIO(yaml_data)
subcloud_kwargs = {
"data": {"bootstrap-address": SUBCLOUD_BOOTSTRAP_ADDRESS},
"files": {"bootstrap_values": SUBCLOUD_BOOTSTRAP_VALUE_PATH},
}
actual_subcloud = client.add_subcloud_with_secondary_status(**subcloud_kwargs)
self.assertEqual(SUBCLOUD_NAME, actual_subcloud.get("name"))
# purge the cache file
os.remove(SUBCLOUD_BOOTSTRAP_VALUE_PATH)
@mock.patch("requests.delete")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_delete_subcloud_peer_group(self, mock_client_init, mock_delete):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = ""
mock_delete.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
actual_subcloud = self.client.add_subcloud_with_secondary_status(
**subcloud_kwargs
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
self.assertDictEqual(actual_subcloud, FAKE_SUBCLOUD_DATA)
result = client.delete_subcloud_peer_group(SUBCLOUD_PEER_GROUP_NAME)
mock_delete.assert_called_once_with(
def test_delete_subcloud_peer_group(self):
self.mock_response.status_code = 200
self.mock_response.json.return_value = ""
self.mock_session.delete.return_value = self.mock_response
result = self.client.delete_subcloud_peer_group(SUBCLOUD_PEER_GROUP_NAME)
self.mock_session.delete.assert_called_once_with(
FAKE_ENDPOINT + "/subcloud-peer-groups/" + SUBCLOUD_PEER_GROUP_NAME,
headers={"X-Auth-Token": FAKE_TOKEN},
timeout=FAKE_TIMEOUT,
raise_exc=False,
)
self.assertEqual(result, "")
@mock.patch("requests.delete")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_delete_subcloud_peer_group_not_found(self, mock_client_init, mock_delete):
mock_response = mock.MagicMock()
mock_response.status_code = 404
mock_response.text = "Subcloud Peer Group not found"
mock_delete.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
def test_delete_subcloud_peer_group_not_found(self):
self.mock_response.status_code = 404
self.mock_response.text = "Subcloud Peer Group not found"
self.mock_session.delete.return_value = self.mock_response
self.assertRaises(
dccommon_exceptions.SubcloudPeerGroupNotFound,
client.delete_subcloud_peer_group,
self.client.delete_subcloud_peer_group,
SUBCLOUD_PEER_GROUP_NAME,
)
@mock.patch("requests.delete")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_delete_subcloud(self, mock_client_init, mock_delete):
mock_response = mock.MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = ""
mock_delete.return_value = mock_response
def test_delete_subcloud(self):
self.mock_response.status_code = 200
self.mock_response.json.return_value = ""
self.mock_session.delete.return_value = self.mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
result = client.delete_subcloud(SUBCLOUD_NAME)
mock_delete.assert_called_once_with(
result = self.client.delete_subcloud(SUBCLOUD_NAME)
self.mock_session.delete.assert_called_once_with(
FAKE_ENDPOINT + "/subclouds/" + SUBCLOUD_NAME,
headers={
"X-Auth-Token": FAKE_TOKEN,
"User-Agent": dccommon_consts.DCMANAGER_V1_HTTP_AGENT,
},
timeout=FAKE_TIMEOUT,
user_agent=dccommon_consts.DCMANAGER_V1_HTTP_AGENT,
raise_exc=False,
)
self.assertEqual(result, "")
@mock.patch("requests.delete")
@mock.patch.object(dcmanager_v1.DcmanagerClient, "__init__")
def test_delete_subcloud_not_found(self, mock_client_init, mock_delete):
mock_response = mock.MagicMock()
mock_response.status_code = 404
mock_response.text = "Subcloud not found"
mock_delete.return_value = mock_response
mock_client_init.return_value = None
client = dcmanager_v1.DcmanagerClient(
dccommon_consts.SYSTEM_CONTROLLER_NAME, None
)
client.endpoint = FAKE_ENDPOINT
client.token = FAKE_TOKEN
client.timeout = FAKE_TIMEOUT
def test_delete_subcloud_not_found(self):
self.mock_response.status_code = 404
self.mock_response.text = "Subcloud not found"
self.mock_session.delete.return_value = self.mock_response
self.assertRaises(
dccommon_exceptions.SubcloudNotFound, client.delete_subcloud, SUBCLOUD_NAME
dccommon_exceptions.SubcloudNotFound,
self.client.delete_subcloud,
SUBCLOUD_NAME,
)

View File

@ -79,13 +79,13 @@ URLS = ["/deploy", "/commit_patch"]
def mocked_requests_success(*args, **kwargs):
response_content = None
if args[0].endswith("/release"):
if kwargs["url"].endswith("/release"):
response_content = json.dumps(LIST_RESPONSE)
elif args[0].endswith("/release/DC.1"):
elif kwargs["url"].endswith("/release/DC.1"):
response_content = json.dumps(SHOW_RESPONSE)
elif args[0].endswith("/release/DC.1/DC.2"):
elif kwargs["url"].endswith("/release/DC.1/DC.2"):
response_content = json.dumps(INFO_RESPONSE)
elif any([url in args[0] for url in URLS]):
elif any([url in kwargs["url"] for url in URLS]):
response_content = json.dumps(INFO_RESPONSE)
response = requests.Response()
response.status_code = 200
@ -108,66 +108,65 @@ class TestSoftwareClient(base.DCCommonTestCase):
self.ctx = utils.dummy_context()
self.session = mock.MagicMock()
self.software_client = SoftwareClient(
session=mock.MagicMock(),
endpoint=FAKE_ENDPOINT,
session=mock.MagicMock(), endpoint=FAKE_ENDPOINT, token="TOKEN"
)
@mock.patch("requests.get")
def test_list_success(self, mock_get):
mock_get.side_effect = mocked_requests_success
@mock.patch("requests.request")
def test_list_success(self, mock_request):
mock_request.side_effect = mocked_requests_success
response = self.software_client.list()
self.assertEqual(response, CLIENT_LIST_RESPONSE)
@mock.patch("requests.get")
def test_list_failure(self, mock_get):
mock_get.side_effect = mocked_requests_failure
@mock.patch("requests.request")
def test_list_failure(self, mock_request):
mock_request.side_effect = mocked_requests_failure
exc = self.assertRaises(exceptions.ApiException, self.software_client.list)
self.assertTrue("List failed with status code: 500" in str(exc))
self.assertIn("List failed with status code: 500", str(exc))
@mock.patch("requests.get")
def test_show_success(self, mock_get):
mock_get.side_effect = mocked_requests_success
@mock.patch("requests.request")
def test_show_success(self, mock_request):
mock_request.side_effect = mocked_requests_success
release = "DC.1"
response = self.software_client.show(release)
self.assertEqual(response, SHOW_RESPONSE)
@mock.patch("requests.get")
def test_show_failure(self, mock_get):
mock_get.side_effect = mocked_requests_failure
@mock.patch("requests.request")
def test_show_failure(self, mock_request):
mock_request.side_effect = mocked_requests_failure
release = "DC.1"
exc = self.assertRaises(
exceptions.ApiException, self.software_client.show, release
)
self.assertTrue("Show failed with status code: 500" in str(exc))
self.assertIn("Show failed with status code: 500", str(exc))
@mock.patch("requests.delete")
def test_delete_success(self, mock_delete):
mock_delete.side_effect = mocked_requests_success
@mock.patch("requests.request")
def test_delete_success(self, mock_request):
mock_request.side_effect = mocked_requests_success
releases = ["DC.1", "DC.2"]
response = self.software_client.delete(releases)
self.assertEqual(response, INFO_RESPONSE)
@mock.patch("requests.delete")
def test_delete_failure(self, mock_delete):
mock_delete.side_effect = mocked_requests_failure
@mock.patch("requests.request")
def test_delete_failure(self, mock_request):
mock_request.side_effect = mocked_requests_failure
releases = ["DC.1", "DC.2"]
exc = self.assertRaises(
exceptions.ApiException, self.software_client.delete, releases
)
self.assertTrue("Delete failed with status code: 500" in str(exc))
self.assertIn("Delete failed with status code: 500", str(exc))
@mock.patch("requests.post")
def test_commit_patch_success(self, mock_post):
mock_post.side_effect = mocked_requests_success
@mock.patch("requests.request")
def test_commit_patch_success(self, mock_request):
mock_request.side_effect = mocked_requests_success
releases = ["DC.1", "DC.2"]
response = self.software_client.commit_patch(releases)
self.assertEqual(response, INFO_RESPONSE)
@mock.patch("requests.post")
def test_commit_patch_failure(self, mock_post):
mock_post.side_effect = mocked_requests_failure
@mock.patch("requests.request")
def test_commit_patch_failure(self, mock_request):
mock_request.side_effect = mocked_requests_failure
releases = ["DC.1", "DC.2"]
exc = self.assertRaises(
exceptions.ApiException, self.software_client.commit_patch, releases
)
self.assertTrue("Commit patch failed with status code: 500" in str(exc))
self.assertIn("Commit patch failed with status code: 500", str(exc))

View File

@ -17,11 +17,14 @@
import collections
import copy
import netaddr
import threading
import time
from keystoneauth1 import access
from keystoneclient.v3 import services
from keystoneclient.v3 import tokens
import mock
import netaddr
from oslo_config import cfg
from dccommon import endpoint_cache
@ -77,10 +80,12 @@ FAKE_SERVICES_LIST = [
FakeService(7, "dcorch", "dcorch", True),
]
FAKE_AUTH_URL = "http://fake.auth/url"
class EndpointCacheTest(base.DCCommonTestCase):
def setUp(self):
super(EndpointCacheTest, self).setUp()
super().setUp()
auth_uri_opts = [
cfg.StrOpt("auth_uri", default="fake_auth_uri"),
cfg.StrOpt("username", default="fake_user"),
@ -231,3 +236,123 @@ class EndpointCacheTest(base.DCCommonTestCase):
expected_endpoints = {}
actual_endpoints = utils.build_subcloud_endpoints(empty_ips)
self.assertEqual(expected_endpoints, actual_endpoints)
class TestBoundedFIFOCache(base.DCCommonTestCase):
def setUp(self):
# pylint: disable=protected-access
super().setUp()
self.cache = endpoint_cache.BoundedFIFOCache()
self.cache._maxsize = 3 # Set a small max size for testing
def test_insertion_and_order(self):
self.cache["a"] = 1
self.cache["b"] = 2
self.cache["c"] = 3
self.assertEqual(list(self.cache.keys()), ["a", "b", "c"])
def test_max_size_limit(self):
self.cache["a"] = 1
self.cache["b"] = 2
self.cache["c"] = 3
self.cache["d"] = 4
self.assertEqual(len(self.cache), 3)
self.assertEqual(list(self.cache.keys()), ["b", "c", "d"])
def test_update_existing_key(self):
self.cache["a"] = 1
self.cache["b"] = 2
self.cache["a"] = 3
self.assertEqual(list(self.cache.keys()), ["b", "a"])
self.assertEqual(self.cache["a"], 3)
class TestCachedV3Password(base.DCCommonTestCase):
# pylint: disable=protected-access
def setUp(self):
super().setUp()
self.auth = endpoint_cache.CachedV3Password(auth_url=FAKE_AUTH_URL)
endpoint_cache.CachedV3Password._CACHE.clear()
# Set a maxsize value so it doesn't try to read from the config file
endpoint_cache.CachedV3Password._CACHE._maxsize = 50
mock_get_auth_ref_object = mock.patch("endpoint_cache.v3.Password.get_auth_ref")
self.mock_parent_get_auth_ref = mock_get_auth_ref_object.start()
self.addCleanup(mock_get_auth_ref_object.stop)
@mock.patch("endpoint_cache.utils.is_token_expiring_soon")
def test_get_auth_ref_cached(self, mock_is_expiring):
mock_is_expiring.return_value = False
mock_session = mock.MagicMock()
# Simulate a cached token
cached_data = ({"token": "fake_token"}, "auth_token")
self.auth._CACHE[FAKE_AUTH_URL] = cached_data
result = self.auth.get_auth_ref(mock_session)
self.assertIsInstance(result, access.AccessInfoV3)
self.assertEqual(result._auth_token, "auth_token")
# Ensure we didn't call the parent method
self.mock_parent_get_auth_ref.assert_not_called()
def test_get_auth_ref_new_token(self):
mock_session = mock.MagicMock()
mock_access_info = mock.MagicMock(spec=access.AccessInfoV3)
mock_access_info._data = {"token": "new_token"}
mock_access_info._auth_token = "new_auth_token"
self.mock_parent_get_auth_ref.return_value = mock_access_info
result = self.auth.get_auth_ref(mock_session)
self.assertEqual(result, mock_access_info)
self.mock_parent_get_auth_ref.assert_called_once_with(mock_session)
self.assertEqual(
self.auth._CACHE[FAKE_AUTH_URL],
(mock_access_info._data, mock_access_info._auth_token),
)
def test_get_auth_concurrent_access(self):
auth_obj_list = [
endpoint_cache.CachedV3Password(auth_url=f"{FAKE_AUTH_URL}/{i}")
for i in range(1, 51)
]
call_count = 0
generated_tokens = []
def mock_get_auth_ref(_, **__):
nonlocal call_count, generated_tokens
time.sleep(0.1) # Simulate network delay
call_count += 1
token = f"auth_token_{call_count}"
generated_tokens.append(token)
return access.AccessInfoV3({"token": token}, token)
self.mock_parent_get_auth_ref.side_effect = mock_get_auth_ref
threads = [
threading.Thread(target=auth.get_auth_ref, args=(None,))
for auth in auth_obj_list
]
for t in threads:
t.start()
for t in threads:
t.join()
# All URLs should have generated their own tokens
self.assertEqual(len(endpoint_cache.CachedV3Password._CACHE), 50)
cached_tokens = [v[1] for v in endpoint_cache.CachedV3Password._CACHE.values()]
self.assertCountEqual(generated_tokens, cached_tokens)
self.assertEqual(self.mock_parent_get_auth_ref.call_count, 50)
@mock.patch("endpoint_cache.v3.Password.invalidate")
def test_invalidate(self, mock_parent_invalidate):
# Set up a fake cached token
self.auth._CACHE[FAKE_AUTH_URL] = ({"token": "fake_token"}, "auth_token")
self.auth.invalidate()
self.assertNotIn(FAKE_AUTH_URL, self.auth._CACHE)
mock_parent_invalidate.assert_called_once()

View File

@ -24,6 +24,8 @@ import logging
import os
import requests
from keystoneauth1 import exceptions as ks_exceptions
from keystoneauth1 import session as ks_session
from oslo_utils import importutils
from dcdbsync.dbsyncclient import exceptions
@ -52,6 +54,7 @@ class HTTPClient(object):
cacert=None,
insecure=False,
request_timeout=None,
session=None,
):
self.base_url = base_url
self.token = token
@ -59,6 +62,7 @@ class HTTPClient(object):
self.user_id = user_id
self.ssl_options = {}
self.request_timeout = request_timeout
self.session: ks_session.Session = session
if self.base_url.startswith("https"):
if cacert and not os.path.exists(cacert):
@ -72,95 +76,73 @@ class HTTPClient(object):
self.ssl_options["verify"] = not insecure
self.ssl_options["cert"] = cacert
def request(self, url: str, method: str, data=None, **kwargs):
"""Request directly if session is not passed, otherwise use the session"""
try:
if self.session:
return self.session.request(
url,
method=method,
data=data,
timeout=self.request_timeout,
raise_exc=False,
**kwargs,
)
return requests.request(
method=method,
url=url,
data=data,
timeout=self.request_timeout,
**kwargs,
)
except (
requests.exceptions.Timeout,
ks_exceptions.ConnectTimeout,
) as e:
msg = f"Request to {url} timed out"
raise exceptions.ConnectTimeout(msg) from e
except (
requests.exceptions.ConnectionError,
ks_exceptions.ConnectionError,
) as e:
msg = f"Unable to establish connection to {url}: {e}"
raise exceptions.ConnectFailure(msg) from e
except (
requests.exceptions.RequestException,
ks_exceptions.ClientException,
) as e:
msg = f"Unexpected exception for {url}: {e}"
raise exceptions.UnknownConnectionError(msg) from e
@log_request
def get(self, url, headers=None):
options = self._get_request_options("get", headers)
try:
url = self.base_url + url
timeout = self.request_timeout
return requests.get(url, timeout=timeout, **options)
except requests.exceptions.Timeout:
msg = "Request to %s timed out" % url
raise exceptions.ConnectTimeout(msg)
except requests.exceptions.ConnectionError as e:
msg = "Unable to establish connection to %s: %s" % (url, e)
raise exceptions.ConnectFailure(msg)
except requests.exceptions.RequestException as e:
msg = "Unexpected exception for %s: %s" % (url, e)
raise exceptions.UnknownConnectionError(msg)
url = self.base_url + url
return self.request(url, "GET", **options)
@log_request
def post(self, url, body, headers=None):
options = self._get_request_options("post", headers)
try:
url = self.base_url + url
timeout = self.request_timeout
return requests.post(url, body, timeout=timeout, **options)
except requests.exceptions.Timeout:
msg = "Request to %s timed out" % url
raise exceptions.ConnectTimeout(msg)
except requests.exceptions.ConnectionError as e:
msg = "Unable to establish connection to %s: %s" % (url, e)
raise exceptions.ConnectFailure(msg)
except requests.exceptions.RequestException as e:
msg = "Unexpected exception for %s: %s" % (url, e)
raise exceptions.UnknownConnectionError(msg)
url = self.base_url + url
return self.request(url, "POST", data=body, **options)
@log_request
def put(self, url, body, headers=None):
options = self._get_request_options("put", headers)
try:
url = self.base_url + url
timeout = self.request_timeout
return requests.put(url, body, timeout=timeout, **options)
except requests.exceptions.Timeout:
msg = "Request to %s timed out" % url
raise exceptions.ConnectTimeout(msg)
except requests.exceptions.ConnectionError as e:
msg = "Unable to establish connection to %s: %s" % (url, e)
raise exceptions.ConnectFailure(msg)
except requests.exceptions.RequestException as e:
msg = "Unexpected exception for %s: %s" % (url, e)
raise exceptions.UnknownConnectionError(msg)
url = self.base_url + url
return self.request(url, "PUT", data=body, **options)
@log_request
def patch(self, url, body, headers=None):
options = self._get_request_options("patch", headers)
try:
url = self.base_url + url
timeout = self.request_timeout
return requests.patch(url, body, timeout=timeout, **options)
except requests.exceptions.Timeout:
msg = "Request to %s timed out" % url
raise exceptions.ConnectTimeout(msg)
except requests.exceptions.ConnectionError as e:
msg = "Unable to establish connection to %s: %s" % (url, e)
raise exceptions.ConnectFailure(msg)
except requests.exceptions.RequestException as e:
msg = "Unexpected exception for %s: %s" % (url, e)
raise exceptions.UnknownConnectionError(msg)
url = self.base_url + url
return self.request(url, "PATCH", data=body, **options)
@log_request
def delete(self, url, headers=None):
options = self._get_request_options("delete", headers)
try:
url = self.base_url + url
timeout = self.request_timeout
return requests.delete(url, timeout=timeout, **options)
except requests.exceptions.Timeout:
msg = "Request to %s timed out" % url
raise exceptions.ConnectTimeout(msg)
except requests.exceptions.ConnectionError as e:
msg = "Unable to establish connection to %s: %s" % (url, e)
raise exceptions.ConnectFailure(msg)
except requests.exceptions.RequestException as e:
msg = "Unexpected exception for %s: %s" % (url, e)
raise exceptions.UnknownConnectionError(msg)
url = self.base_url + url
return self.request(url, "DELETE", **options)
def _get_request_options(self, method, headers):
headers = self._update_headers(headers)
@ -178,9 +160,11 @@ class HTTPClient(object):
if not headers:
headers = {}
token = headers.get("x-auth-token", self.token)
if token:
headers["x-auth-token"] = token
# If the session exists, let it handle the token
if not self.session:
token = headers.get("x-auth-token", self.token)
if token:
headers["x-auth-token"] = token
project_id = headers.get("X-Project-Id", self.project_id)
if project_id:

View File

@ -103,6 +103,7 @@ class Client(object):
cacert=cacert,
insecure=insecure,
request_timeout=_DEFAULT_REQUEST_TIMEOUT,
session=session,
)
# Create all managers

View File

@ -42,7 +42,7 @@ class PatchAudit(object):
def get_regionone_audit_data(self):
return PatchAuditData()
def subcloud_patch_audit(self, keystone_session, subcloud):
def subcloud_patch_audit(self, keystone_client, subcloud):
LOG.info("Triggered patch audit for: %s." % subcloud.name)
# NOTE(nicodemos): Patch audit not supported for 24.09 subcloud
@ -51,7 +51,7 @@ class PatchAudit(object):
# NOTE(nicodemos): If the subcloud is on the 22.12 release with USM enabled,
# skip the patch audit.
if utils.has_usm_service(subcloud.region_name, keystone_session):
if utils.has_usm_service(subcloud.region_name, keystone_client):
return consts.SYNC_STATUS_NOT_AVAILABLE
# NOTE(nicodemos): As of version 24.09, the patching orchestration only

View File

@ -65,7 +65,7 @@ class DCManagerAuditService(service.Service):
self.subcloud_audit_manager = None
def start(self):
utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile)
utils.set_open_file_limit(cfg.CONF.audit_worker_rlimit_nofile)
target = oslo_messaging.Target(
version=self.rpc_api_version, server=self.host, topic=self.topic
)
@ -193,7 +193,7 @@ class DCManagerAuditWorkerService(service.Service):
self.subcloud_audit_worker_manager = None
def start(self):
utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile)
utils.set_open_file_limit(cfg.CONF.audit_worker_rlimit_nofile)
self.init_tgm()
self.init_audit_managers()
target = oslo_messaging.Target(

View File

@ -4,7 +4,7 @@
# SPDX-License-Identifier: Apache-2.0
#
from keystoneauth1.session import Session as keystone_session
from keystoneclient.v3.client import Client as KeystoneClient
from oslo_log import log as logging
from tsconfig.tsconfig import SW_VERSION
@ -142,7 +142,7 @@ class SoftwareAudit(object):
def subcloud_software_audit(
self,
keystone_session: keystone_session,
keystone_client: KeystoneClient,
subcloud: models.Subcloud,
audit_data: SoftwareAuditData,
):
@ -150,7 +150,7 @@ class SoftwareAudit(object):
# TODO(nicodemos): Remove this method after all support to patching is removed
# NOTE(nicodemos): Software audit not support on 22.12 subcloud without USM
if subcloud.software_version != SW_VERSION and not utils.has_usm_service(
subcloud.region_name, keystone_session
subcloud.region_name, keystone_client
):
LOG.info(f"Software audit not supported for {subcloud.name} without USM.")
return dccommon_consts.SYNC_STATUS_NOT_AVAILABLE
@ -160,7 +160,7 @@ class SoftwareAudit(object):
subcloud.management_start_ip, dccommon_consts.ENDPOINT_NAME_USM
)
software_client = SoftwareClient(
keystone_session, endpoint=software_endpoint
keystone_client.session, endpoint=software_endpoint
)
except Exception:
LOG.exception(

View File

@ -561,7 +561,7 @@ class SubcloudAuditWorkerManager(manager.Manager):
try:
endpoint_data[dccommon_consts.ENDPOINT_TYPE_PATCHING] = (
self.patch_audit.subcloud_patch_audit(
keystone_client.session,
keystone_client.keystone_client,
subcloud,
)
)
@ -685,7 +685,7 @@ class SubcloudAuditWorkerManager(manager.Manager):
try:
endpoint_data[dccommon_consts.ENDPOINT_TYPE_PATCHING] = (
self.patch_audit.subcloud_patch_audit(
keystone_client.session,
keystone_client.keystone_client,
subcloud,
)
)
@ -772,7 +772,7 @@ class SubcloudAuditWorkerManager(manager.Manager):
try:
endpoint_data[dccommon_consts.AUDIT_TYPE_SOFTWARE] = (
self.software_audit.subcloud_software_audit(
keystone_client.session,
keystone_client.keystone_client,
subcloud,
software_audit_data,
)

View File

@ -33,11 +33,6 @@ global_opts = [
default=60,
help="Seconds between running periodic reporting tasks.",
),
cfg.IntOpt(
"worker_rlimit_nofile",
default=4096,
help="Maximum number of open files per worker process.",
),
]
# OpenStack credentials used for Endpoint Cache
@ -133,6 +128,11 @@ endpoint_cache_opts = [
"http_connect_timeout",
help="Request timeout value for communicating with Identity API server.",
),
cfg.IntOpt(
"token_cache_size",
default=5000,
help="Maximum number of entries in the in-memory token cache",
),
]
scheduler_opts = [
@ -180,6 +180,26 @@ common_opts = [
"1:enabled via rvmc_debug_level, 2:globally enabled"
),
),
cfg.IntOpt(
"dcmanager_worker_rlimit_nofile",
default=4096,
help="Maximum number of open files per dcmanager_manager worker process.",
),
cfg.IntOpt(
"orchestrator_worker_rlimit_nofile",
default=8192,
help="Maximum number of open files per dcmanager_orchestrator worker process.",
),
cfg.IntOpt(
"audit_worker_rlimit_nofile",
default=4096,
help="Maximum number of open files per dcmanager_audit worker process.",
),
cfg.IntOpt(
"state_worker_rlimit_nofile",
default=4096,
help="Maximum number of open files per dcmanager_state worker process.",
),
]
scheduler_opt_group = cfg.OptGroup(

View File

@ -32,6 +32,7 @@ import uuid
import xml.etree.ElementTree as ElementTree
from keystoneauth1 import exceptions as keystone_exceptions
from keystoneclient.v3.client import Client as KeystoneClient
import netaddr
from oslo_concurrency import lockutils
from oslo_config import cfg
@ -1200,7 +1201,7 @@ def get_system_controller_software_list(
)
return software_client.list()
except requests.exceptions.ConnectionError:
except (requests.exceptions.ConnectionError, keystone_exceptions.ConnectionError):
LOG.exception("Failed to get software list for %s", region_name)
raise
except Exception:
@ -2254,27 +2255,28 @@ def validate_software_strategy(release_id: str):
pecan.abort(400, _(message))
def has_usm_service(subcloud_region, keystone_session=None):
def has_usm_service(
subcloud_region: str, keystone_client: KeystoneClient = None
) -> bool:
# Lookup keystone client session if not specified
if not keystone_session:
if not keystone_client:
try:
os_client = OpenStackDriver(
keystone_client = OpenStackDriver(
region_name=subcloud_region,
region_clients=None,
fetch_subcloud_ips=fetch_subcloud_mgmt_ips,
)
keystone_session = os_client.keystone_client.session
except Exception:
).keystone_client.keystone_client
except Exception as e:
LOG.exception(
f"Failed to get keystone client for subcloud_region: {subcloud_region}"
)
raise exceptions.InternalError()
raise exceptions.InternalError() from e
try:
# Try to get the usm endpoint for the subcloud.
software_v1.SoftwareClient(keystone_session, region=subcloud_region)
# Try to get the USM service for the subcloud.
keystone_client.services.find(name=dccommon_consts.ENDPOINT_NAME_USM)
return True
except keystone_exceptions.EndpointNotFound:
except keystone_exceptions.NotFound:
LOG.warning("USM service not found for subcloud_region: %s", subcloud_region)
return False

View File

@ -100,7 +100,7 @@ class DCManagerService(service.Service):
self.system_peer_manager = SystemPeerManager(self.peer_monitor_manager)
def start(self):
utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile)
utils.set_open_file_limit(cfg.CONF.dcmanager_worker_rlimit_nofile)
self.dcmanager_id = uuidutils.generate_uuid()
self.init_managers()
target = oslo_messaging.Target(

View File

@ -552,7 +552,8 @@ class OrchThread(threading.Thread):
# First check if the strategy has been created.
try:
subcloud_strategy = OrchThread.get_vim_client(region).get_strategy(
vim_client = OrchThread.get_vim_client(region)
subcloud_strategy = vim_client.get_strategy(
strategy_name=self.vim_strategy_name
)
except (keystone_exceptions.EndpointNotFound, IndexError):
@ -586,9 +587,7 @@ class OrchThread(threading.Thread):
# If we are here, we need to delete the strategy
try:
OrchThread.get_vim_client(region).delete_strategy(
strategy_name=self.vim_strategy_name
)
vim_client.delete_strategy(strategy_name=self.vim_strategy_name)
except Exception:
message = "(%s) Vim strategy:(%s) delete failed for region:(%s)" % (
self.update_type,

View File

@ -64,7 +64,7 @@ class DCManagerOrchestratorService(service.Service):
self.sw_update_manager = None
def start(self):
utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile)
utils.set_open_file_limit(cfg.CONF.orchestrator_worker_rlimit_nofile)
self.init_tgm()
self.init_manager()
target = oslo_messaging.Target(

View File

@ -5,6 +5,7 @@
#
import abc
from functools import lru_cache
from typing import Optional
from typing import Type
@ -26,6 +27,11 @@ from dcmanager.db import api as db_api
LOG = logging.getLogger(__name__)
# The cache is scoped to the strategy state object, so we only cache clients
# for the subcloud region. This reduces redundant clients and minimizes
# the number of unnecessary TCP connections.
CLIENT_CACHE_SIZE = 1
class BaseState(object, metaclass=abc.ABCMeta):
@ -164,60 +170,73 @@ class BaseState(object, metaclass=abc.ABCMeta):
return strategy_step.subcloud.name
@staticmethod
def get_keystone_client(region_name=dccommon_consts.DEFAULT_REGION_NAME):
@lru_cache(maxsize=CLIENT_CACHE_SIZE)
def get_keystone_client(region_name: str = dccommon_consts.DEFAULT_REGION_NAME):
"""Construct a (cached) keystone client (and token)"""
try:
os_client = OpenStackDriver(
return OpenStackDriver(
region_name=region_name,
region_clients=None,
fetch_subcloud_ips=utils.fetch_subcloud_mgmt_ips,
)
return os_client.keystone_client
).keystone_client
except Exception:
LOG.warning(
f"Failure initializing KeystoneClient for region: {region_name}"
)
raise
def get_sysinv_client(self, region_name):
"""construct a sysinv client"""
@lru_cache(maxsize=CLIENT_CACHE_SIZE)
def get_sysinv_client(self, region_name: str) -> SysinvClient:
"""Get the Sysinv client for the given region."""
keystone_client = self.get_keystone_client(region_name)
endpoint = keystone_client.endpoint_cache.get_endpoint("sysinv")
return SysinvClient(region_name, keystone_client.session, endpoint=endpoint)
def get_fm_client(self, region_name):
@lru_cache(maxsize=CLIENT_CACHE_SIZE)
def get_fm_client(self, region_name: str) -> FmClient:
"""Get the FM client for the given region."""
keystone_client = self.get_keystone_client(region_name)
endpoint = keystone_client.endpoint_cache.get_endpoint("fm")
return FmClient(region_name, keystone_client.session, endpoint=endpoint)
def get_patching_client(self, region_name=dccommon_consts.DEFAULT_REGION_NAME):
@lru_cache(maxsize=CLIENT_CACHE_SIZE)
def get_patching_client(
self, region_name: str = dccommon_consts.DEFAULT_REGION_NAME
) -> PatchingClient:
"""Get the Patching client for the given region."""
keystone_client = self.get_keystone_client(region_name)
return PatchingClient(region_name, keystone_client.session)
def get_software_client(self, region_name=dccommon_consts.DEFAULT_REGION_NAME):
@lru_cache(maxsize=CLIENT_CACHE_SIZE)
def get_software_client(
self, region_name: str = dccommon_consts.DEFAULT_REGION_NAME
) -> SoftwareClient:
"""Get the Software client for the given region."""
keystone_client = self.get_keystone_client(region_name)
return SoftwareClient(keystone_client.session, region_name)
@lru_cache(maxsize=CLIENT_CACHE_SIZE)
def get_barbican_client(self, region_name: str) -> BarbicanClient:
"""Get the Barbican client for the given region."""
keystone_client = self.get_keystone_client(region_name)
return BarbicanClient(region_name, keystone_client.session)
@lru_cache(maxsize=CLIENT_CACHE_SIZE)
def get_vim_client(self, region_name: str) -> VimClient:
"""Get the Vim client for the given region."""
keystone_client = self.get_keystone_client(region_name)
return VimClient(region_name, keystone_client.session)
@property
def local_sysinv(self):
def local_sysinv(self) -> SysinvClient:
"""Return the local Sysinv client."""
return self.get_sysinv_client(dccommon_consts.DEFAULT_REGION_NAME)
@property
def subcloud_sysinv(self):
def subcloud_sysinv(self) -> SysinvClient:
"""Return the subcloud Sysinv client."""
return self.get_sysinv_client(self.region_name)
def get_barbican_client(self, region_name):
"""construct a barbican client"""
keystone_client = self.get_keystone_client(region_name)
return BarbicanClient(region_name, keystone_client.session)
def get_vim_client(self, region_name):
"""construct a vim client for a region."""
keystone_client = self.get_keystone_client(region_name)
return VimClient(region_name, keystone_client.session)
def add_shared_caches(self, shared_caches):
# Shared caches not required by all states, so instantiate only if necessary
self._shared_caches = shared_caches

View File

@ -78,7 +78,7 @@ class DCManagerStateService(service.Service):
def start(self):
LOG.info("Starting %s", self.__class__.__name__)
utils.set_open_file_limit(cfg.CONF.worker_rlimit_nofile)
utils.set_open_file_limit(cfg.CONF.state_worker_rlimit_nofile)
self._init_managers()
target = oslo_messaging.Target(
version=self.rpc_api_version, server=self.host, topic=self.topic

View File

@ -15,13 +15,13 @@
#
from keystoneauth1 import exceptions as keystone_exceptions
import mock
from dccommon import consts as dccommon_consts
from dcmanager.audit import patch_audit
from dcmanager.audit import subcloud_audit_manager
from dcmanager.tests import base
from dcmanager.tests.unit.common.fake_subcloud import create_fake_subcloud
from dcmanager.tests.unit import fakes
class TestPatchAudit(base.DCManagerTestCase):
@ -39,14 +39,12 @@ class TestPatchAudit(base.DCManagerTestCase):
self.am = subcloud_audit_manager.SubcloudAuditManager()
self.am.patch_audit = self.pm
# Mock KeystoneClient.session
self.keystone_session = fakes.FakeKeystone().session
self.keystone_client = mock.MagicMock()
def test_patch_audit_previous_release_usm_enabled(self):
subcloud = create_fake_subcloud(self.ctx)
self.keystone_session.get_endpoint.return_value = "http://fake_endpoint"
patch_response = self.pm.subcloud_patch_audit(
self.keystone_session,
self.keystone_client,
subcloud,
)
load_response = self.pm.subcloud_load_audit()
@ -54,18 +52,16 @@ class TestPatchAudit(base.DCManagerTestCase):
expected_patch_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE
expected_load_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE
self.assertTrue(self.keystone_session.get_endpoint.called)
self.keystone_client.services.find.assert_called_once()
self.assertEqual(patch_response, expected_patch_response)
self.assertEqual(load_response, expected_load_response)
def test_patch_audit_previous_release(self):
subcloud = create_fake_subcloud(self.ctx)
self.keystone_session.get_endpoint.side_effect = (
keystone_exceptions.EndpointNotFound
)
self.keystone_client.services.find.side_effect = keystone_exceptions.NotFound
patch_response = self.pm.subcloud_patch_audit(
self.keystone_session,
self.keystone_client,
subcloud,
)
load_response = self.pm.subcloud_load_audit()
@ -73,14 +69,14 @@ class TestPatchAudit(base.DCManagerTestCase):
expected_patch_response = dccommon_consts.SYNC_STATUS_OUT_OF_SYNC
expected_load_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE
self.assertTrue(self.keystone_session.get_endpoint.called)
self.keystone_client.services.find.assert_called_once()
self.assertEqual(patch_response, expected_patch_response)
self.assertEqual(load_response, expected_load_response)
def test_patch_audit_current_release(self):
subcloud = create_fake_subcloud(self.ctx, software_version="TEST.SW.VERSION")
patch_response = self.pm.subcloud_patch_audit(
self.keystone_session,
self.keystone_client,
subcloud,
)
load_response = self.pm.subcloud_load_audit()
@ -88,6 +84,6 @@ class TestPatchAudit(base.DCManagerTestCase):
expected_patch_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE
expected_load_response = dccommon_consts.SYNC_STATUS_NOT_AVAILABLE
self.assertFalse(self.keystone_session.get_endpoint.called)
self.assertFalse(self.keystone_client.get_endpoint.called)
self.assertEqual(patch_response, expected_patch_response)
self.assertEqual(load_response, expected_load_response)

View File

@ -5,13 +5,13 @@
#
from keystoneauth1 import exceptions as keystone_exceptions
import mock
from dccommon import consts as dccommon_consts
from dcmanager.audit import software_audit
from dcmanager.audit import subcloud_audit_manager
from dcmanager.tests import base
from dcmanager.tests.unit.common.fake_subcloud import create_fake_subcloud
from dcmanager.tests.unit import fakes
FAKE_REGIONONE_RELEASES = [
{
@ -98,8 +98,7 @@ class TestSoftwareAudit(base.DCManagerTestCase):
self.audit_manager = subcloud_audit_manager.SubcloudAuditManager()
self.audit_manager.software_audit = self.software_audit
# Mock KeystoneClient.session
self.keystone_session = fakes.FakeKeystone().session
self.keystone_client = mock.MagicMock()
# Mock RegionOne SoftwareClient's list method
regionone_software_client = self.mock_software_client.return_value
@ -116,11 +115,9 @@ class TestSoftwareAudit(base.DCManagerTestCase):
def test_software_audit_previous_release_not_usm(self):
software_audit_data = self.get_software_audit_data()
subcloud = create_fake_subcloud(self.ctx)
self.keystone_session.get_endpoint.side_effect = (
keystone_exceptions.EndpointNotFound
)
self.keystone_client.services.find.side_effect = keystone_exceptions.NotFound
software_response = self.software_audit.subcloud_software_audit(
self.keystone_session,
self.keystone_client,
subcloud,
software_audit_data,
)
@ -130,14 +127,13 @@ class TestSoftwareAudit(base.DCManagerTestCase):
def test_software_audit_previous_release_usm(self):
software_audit_data = self.get_software_audit_data()
subcloud = create_fake_subcloud(self.ctx)
self.keystone_session.get_endpoint.return_value = "http://fake_endpoint"
sc_software_client = self.mock_software_client(subcloud.region_name)
sc_software_client.list.return_value = (
FAKE_SUBCLOUD_RELEASES_MISSING_OUT_OF_SYNC
)
software_response = self.software_audit.subcloud_software_audit(
self.keystone_session,
self.keystone_client,
subcloud,
software_audit_data,
)
@ -152,7 +148,7 @@ class TestSoftwareAudit(base.DCManagerTestCase):
sc_software_client = self.mock_software_client(subcloud.region_name)
sc_software_client.list.return_value = FAKE_SUBCLOUD_RELEASES_IN_SYNC
software_response = self.software_audit.subcloud_software_audit(
self.keystone_session,
self.keystone_client,
subcloud,
software_audit_data,
)
@ -169,7 +165,7 @@ class TestSoftwareAudit(base.DCManagerTestCase):
FAKE_SUBCLOUD_RELEASES_MISSING_OUT_OF_SYNC
)
software_response = self.software_audit.subcloud_software_audit(
self.keystone_session,
self.keystone_client,
subcloud,
software_audit_data,
)
@ -184,7 +180,7 @@ class TestSoftwareAudit(base.DCManagerTestCase):
sc_software_client = self.mock_software_client(subcloud.region_name)
sc_software_client.list.return_value = FAKE_SUBCLOUD_RELEASES_EXTRA_OUT_OF_SYNC
software_response = self.software_audit.subcloud_software_audit(
self.keystone_session,
self.keystone_client,
subcloud,
software_audit_data,
)

View File

@ -225,6 +225,11 @@ endpoint_cache_opts = [
"http_connect_timeout",
help="Request timeout value for communicating with Identity API server.",
),
cfg.IntOpt(
"token_cache_size",
default=5000,
help="Maximum number of entries in the in-memory token cache",
),
]
scheduler_opts = [

View File

@ -17,6 +17,7 @@
import itertools
import uuid
from keystoneauth1 import session as ks_session
from oslo_db import exception as oslo_db_exception
from oslo_log import log as logging
@ -236,3 +237,15 @@ def enqueue_work(
# pylint: disable-next=no-member
f"{rsrc.id}/{resource_type}/{source_resource_id}/{operation_type}"
)
def close_session(session: ks_session.Session, operation: str, region_ref: str) -> None:
if session:
try:
LOG.debug("Closing session after %s for %s", operation, region_ref)
session.session.close()
except Exception:
LOG.warning(
f"Failed to close session for {region_ref} after {operation}",
exc_info=True,
)

View File

@ -11,6 +11,7 @@ from dccommon import consts as dccommon_consts
from dcorch.common import consts as dco_consts
from dcorch.common import context
from dcorch.common import exceptions
from dcorch.common import utils
from dcorch.db import api as db_api
from dcorch.engine import scheduler
from dcorch.engine.sync_services.identity import IdentitySyncThread
@ -107,7 +108,8 @@ class GenericSyncWorkerManager(object):
def _sync_subcloud(
self, context, subcloud_name, endpoint_type, management_ip, software_version
):
LOG.info(f"Start to sync subcloud {subcloud_name}/{endpoint_type}.")
subcloud_ref_str = f"{subcloud_name}/{endpoint_type}"
LOG.info(f"Start to sync subcloud {subcloud_ref_str}.")
sync_obj = sync_object_class_map[endpoint_type](
subcloud_name, endpoint_type, management_ip, software_version
)
@ -115,8 +117,10 @@ class GenericSyncWorkerManager(object):
try:
sync_obj.sync()
except Exception:
LOG.exception(f"Sync failed for {subcloud_name}/{endpoint_type}")
LOG.exception(f"Sync failed for {subcloud_ref_str}")
new_state = dco_consts.SYNC_STATUS_FAILED
finally:
utils.close_session(sync_obj.sc_admin_session, "sync", subcloud_ref_str)
db_api.subcloud_sync_update(
context, subcloud_name, endpoint_type, values={"sync_request": new_state}

View File

@ -10,6 +10,7 @@ from oslo_log import log as logging
from dcorch.common import consts
from dcorch.common import context
from dcorch.common import utils
from dcorch.db import api as db_api
from dcorch.engine.fernet_key_manager import FernetKeyManager
from dcorch.engine import scheduler
@ -208,4 +209,9 @@ class InitialSyncWorkerManager(object):
def initial_sync(self, subcloud_name, sync_objs):
LOG.debug(f"Initial sync subcloud {subcloud_name} {self.engine_id}")
for sync_obj in sync_objs.values():
sync_obj.initial_sync()
try:
sync_obj.initial_sync()
finally:
utils.close_session(
sync_obj.sc_admin_session, "initial sync", subcloud_name
)

View File

@ -588,8 +588,7 @@ class SysinvSyncThread(SyncThread):
return None
def post_audit(self):
# TODO(lzhu1): This should be revisited once the master cache service
# is implemented.
super().post_audit()
OpenStackDriver.delete_region_clients_for_thread(self.region_name, "audit")
OpenStackDriver.delete_region_clients_for_thread(
dccommon_consts.CLOUD_0, "audit"
@ -780,7 +779,7 @@ class SysinvSyncThread(SyncThread):
if finding == AUDIT_RESOURCE_MISSING:
# The missing resource should be created by underlying subcloud
# thus action is to update for a 'missing' resource
# should not get here since audit discrepency will handle this
# should not get here since audit discrepancy will handle this
resource_id = self.get_resource_id(resource_type, resource)
self.schedule_work(
self.endpoint_type,

View File

@ -569,7 +569,7 @@ class SyncThread(object):
# If the request was aborted due to an expired certificate,
# update the status to 'out-of-sync' and just return so the
# sync_request is updated to "completed". This way, the sync
# job won't attemp to retry the sync in the next cycle.
# job won't attempt to retry the sync in the next cycle.
if request_aborted:
self.set_sync_status(dccommon_consts.SYNC_STATUS_OUT_OF_SYNC)
LOG.info(
@ -620,7 +620,10 @@ class SyncThread(object):
LOG.debug(
"Engine id={}: sync_audit started".format(engine_id), extra=self.log_extra
)
self.sync_audit(engine_id)
try:
self.sync_audit(engine_id)
finally:
self.post_audit()
def sync_audit(self, engine_id):
LOG.debug(
@ -776,11 +779,12 @@ class SyncThread(object):
"{}: done sync audit".format(threading.currentThread().getName()),
extra=self.log_extra,
)
self.post_audit()
def post_audit(self):
# Some specific SyncThread subclasses may perform post audit actions
pass
utils.close_session(
self.sc_admin_session, "audit", f"{self.subcloud_name}/{self.endpoint_type}"
)
@classmethod
@lockutils.synchronized(AUDIT_LOCK_NAME)

View File

@ -35,7 +35,7 @@ from dcorch.tests import utils
get_engine = api.get_engine
CAPABILITES = {
CAPABILITIES = {
"endpoint_types": [
dccommon_consts.ENDPOINT_TYPE_PLATFORM,
dccommon_consts.ENDPOINT_TYPE_IDENTITY,

View File

@ -58,7 +58,7 @@ class TestGenericSyncWorkerManager(base.OrchestratorTestCase):
def test_create_sync_objects(self):
sync_objs = self.gswm.create_sync_objects(
"subcloud1", base.CAPABILITES, "192.168.1.11", "24.09"
"subcloud1", base.CAPABILITIES, "192.168.1.11", "24.09"
)
# Verify both endpoint types have corresponding sync object

View File

@ -113,7 +113,7 @@ class TestInitialSyncManager(base.OrchestratorTestCase):
management_ip="192.168.1." + str(i),
)
chunks[chunk_num][subcloud.region_name] = (
base.CAPABILITES,
base.CAPABILITIES,
subcloud.management_ip,
subcloud.software_version,
False,

View File

@ -16,6 +16,10 @@ from dcorch.tests import utils
class FakeSyncObject(object):
def __init__(self):
# sc_admin_session is used when attempting to close the session
self.sc_admin_session = mock.MagicMock()
def initial_sync(self):
pass
@ -136,7 +140,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
self.iswm._initial_sync_subcloud(
self.ctx,
subcloud.region_name,
base.CAPABILITES,
base.CAPABILITIES,
subcloud.management_ip,
subcloud.software_version,
False,
@ -168,7 +172,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
self.iswm._initial_sync_subcloud(
self.ctx,
subcloud.region_name,
base.CAPABILITES,
base.CAPABILITIES,
subcloud.management_ip,
subcloud.software_version,
False,
@ -198,7 +202,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
self.iswm._initial_sync_subcloud(
self.ctx,
subcloud.region_name,
base.CAPABILITES,
base.CAPABILITIES,
subcloud.management_ip,
subcloud.software_version,
True,
@ -233,7 +237,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
self.iswm._initial_sync_subcloud(
self.ctx,
subcloud.region_name,
base.CAPABILITES,
base.CAPABILITIES,
subcloud.management_ip,
subcloud.software_version,
False,
@ -294,13 +298,13 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
)
subcloud_capabilities = {
subcloud1.region_name: (
base.CAPABILITES,
base.CAPABILITIES,
subcloud1.management_ip,
subcloud1.software_version,
False,
),
subcloud2.region_name: (
base.CAPABILITES,
base.CAPABILITIES,
subcloud2.management_ip,
subcloud2.software_version,
False,
@ -314,7 +318,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
self.iswm._initial_sync_subcloud,
mock.ANY,
subcloud1.region_name,
base.CAPABILITES,
base.CAPABILITIES,
subcloud1.management_ip,
subcloud1.software_version,
False,
@ -323,7 +327,7 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
self.iswm._initial_sync_subcloud,
mock.ANY,
subcloud2.region_name,
base.CAPABILITES,
base.CAPABILITIES,
subcloud2.management_ip,
subcloud2.software_version,
False,

View File

@ -106,7 +106,7 @@ def create_subcloud_static(ctxt, name, **kwargs):
"management_state": dccommon_consts.MANAGEMENT_MANAGED,
"availability_status": dccommon_consts.AVAILABILITY_ONLINE,
"initial_sync_state": "",
"capabilities": base.CAPABILITES,
"capabilities": base.CAPABILITIES,
"management_ip": "192.168.0.1",
}
values.update(kwargs)