Cache RegionOne k8s versions during DC orchestrator k8s upgrade
The RegionOne Kubernetes version was being requested two times per subcloud during a kubernetes upgrade orchestration, one time during the pre-check state and another during the create VIM state. This can cause issues when running the orchestration with a large number of parallel subclouds. This commit moves the cache implementation [1] to the orchestrator folder, and then use it for the Kubernetes orchestrator to cache the RegionOne k8s versions. Test Plan: 1. PASS - Run the Kubernetes upgrade strategy and verify that it completes successfully and that the RegionOne k8s versions sysinv query is called a single time during the strategy apply. Story: 2011106 Task: 50491 [1]: https://review.opendev.org/c/starlingx/distcloud/+/841093 Change-Id: I56c353977ef5476942d9e6090b29fbbe4330c553 Signed-off-by: Gustavo Herzmann <gustavo.herzmann@windriver.com>
This commit is contained in:
0
distributedcloud/dcmanager/orchestrator/cache/__init__.py
vendored
Normal file
0
distributedcloud/dcmanager/orchestrator/cache/__init__.py
vendored
Normal file
67
distributedcloud/dcmanager/orchestrator/cache/cache_specifications.py
vendored
Normal file
67
distributedcloud/dcmanager/orchestrator/cache/cache_specifications.py
vendored
Normal file
@@ -0,0 +1,67 @@
|
||||
#
|
||||
# Copyright (c) 2024 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
from dataclasses import dataclass
|
||||
import typing
|
||||
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.orchestrator.cache import clients
|
||||
|
||||
|
||||
@dataclass
|
||||
class CacheSpecification(object):
|
||||
"""A cache specification.
|
||||
|
||||
:param fetch_implementation: implementation on how to retrieve data from
|
||||
client
|
||||
:type fetch_implementation: function
|
||||
:param post_filter_implementation: implementation on how to post-filter
|
||||
cached data, if any
|
||||
:type post_filter_implementation: function
|
||||
:param valid_filters: valid post-filter parameters
|
||||
:type valid_filters: set
|
||||
:param retry_on_exception: exceptions to be retried on client read
|
||||
:type retry_on_exception: type|tuple
|
||||
:param max_attempts: Maximum number of client read attempts if retryable
|
||||
exceptions occur
|
||||
:param retry_sleep_msecs: Fixed backoff interval
|
||||
"""
|
||||
|
||||
fetch_implementation: typing.Callable
|
||||
post_filter_implementation: typing.Optional[typing.Callable] = None
|
||||
valid_filters: typing.Set = frozenset()
|
||||
retry_on_exception: typing.Tuple[typing.Type[Exception], ...] = (
|
||||
clients.CLIENT_READ_EXCEPTIONS
|
||||
)
|
||||
max_attempts: int = clients.CLIENT_READ_MAX_ATTEMPTS
|
||||
retry_sleep_msecs: int = consts.PLATFORM_RETRY_SLEEP_MILLIS
|
||||
|
||||
|
||||
# Cache types
|
||||
REGION_ONE_LICENSE_CACHE_TYPE = "RegionOne system license"
|
||||
REGION_ONE_SYSTEM_INFO_CACHE_TYPE = "RegionOne system info"
|
||||
REGION_ONE_RELEASE_USM_CACHE_TYPE = "RegionOne release usm"
|
||||
REGION_ONE_KUBERNETES_CACHE_TYPE = "RegionOne kubernetes version"
|
||||
|
||||
# Cache specifications
|
||||
REGION_ONE_KUBERNETES_CACHE_SPECIFICATION = CacheSpecification(
|
||||
lambda: clients.get_sysinv_client().get_kube_versions()
|
||||
)
|
||||
|
||||
# Map each expected operation type to its required cache types
|
||||
CACHE_TYPES_BY_OPERATION_TYPE = {
|
||||
consts.SW_UPDATE_TYPE_KUBERNETES: {
|
||||
REGION_ONE_KUBERNETES_CACHE_TYPE: REGION_ONE_KUBERNETES_CACHE_SPECIFICATION
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def get_specifications_for_operation(operation_type: str):
|
||||
"""Retrieve all cache specifications required by an operation type
|
||||
|
||||
:param str operation_type: The software update strategy type
|
||||
:return dict: A mapping between each cache type to its specification
|
||||
"""
|
||||
return CACHE_TYPES_BY_OPERATION_TYPE.get(operation_type, {})
|
58
distributedcloud/dcmanager/orchestrator/cache/clients.py
vendored
Normal file
58
distributedcloud/dcmanager/orchestrator/cache/clients.py
vendored
Normal file
@@ -0,0 +1,58 @@
|
||||
#
|
||||
# Copyright (c) 2024 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
import socket
|
||||
|
||||
from keystoneauth1 import exceptions as keystone_exceptions
|
||||
from oslo_log import log as logging
|
||||
|
||||
from dccommon import consts as dccommon_consts
|
||||
from dccommon.drivers.openstack.sdk_platform import OpenStackDriver
|
||||
from dccommon.drivers.openstack.software_v1 import SoftwareClient
|
||||
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
|
||||
from dcmanager.common import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# Default timeout configurations for client reads
|
||||
CLIENT_READ_TIMEOUT_SECONDS = 60
|
||||
CLIENT_READ_EXCEPTIONS = (socket.timeout, keystone_exceptions.ServiceUnavailable)
|
||||
CLIENT_READ_MAX_ATTEMPTS = 2
|
||||
|
||||
# Helper functions to retrieve clients for caching
|
||||
|
||||
|
||||
def get_sysinv_client():
|
||||
ks_client = get_keystone_client()
|
||||
return SysinvClient(
|
||||
dccommon_consts.DEFAULT_REGION_NAME,
|
||||
ks_client.session,
|
||||
endpoint=ks_client.endpoint_cache.get_endpoint("sysinv"),
|
||||
timeout=CLIENT_READ_TIMEOUT_SECONDS,
|
||||
)
|
||||
|
||||
|
||||
def get_software_client():
|
||||
ks_client = get_keystone_client()
|
||||
return SoftwareClient(
|
||||
dccommon_consts.DEFAULT_REGION_NAME,
|
||||
ks_client.session,
|
||||
endpoint=ks_client.endpoint_cache.get_endpoint("usm"),
|
||||
)
|
||||
|
||||
|
||||
def get_keystone_client(region_name=dccommon_consts.DEFAULT_REGION_NAME):
|
||||
"""Construct a (cached) keystone client (and token)"""
|
||||
|
||||
try:
|
||||
os_client = OpenStackDriver(
|
||||
region_name=region_name,
|
||||
region_clients=None,
|
||||
fetch_subcloud_ips=utils.fetch_subcloud_mgmt_ips,
|
||||
)
|
||||
return os_client.keystone_client
|
||||
except Exception:
|
||||
LOG.warning("Failure initializing KeystoneClient for region: %s" % region_name)
|
||||
raise
|
40
distributedcloud/dcmanager/orchestrator/cache/shared_cache_repository.py
vendored
Normal file
40
distributedcloud/dcmanager/orchestrator/cache/shared_cache_repository.py
vendored
Normal file
@@ -0,0 +1,40 @@
|
||||
#
|
||||
# Copyright (c) 2024 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from dcmanager.common.exceptions import InvalidParameterValue
|
||||
from dcmanager.orchestrator.cache import cache_specifications
|
||||
from dcmanager.orchestrator.cache.shared_client_cache import SharedClientCache
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class SharedCacheRepository(object):
|
||||
|
||||
def __init__(self, operation_type):
|
||||
self._shared_caches = {}
|
||||
self._operation_type = operation_type
|
||||
|
||||
def initialize_caches(self):
|
||||
# Retrieve specifications for each cache type required by the operation
|
||||
operation_specifications = (
|
||||
cache_specifications.get_specifications_for_operation(self._operation_type)
|
||||
)
|
||||
|
||||
# Create shared caches mapping
|
||||
self._shared_caches = {
|
||||
cache_type: SharedClientCache(cache_type, cache_specification)
|
||||
for cache_type, cache_specification in operation_specifications.items()
|
||||
}
|
||||
|
||||
def read(self, cache_type: str, **filter_params):
|
||||
cache = self._shared_caches.get(cache_type)
|
||||
if cache:
|
||||
return cache.read(**filter_params)
|
||||
raise InvalidParameterValue(
|
||||
err=f"Specified cache type '{cache_type}' not present"
|
||||
)
|
142
distributedcloud/dcmanager/orchestrator/cache/shared_client_cache.py
vendored
Normal file
142
distributedcloud/dcmanager/orchestrator/cache/shared_client_cache.py
vendored
Normal file
@@ -0,0 +1,142 @@
|
||||
#
|
||||
# Copyright (c) 2024 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log
|
||||
import retrying
|
||||
|
||||
from dcmanager.common.exceptions import InvalidParameterValue
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class SharedClientCache(object):
|
||||
"""Data cache for sharing client or API data between concurrent threads
|
||||
|
||||
Used to avoid repeated requests and prevent client overload.
|
||||
|
||||
Cache is not self refreshing. User of the cache is responsible for triggering
|
||||
the refresh.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, cache_type, cache_specification):
|
||||
"""Create cache instance.
|
||||
|
||||
:param cache_type: type of data being cached, for logging
|
||||
:type cache_type: str
|
||||
:param cache_specification: specifications on how the cache should
|
||||
operate
|
||||
:type cache_specification: dcmanager.orchestrator.states.software.cache
|
||||
.cache_specifications.CacheSpecification
|
||||
"""
|
||||
self._client_lock = lockutils.ReaderWriterLock()
|
||||
self._cache = None
|
||||
|
||||
# Cache configurations
|
||||
self._cache_type = cache_type
|
||||
self._valid_filters = cache_specification.valid_filters
|
||||
|
||||
# Retry configurations
|
||||
self._max_attempts = cache_specification.max_attempts
|
||||
self._retry_sleep_msecs = cache_specification.retry_sleep_msecs
|
||||
|
||||
# Add retry to client read if any retryable exception is provided
|
||||
self._load_data_from_client = cache_specification.fetch_implementation
|
||||
retry_on_exception = cache_specification.retry_on_exception
|
||||
if retry_on_exception:
|
||||
retry = retrying.retry(
|
||||
retry_on_exception=lambda ex: isinstance(ex, retry_on_exception),
|
||||
stop_max_attempt_number=self._max_attempts,
|
||||
wait_fixed=self._retry_sleep_msecs,
|
||||
wait_func=self._retry_client_read,
|
||||
)
|
||||
self._load_data_from_client = retry(
|
||||
cache_specification.fetch_implementation
|
||||
)
|
||||
|
||||
# Use default implementation with no filtering if none is provided
|
||||
self._post_filter_impl = cache_specification.post_filter_implementation or (
|
||||
lambda data, **filter_params: data
|
||||
)
|
||||
|
||||
def read(self, **filter_params):
|
||||
"""Retrieve data from cache, if available.
|
||||
|
||||
Read from client and (re)populate cache, if not.
|
||||
|
||||
Only one thread may access the client at a time to prevent overload.
|
||||
|
||||
Concurrent reads are blocked until client read completes/fails. A recheck
|
||||
for updates on the cache is performed afterwards.
|
||||
|
||||
Post-filtering can be applied to the results before returning. Data saved to
|
||||
the cache will not include any filtering applied to returned data.
|
||||
|
||||
:param filter_params: parameters to be used for post-filtering
|
||||
:type filter_params: string
|
||||
:return: Cached data, filtered according to parameters given
|
||||
:raises RuntimeError: If cache read fails due to concurrent client read error
|
||||
:raises InvalidParameterError: If invalid filter parameters are given
|
||||
"""
|
||||
# Use data stored in the cache, if present. Otherwise, read and cache
|
||||
# data from client
|
||||
if self._cache is None:
|
||||
self._cache_data_from_client()
|
||||
|
||||
# Filter cached data and return results
|
||||
return self._post_filter(self._cache, **filter_params)
|
||||
|
||||
def _cache_data_from_client(self):
|
||||
# Read from the client and update cache if no concurrent write is in progress
|
||||
if self._client_lock.owner != lockutils.ReaderWriterLock.WRITER:
|
||||
with self._client_lock.write_lock():
|
||||
# Atomically fetch data from client and update the cache
|
||||
LOG.info(f"Reading data from {self._cache_type} client for caching")
|
||||
self._cache = self._load_data_from_client()
|
||||
else:
|
||||
# If a concurrent write is in progress, wait for it and recheck cache
|
||||
with self._client_lock.read_lock():
|
||||
if self._cache is None:
|
||||
raise RuntimeError(
|
||||
f"Failed to retrieve data from {self._cache_type} cache. "
|
||||
"Possible failure on concurrent client read."
|
||||
)
|
||||
|
||||
def _retry_client_read(self, attempt: int, _: int):
|
||||
"""Determines the retry interval
|
||||
|
||||
This function should return the interval between retries in milliseconds,
|
||||
The retrying module calls it passing the attempt number and the delay
|
||||
since the first attempt in milliseconds.
|
||||
|
||||
It is used here as a way to add a log message during retries
|
||||
|
||||
:param int attempt: Number of attempts
|
||||
:param int _: Delay since the first attempt in milliseconds (not used)
|
||||
:return int: Expected delay between attempts in milliseconds
|
||||
"""
|
||||
# To be called when a client read operation fails with a retryable error
|
||||
# After this, read operation should be retried
|
||||
LOG.warn(
|
||||
f"Retryable error occurred while reading from {self._cache_type} client "
|
||||
f"(Attempt {attempt}/{self._max_attempts})"
|
||||
)
|
||||
return self._retry_sleep_msecs
|
||||
|
||||
def _post_filter(self, data, **filter_params):
|
||||
# Validate the parameters and apply specified filter implementation
|
||||
self._validate_filter_params(**filter_params)
|
||||
return self._post_filter_impl(data, **filter_params)
|
||||
|
||||
def _validate_filter_params(self, **filter_params):
|
||||
# Compare each passed parameter against the specified valid parameters
|
||||
# Raise an exception if any unexpected parameter is found
|
||||
if filter_params:
|
||||
invalid_params = set(filter_params.keys()) - self._valid_filters
|
||||
if invalid_params:
|
||||
raise InvalidParameterValue(
|
||||
err=f"Invalid filter parameters: {invalid_params}"
|
||||
)
|
@@ -1,5 +1,5 @@
|
||||
# Copyright 2017 Ericsson AB.
|
||||
# Copyright (c) 2017-2021 Wind River Systems, Inc.
|
||||
# Copyright (c) 2017-2021, 2024 Wind River Systems, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -16,6 +16,8 @@
|
||||
#
|
||||
from dccommon.drivers.openstack import vim
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.orchestrator.cache.shared_cache_repository import \
|
||||
SharedCacheRepository
|
||||
from dcmanager.orchestrator.orch_thread import OrchThread
|
||||
from dcmanager.orchestrator.states.kube.applying_vim_kube_upgrade_strategy \
|
||||
import ApplyingVIMKubeUpgradeStrategyState
|
||||
@@ -46,6 +48,22 @@ class KubeUpgradeOrchThread(OrchThread):
|
||||
vim.STRATEGY_NAME_KUBE_UPGRADE,
|
||||
consts.STRATEGY_STATE_KUBE_UPGRADE_PRE_CHECK)
|
||||
|
||||
# Initialize shared cache instances for the states that require them
|
||||
self._shared_caches = SharedCacheRepository(self.update_type)
|
||||
self._shared_caches.initialize_caches()
|
||||
|
||||
def trigger_audit(self):
|
||||
"""Trigger an audit for kubernetes"""
|
||||
self.audit_rpc_client.trigger_kubernetes_audit(self.context)
|
||||
|
||||
def pre_apply_setup(self):
|
||||
# Restart caches for next strategy so that we always have the
|
||||
# latest RegionOne data at the moment the strategy is applied
|
||||
self._shared_caches.initialize_caches()
|
||||
super().pre_apply_setup()
|
||||
|
||||
def determine_state_operator(self, strategy_step):
|
||||
state = super().determine_state_operator(strategy_step)
|
||||
# Share the cache with the state object
|
||||
state.add_shared_caches(self._shared_caches)
|
||||
return state
|
||||
|
@@ -4,10 +4,11 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
from dccommon.consts import DEFAULT_REGION_NAME
|
||||
from dccommon.drivers.openstack import vim
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.common import utils as dcmanager_utils
|
||||
from dcmanager.orchestrator.cache.cache_specifications import \
|
||||
REGION_ONE_KUBERNETES_CACHE_TYPE
|
||||
from dcmanager.orchestrator.states.creating_vim_strategy \
|
||||
import CreatingVIMStrategyState
|
||||
|
||||
@@ -47,11 +48,10 @@ class CreatingVIMKubeUpgradeStrategyState(CreatingVIMStrategyState):
|
||||
extra_args = {}
|
||||
to_version = extra_args.get('to-version', None)
|
||||
if to_version is None:
|
||||
sys_kube_versions = \
|
||||
self.get_sysinv_client(DEFAULT_REGION_NAME).get_kube_versions()
|
||||
to_version = dcmanager_utils.get_active_kube_version(
|
||||
sys_kube_versions
|
||||
sys_kube_versions = self._read_from_cache(
|
||||
REGION_ONE_KUBERNETES_CACHE_TYPE
|
||||
)
|
||||
to_version = dcmanager_utils.get_active_kube_version(sys_kube_versions)
|
||||
if to_version is None:
|
||||
# No active target kube version on the system controller means
|
||||
# the system controller is part-way through a kube upgrade
|
||||
|
@@ -6,13 +6,14 @@
|
||||
|
||||
import re
|
||||
|
||||
from dccommon.consts import DEFAULT_REGION_NAME
|
||||
from dcmanager.common.consts import ERROR_DESC_CMD
|
||||
from dcmanager.common.consts import STRATEGY_STATE_COMPLETE
|
||||
from dcmanager.common.consts \
|
||||
import STRATEGY_STATE_KUBE_CREATING_VIM_KUBE_UPGRADE_STRATEGY
|
||||
from dcmanager.common import utils
|
||||
from dcmanager.db import api as db_api
|
||||
from dcmanager.orchestrator.cache.cache_specifications import \
|
||||
REGION_ONE_KUBERNETES_CACHE_TYPE
|
||||
from dcmanager.orchestrator.states.base import BaseState
|
||||
|
||||
# These following alarms can occur during a vim orchestrated k8s upgrade on the
|
||||
@@ -86,15 +87,14 @@ class KubeUpgradePreCheckState(BaseState):
|
||||
|
||||
# check extra_args for the strategy
|
||||
# if there is a to-version, use that when checking against the subcloud
|
||||
# target version, otherwise compare to the sytem controller version
|
||||
# target version, otherwise compare to the system controller version
|
||||
# to determine if this subcloud is permitted to upgrade.
|
||||
extra_args = utils.get_sw_update_strategy_extra_args(self.context)
|
||||
if extra_args is None:
|
||||
extra_args = {}
|
||||
to_version = extra_args.get('to-version', None)
|
||||
if to_version is None:
|
||||
sys_kube_versions = \
|
||||
self.get_sysinv_client(DEFAULT_REGION_NAME).get_kube_versions()
|
||||
sys_kube_versions = self._read_from_cache(REGION_ONE_KUBERNETES_CACHE_TYPE)
|
||||
to_version = utils.get_active_kube_version(sys_kube_versions)
|
||||
if to_version is None:
|
||||
# No active target kube version on the system controller means
|
||||
@@ -127,7 +127,7 @@ class KubeUpgradePreCheckState(BaseState):
|
||||
|
||||
# For the to-version, the code currently allows a partial version
|
||||
# ie: v1.20 or a version that is much higher than is installed.
|
||||
# This allows flexability when passing in a to-version.
|
||||
# This allows flexibility when passing in a to-version.
|
||||
|
||||
# The 'to-version' is the desired version to upgrade the subcloud.
|
||||
# The 'target_version' is what the subcloud is allowed to upgrade to.
|
||||
|
@@ -8,6 +8,7 @@ import mock
|
||||
|
||||
from dccommon.drivers.openstack import vim
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.orchestrator.states.base import BaseState
|
||||
from dcmanager.tests.unit.common import fake_strategy
|
||||
from dcmanager.tests.unit.fakes import FakeVimStrategy
|
||||
from dcmanager.tests.unit.orchestrator.states.fakes import FakeKubeUpgrade
|
||||
@@ -102,6 +103,18 @@ class TestCreatingVIMKubeUpgradeStrategyStage(
|
||||
state='available'),
|
||||
]
|
||||
|
||||
self._mock_read_from_cache(BaseState)
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=PREVIOUS_KUBE_VERSION,
|
||||
target=True,
|
||||
state='active'),
|
||||
FakeKubeVersion(obj_id=2,
|
||||
version=UPGRADED_KUBE_VERSION,
|
||||
target=False,
|
||||
state='available'),
|
||||
]
|
||||
|
||||
def mock_and_assert_step_update(
|
||||
self, is_upgrade=False, kube_version=None, kube_version_list=None
|
||||
):
|
||||
@@ -115,6 +128,7 @@ class TestCreatingVIMKubeUpgradeStrategyStage(
|
||||
STRATEGY_DONE_BUILDING,
|
||||
]
|
||||
self.sysinv_client.get_kube_versions.return_value = KUBE_VERSION_LIST
|
||||
self.mock_read_from_cache.return_value = KUBE_VERSION_LIST
|
||||
|
||||
if is_upgrade:
|
||||
self.sysinv_client.get_kube_upgrades.return_value = kube_version_list
|
||||
@@ -128,8 +142,10 @@ class TestCreatingVIMKubeUpgradeStrategyStage(
|
||||
extra_args=extra_args)
|
||||
else:
|
||||
kube_version = kube_version_list[0].version
|
||||
self.sysinv_client.get_kube_versions.side_effect = \
|
||||
[kube_version_list, KUBE_VERSION_LIST]
|
||||
# Subcloud query
|
||||
self.sysinv_client.get_kube_versions.return_value = KUBE_VERSION_LIST
|
||||
# System controller query
|
||||
self.mock_read_from_cache.return_value = kube_version_list
|
||||
|
||||
# API calls acts as expected
|
||||
self.vim_client.create_strategy.return_value = STRATEGY_BUILDING
|
||||
@@ -183,6 +199,8 @@ class TestCreatingVIMKubeUpgradeStrategyStage(
|
||||
|
||||
self.sysinv_client.get_kube_versions.return_value = \
|
||||
KUBE_VERSION_LIST_WITHOUT_ACTIVE
|
||||
self.mock_read_from_cache.return_value = \
|
||||
KUBE_VERSION_LIST_WITHOUT_ACTIVE
|
||||
|
||||
self.worker.perform_state_action(self.strategy_step)
|
||||
|
||||
|
@@ -13,6 +13,7 @@ from dcmanager.common.consts \
|
||||
import STRATEGY_STATE_KUBE_CREATING_VIM_KUBE_UPGRADE_STRATEGY
|
||||
from dcmanager.common.consts import STRATEGY_STATE_KUBE_UPGRADE_PRE_CHECK
|
||||
from dcmanager.db.sqlalchemy import api as db_api
|
||||
from dcmanager.orchestrator.states.base import BaseState
|
||||
from dcmanager.tests.unit.common import fake_strategy
|
||||
from dcmanager.tests.unit.orchestrator.states.fakes import FakeAlarm
|
||||
from dcmanager.tests.unit.orchestrator.states.fakes import FakeKubeUpgrade
|
||||
@@ -112,6 +113,18 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
# mock the get_kube_versions calls
|
||||
self.sysinv_client.get_kube_versions = mock.MagicMock()
|
||||
self.sysinv_client.get_kube_versions.return_value = []
|
||||
# mock the cached get_kube_versions calls
|
||||
self._mock_read_from_cache(BaseState)
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=PREVIOUS_KUBE_VERSION,
|
||||
target=True,
|
||||
state='active'),
|
||||
FakeKubeVersion(obj_id=2,
|
||||
version=UPGRADED_KUBE_VERSION,
|
||||
target=False,
|
||||
state='available'),
|
||||
]
|
||||
|
||||
def test_pre_check_subcloud_existing_upgrade(self):
|
||||
"""Test pre check step where the subcloud has a kube upgrade
|
||||
@@ -127,7 +140,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
deploy_status=DEPLOY_STATE_DONE)
|
||||
self.sysinv_client.get_kube_upgrades.return_value = [FakeKubeUpgrade()]
|
||||
# get kube versions invoked only for the system controller
|
||||
self.sysinv_client.get_kube_versions.return_value = [
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=UPGRADED_KUBE_VERSION,
|
||||
target=True,
|
||||
@@ -138,7 +151,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
self.worker.perform_state_action(self.strategy_step)
|
||||
|
||||
# Verify the single query (for the system controller)
|
||||
self.sysinv_client.get_kube_versions.assert_called_once()
|
||||
self.mock_read_from_cache.assert_called_once()
|
||||
|
||||
# Verify the transition to the expected next state
|
||||
self.assert_step_updated(self.strategy_step.subcloud_id, next_state)
|
||||
@@ -156,7 +169,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
)
|
||||
self.sysinv_client.get_kube_upgrades.return_value = [FakeKubeUpgrade()]
|
||||
|
||||
self.sysinv_client.get_kube_versions.return_value = [
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(
|
||||
obj_id=1, version=UPGRADED_KUBE_VERSION, target=True, state='active'
|
||||
)
|
||||
@@ -164,7 +177,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
|
||||
self.worker.perform_state_action(self.strategy_step)
|
||||
|
||||
self.sysinv_client.get_kube_versions.assert_called_once()
|
||||
self.mock_read_from_cache.assert_called_once()
|
||||
|
||||
self.assert_step_updated(self.strategy_step.subcloud_id, next_state)
|
||||
|
||||
@@ -180,7 +193,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
self.sysinv_client.get_kube_upgrade_health.return_value = (
|
||||
KUBERNETES_UPGRADE_HEALTH_RESPONSE_MGMT_AFFECTING_ALARM)
|
||||
self.sysinv_client.get_kube_upgrades.return_value = [FakeKubeUpgrade()]
|
||||
self.sysinv_client.get_kube_versions.return_value = [
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=UPGRADED_KUBE_VERSION,
|
||||
target=True,
|
||||
@@ -227,7 +240,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
self.sysinv_client.get_kube_upgrade_health.return_value = (
|
||||
KUBERNETES_UPGRADE_HEALTH_RESPONSE_MGMT_AFFECTING_ALARM)
|
||||
self.sysinv_client.get_kube_upgrades.return_value = [FakeKubeUpgrade()]
|
||||
self.sysinv_client.get_kube_versions.return_value = [
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=UPGRADED_KUBE_VERSION,
|
||||
target=True,
|
||||
@@ -249,7 +262,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
self.sysinv_client.get_kube_upgrade_health.return_value = (
|
||||
KUBERNETES_UPGRADE_HEALTH_RESPONSE_NON_MGMT_AFFECTING_ALARM)
|
||||
self.sysinv_client.get_kube_upgrades.return_value = [FakeKubeUpgrade()]
|
||||
self.sysinv_client.get_kube_versions.return_value = [
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=UPGRADED_KUBE_VERSION,
|
||||
target=True,
|
||||
@@ -281,7 +294,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
# Query system controller kube versions
|
||||
# override the first get, so that there is no active release
|
||||
# 'partial' indicates the system controller is still upgrading
|
||||
self.sysinv_client.get_kube_versions.return_value = [
|
||||
self.mock_read_from_cache.return_value = [
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=PREVIOUS_KUBE_VERSION,
|
||||
target=True,
|
||||
@@ -319,7 +332,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
|
||||
# No extra args / to-version in the database
|
||||
# Query system controller kube versions
|
||||
self.sysinv_client.get_kube_versions.side_effect = [
|
||||
self.mock_read_from_cache.side_effect = [
|
||||
[ # first list: (system controller) has an active release
|
||||
FakeKubeVersion(obj_id=1,
|
||||
version=PREVIOUS_KUBE_VERSION,
|
||||
@@ -347,8 +360,8 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
# invoke the strategy state operation on the orch thread
|
||||
self.worker.perform_state_action(self.strategy_step)
|
||||
|
||||
# get_kube_versions gets called (more than once)
|
||||
self.sysinv_client.get_kube_versions.assert_called()
|
||||
# cached get_kube_versions gets called (more than once)
|
||||
self.mock_read_from_cache.assert_called()
|
||||
|
||||
# Verify the expected next state happened
|
||||
self.assert_step_updated(self.strategy_step.subcloud_id, next_state)
|
||||
@@ -388,7 +401,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
# Do not need to mock query kube versions since extra args will be
|
||||
# queried to get the info for the system controller
|
||||
# and pre-existing upgrade is used for subcloud
|
||||
self.sysinv_client.get_kube_versions.assert_not_called()
|
||||
self.mock_read_from_cache.assert_not_called()
|
||||
|
||||
# Verify the transition to the expected next state
|
||||
self.assert_step_updated(self.strategy_step.subcloud_id, next_state)
|
||||
@@ -426,7 +439,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
# Do not need to mock query kube versions since extra args will be
|
||||
# queried to get the info for the system controller
|
||||
# and pre-existing upgrade is used for subcloud
|
||||
self.sysinv_client.get_kube_versions.assert_not_called()
|
||||
self.mock_read_from_cache.assert_not_called()
|
||||
|
||||
# Verify the transition to the expected next state
|
||||
self.assert_step_updated(self.strategy_step.subcloud_id, next_state)
|
||||
@@ -483,7 +496,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
# Do not need to mock query kube versions since extra args will be
|
||||
# queried to get the info for the system controller
|
||||
# and pre-existing upgrade is used for subcloud
|
||||
self.sysinv_client.get_kube_versions.assert_not_called()
|
||||
self.mock_read_from_cache.assert_not_called()
|
||||
|
||||
# Verify the transition to the expected next state
|
||||
self.assert_step_updated(self.strategy_step.subcloud_id, next_state)
|
||||
@@ -518,7 +531,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
deploy_status=DEPLOY_STATE_DONE)
|
||||
|
||||
# Setup a fake kube upgrade in progress
|
||||
self.sysinv_client.get_kube_versions.return_value = KUBE_VERSION_LIST
|
||||
self.mock_read_from_cache.return_value = KUBE_VERSION_LIST
|
||||
|
||||
# Setup a fake kube upgrade strategy with the to-version specified
|
||||
extra_args = {"to-version": "v1.2.4"}
|
||||
@@ -543,7 +556,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
deploy_status=DEPLOY_STATE_DONE)
|
||||
|
||||
# Setup a fake kube upgrade in progress
|
||||
self.sysinv_client.get_kube_versions.return_value = []
|
||||
self.mock_read_from_cache.return_value = []
|
||||
|
||||
# Setup a fake kube upgrade strategy with the to-version specified
|
||||
extra_args = {"to-version": "v1.2.4"}
|
||||
@@ -569,6 +582,7 @@ class TestKubeUpgradePreCheckStage(TestKubeUpgradeState):
|
||||
|
||||
# Setup a fake kube upgrade in progress
|
||||
self.sysinv_client.get_kube_versions.return_value = KUBE_VERSION_LIST_2
|
||||
self.mock_read_from_cache.return_value = KUBE_VERSION_LIST_2
|
||||
|
||||
# Setup a fake kube upgrade strategy with the to-version specified
|
||||
extra_args = {"to-version": "v1.2.6"}
|
||||
|
Reference in New Issue
Block a user