From 00c2129a16d94669ce5ba2f9320ed3e27784a788 Mon Sep 17 00:00:00 2001 From: Dan Voiculeasa Date: Mon, 21 Nov 2022 14:08:12 +0200 Subject: [PATCH] AppFwk: Recover apply from helm operation in progress It is observed that when a helm release is in pending state, another helm release can't be started by FluxCD. FluxCD will not try to do steps to apply the newer helm release, but will just error. This prevents us from applying a new helm release over a release with pods stuck in Pending state (just an example). When the specific message for helm operation in progress is detected, attempt to recover by moving the older releases to failed state. Move inspired by [1]. To do so, patch the helm secret for the specific release. As an optimization, trigger the FluxCD HelmRelease reconciliation right after. One future optimization we can do is run an audit to delete the helm releases for which metadata status is a pending operation, but release data is failed (resource that we patched in this commit). Refactor HelmRelease resource reconciliation trigger, smaller size. There are upstream references related to this bug, see [2] and [3]. Tests on Debian AIO-SX: PASS: unlocked enabled available PASS: platform-integ-apps applied after reproducing error: PASS: inspect sysinv logs, see recovery is attemped PASS: inspect fluxcd logs, see that HelmRelease reconciliation is triggered part of recovery [1]: https://github.com/porter-dev/porter/pull/1685/files [2]: https://github.com/helm/helm/issues/8987 [3]: https://github.com/helm/helm/issues/4558 Closes-Bug: 1997368 Signed-off-by: Dan Voiculeasa Change-Id: I36116ce8d298cc97194062b75db64541661ce84d --- .../sysinv/sysinv/sysinv/common/constants.py | 11 + .../sysinv/sysinv/sysinv/common/kubernetes.py | 70 +++++++ .../sysinv/sysinv/conductor/kube_app.py | 197 +++++++++++------- sysinv/sysinv/sysinv/sysinv/helm/utils.py | 61 +++++- 4 files changed, 259 insertions(+), 80 deletions(-) diff --git a/sysinv/sysinv/sysinv/sysinv/common/constants.py b/sysinv/sysinv/sysinv/sysinv/common/constants.py index 2bcc4da0c3..ae5ee52949 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/constants.py +++ b/sysinv/sysinv/sysinv/sysinv/common/constants.py @@ -1713,6 +1713,17 @@ FLUXCD_RECOVERY_HELM_CHART_STATUS_ERRORS = [ 'failed to retrieve source:', 'chart pull error:' ] +# Actually beginning of errors, should be used with +# string.startswith(FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS[number]) +# We want to recover from these errors +FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS = [ + 'Helm upgrade failed: another operation (install/upgrade/rollback) is in progress' +] +FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS = [ + 'pending-install', + 'pending-upgrade', + 'pending-rollback' +] # State constants APP_NOT_PRESENT = 'missing' diff --git a/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py b/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py index 6c47c635d1..0feb23e481 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py +++ b/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py @@ -698,6 +698,76 @@ class KubeOperator(object): % (namespace, e)) raise + def get_transform_patch_custom_resource(self, group, version, namespace, + plural, name, transform, raise_error=True): + """ Apply a custom resource after it was transformed by a function + + :param group: Used by k8s API to determine resource + :param version: Used by k8s API to determine resource + :param namespace: Used by k8s API to determine resource + :param plural: Used by k8s API to determine resource + :param name: Used by k8s API to determine resource + :param transform: A function used to transform the resource + For example access the dictionary and change some + fields. + :param raise_error: Control the exception handling here. + If True, log an error and raise errors further. + If False, log a warning and return from function. + + :return: True if everything finished successfully. + False otherwise. + """ + kind = group + '/' + version + try: + custom_resource = self.get_custom_resource( + group, + version, + namespace, + plural, + name) + except Exception as err: + if raise_error: + LOG.error("Failed to get resource kind {}, name {}: {}" + "".format(kind, name, err)) + raise + else: + LOG.warning("Failed to get resource kind {}, name {}: {}" + "".format(kind, name, err)) + return False + + try: + transform(custom_resource) + except Exception as err: + if raise_error: + LOG.error("Failed to transform resource {} using {}: {}" + "".format(custom_resource, transform, err)) + raise + else: + LOG.warning("Failed to transform resource {} using {}: {}" + "".format(custom_resource, transform, err)) + return False + + try: + self.apply_custom_resource( + group, + version, + namespace, + plural, + name, + custom_resource + ) + except Exception as err: + if raise_error: + LOG.error("Failed to patch kind {}, name {}: {}" + "".format(kind, name, err)) + raise + else: + LOG.warning("Failed to patch kind {}, name {}: {}" + "".format(kind, name, err)) + return False + + return True + def kube_get_service_account(self, name, namespace): c = self._get_kubernetesclient_core() try: diff --git a/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py b/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py index e21e96e849..58f144f849 100644 --- a/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py +++ b/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py @@ -1667,6 +1667,16 @@ class AppOperator(object): @retry(retry_on_exception=lambda x: isinstance(x, exception.ApplicationApplyFailure), stop_max_attempt_number=5, wait_fixed=30 * 1000) def _make_fluxcd_operation_with_monitor(self, app, request): + def _patch_flux_suspend_false(resource): + """ Change resource.spec.suspend = False + """ + resource['spec']['suspend'] = False + + def _patch_flux_suspend_true(resource): + """ Change resource.spec.suspend = True + """ + resource['spec']['suspend'] = True + def _recover_from_failed_helm_chart_on_app_apply(metadata_name, namespace): """ Recovery logic for FluxCD on apply @@ -1728,99 +1738,140 @@ class AppOperator(object): "".format(helm_chart_resource['metadata']['name'], err)) return attempt, True - # Need to get the resource again + # Flip to spec.suspended to False from HelmChart try: - helm_chart_resource = self._kube.get_custom_resource( + self._kube.get_transform_patch_custom_resource( constants.FLUXCD_CRD_HELM_CHART_GROUP, constants.FLUXCD_CRD_HELM_CHART_VERSION, namespace, constants.FLUXCD_CRD_HELM_CHART_PLURAL, - helm_chart_name) - except Exception as err: - LOG.warning("Failed to get HelmChart resource {}: {}" - "".format(helm_chart_name, err)) - return attempt, True - - # Flip to spec.suspended to False from HelmChart - try: - helm_chart_resource['spec']['suspend'] = False - group, version = helm_chart_resource['apiVersion'].split('/') - self._kube.apply_custom_resource( - group, - version, - helm_chart_resource['metadata']['namespace'], - constants.FLUXCD_CRD_HELM_CHART_PLURAL, - helm_chart_resource['metadata']['name'], - helm_chart_resource + helm_chart_name, + _patch_flux_suspend_false ) - except Exception as err: - LOG.info("Failed to patch HelmChart resource {}: {}" - "".format(helm_chart_resource['metadata']['name'], err)) + except Exception: return attempt, True # Force HelmRelease reconciliation now, saves up to reconciliation # timeout for the specific resource. Same trigger as with HelmChart. - - # Flip to spec.suspended to True from HelmRelease try: - helm_release_resource = self._kube.get_custom_resource( + # Flip to spec.suspended to True from HelmRelease + self._kube.get_transform_patch_custom_resource( constants.FLUXCD_CRD_HELM_REL_GROUP, constants.FLUXCD_CRD_HELM_REL_VERSION, namespace, constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_name) - except Exception as err: - LOG.warning("Failed to get HelmRelease resource {}: {}" - "".format(helm_release_name, err)) - return attempt, True - - try: - helm_release_resource['spec']['suspend'] = True - group, version = helm_release_resource['apiVersion'].split('/') - self._kube.apply_custom_resource( - group, - version, - helm_release_resource['metadata']['namespace'], - constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_resource['metadata']['name'], - helm_release_resource + helm_release_name, + _patch_flux_suspend_true ) - except Exception as err: - LOG.warning("Failed to patch HelmRelease resource {}: {}" - "".format(helm_release_resource['metadata']['name'], err)) - return attempt, True - # Flip to spec.suspended to False from HelmRelease - try: - helm_release_resource = self._kube.get_custom_resource( + # Flip to spec.suspended to False from HelmRelease + self._kube.get_transform_patch_custom_resource( constants.FLUXCD_CRD_HELM_REL_GROUP, constants.FLUXCD_CRD_HELM_REL_VERSION, namespace, constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_name) - except Exception as err: - LOG.warning("Failed to get HelmRelease resource {}: {}" - "".format(helm_release_name, err)) + helm_release_name, + _patch_flux_suspend_false + ) + except Exception: return attempt, True + return attempt, False + + def _recover_from_helm_operation_in_progress_on_app_apply(metadata_name, namespace, + flux_error_message): + """ Recovery logic for FluxCD on apply + + In case a helm operation is already in progress, FluxCD will raise + an error. Recover by patching the helm release secret, forcing + the status to be 'failed'. + + :param metadata_name: metadata name from helmrelease.yaml + :param namespace: namespace from kustomization.yaml + :param flux_error_message: Error message FluxCD encountered + + :return: tuple(attempt, error). + attempt is True if recovery is triggered + error is True if an error was encountered + """ + helm_release_name = metadata_name + attempt = False + + for error_string in constants.FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS: + if flux_error_message.startswith(error_string): + LOG.info("For helm release {} found a matching error string " + "we can attempt to recover from: {}" + "".format(helm_release_name, error_string)) + attempt = True + break + + if not attempt: + return attempt, False + try: - helm_release_resource['spec']['suspend'] = False - group, version = helm_release_resource['apiVersion'].split('/') - self._kube.apply_custom_resource( - group, - version, - helm_release_resource['metadata']['namespace'], - constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_resource['metadata']['name'], - helm_release_resource - ) + secret_list = self._kube.kube_list_secret(namespace) except Exception as err: - LOG.warning("Failed to patch HelmRelease resource {}: {}" - "".format(helm_release_resource['metadata']['name'], err)) + LOG.warning("Failed to get secrets in namespace {}: {}" + "".format(namespace, err)) + return attempt, True + + recover_list = [] + for secret in secret_list: + label = secret.metadata.labels + if not label: + continue + if 'owner' not in label: + continue + if 'status' not in label: + continue + if label['owner'] == 'helm' and \ + label['status'] in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS: + LOG.info("Found helm release {} in state {}" + "".format(secret.metadata.name, label['status'])) + recover_list.append(secret) + + # Force 'failed' status for helm releases + for secret in recover_list: + release_data = helm_utils.decompress_helm_release_data(secret.data['release']) + + for status in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS: + release_data = release_data.replace('"status":"{}"'.format(status), '"status":"failed"') + + release_data = helm_utils.compress_helm_release_data(release_data) + + secret.data['release'] = release_data + try: + self._kube.kube_patch_secret(secret.metadata.name, + secret.metadata.namespace, secret) + except Exception as err: + LOG.warning("Failed to patch secret {} in namespace {}: {}" + "".format(secret.metadata.name, + secret.metadata.namespace, err)) + return attempt, True + + # Force HelmRelease reconciliation now, saves up to reconciliation + # timeout for the specific resource. Flip suspend True, then False. + try: + self._kube.get_transform_patch_custom_resource( + constants.FLUXCD_CRD_HELM_REL_GROUP, + constants.FLUXCD_CRD_HELM_REL_VERSION, + namespace, + constants.FLUXCD_CRD_HELM_REL_PLURAL, + helm_release_name, + _patch_flux_suspend_true + ) + + self._kube.get_transform_patch_custom_resource( + constants.FLUXCD_CRD_HELM_REL_GROUP, + constants.FLUXCD_CRD_HELM_REL_VERSION, + namespace, + constants.FLUXCD_CRD_HELM_REL_PLURAL, + helm_release_name, + _patch_flux_suspend_false + ) + except Exception: return attempt, True - # TODO(dvoicule): What if we extract repeated get&patch operation to a generic - # get_patch(, lambda_func_transformation) return attempt, False def _check_progress(): @@ -1884,10 +1935,16 @@ class AppOperator(object): if release_status == "False": # If the helm release failed the app must also be in a # failed state - err_msg = ":{}".format(msg) if msg else "" - LOG.exception("Application {}: release {}: Failed during {} {}" - .format(app.name, release_name, request, err_msg)) - return False + err_msg = "{}".format(msg) if msg else "" + attempt, _ = _recover_from_helm_operation_in_progress_on_app_apply( + metadata_name=release_name, + namespace=chart_obj['namespace'], + flux_error_message=err_msg) + + if not attempt: + LOG.exception("Application {}: release {}: Failed during {} :{}" + "".format(app.name, release_name, request, err_msg)) + return False elif release_status == "True": # Special validation check needed for AIO-SX only, can # go away once upstream issues are addressed. See method diff --git a/sysinv/sysinv/sysinv/sysinv/helm/utils.py b/sysinv/sysinv/sysinv/sysinv/helm/utils.py index 5dbcd1c71a..c18fffb785 100644 --- a/sysinv/sysinv/sysinv/sysinv/helm/utils.py +++ b/sysinv/sysinv/sysinv/sysinv/helm/utils.py @@ -1,6 +1,6 @@ # sim: tabstop=4 shiftwidth=4 softtabstop=4 # -# Copyright (c) 2019-2021 Wind River Systems, Inc. +# Copyright (c) 2019-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -9,18 +9,21 @@ """Helm utilities and helper functions.""" +import base64 import os -from eventlet.green import subprocess -import ruamel.yaml as yaml -from oslo_log import log as logging -from sysinv.agent import rpcapi as agent_rpcapi -from sysinv.common import kubernetes -from sysinv.common import exception -from sysinv.openstack.common import context -import tempfile -import threading import psutil import retrying +import ruamel.yaml as yaml +import tempfile +import threading +import zlib + +from eventlet.green import subprocess +from oslo_log import log as logging +from sysinv.agent import rpcapi as agent_rpcapi +from sysinv.common import exception +from sysinv.common import kubernetes +from sysinv.openstack.common import context LOG = logging.getLogger(__name__) @@ -321,3 +324,41 @@ def install_helm_chart_with_dry_run(args=None): timer.cancel() os.remove(chartfile) os.rmdir(tmpdir) + + +def decompress_helm_release_data(release_data): + """ Convert release data to format for applying transformations + + :param release_data: Helm release secret data + Format is gzip double base64 encoded + :return: string + """ + release_data = base64.b64decode(release_data) + release_data = base64.b64decode(release_data) + # wbits value needs to specify 16 for gzip header/trailer plus window size. + # Window size needs to be at least the one used for compression + # this set the largest + release_data = zlib.decompress(release_data, wbits=16 + zlib.MAX_WBITS).decode('utf-8') + + return str(release_data) + + +def compress_helm_release_data(release_data): + """ Convert release data to format for storing in cluster + + :param release_data: Helm release secret data + :return: string + Format is gzip double base64 encoded + """ + # wbits value of 25 specifies the minimum window size + # and gzip header/trailer. + compressed_object = zlib.compressobj(wbits=25) + + release_data = compressed_object.compress(release_data.encode('utf-8')) + release_data += compressed_object.flush() + release_data = base64.b64encode(release_data) + release_data = base64.b64encode(release_data) + + release_data = release_data.decode('utf-8') + + return release_data