Merge "Refactor Flux upgrade and rollback"

This commit is contained in:
Zuul
2025-11-18 15:22:15 +00:00
committed by Gerrit Code Review
2 changed files with 91 additions and 442 deletions
@@ -3,26 +3,63 @@
#
# SPDX-License-Identifier: Apache-2.0
#
# This script install fluxcd controllers in the flux-helm namespace
# This script upgrades fluxcd controllers in the flux-helm namespace
# in kubernetes
#
import logging
import subprocess
import os
import sys
from sysinv.common import exception
from sysinv.common.retrying import retry
from sysinv.common.kubernetes import test_k8s_health
from cgtsclient import client as cgts_client
from software.utilities.utils import configure_logging
from sysinv.common.kubernetes import test_k8s_health
from sysinv.common.retrying import retry
LOG = logging.getLogger('main_logger')
FLUXCD_NAMESPACE = "flux-helm"
FLUXCD_HELM_RELEASE = "fluxcd"
KUBECONFIG = "/etc/kubernetes/admin.conf"
KUBECTL_REQUEST_TIMEOUT = "60s"
def get_sysinv_client():
""" Get an authenticated cgts client
Returns:
Client: cgts client object
"""
sysinv_client = cgts_client.get_client(
"1",
os_auth_token=os.environ.get("OS_AUTH_TOKEN"),
system_url=os.environ.get("SYSTEM_URL")
)
return sysinv_client
@retry(retry_on_result=lambda x: x is False, stop_max_attempt_number=3)
@test_k8s_health
def upgrade_controllers():
""" Upgrade Flux controllers
Returns:
bool: True if upgrade is sucessful. False otherwise.
"""
LOG.info("Upgrading Flux controllers")
client = get_sysinv_client()
result = False
try:
result = client.flux.upgrade_controllers()
if result:
LOG.info("Flux controllers successfully upgraded")
else:
LOG.error("Error while upgrading flux controllers. "
"Check /var/log/sysinv.log for more details.")
except Exception as e:
LOG.error("Cannot upgrade flux controllers: %s", e)
return result
def main():
@@ -46,194 +83,16 @@ def main():
arg += 1
configure_logging()
if action == "activate" and from_release >= "24.09":
namespace_created = create_fluxcd_namespace()
fluxcd_helm_release_not_installed = helm_release_not_exists()
if action == "activate" and from_release >= "25.09":
LOG.info(
"%s invoked with from_release = %s to_release = %s "
"action = %s" % (sys.argv[0], from_release, to_release, action)
)
enable_fluxcd_controllers(
from_release, namespace_created, fluxcd_helm_release_not_installed
)
if upgrade_controllers():
return 0
def execute_command(
cmd, success_message, error_message, return_type="exception", output_processor_fn=None
):
"""Generic function to execute commands and handle their output
Args:
cmd: Command to execute
success_message: Message to log on success
error_message: Error message prefix for failures
return_type:
- exception' (raise on error)
- processed' (use output_processor_fn)
output_processor_fn: Function to process command output (for return_type='processed')
Returns:
None (for 'exception') or processed result (for 'processed')
"""
try:
sub = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = sub.communicate()
stdout = stdout.decode('utf-8').strip()
stderr = stderr.decode('utf-8').strip()
if sub.returncode != 0:
if return_type == "exception":
raise exception.SysinvException(
f"{error_message} via {cmd}, stderr: {stderr}"
)
elif return_type == "processed" and output_processor_fn:
return output_processor_fn(stderr, returned_code=sub.returncode)
LOG.info(f"{success_message} Output: {stdout}")
if return_type == "processed" and output_processor_fn:
return output_processor_fn(stdout)
except Exception as e:
LOG.error(f"Exception occurred while executing command '{cmd}': {e}")
raise exception.SysinvException(f"{error_message} via {cmd}, reason: {e}")
@retry(
retry_on_exception=lambda x: isinstance(x, exception.SysinvException),
stop_max_attempt_number=3,
)
@test_k8s_health
def enable_fluxcd_controllers(from_release, namespace_created, fluxcd_helm_release_not_installed):
"""Run fluxcd_controllers ansible playbook to enable fluxcd controllers"""
playbooks_root = "/usr/share/ansible/stx-ansible/playbooks"
upgrade_script = "upgrade-fluxcd-controllers.yml"
cmd = (
f'ansible-playbook {playbooks_root}/{upgrade_script} '
f'-e "upgrade_activate_from_release={from_release} '
f'fluxcd_namespace_created={namespace_created} '
f'fluxcd_helm_release_not_installed={fluxcd_helm_release_not_installed}"'
)
LOG.info("Enabling FluxCD controllers...")
execute_command(
cmd=cmd,
success_message="FluxCD controllers enabled.",
error_message="Error trying to enable fluxcd controllers",
return_type="exception",
)
@retry(
retry_on_exception=lambda x: isinstance(x, exception.SysinvException),
stop_max_attempt_number=3,
)
def create_fluxcd_namespace():
"""
Creates the FluxCD namespace in the Kubernetes cluster if it does not already exist.
This function attempts to create a namespace for FluxCD using kubectl. It checks the
command output to determine if the namespace was created successfully or if it already
exists, and logs the appropriate messages.
Returns:
bool: True if the namespace was created or already exists.
Raises:
SysinvException: If there is an unexpected error while trying to create the namespace.
"""
def check_namespace_already_exists(output, returned_code=0):
"""
Determines if the FluxCD namespace already exists based on
command output and return code.
"""
result = False
if returned_code != 0 and "AlreadyExists" in output:
LOG.info(f"{FLUXCD_NAMESPACE} namespace already exists.")
result = True
elif returned_code == 0:
LOG.info(f"{FLUXCD_NAMESPACE} namespace created.")
result = True
else:
LOG.error(f"Error trying to create {FLUXCD_NAMESPACE} namespace. Output: {output}")
raise exception.SysinvException(
f"Error trying to create {FLUXCD_NAMESPACE} namespace. Output: {output}"
)
return result
cmd = f"kubectl create namespace {FLUXCD_NAMESPACE} --kubeconfig {KUBECONFIG} \
--request-timeout={KUBECTL_REQUEST_TIMEOUT}"
LOG.info(f"Creating {FLUXCD_NAMESPACE} namespace...")
return execute_command(
cmd=cmd,
success_message=f"{FLUXCD_NAMESPACE} namespace created.",
error_message=f"Error trying to create {FLUXCD_NAMESPACE} namespace",
return_type="processed",
output_processor_fn=check_namespace_already_exists
)
@retry(
retry_on_exception=lambda x: isinstance(x, exception.SysinvException),
stop_max_attempt_number=3,
)
def helm_release_not_exists():
"""
Checks whether a specific Helm release exists in a given namespace.
This function constructs and executes a Helm command to check the status of a Helm release
within a specified namespace using a provided kubeconfig. It processes the command output
to determine if the release exists, does not exist, or if an error occurred during the check.
Returns:
bool: True if the Helm release does not exist, False if it exists.
Raises:
SysinvException: If an unexpected error occurs while checking the Helm release status.
"""
def check_helm_status_output_was_not_found(output, returned_code=0):
"""Check if a Helm release is not found based on command output and return code."""
result = False
if returned_code != 0 and "release: not found" in output:
LOG.info(
f"Helm release {FLUXCD_HELM_RELEASE} not found in namespace {FLUXCD_NAMESPACE}.")
result = True
elif returned_code == 0:
LOG.info(f"Helm release {FLUXCD_HELM_RELEASE} exists in namespace {FLUXCD_NAMESPACE}.")
result = False
else:
LOG.error(
f"Unexpected error while checking Helm release {FLUXCD_HELM_RELEASE} \
in namespace {FLUXCD_NAMESPACE}. Output: {output}"
)
raise exception.SysinvException(
f"Error trying to check if Helm release {FLUXCD_HELM_RELEASE} \
exists in namespace {FLUXCD_NAMESPACE}."
)
return result
cmd = f"helm status -n {FLUXCD_NAMESPACE} {FLUXCD_HELM_RELEASE} --kubeconfig {KUBECONFIG}"
LOG.info(f"Checking if Helm release {FLUXCD_HELM_RELEASE} \
exists in namespace {FLUXCD_NAMESPACE}...")
return execute_command(
cmd=cmd,
success_message="Flux-helm release already exists.",
error_message="Error trying to check if flux-helm release exists",
return_type="processed",
output_processor_fn=check_helm_status_output_was_not_found
)
return 1
if __name__ == "__main__":
@@ -5,249 +5,62 @@
#
# This script rolls back flux controllers in the fluxcd-helm namespace
import json
import logging
import os
import re
import subprocess
import sys
import yaml
from sysinv.common import exception
from sysinv.common.retrying import retry
from sysinv.common.kubernetes import test_k8s_health
from cgtsclient import client as cgts_client
from software.utilities.utils import configure_logging
from sysinv.common.kubernetes import test_k8s_health
from sysinv.common.retrying import retry
LOG = logging.getLogger('main_logger')
RELEASE_NAME = "fluxcd"
RELEASE_NAMESPACE = "flux-helm"
KUBECONFIG = "/etc/kubernetes/admin.conf"
TARGET_REVISION = "1"
KUBECTL_REQUEST_TIMEOUT = "60s"
BUCKETS_CRD = "buckets.source.toolkit.fluxcd.io"
LEGACY_CHART_DIRECTORY = "/usr/local/share/flux2-charts-legacy/"
def find_flux_chart():
""" Find the Flux chart that corresponds to the target release for rollback.
def get_sysinv_client():
""" Get an authenticated cgts client
Returns:
path: Absolute path to the chart tarball.
Client: cgts client object
"""
pattern = re.compile(r'^flux2-(\d+\.\d+\.\d+)\.tgz$')
for filename in os.listdir(LEGACY_CHART_DIRECTORY):
if pattern.match(filename):
return os.path.join(LEGACY_CHART_DIRECTORY, filename)
return None
sysinv_client = cgts_client.get_client(
"1",
os_auth_token=os.environ.get("OS_AUTH_TOKEN"),
system_url=os.environ.get("SYSTEM_URL")
)
return sysinv_client
def get_chart_version(tgz_path):
""" Retrieve the version from a chart tarball
Args:
tgz_path (path): path to the chart tarball
Returns:
string: chart version.
"""
try:
result = subprocess.run(
["helm", "show", "chart", tgz_path],
capture_output=True,
text=True,
check=True
)
chart_yaml = yaml.safe_load(result.stdout)
return chart_yaml.get("version")
except subprocess.CalledProcessError as e:
print(f"Error while retrieving chart version: {e.stderr}")
except Exception as e:
raise exception.SysinvException(f"Cannot retrieve chart version: {e}")
def get_target_revision(history, target_version):
""" Retrieve the target Helm release revision for rollback
Args:
history (list): list of records of the release history.
target_version (string): target version for rollback.
Returns:
integer: Revision number.
"""
for record in reversed(history[:-1]):
if f"-{target_version}" in record["chart"]:
return record["revision"]
return None
def is_target_version_installed(history, target_version):
""" Retrieve the target Helm release revision for rollback
Args:
history (list): list of records of the release history.
target_version (string): target version for rollback.
Returns:
bool: True if the target release is installed. False otherwise.
"""
if len(history) == 0:
# This is ok for stx11 since it means that Flux hasn't been migrated over to
# Helm when the activation process was interruped.
# TODO(ipiresso): should return an error for releases after stx11 where it will be
# always expected to have a Helm history during rollback.
LOG.info("Flux not migrated over to Helm. Skipping.")
return True
elif f"-{target_version}" in history[-1]["chart"] and history[-1]["status"] == 'deployed':
LOG.warning("Already running target Flux release. Skipping.")
return True
return False
def get_history():
""" Retrieve the Helm history for the Flux release.
Returns:
list: Records of each revision of the fluxcd release.
"""
try:
result = subprocess.run(
["helm",
"history", RELEASE_NAME,
"-n", RELEASE_NAMESPACE,
"--output", "json",
"--kubeconfig", KUBECONFIG],
check=True,
capture_output=True,
text=True
)
history = json.loads(result.stdout)
return history
except subprocess.CalledProcessError as e:
if 'not found' in e.stderr:
LOG.warning("Helm release %s not found in %s namespace",
RELEASE_NAME, RELEASE_NAMESPACE)
return []
raise exception.SysinvException("Error while attempting to retrieve Helm "
f"release history: {e.stderr}")
except Exception as e:
raise exception.SysinvException(f"Cannot retrieve Helm release history: {e}")
@retry(retry_on_exception=lambda x: isinstance(x, exception.SysinvException),
stop_max_attempt_number=3)
def delete_incompatible_crd():
""" Delete buckets.source.toolkit.fluxcd.io CRD as manifests from
versions 2.13 and 2.15 do not support straightforward rollback.
"""
# First check if the CRD exists
try:
subprocess.run(
["kubectl", "get",
"customresourcedefinitions.apiextensions.k8s.io",
BUCKETS_CRD,
f"--request-timeout={KUBECTL_REQUEST_TIMEOUT}",
"--kubeconfig", KUBECONFIG],
check=True,
capture_output=True,
text=True
)
except subprocess.CalledProcessError as e:
if "not found" in e.stderr:
LOG.warning("CRD %s not found for deletion", BUCKETS_CRD)
return
raise exception.SysinvException(f"Error while checking CRD: {e.stderr}")
except Exception as e:
raise exception.SysinvException(f"Cannot check if CRD exists: {e}")
LOG.info("Deleting incompatible CRD %s", BUCKETS_CRD)
try:
subprocess.run(
["kubectl", "delete",
"customresourcedefinitions.apiextensions.k8s.io",
BUCKETS_CRD,
f"--request-timeout={KUBECTL_REQUEST_TIMEOUT}",
"--kubeconfig", KUBECONFIG],
check=True,
capture_output=True
)
except subprocess.CalledProcessError as e:
raise exception.SysinvException(f"Error while deleting CRD: {e.stderr}")
except Exception as e:
raise exception.SysinvException(f"Cannot delete CRD: {e}")
LOG.info("CRD successfully deleted")
@retry(retry_on_exception=lambda x: isinstance(x, exception.SysinvException),
stop_max_attempt_number=3)
@retry(retry_on_result=lambda x: x is False, stop_max_attempt_number=3)
@test_k8s_health
def rollback_fluxcd_controllers(revision):
""" Rollback flux controllers via 'helm rollback'
def rollback_controllers():
""" Rollback Flux controllers
Args:
revision (integer): target revision number.
Returns:
bool: True if rollback is sucessful. False otherwise.
"""
LOG.info("Rolling back Flux release to revision %s", revision)
LOG.info("Rolling back Flux controllers")
client = get_sysinv_client()
result = False
try:
subprocess.run(
["helm", "rollback", RELEASE_NAME, str(revision),
"-n", RELEASE_NAMESPACE,
"--kubeconfig", KUBECONFIG,
"--wait",
"--wait-for-jobs"],
check=True,
capture_output=True
)
except subprocess.CalledProcessError as e:
raise exception.SysinvException(f"Error while rolling back flux controllers: {e.stderr}")
result = client.flux.rollback_controllers()
if result:
LOG.info("Flux controllers successfully rolled back")
else:
LOG.error("Error while rolling back flux controllers. "
"Check /var/log/sysinv.log for more details.")
except Exception as e:
raise exception.SysinvException(f"Cannot rollback flux controllers: {e}")
LOG.error("Cannot roll back flux controllers: %s", e)
LOG.info("Flux release successfully rolled back")
# Workaround for portieris issue when helm-controller is restarting
@test_k8s_health
def wait_helm_controller_pod_ready():
""" Wait for helm-controller pod to be Ready
"""
LOG.info("Waiting for helm-controller pod to be Ready")
try:
subprocess.run(
["kubectl", "wait", "--for=condition=Ready", "pods",
"-l", "app=helm-controller",
"-n", RELEASE_NAMESPACE,
"--timeout=60s",
"--kubeconfig", KUBECONFIG],
check=True
)
except Exception as e:
# Warning and proceeding with the rollback, as the issue might be fixed by it
LOG.warning(f"Error waiting for helm-controller pod to be Ready: {e}")
else:
LOG.info("helm-controller pod is Ready. Proceeding.")
return result
def main():
action = None
from_release = None
to_release = None
@@ -260,47 +73,24 @@ def main():
elif arg == 3:
action = sys.argv[arg]
elif arg == 4:
# Optional postgres port parameter for USM upgrade (not used
# by this script).
# postgres_port = sys.argv[arg]
pass
else:
print(f"Invalid option {sys.argv[arg]}")
print("Invalid option %s." % sys.argv[arg])
return 1
arg += 1
configure_logging()
LOG.info("%s invoked with from_release %s to_release %s and action %s",
sys.argv[0], from_release, to_release, action)
if action == "activate-rollback" and from_release >= "25.09":
LOG.info(
"%s invoked with from_release = %s to_release = %s "
"action = %s" % (sys.argv[0], from_release, to_release, action)
)
if action == 'activate-rollback':
flux_chart = find_flux_chart()
if flux_chart is None:
LOG.error("Flux chart from previous version is not available")
return 1
if rollback_controllers():
return 0
try:
previous_version = get_chart_version(flux_chart)
history = get_history()
# Rollback only if not already in the target version
if not is_target_version_installed(history, previous_version):
target_revision = get_target_revision(history, previous_version)
if target_revision:
delete_incompatible_crd()
wait_helm_controller_pod_ready()
rollback_fluxcd_controllers(target_revision)
else:
LOG.error("Version %s is not available in revision history", previous_version)
return 1
except exception.SysinvException as e:
LOG.exception(e)
return 1
else:
LOG.info("Not an activate-rollback action. Skipping.")
return 0
return 1
if __name__ == "__main__":