Files
update/software/scripts/deploy-precheck
Bin Qian ebe177d918 USM deploy state
This change introduced state machines for release state, deploy state
and deploy host state.

This change removed the direct reference to the software metadata from
software-controller and other modules. Replaced with encapuslated
release_data module.

Also include changes:
1. removed required parameter for software deploy activate and software
deploy complete RestAPI.
2. ensure reload metadata for each request
3. added feed_repo and commit-id to the deploy entity, to be
   subsequently passed to deploy operations.
4. fix issues

TCs:
    passed: software upload major and patching releases
    passed: software deploy start major and patching releases
    passed: software deploy host (mock) major and patching release
    passed: software activate major and patching release
    passed: software complete major release and patching release
    passed: redeploy host after host deploy failed both major and
patching release

Story: 2010676
Task: 49849

Change-Id: I4b1894560eccb8ef4f613633a73bf3887b2b93fb
Signed-off-by: Bin Qian <bin.qian@windriver.com>
2024-04-17 16:40:07 +00:00

340 lines
13 KiB
Python

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2023-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Run platform upgrade deploy precheck as a standalone executable
"""
import argparse
import os
import re
import requests
import subprocess
import sys
import tempfile
from lxml import etree as ElementTree
import upgrade_utils
# TODO(heitormatsui) keep updated for every release
SUPPORTED_K8S_VERSIONS = [
"v1.24.4",
"v1.25.3",
"v1.26.1",
"v1.27.5",
"v1.28.4",
]
RC_SUCCESS = 0
RC_UNHEALTHY = 3
class HealthCheck(object):
"""This class represents a general health check object
that uses sysinv-client to run system health checks"""
SUCCESS_MSG = 'OK'
FAIL_MSG = 'Fail'
def __init__(self, config):
self._config = config
# get target release from script directory location
self._target_release = re.match("^.*/rel-(\d\d.\d\d.\d+)/", __file__).group(1)
self._major_release = self._target_release.rsplit(".", 1)[0]
# get sysinv token, endpoint and client
self._sysinv_token, self._sysinv_endpoint = \
upgrade_utils.get_token_endpoint(config, service_type="platform")
self._sysinv_client = upgrade_utils.get_sysinv_client(self._sysinv_token,
self._sysinv_endpoint)
# get usm token and endpoint
self._software_token, self._software_endpoint = \
upgrade_utils.get_token_endpoint(config, service_type="usm")
def _check_license(self, version):
"""
Validates the current license is valid for the specified version
:param version: version to be checked against installed license
:return: True is license is valid for version, False otherwise
"""
license_dict = self._sysinv_client.license.show()
if license_dict["error"]:
return False
# create temp file with license content to run verify-license binary against it
with tempfile.NamedTemporaryFile(mode="w", delete=True) as license_file:
try:
license_file.write(license_dict["content"])
subprocess.check_call(["/usr/bin/verify-license", # pylint: disable=not-callable
license_file.name,
version])
except subprocess.CalledProcessError:
return False
return True
# TODO(heitormatsui): implement patch precheck targeted against USM
# and implement patch precheck for subcloud
def _check_deployed_state(self, required_patches):
"""
Checks if every patch in a list is in 'deployed' state
:param required_patches: list of patches to be checked
:return: boolean indicating success/failure and list of patches
that are not in the 'deployed' state
"""
url = self._software_endpoint + '/query?show=deployed'
headers = {"X-Auth-Token": self._software_token}
response = requests.get(url, headers=headers, timeout=10)
success = True
if response.status_code != 200:
print("Could not check required patches...")
return False, required_patches
applied_patches = [release['release_id'] for release in response.json()]
missing_patch = list(set(required_patches) - set(applied_patches))
if missing_patch:
success = False
return success, missing_patch
def run_health_check(self):
"""Run general health check using sysinv client"""
force = self._config.get("force", False)
health_ok = success = True
output = self._sysinv_client.health.get_kube_upgrade(args={}, relaxed=force)
if HealthCheck.FAIL_MSG in output:
success = False
health_ok = health_ok and success
# check installed license
success = self._check_license(self._major_release)
output += 'Installed license is valid: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
return health_ok, output
class UpgradeHealthCheck(HealthCheck):
"""This class represents a upgrade-specific health check object
that verifies if system is in a valid state for upgrade"""
# TODO(heitormatsui): switch from using upgrade metadata xml to
# the new USM metadata format
def _check_valid_upgrade_path(self):
"""Checks if active release to specified release is a valid upgrade path"""
# Get active release
isystem = self._sysinv_client.isystem.list()[0]
active_release = isystem.software_version
# supported_release is a dict with {release: required_patch}
supported_releases = dict()
# Parse upgrade metadata file for supported upgrade paths
root = ElementTree.parse("/var/www/pages/feed/rel-%s/upgrades/metadata.xml" % self._major_release)
upgrade_root = root.find("supported_upgrades").findall("upgrade")
for upgrade in upgrade_root:
version = upgrade.find("version")
required_patch = upgrade.find("required_patch")
supported_releases.update({version.text: [required_patch.text] if
required_patch is not None else []})
success = active_release in supported_releases
return success, active_release, supported_releases.get(active_release, [])
# TODO(heitormatsui) do we need this check on USM? Remove if we don't
def _check_active_is_controller_0(self):
"""Checks that active controller is controller-0"""
controllers = self._sysinv_client.ihost.list()
for controller in controllers:
if controller.hostname == "controller-0" and \
"Controller-Active" in controller.capabilities["Personality"]:
return True
return False
def _check_kube_version(self, supported_versions):
"""
Check if active k8s version is in a list of supported versions
:param supported_versions: list of supported k8s versions
:return: boolean indicating success/failure and active k8s version
"""
kube_versions = self._sysinv_client.kube_version.list()
active_version = None
for kv in kube_versions:
if kv.state == "active":
active_version = kv.version
break
success = active_version in supported_versions
return success, active_version
def run_health_check(self):
"""Run specific upgrade health checks"""
health_ok = True
output = ""
# check if it is a valid upgrade path
success, active_release, required_patches = self._check_valid_upgrade_path()
output += 'Valid upgrade path from release %s to %s: [%s]\n' \
% (active_release, self._major_release,
HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
# check if required patches are deployed
success, missing_patches = self._check_deployed_state(required_patches)
output += 'Required patches are applied: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not success:
output += '-> Patches not applied: [%s]\n' \
% ', '.join(missing_patches)
health_ok = health_ok and success
# check if k8s version is valid
success, active_version = self._check_kube_version(SUPPORTED_K8S_VERSIONS)
output += 'Active kubernetes version [%s] is a valid supported version: [%s]\n' \
% (active_version, HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not active_version:
output += ('-> Failed to get version info. Upgrade kubernetes to one of the '
'supported versions [%s] and ensure that the kubernetes version '
'information is available in the kubeadm configmap.\n'
'See "system kube-version-list"\n' % ", ".join(SUPPORTED_K8S_VERSIONS))
elif not success:
output += ('-> Upgrade active kubernetes version [%s] to one of the '
'supported versions [%s]. See "system kube-version-list"\n' %
(active_version, ", ".join(SUPPORTED_K8S_VERSIONS)))
health_ok = health_ok and success
# TODO(heitormatsui) Do we need the following check on USM?
# The load is only imported to controller-0. An upgrade can only
# be started when controller-0 is active.
is_controller_0 = self._check_active_is_controller_0()
success = is_controller_0
output += \
'Active controller is controller-0: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
return health_ok, output
class PatchHealthCheck(HealthCheck):
"""This class represents a patch-specific health check object
that verifies if system is in valid state to apply a patch"""
def _get_required_patches(self):
"""Get required patches for a target release"""
url = self._software_endpoint + '/query'
headers = {"X-Auth-Token": self._software_token}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print("Could not get required patches...")
return []
required_patches = []
for release, values in response.json()["sd"].items():
if values["sw_version"] == self._target_release:
required_patches.extend(values["requires"])
break
return required_patches
def run_health_check(self):
"""Run specific patch health checks"""
health_ok = True
output = ""
# check required patches for target release
required_patches = self._get_required_patches()
success, missing_patches = self._check_deployed_state(required_patches)
output += 'Required patches are applied: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not success:
output += '-> Patches not applied: [%s]\n' \
% ', '.join(missing_patches)
health_ok = health_ok and success
return health_ok, output
def parse_config(args=None):
"""Parse the parameters passed to the script"""
parser = argparse.ArgumentParser(description="Run health checks to verify if the system "
"meets the requirements to deploy a specific "
"release.")
parser.add_argument("--auth_url",
help="Authentication URL",
required=True)
parser.add_argument("--username",
help="Username",
required=True)
parser.add_argument("--password",
help="Password",
required=True)
parser.add_argument("--project_name",
help="Project Name",
required=True)
parser.add_argument("--user_domain_name",
help="User Domain Name",
required=True)
parser.add_argument("--project_domain_name",
help="Project Domain Name",
required=True)
parser.add_argument("--region_name",
help="Region Name",
default="RegionOne")
parser.add_argument("--force",
help="Ignore non-critical health checks",
action="store_true")
parser.add_argument("--patch",
help="Set precheck to run against a patch release",
action="store_true")
# if args was not passed will use sys.argv by default
parsed_args = parser.parse_args(args)
return vars(parsed_args)
def main(argv=None):
config = parse_config(argv)
patch_release = config.get("patch", False)
health_ok = True
output = ""
# execute general health check
general_health_check = HealthCheck(config)
general_health_ok, general_output = general_health_check.run_health_check()
# execute release-specific health check
if patch_release:
specific_health_check = PatchHealthCheck(config)
else:
specific_health_check = UpgradeHealthCheck(config)
specific_health_ok, specific_output = specific_health_check.run_health_check()
# combine health check results removing extra line breaks/blank spaces from the output
health_ok = general_health_ok and specific_health_ok
output = general_output.strip() + "\n" + specific_output.strip()
# print health check output and exit
print(output)
if health_ok:
return RC_SUCCESS
else:
return RC_UNHEALTHY
if __name__ == "__main__":
sys.exit(main())