
263 lines
10 KiB

# -*- encoding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2023 Wind River Systems, Inc.
# SPDX-License-Identifier: Apache-2.0
Run platform upgrade deploy precheck as a standalone executable
import os
import re
import requests
import subprocess
import sys
import tempfile
from lxml import etree as ElementTree
import upgrade_utils
class HealthCheck(object):
"""This class represents a general health check object
that uses sysinv-client to run system health checks"""
FAIL_MSG = 'Fail'
def __init__(self, config):
self._config = config
# get sysinv token, endpoint and client
self._sysinv_token, self._sysinv_endpoint = \
upgrade_utils.get_token_endpoint(config, service_type="platform")
self._sysinv_client = upgrade_utils.get_sysinv_client(self._sysinv_token,
# get usm token and endpoint
self._software_token, self._software_endpoint = \
upgrade_utils.get_token_endpoint(config, service_type="usm")
def run_health_check(self):
force = self._config.get("force", False)
output = self._sysinv_client.health.get_kube_upgrade(args={}, relaxed=force)
if HealthCheck.FAIL_MSG in output:
return False, output
return True, output
class UpgradeHealthCheck(HealthCheck):
"""This class represents a upgrade-specific health check object
that verifies if system is in a valid state for upgrade"""
def _check_valid_upgrade_path(self):
"""Checks if active release to specified release is a valid upgrade path"""
# Get active release
isystem = self._sysinv_client.isystem.list()[0]
active_release = isystem.software_version
# supported_release is a dict with {release: required_patch}
supported_releases = dict()
# Parse upgrade metadata file for supported upgrade paths
root = ElementTree.parse("%s/metadata.xml" % os.path.dirname(__file__))
upgrade_root = root.find("supported_upgrades").findall("upgrade")
for upgrade in upgrade_root:
version = upgrade.find("version")
required_patch = upgrade.find("required_patch")
supported_releases.update({version.text: required_patch.text if
required_patch is not None else None})
success = active_release in supported_releases
return success, active_release, supported_releases.get(active_release, None)
# TODO(heitormatsui): implement patch precheck targeted against USM
# and implement patch precheck for subcloud
def _check_required_patch(self, release, required_patch):
"""Checks if required patch for the supported release is installed"""
url = self._software_endpoint + '/query?show=applied&release=%s' % release
headers = {"X-Auth-Token": self._software_token}
response = requests.get(url, headers=headers, timeout=10)
success = True
required_patch = [required_patch] if required_patch else []
if response.status_code != 200:
print("Could not check required patches...")
return False, required_patch
applied_patches = list(response.json()["sd"].keys())
missing_patch = list(set(required_patch) - set(applied_patches))
if missing_patch:
success = False
return success, missing_patch
# TODO(heitormatsui) do we need this check on USM? Remove if we don't
def _check_active_is_controller_0(self):
"""Checks that active controller is controller-0"""
controllers = self._sysinv_client.ihost.list()
for controller in controllers:
if controller.hostname == "controller-0" and \
"Controller-Active" in controller.capabilities["Personality"]:
return True
return False
def _check_license(self, version):
"""Validates the current license is valid for the specified version"""
license_dict = self._sysinv_client.license.show()
if license_dict["error"]:
return False
# create temp file with license content to run verify-license binary against it
with tempfile.NamedTemporaryFile(mode="w", delete=True) as license_file:
subprocess.check_call(["/usr/bin/verify-license", # pylint: disable=not-callable
except subprocess.CalledProcessError:
return False
return True
def _check_kube_version(self):
"""Check if active k8s version is the latest available"""
kube_versions = self._sysinv_client.kube_version.list()
active_version = None
latest_version = kube_versions[-1].version
for kv in kube_versions:
if kv.state == "active":
active_version = kv.version
success = active_version == latest_version
return success, active_version, latest_version
def run_health_check(self):
health_ok = True
output = ""
# get target release from script directory location
upgrade_release = re.match("^.*/rel-(\d\d.\d\d)/", __file__).group(1)
# check installed license
success = self._check_license(upgrade_release)
output += 'License valid for upgrade: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
# check if it is a valid upgrade path
success, active_release, required_patch = self._check_valid_upgrade_path()
output += 'Valid upgrade path from release %s to %s: [%s]\n' \
% (active_release, upgrade_release,
HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
# check if required patches are applied/committed if is a valid upgrade path
if success:
success, missing_patches = self._check_required_patch(active_release, required_patch)
output += 'Required patches are applied: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if not success:
output += 'Patches not applied: %s\n' \
% ', '.join(missing_patches)
health_ok = health_ok and success
output += 'Invalid upgrade path, skipping required patches check...'
# check k8s version is the latest available
success, active_version, latest_version = self._check_kube_version()
if success:
output += 'Active kubernetes version is the latest supported version: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
if active_version:
output += 'Upgrade kubernetes to the latest version: [%s]. ' \
'See "system kube-version-list"\n' % latest_version
output += 'Failed to get version info. Upgrade kubernetes to' \
' the latest version (%s) and ensure that the ' \
'kubernetes version information is available in ' \
' the kubeadm configmap.\n' \
'Also see "system kube-version-list"\n' % latest_version
health_ok = health_ok and success
# TODO(heitormatsui) Do we need the following check on USM?
# The load is only imported to controller-0. An upgrade can only
# be started when controller-0 is active.
is_controller_0 = self._check_active_is_controller_0()
success = is_controller_0
output += \
'Active controller is controller-0: [%s]\n' \
% (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG)
health_ok = health_ok and success
return health_ok, output
def parse_config(args):
if not args:
return None
required_keystone_config = ["auth_url", "username", "password", "project_name",
"user_domain_name", "project_domain_name", "region_name"]
config = dict()
for i in range(1, len(args)):
sep = args[i].find("=")
# for params in --key=value format
if sep > 0:
key = args[i][:sep].lstrip("-")
value = args[i][sep+1:]
# for params in --key format
key = args[i].lstrip("-")
value = True
config[key] = value
except ValueError:
print("Invalid parameter format: %s" % args[i])
return None
if not all(cf in config for cf in required_keystone_config):
return None
return config
def main(argv=None):
config = parse_config(argv)
if not config:
print("Please provide keystone_authtoken configuration.\n"
"usage: deploy-precheck --auth_url=<auth_url> --username=<username> "
"--password=<password> --project_name=<project_name> --user_domain_name=<user_domain_name> "
"--project_domain_name=<project_domain_name> --region_name=<region_name> [--force]")
return 1
general_health_check = HealthCheck(config)
upgrade_health_check = UpgradeHealthCheck(config)
# execute general health check
general_health_ok, general_output = general_health_check.run_health_check()
# execute upgrade-specific health check
upgrade_health_ok, upgrade_output = upgrade_health_check.run_health_check()
# combine health check results removing extra line breaks/blank spaces from the output
health_ok = general_health_ok and upgrade_health_ok
output = general_output.strip() + "\n" + upgrade_output.strip()
# print health check output and exit
if health_ok:
return 0
return 1
if __name__ == "__main__":