Merge "AppFwk: Recover apply from helm operation in progress"

This commit is contained in:
Zuul 2022-11-24 15:56:56 +00:00 committed by Gerrit Code Review
commit aede2f1492
4 changed files with 259 additions and 80 deletions

View File

@ -1713,6 +1713,17 @@ FLUXCD_RECOVERY_HELM_CHART_STATUS_ERRORS = [
'failed to retrieve source:',
'chart pull error:'
]
# Actually beginning of errors, should be used with
# string.startswith(FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS[number])
# We want to recover from these errors
FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS = [
'Helm upgrade failed: another operation (install/upgrade/rollback) is in progress'
]
FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS = [
'pending-install',
'pending-upgrade',
'pending-rollback'
]
# State constants
APP_NOT_PRESENT = 'missing'

View File

@ -720,6 +720,76 @@ class KubeOperator(object):
% (namespace, e))
raise
def get_transform_patch_custom_resource(self, group, version, namespace,
plural, name, transform, raise_error=True):
""" Apply a custom resource after it was transformed by a function
:param group: Used by k8s API to determine resource
:param version: Used by k8s API to determine resource
:param namespace: Used by k8s API to determine resource
:param plural: Used by k8s API to determine resource
:param name: Used by k8s API to determine resource
:param transform: A function used to transform the resource
For example access the dictionary and change some
fields.
:param raise_error: Control the exception handling here.
If True, log an error and raise errors further.
If False, log a warning and return from function.
:return: True if everything finished successfully.
False otherwise.
"""
kind = group + '/' + version
try:
custom_resource = self.get_custom_resource(
group,
version,
namespace,
plural,
name)
except Exception as err:
if raise_error:
LOG.error("Failed to get resource kind {}, name {}: {}"
"".format(kind, name, err))
raise
else:
LOG.warning("Failed to get resource kind {}, name {}: {}"
"".format(kind, name, err))
return False
try:
transform(custom_resource)
except Exception as err:
if raise_error:
LOG.error("Failed to transform resource {} using {}: {}"
"".format(custom_resource, transform, err))
raise
else:
LOG.warning("Failed to transform resource {} using {}: {}"
"".format(custom_resource, transform, err))
return False
try:
self.apply_custom_resource(
group,
version,
namespace,
plural,
name,
custom_resource
)
except Exception as err:
if raise_error:
LOG.error("Failed to patch kind {}, name {}: {}"
"".format(kind, name, err))
raise
else:
LOG.warning("Failed to patch kind {}, name {}: {}"
"".format(kind, name, err))
return False
return True
def kube_get_service_account(self, name, namespace):
c = self._get_kubernetesclient_core()
try:

View File

@ -1667,6 +1667,16 @@ class AppOperator(object):
@retry(retry_on_exception=lambda x: isinstance(x, exception.ApplicationApplyFailure),
stop_max_attempt_number=5, wait_fixed=30 * 1000)
def _make_fluxcd_operation_with_monitor(self, app, request):
def _patch_flux_suspend_false(resource):
""" Change resource.spec.suspend = False
"""
resource['spec']['suspend'] = False
def _patch_flux_suspend_true(resource):
""" Change resource.spec.suspend = True
"""
resource['spec']['suspend'] = True
def _recover_from_failed_helm_chart_on_app_apply(metadata_name, namespace):
""" Recovery logic for FluxCD on apply
@ -1728,99 +1738,140 @@ class AppOperator(object):
"".format(helm_chart_resource['metadata']['name'], err))
return attempt, True
# Need to get the resource again
# Flip to spec.suspended to False from HelmChart
try:
helm_chart_resource = self._kube.get_custom_resource(
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_CHART_GROUP,
constants.FLUXCD_CRD_HELM_CHART_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_CHART_PLURAL,
helm_chart_name)
except Exception as err:
LOG.warning("Failed to get HelmChart resource {}: {}"
"".format(helm_chart_name, err))
return attempt, True
# Flip to spec.suspended to False from HelmChart
try:
helm_chart_resource['spec']['suspend'] = False
group, version = helm_chart_resource['apiVersion'].split('/')
self._kube.apply_custom_resource(
group,
version,
helm_chart_resource['metadata']['namespace'],
constants.FLUXCD_CRD_HELM_CHART_PLURAL,
helm_chart_resource['metadata']['name'],
helm_chart_resource
helm_chart_name,
_patch_flux_suspend_false
)
except Exception as err:
LOG.info("Failed to patch HelmChart resource {}: {}"
"".format(helm_chart_resource['metadata']['name'], err))
except Exception:
return attempt, True
# Force HelmRelease reconciliation now, saves up to reconciliation
# timeout for the specific resource. Same trigger as with HelmChart.
# Flip to spec.suspended to True from HelmRelease
try:
helm_release_resource = self._kube.get_custom_resource(
# Flip to spec.suspended to True from HelmRelease
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name)
except Exception as err:
LOG.warning("Failed to get HelmRelease resource {}: {}"
"".format(helm_release_name, err))
return attempt, True
try:
helm_release_resource['spec']['suspend'] = True
group, version = helm_release_resource['apiVersion'].split('/')
self._kube.apply_custom_resource(
group,
version,
helm_release_resource['metadata']['namespace'],
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_resource['metadata']['name'],
helm_release_resource
helm_release_name,
_patch_flux_suspend_true
)
except Exception as err:
LOG.warning("Failed to patch HelmRelease resource {}: {}"
"".format(helm_release_resource['metadata']['name'], err))
return attempt, True
# Flip to spec.suspended to False from HelmRelease
try:
helm_release_resource = self._kube.get_custom_resource(
# Flip to spec.suspended to False from HelmRelease
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name)
except Exception as err:
LOG.warning("Failed to get HelmRelease resource {}: {}"
"".format(helm_release_name, err))
helm_release_name,
_patch_flux_suspend_false
)
except Exception:
return attempt, True
return attempt, False
def _recover_from_helm_operation_in_progress_on_app_apply(metadata_name, namespace,
flux_error_message):
""" Recovery logic for FluxCD on apply
In case a helm operation is already in progress, FluxCD will raise
an error. Recover by patching the helm release secret, forcing
the status to be 'failed'.
:param metadata_name: metadata name from helmrelease.yaml
:param namespace: namespace from kustomization.yaml
:param flux_error_message: Error message FluxCD encountered
:return: tuple(attempt, error).
attempt is True if recovery is triggered
error is True if an error was encountered
"""
helm_release_name = metadata_name
attempt = False
for error_string in constants.FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS:
if flux_error_message.startswith(error_string):
LOG.info("For helm release {} found a matching error string "
"we can attempt to recover from: {}"
"".format(helm_release_name, error_string))
attempt = True
break
if not attempt:
return attempt, False
try:
helm_release_resource['spec']['suspend'] = False
group, version = helm_release_resource['apiVersion'].split('/')
self._kube.apply_custom_resource(
group,
version,
helm_release_resource['metadata']['namespace'],
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_resource['metadata']['name'],
helm_release_resource
)
secret_list = self._kube.kube_list_secret(namespace)
except Exception as err:
LOG.warning("Failed to patch HelmRelease resource {}: {}"
"".format(helm_release_resource['metadata']['name'], err))
LOG.warning("Failed to get secrets in namespace {}: {}"
"".format(namespace, err))
return attempt, True
recover_list = []
for secret in secret_list:
label = secret.metadata.labels
if not label:
continue
if 'owner' not in label:
continue
if 'status' not in label:
continue
if label['owner'] == 'helm' and \
label['status'] in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS:
LOG.info("Found helm release {} in state {}"
"".format(secret.metadata.name, label['status']))
recover_list.append(secret)
# Force 'failed' status for helm releases
for secret in recover_list:
release_data = helm_utils.decompress_helm_release_data(secret.data['release'])
for status in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS:
release_data = release_data.replace('"status":"{}"'.format(status), '"status":"failed"')
release_data = helm_utils.compress_helm_release_data(release_data)
secret.data['release'] = release_data
try:
self._kube.kube_patch_secret(secret.metadata.name,
secret.metadata.namespace, secret)
except Exception as err:
LOG.warning("Failed to patch secret {} in namespace {}: {}"
"".format(secret.metadata.name,
secret.metadata.namespace, err))
return attempt, True
# Force HelmRelease reconciliation now, saves up to reconciliation
# timeout for the specific resource. Flip suspend True, then False.
try:
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name,
_patch_flux_suspend_true
)
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name,
_patch_flux_suspend_false
)
except Exception:
return attempt, True
# TODO(dvoicule): What if we extract repeated get&patch operation to a generic
# get_patch(<how to obtain resource>, lambda_func_transformation)
return attempt, False
def _check_progress():
@ -1884,10 +1935,16 @@ class AppOperator(object):
if release_status == "False":
# If the helm release failed the app must also be in a
# failed state
err_msg = ":{}".format(msg) if msg else ""
LOG.exception("Application {}: release {}: Failed during {} {}"
.format(app.name, release_name, request, err_msg))
return False
err_msg = "{}".format(msg) if msg else ""
attempt, _ = _recover_from_helm_operation_in_progress_on_app_apply(
metadata_name=release_name,
namespace=chart_obj['namespace'],
flux_error_message=err_msg)
if not attempt:
LOG.exception("Application {}: release {}: Failed during {} :{}"
"".format(app.name, release_name, request, err_msg))
return False
elif release_status == "True":
# Special validation check needed for AIO-SX only, can
# go away once upstream issues are addressed. See method

View File

@ -1,6 +1,6 @@
# sim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2019-2021 Wind River Systems, Inc.
# Copyright (c) 2019-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -9,18 +9,21 @@
"""Helm utilities and helper functions."""
import base64
import os
from eventlet.green import subprocess
import ruamel.yaml as yaml
from oslo_log import log as logging
from sysinv.agent import rpcapi as agent_rpcapi
from sysinv.common import kubernetes
from sysinv.common import exception
from sysinv.openstack.common import context
import tempfile
import threading
import psutil
import retrying
import ruamel.yaml as yaml
import tempfile
import threading
import zlib
from eventlet.green import subprocess
from oslo_log import log as logging
from sysinv.agent import rpcapi as agent_rpcapi
from sysinv.common import exception
from sysinv.common import kubernetes
from sysinv.openstack.common import context
LOG = logging.getLogger(__name__)
@ -321,3 +324,41 @@ def install_helm_chart_with_dry_run(args=None):
timer.cancel()
os.remove(chartfile)
os.rmdir(tmpdir)
def decompress_helm_release_data(release_data):
""" Convert release data to format for applying transformations
:param release_data: Helm release secret data
Format is gzip double base64 encoded
:return: string
"""
release_data = base64.b64decode(release_data)
release_data = base64.b64decode(release_data)
# wbits value needs to specify 16 for gzip header/trailer plus window size.
# Window size needs to be at least the one used for compression
# this set the largest
release_data = zlib.decompress(release_data, wbits=16 + zlib.MAX_WBITS).decode('utf-8')
return str(release_data)
def compress_helm_release_data(release_data):
""" Convert release data to format for storing in cluster
:param release_data: Helm release secret data
:return: string
Format is gzip double base64 encoded
"""
# wbits value of 25 specifies the minimum window size
# and gzip header/trailer.
compressed_object = zlib.compressobj(wbits=25)
release_data = compressed_object.compress(release_data.encode('utf-8'))
release_data += compressed_object.flush()
release_data = base64.b64encode(release_data)
release_data = base64.b64encode(release_data)
release_data = release_data.decode('utf-8')
return release_data