StarlingX System Configuration Management
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
config/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py

4887 lines
219 KiB

# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2018-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# All Rights Reserved.
#
""" System Inventory Kubernetes Application Operator."""
import copy
import docker
from eventlet.green import subprocess
import glob
import grp
import functools
import io
import os
import pkg_resources
import pwd
import random
import re
import ruamel.yaml as yaml
import shutil
import site
import six
from six.moves.urllib.parse import urlparse
import sys
import threading
import time
import zipfile
from collections import namedtuple
from distutils.util import strtobool
from eventlet import greenpool
from eventlet import greenthread
from eventlet import queue
from eventlet import Timeout
from fm_api import constants as fm_constants
from fm_api import fm_api
from oslo_log import log as logging
from oslo_serialization import base64
from sysinv._i18n import _
from sysinv.api.controllers.v1 import kube_app
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import kubernetes
from sysinv.common.retrying import retry
from sysinv.common import utils as cutils
from sysinv.conductor import openstack
from sysinv.helm import base as helm_base
from sysinv.helm import common
from sysinv.helm import utils as helm_utils
from sysinv.helm.lifecycle_constants import LifecycleConstants
from sysinv.helm.lifecycle_hook import LifecycleHookInfo
# Log and config
LOG = logging.getLogger(__name__)
# Constants
APPLY_SEARCH_PATTERN = 'Processing Chart,'
ARMADA_NAMESPACE = 'armada'
ARMADA_APPLICATION = 'armada'
ARMADA_CONTAINER_NAME = 'armada-api'
ARMADA_MANIFEST_APPLY_SUCCESS_MSG = 'Done applying manifest'
ARMADA_RELEASE_ROLLBACK_FAILURE_MSG = 'Error while rolling back tiller release'
CONTAINER_ABNORMAL_EXIT_CODE = 137
DELETE_SEARCH_PATTERN = 'Deleting release|no release to delete'
ROLLBACK_SEARCH_PATTERN = 'Helm rollback of release'
INSTALLATION_TIMEOUT = 3600
MAX_DOWNLOAD_THREAD = 5
MAX_DOWNLOAD_ATTEMPTS = 3
DOWNLOAD_WAIT_BEFORE_RETRY = 15
TARFILE_DOWNLOAD_CONNECTION_TIMEOUT = 60
TARFILE_TRANSFER_CHUNK_SIZE = 1024 * 512
ARMADA_LOG_MAX = 10
ARMADA_HOST_LOG_LOCATION = '/var/log/armada'
ARMADA_CONTAINER_LOG_LOCATION = '/logs'
ARMADA_CONTAINER_TMP = '/tmp'
ARMADA_LOCK_GROUP = 'armada.process'
ARMADA_LOCK_VERSION = 'v1'
ARMADA_LOCK_NAMESPACE = 'kube-system'
ARMADA_LOCK_PLURAL = 'locks'
ARMADA_LOCK_NAME = 'lock'
LOCK_NAME_APP_REAPPLY = 'app_reapply'
LOCK_NAME_PROCESS_APP_METADATA = 'process_app_metadata'
STX_APP_PLUGIN_PATH = '/var/stx_app/plugins'
# Helper functions
def generate_armada_service_manifest_fqpn(app_name, app_version, manifest_filename):
return os.path.join('/manifests', app_name, app_version,
app_name + '-' + manifest_filename)
def generate_install_manifest_fqpn(app_name, app_version, manifest_filename):
return os.path.join(constants.APP_INSTALL_PATH,
app_name, app_version, manifest_filename)
def generate_synced_images_fqpn(app_name, app_version):
return os.path.join(
constants.APP_SYNCED_ARMADA_DATA_PATH, app_name, app_version,
app_name + '-images.yaml')
def generate_synced_helm_overrides_dir(app_name, app_version):
return os.path.join(common.HELM_OVERRIDES_PATH, app_name, app_version)
def generate_synced_app_plugins_dir(app_name, app_version):
return os.path.join(
generate_synced_helm_overrides_dir(app_name, app_version),
'plugins')
def generate_synced_fluxcd_images_fqpn(app_name, app_version):
return os.path.join(
constants.APP_FLUXCD_DATA_PATH, app_name, app_version,
app_name + '-images.yaml')
def create_app_path(path):
uid = pwd.getpwnam(constants.SYSINV_USERNAME).pw_uid
gid = os.getgid()
if not os.path.exists(constants.APP_INSTALL_PATH):
os.makedirs(constants.APP_INSTALL_PATH)
os.chown(constants.APP_INSTALL_PATH, uid, gid)
os.makedirs(path)
os.chown(path, uid, gid)
def get_app_install_root_path_ownership():
uid = os.stat(constants.APP_INSTALL_ROOT_PATH).st_uid
gid = os.stat(constants.APP_INSTALL_ROOT_PATH).st_gid
return (uid, gid)
Chart = namedtuple('Chart', 'metadata_name name namespace location release labels sequenced')
FluxCDChart = namedtuple('FluxCDChart', 'metadata_name name namespace location '
'release chart_os_path chart_label')
class AppOperator(object):
"""Class to encapsulate Kubernetes App operations for System Inventory"""
DOCKER_REGISTRY_SECRET = 'default-registry-key'
# List of in progress apps and their abort status
abort_requested = {}
def __init__(self, dbapi, helm_op, apps_metadata):
self._dbapi = dbapi
self._helm = helm_op
self._apps_metadata = apps_metadata
self._plugins = PluginHelper(self._dbapi, self._helm)
self._fm_api = fm_api.FaultAPIs()
self._docker = DockerHelper(self._dbapi)
self._kube = kubernetes.KubeOperator()
self._utils = kube_app.KubeAppHelper(self._dbapi)
self._image = AppImageParser()
self._lock = threading.Lock()
self._armada = ArmadaHelper(self._kube)
self._fluxcd = FluxCDHelper(self._dbapi, self._kube)
if not os.path.isfile(constants.ANSIBLE_BOOTSTRAP_FLAG):
self._clear_stuck_applications()
# Audit discoverable app plugins to remove any stale plugins that may
# have been removed since this host was last tasked to manage
# applications
self._plugins.audit_plugins()
def activate_app_plugins(self, rpc_app):
app = AppOperator.Application(rpc_app)
self._plugins.activate_plugins(app)
def deactivate_app_plugins(self, rpc_app):
app = AppOperator.Application(rpc_app)
self._plugins.deactivate_plugins(app)
def app_has_system_plugins(self, rpc_app):
app = AppOperator.Application(rpc_app)
return app.system_app
def _clear_stuck_applications(self):
apps = self._dbapi.kube_app_get_all()
for app in apps:
if app.status in [constants.APP_UPLOAD_IN_PROGRESS,
constants.APP_APPLY_IN_PROGRESS,
constants.APP_UPDATE_IN_PROGRESS,
constants.APP_RECOVER_IN_PROGRESS,
constants.APP_REMOVE_IN_PROGRESS]:
self._abort_operation(app, app.status, reset_status=True)
else:
continue
# Delete the Armada locks that might have been acquired previously
# for a fresh start. This guarantees that a re-apply, re-update or
# a re-remove attempt following a status reset will not fail due
# to a lock related issue.
self._armada.clear_armada_locks()
def _raise_app_alarm(self, app_name, app_action, alarm_id, severity,
reason_text, alarm_type, repair_action,
service_affecting):
entity_instance_id = "%s=%s" % (fm_constants.FM_ENTITY_TYPE_APPLICATION,
app_name)
app_alarms = self._fm_api.get_faults(entity_instance_id)
if app_alarms:
if ((app_action == constants.APP_APPLY_FAILURE and
app_alarms[0].alarm_id ==
fm_constants.FM_ALARM_ID_APPLICATION_APPLY_FAILED) or
(app_action == constants.APP_UPLOAD_FAILURE and
app_alarms[0].alarm_id ==
fm_constants.FM_ALARM_ID_APPLICATION_UPLOAD_FAILED) or
(app_action == constants.APP_REMOVE_FAILURE and
app_alarms[0].alarm_id ==
fm_constants.FM_ALARM_ID_APPLICATION_REMOVE_FAILED) or
(app_action == constants.APP_APPLY_IN_PROGRESS and
app_alarms[0].alarm_id ==
fm_constants.FM_ALARM_ID_APPLICATION_APPLYING) or
(app_action == constants.APP_UPDATE_IN_PROGRESS and
app_alarms[0].alarm_id ==
fm_constants.FM_ALARM_ID_APPLICATION_UPDATING)):
# The same alarm was raised before, will re-raise to set the
# latest timestamp.
pass
else:
# Clear existing alarm for this app if it differs than the one to
# be raised.
self._fm_api.clear_fault(app_alarms[0].alarm_id,
app_alarms[0].entity_instance_id)
fault = fm_api.Fault(
alarm_id=alarm_id,
alarm_state=fm_constants.FM_ALARM_STATE_SET,
entity_type_id=fm_constants.FM_ENTITY_TYPE_APPLICATION,
entity_instance_id=entity_instance_id,
severity=severity,
reason_text=reason_text,
alarm_type=alarm_type,
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_UNKNOWN,
proposed_repair_action=repair_action,
service_affecting=service_affecting)
self._fm_api.set_fault(fault)
def _clear_app_alarm(self, app_name):
entity_instance_id = "%s=%s" % (fm_constants.FM_ENTITY_TYPE_APPLICATION,
app_name)
app_alarms = self._fm_api.get_faults(entity_instance_id)
if app_alarms:
# There can only exist one alarm per app
self._fm_api.clear_fault(app_alarms[0].alarm_id,
app_alarms[0].entity_instance_id)
def _register_app_abort(self, app_name):
with self._lock:
AppOperator.abort_requested[app_name] = False
LOG.info("Register the initial abort status of app %s" % app_name)
def _deregister_app_abort(self, app_name):
with self._lock:
try:
del AppOperator.abort_requested[app_name]
except KeyError:
pass
LOG.info("Deregister the abort status of app %s" % app_name)
@staticmethod
def is_app_aborted(app_name):
try:
return AppOperator.abort_requested[app_name]
except KeyError:
return False
def _set_abort_flag(self, app_name):
with self._lock:
AppOperator.abort_requested[app_name] = True
LOG.info("Abort set for app %s" % app_name)
def _cleanup(self, app, app_dir=True):
"""" Remove application directories and override files """
self._plugins.uninstall_plugins(app)
try:
if os.path.exists(app.sync_overrides_dir):
shutil.rmtree(app.sync_overrides_dir)
if app_dir:
shutil.rmtree(os.path.dirname(
app.sync_overrides_dir))
if os.path.exists(app.sync_armada_mfile_dir):
shutil.rmtree(app.sync_armada_mfile_dir)
if app_dir:
shutil.rmtree(os.path.dirname(
app.sync_armada_mfile_dir))
if os.path.exists(app.inst_path):
shutil.rmtree(app.inst_path)
if app_dir:
shutil.rmtree(os.path.dirname(
app.inst_path))
if app.is_fluxcd_app:
if os.path.exists(app.sync_fluxcd_manifest_dir):
shutil.rmtree(app.sync_fluxcd_manifest_dir)
if app_dir:
shutil.rmtree(os.path.dirname(
app.sync_fluxcd_manifest_dir))
except OSError as e:
LOG.error(e)
raise
def _update_app_status(self, app, new_status=None, new_progress=None):
""" Persist new app status """
if new_status is None:
new_status = app.status
with self._lock:
app.update_status(new_status, new_progress)
def _abort_operation(self, app, operation,
progress=constants.APP_PROGRESS_ABORTED,
user_initiated=False, reset_status=False,
forced_operation=False):
if user_initiated:
progress = constants.APP_PROGRESS_ABORTED_BY_USER
if app.status == constants.APP_UPLOAD_IN_PROGRESS:
new_status = constants.APP_UPLOAD_FAILURE
op = 'application-upload'
self._raise_app_alarm(
app.name, constants.APP_UPLOAD_FAILURE,
fm_constants.FM_ALARM_ID_APPLICATION_UPLOAD_FAILED,
fm_constants.FM_ALARM_SEVERITY_WARNING,
_("Application Upload Failure"),
fm_constants.FM_ALARM_TYPE_3,
_("Check system inventory log for cause."),
False)
elif (app.status == constants.APP_APPLY_IN_PROGRESS or
app.status == constants.APP_UPDATE_IN_PROGRESS or
app.status == constants.APP_RECOVER_IN_PROGRESS):
new_status = constants.APP_APPLY_FAILURE
if reset_status:
if app.status == constants.APP_APPLY_IN_PROGRESS:
op = 'application-apply'
else:
op = 'application-update'
if app.name in self._apps_metadata[
constants.APP_METADATA_PLATFORM_MANAGED_APPS].keys():
# For platform core apps, set the new status
# to 'uploaded'. The audit task will kick in with
# all its pre-requisite checks before reapplying.
new_status = constants.APP_UPLOAD_SUCCESS
self._clear_app_alarm(app.name)
if (not reset_status or
app.name not in self._apps_metadata[
constants.APP_METADATA_PLATFORM_MANAGED_APPS].keys()):
self._raise_app_alarm(
app.name, constants.APP_APPLY_FAILURE,
fm_constants.FM_ALARM_ID_APPLICATION_APPLY_FAILED,
fm_constants.FM_ALARM_SEVERITY_MAJOR,
_("Application Apply Failure"),
fm_constants.FM_ALARM_TYPE_3,
_("Retry applying the application. If the issue persists, "
"please check system inventory log for cause."),
True)
elif app.status == constants.APP_REMOVE_IN_PROGRESS:
op = 'application-remove'
if not forced_operation:
new_status = constants.APP_REMOVE_FAILURE
self._raise_app_alarm(
app.name, constants.APP_REMOVE_FAILURE,
fm_constants.FM_ALARM_ID_APPLICATION_REMOVE_FAILED,
fm_constants.FM_ALARM_SEVERITY_MAJOR,
_("Application Remove Failure"),
fm_constants.FM_ALARM_TYPE_3,
_("Retry removing the application. If the issue persists, "
"please check system inventory log for cause. "
"Using --force will set the app status to 'uploaded' "
"in case the error persists."),
True)
else:
# In case there is an existing alarm for previous remove failure
self._clear_app_alarm(app.name)
new_status = constants.APP_UPLOAD_SUCCESS
progress = constants.APP_PROGRESS_REMOVE_FAILED_WARNING.format(new_status)
LOG.warning(progress)
else:
# Should not get here, perhaps a new status was introduced?
LOG.error("No abort handling code for app status = '%s'!" % app.status)
return
if not reset_status:
self._update_app_status(app, new_status, progress)
if not user_initiated:
LOG.error("Application %s aborted!." % operation)
else:
LOG.info("Application %s aborted by user!." % operation)
else:
LOG.info("Resetting status of app %s from '%s' to '%s' " %
(app.name, app.status, new_status))
error_msg = "Unexpected process termination while " + op +\
" was in progress. The application status " +\
"has changed from \'" + app.status +\
"\' to \'" + new_status + "\'."
values = {'progress': error_msg, 'status': new_status}
self._dbapi.kube_app_update(app.id, values)
def _download_tarfile(self, app):
from six.moves.urllib.request import urlopen
from six.moves.urllib.error import HTTPError
from six.moves.urllib.error import URLError
from socket import timeout as socket_timeout
from six.moves.urllib.parse import urlsplit
def _handle_download_failure(reason):
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason=reason)
try:
remote_file = urlopen(
app.tarfile, timeout=TARFILE_DOWNLOAD_CONNECTION_TIMEOUT)
try:
remote_filename = remote_file.info()['Content-Disposition']
except KeyError:
remote_filename = os.path.basename(
urlsplit(remote_file.url).path)
filename_avail = True if (remote_filename is None or
remote_filename == '') else False
if filename_avail:
if (not remote_filename.endswith('.tgz') and
not remote_filename.endswith('.tar.gz')):
reason = app.tarfile + ' has unrecognizable tar file ' + \
'extension. Supported extensions are: .tgz and .tar.gz.'
_handle_download_failure(reason)
return None
filename = '/tmp/' + remote_filename
else:
filename = '/tmp/' + app.name + '.tgz'
with open(filename, 'w') as dest:
shutil.copyfileobj(remote_file, dest, TARFILE_TRANSFER_CHUNK_SIZE)
return filename
except HTTPError as err:
LOG.error(err)
reason = 'failed to download tarfile ' + app.tarfile + \
', error code = ' + str(err.code)
_handle_download_failure(reason)
except URLError as err:
LOG.error(err)
reason = app.tarfile + ' is unreachable.'
_handle_download_failure(reason)
except shutil.Error as err:
LOG.error(err)
err_file = os.path.basename(filename) if filename_avail else app.tarfile
reason = 'failed to process tarfile ' + err_file
_handle_download_failure(reason)
except socket_timeout as e:
LOG.error(e)
reason = 'failed to download tarfile ' + app.tarfile + \
', connection timed out.'
_handle_download_failure(reason)
def _extract_tarfile(self, app):
def _handle_extract_failure(
reason='failed to extract tarfile content.'):
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason=reason)
orig_uid, orig_gid = get_app_install_root_path_ownership()
try:
if app.is_fluxcd_app:
# One time set up of fluxcd manifest path for the system
if not os.path.isdir(constants.APP_FLUXCD_DATA_PATH):
os.makedirs(constants.APP_FLUXCD_DATA_PATH)
if not os.path.isdir(app.sync_fluxcd_manifest_dir):
os.makedirs(app.sync_fluxcd_manifest_dir)
else:
# One time set up of base armada manifest path for the system
if not os.path.isdir(constants.APP_SYNCED_ARMADA_DATA_PATH):
os.makedirs(constants.APP_SYNCED_ARMADA_DATA_PATH)
if not os.path.isdir(app.sync_armada_mfile_dir):
os.makedirs(app.sync_armada_mfile_dir)
if not os.path.isdir(app.inst_path):
create_app_path(app.inst_path)
# Temporarily change /scratch group ownership to sys_protected
os.chown(constants.APP_INSTALL_ROOT_PATH, orig_uid,
grp.getgrnam(constants.SYSINV_SYSADMIN_GRPNAME).gr_gid)
# Extract the tarfile as sysinv user
if not cutils.extract_tarfile(app.inst_path, app.tarfile, demote_user=True):
_handle_extract_failure()
if app.downloaded_tarfile:
name, version, patches = self._utils._verify_metadata_file(
app.inst_path, app.name, app.version)
if (name != app.name or version != app.version):
# Save the official application info. They will be
# persisted in the next status update
app.regenerate_application_info(name, version, patches)
if not cutils.verify_checksum(app.inst_path):
_handle_extract_failure('checksum validation failed.')
mname, manifest = self._utils._find_manifest(app.inst_path, app.name)
# Save the official manifest file info. They will be persisted
# in the next status update
app.regenerate_manifest_filename(mname, os.path.basename(manifest))
else:
name, version, patches = cutils.find_metadata_file(
app.inst_path, constants.APP_METADATA_FILE)
app.patch_dependencies = patches
self._utils._extract_helm_charts(app.inst_path)
except exception.SysinvException as e:
_handle_extract_failure(str(e))
except OSError as e:
LOG.error(e)
_handle_extract_failure()
finally:
os.chown(constants.APP_INSTALL_ROOT_PATH, orig_uid, orig_gid)
def get_image_tags_by_charts(self, app):
""" Mine the image tags for charts from the images file. Add the
image tags to the manifest file if the image tags from the
charts do not exist in the manifest file. Convert the image
tags in in both override files and manifest file. Intended
for both system and custom apps.
The image tagging conversion(local docker registry address prepended):
${LOCAL_REGISTRY_SERVER}:${REGISTRY_PORT}/<image-name>
(ie..registry.local:9001/docker.io/mariadb:10.2.13)
"""
if app.is_fluxcd_app:
return self._get_image_tags_by_charts_fluxcd(app.sync_imgfile,
app.sync_fluxcd_manifest,
app.sync_overrides_dir)
else:
return self._get_image_tags_by_charts_armada(app.sync_imgfile,
app.sync_armada_mfile,
app.sync_overrides_dir)
def _get_image_tags_by_charts_fluxcd(self, app_images_file, manifest, overrides_dir):
app_imgs = []
images_file = None
if os.path.exists(app_images_file):
with io.open(app_images_file, 'r', encoding='utf-8') as f:
images_file = yaml.safe_load(f)
helmrepo_path = os.path.join(manifest, "base", "helmrepository.yaml")
root_kustomization_path = os.path.join(
manifest, constants.APP_ROOT_KUSTOMIZE_FILE)
for f in (helmrepo_path, root_kustomization_path):
if not os.path.isfile(f):
raise exception.SysinvException(_(
"Mandatory FluxCD yaml file doesn't exist "
"%s" % helmrepo_path))
# get namespace
with io.open(root_kustomization_path, 'r', encoding='utf-8') as f:
root_kustomization_yaml = next(yaml.safe_load_all(f))
global_namespace = root_kustomization_yaml["namespace"]
charts_groups = root_kustomization_yaml["resources"]
for chart_group in charts_groups:
if chart_group != "base":
chart_path = os.path.join(manifest, chart_group)
helmrelease_path = os.path.join(chart_path, "helmrelease.yaml")
chart_kustomization_path = os.path.join(chart_path, "kustomization.yaml")
if not os.path.isfile(chart_kustomization_path) or \
not os.path.isfile(helmrelease_path):
continue
with io.open(chart_kustomization_path, 'r', encoding='utf-8') as f:
chart_kustomization_yaml = next(yaml.safe_load_all(f))
chart_namespace = chart_kustomization_yaml.get("namespace", global_namespace)
with io.open(helmrelease_path, 'r', encoding='utf-8') as f:
helmrelease_yaml = next(yaml.safe_load_all(f))
chart_name = helmrelease_yaml["metadata"]["name"]
# Get the image tags by chart from the images file
helm_chart_imgs = {}
if images_file and chart_name in images_file:
helm_chart_imgs = images_file[chart_name]
# Get the image tags from the chart overrides file
overrides = chart_namespace + '-' + chart_name + '.yaml'
app_overrides_file = os.path.join(overrides_dir, overrides)
overrides_file = {}
if os.path.exists(app_overrides_file):
with io.open(app_overrides_file, 'r', encoding='utf-8') as f:
overrides_file = yaml.safe_load(f)
override_imgs = self._image.find_images_in_dict(
overrides_file.get('data', {}).get('values', {}))
override_imgs_copy = copy.deepcopy(override_imgs)
# Get the image tags from the fluxcd static overrides file
static_overrides_path = None
if "valuesFrom" not in helmrelease_yaml["spec"]:
raise exception.SysinvException(_(
"FluxCD app chart doesn't have overrides files "
"defined in helmrelease.yaml"
"%s" % chart_name))
for override_file in helmrelease_yaml["spec"]["valuesFrom"]:
if override_file["valuesKey"].endswith("static-overrides.yaml"):
static_overrides_path = os.path.join(chart_path,
override_file["valuesKey"])
if not static_overrides_path or \
not os.path.isfile(static_overrides_path):
raise exception.SysinvException(_(
"FluxCD app chart static overrides file doesn't exist "
"%s" % chart_name))
with io.open(static_overrides_path, 'r', encoding='utf-8') as f:
static_overrides_file = yaml.safe_load(f) or {}
# get the image tags from the static overrides file
static_overrides_imgs = self._image.find_images_in_dict(static_overrides_file)
static_overrides_imgs_copy = copy.deepcopy(static_overrides_imgs)
static_overrides_imgs = self._image.merge_dict(helm_chart_imgs, static_overrides_imgs)
# Update image tags with local registry prefix
override_imgs = self._image.update_images_with_local_registry(override_imgs)
static_overrides_imgs = self._image.update_images_with_local_registry(static_overrides_imgs)
# Generate a list of required images by chart
download_imgs = copy.deepcopy(static_overrides_imgs)
download_imgs = self._image.merge_dict(download_imgs, override_imgs)
download_imgs_list = self._image.generate_download_images_list(download_imgs, [])
app_imgs.extend(download_imgs_list)
# Update chart override file if needed
if override_imgs != override_imgs_copy:
with open(app_overrides_file, 'w') as f:
try:
overrides_file['data']['values'] = self._image.merge_dict(
overrides_file['data']['values'], override_imgs)
yaml.safe_dump(overrides_file, f, default_flow_style=False)
LOG.info("Overrides file %s updated with new image tags" %
app_overrides_file)
except (TypeError, KeyError):
LOG.error("Overrides file %s fails to update" %
app_overrides_file)
# Update static overrides if needed
if static_overrides_imgs != static_overrides_imgs_copy:
static_overrides_to_dump = self._image.merge_dict(static_overrides_file,
static_overrides_imgs)
with io.open(static_overrides_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(static_overrides_to_dump, f, default_flow_style=False)
return list(set(app_imgs))
def _get_image_tags_by_charts_armada(self, app_images_file, app_manifest_file, overrides_dir):
app_imgs = []
images_file = None
manifest_update_required = False
if os.path.exists(app_images_file):
with io.open(app_images_file, 'r', encoding='utf-8') as f:
images_file = yaml.safe_load(f)
if os.path.exists(app_manifest_file):
with io.open(app_manifest_file, 'r', encoding='utf-8') as f:
# The RoundTripLoader removes the superfluous quotes by default,
# resulting the dumped out charts not readable in Armada.
# Set preserve_quotes=True to preserve all the quotes.
charts = list(yaml.load_all(
f, Loader=yaml.RoundTripLoader, preserve_quotes=True))
for chart in charts:
if "armada/Chart/" in chart['schema']:
chart_data = chart['data']
chart_name = chart_data['chart_name']
chart_namespace = chart_data['namespace']
# Get the image tags by chart from the images file
helm_chart_imgs = {}
if images_file and chart_name in images_file:
helm_chart_imgs = images_file[chart_name]
# Get the image tags from the chart overrides file
overrides = chart_namespace + '-' + chart_name + '.yaml'
app_overrides_file = os.path.join(overrides_dir, overrides)
overrides_file = {}
if os.path.exists(app_overrides_file):
with io.open(app_overrides_file, 'r', encoding='utf-8') as f:
overrides_file = yaml.safe_load(f)
override_imgs = self._image.find_images_in_dict(
overrides_file.get('data', {}).get('values', {}))
override_imgs_copy = copy.deepcopy(override_imgs)
# Get the image tags from the armada manifest file
armada_chart_imgs = self._image.find_images_in_dict(
chart_data.get('values', {}))
armada_chart_imgs_copy = copy.deepcopy(armada_chart_imgs)
armada_chart_imgs = self._image.merge_dict(helm_chart_imgs, armada_chart_imgs)
# Update image tags with local registry prefix
override_imgs = self._image.update_images_with_local_registry(override_imgs)
armada_chart_imgs = self._image.update_images_with_local_registry(armada_chart_imgs)
# Generate a list of required images by chart
download_imgs = copy.deepcopy(armada_chart_imgs)
download_imgs = self._image.merge_dict(download_imgs, override_imgs)
download_imgs_list = self._image.generate_download_images_list(download_imgs, [])
app_imgs.extend(download_imgs_list)
# Update chart override file if needed
if override_imgs != override_imgs_copy:
with open(app_overrides_file, 'w') as f:
try:
overrides_file['data']['values'] = self._image.merge_dict(
overrides_file['data']['values'], override_imgs)
yaml.safe_dump(overrides_file, f, default_flow_style=False)
LOG.info("Overrides file %s updated with new image tags" %
app_overrides_file)
except (TypeError, KeyError):
LOG.error("Overrides file %s fails to update" %
app_overrides_file)
# Update armada chart if needed
if armada_chart_imgs != armada_chart_imgs_copy:
# This is to convert a empty orderedDict to dict
if 'values' in chart_data:
if not chart_data['values']:
chart_data['values'] = {}
chart_data['values'] = self._image.merge_dict(
chart_data.get('values', {}), armada_chart_imgs)
manifest_update_required = True
# Update manifest file if needed
if manifest_update_required:
with open(app_manifest_file, 'w') as f:
try:
yaml.dump_all(charts, f, Dumper=yaml.RoundTripDumper,
explicit_start=True, default_flow_style=False)
LOG.info("Manifest file %s updated with new image tags" %
app_manifest_file)
except Exception as e:
LOG.error("Manifest file %s fails to update with "
"new image tags: %s" % (app_manifest_file, e))
return list(set(app_imgs))
def _register_embedded_images(self, app):
"""
TODO(tngo): When we're ready to support air-gap scenario and private
images, the following need to be done:
a. load the embedded images
b. tag and push them to the docker registery on the controller
c. find image tag IDs in each chart and replace their values with
new tags. Alternatively, document the image tagging convention
${LOCAL_REGISTRY_SERVER}:${REGISTRY_PORT}/<image-name>
(e.g. registry.local:9001/prom/mysqld-exporter)
to be referenced in the application Helm charts.
"""
raise exception.KubeAppApplyFailure(
name=app.name,
version=app.version,
reason="embedded images are not yet supported.")
def _save_images_list(self, app):
# Extract the list of images from the charts and overrides where
# applicable. Save the list to the same location as the armada manifest
# so it can be sync'ed.
app.charts = self._get_list_of_charts(app)
self._plugins.activate_plugins(app)
LOG.info("Generating application overrides to discover required images.")
self._helm.generate_helm_application_overrides(
app.sync_overrides_dir, app.name, mode=None, cnamespace=None,
armada_format=True, chart_info=app.charts, combined=True,
is_fluxcd_app=app.is_fluxcd_app)
self._plugins.deactivate_plugins(app)
self._save_images_list_by_charts(app)
# Get the list of images from the updated images overrides
images_to_download = self.get_image_tags_by_charts(app)
if not images_to_download:
# TODO(tngo): We may want to support the deployment of apps that
# set up resources only in the future. In which case, generate
# an info log and let it advance to the next step.
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason="charts specify no docker images.")
with open(app.sync_imgfile, 'a') as f:
yaml.safe_dump({"download_images": images_to_download}, f,
default_flow_style=False)
def _save_images_list_by_charts(self, app):
# Mine the images from values.yaml files in the charts directory.
# The list of images for each chart are saved to the images file.
images_by_charts = {}
for chart in app.charts:
chart_name = os.path.join(app.inst_charts_dir, chart.name)
if not os.path.exists(chart_name):
# If the helm chart name is not the same as the armada
# chart name in the manifest, try using the source
# to find the chart directory.
try:
# helm charts should be of the standard format:
# <chartname>-X.X.X.tgz
url_path = os.path.basename(urlparse(chart.location).path)
# strip the .tgz
chart_and_version = re.sub('\.tgz$', '', url_path)
# strip the version
chart_name_no_version = re.sub('-(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)',
'', chart_and_version)
chart_name = os.path.join(app.inst_charts_dir, chart_name_no_version)
except Exception as e:
LOG.info("Cannot parse chart path: %s" % e)
pass
chart_path = os.path.join(chart_name, 'values.yaml')
if os.path.exists(chart_path):
with io.open(chart_path, 'r', encoding='utf-8') as f:
y = yaml.safe_load(f)
chart_images = self._image.find_images_in_dict(y)
if chart_images:
images_by_charts.update({chart.name: chart_images})
with open(app.sync_imgfile, 'w') as f:
yaml.safe_dump(images_by_charts, f, explicit_start=True,
default_flow_style=False)
def _retrieve_images_list(self, app_images_file):
with io.open(app_images_file, 'r', encoding='utf-8') as f:
images_list = yaml.safe_load(f)
return images_list
def download_images(self, app):
if os.path.isdir(app.inst_images_dir):
return self._register_embedded_images(app)
if app.system_app:
# Some images could have been overwritten via user overrides
# between upload and apply, or between applies. Refresh the
# saved images list.
saved_images_list = self._retrieve_images_list(app.sync_imgfile)
saved_download_images_list = list(saved_images_list.get("download_images"))
images_to_download = self.get_image_tags_by_charts(app)
if set(saved_download_images_list) != set(images_to_download):
saved_images_list.update({"download_images": images_to_download})
with open(app.sync_imgfile, 'w') as f:
yaml.safe_dump(saved_images_list, f, explicit_start=True,
default_flow_style=False)
else:
images_to_download = self._retrieve_images_list(
app.sync_imgfile).get("download_images")
total_count = len(images_to_download)
threads = min(MAX_DOWNLOAD_THREAD, total_count)
start = time.time()
try:
registries_info = self._docker.retrieve_specified_registries()
except Exception as e:
raise exception.KubeAppApplyFailure(
name=app.name,
version=app.version,
reason=str(e))
for idx in reversed(range(MAX_DOWNLOAD_ATTEMPTS)):
pool = greenpool.GreenPool(size=threads)
for tag, success in pool.imap(
functools.partial(self._docker.download_an_image,
app.name,
registries_info),
images_to_download):
if success:
continue
if AppOperator.is_app_aborted(app.name):
raise exception.KubeAppApplyFailure(
name=app.name,
version=app.version,
reason="operation aborted by user.")
else:
LOG.info("Failed to download image: %s", tag)
break
else:
elapsed = time.time() - start
LOG.info("All docker images for application %s were successfully "
"downloaded in %d seconds", app.name, elapsed)
break
# don't sleep after last download attempt
if idx:
# Exponential backoff, the wait time = 15s *2**retry_times + random
# between 0-15s, e.g.:
# 1st retry: 15*2**1 + random, max wait time 45s,
# 2nd retry: 15*2**2 + random, max wait time 75s,
# The current max_wait_time: 15*2**3+15=135s
# NOTE(yuxing): the wait time will increase if we add more retries
wait_before_retry = \
DOWNLOAD_WAIT_BEFORE_RETRY * 2 ** (MAX_DOWNLOAD_ATTEMPTS - idx + 1) \
+ random.uniform(0, DOWNLOAD_WAIT_BEFORE_RETRY)
LOG.info("Retry docker images download for application %s "
"after %d seconds", app.name, wait_before_retry)
time.sleep(wait_before_retry)
else:
raise exception.KubeAppApplyFailure(
name=app.name,
version=app.version,
reason=constants.APP_PROGRESS_IMAGES_DOWNLOAD_FAILED)
def _validate_helm_charts(self, app):
failed_charts = []
for r, f in cutils.get_files_matching(app.inst_charts_dir, 'Chart.yaml'):
# Eliminate redundant validation for system app
if app.system_app and '/charts/helm-toolkit' in r:
continue
try:
output = subprocess.check_output( # pylint: disable=not-callable
['helm', 'lint', r], universal_newlines=True)
if "linted, 0 chart(s) failed" in output:
LOG.info("Helm chart %s validated" % os.path.basename(r))
else:
LOG.error("Validation failed for helm chart %s" %
os.path.basename(r))
failed_charts.append(r)
except Exception as e:
raise exception.KubeAppUploadFailure(
name=app.name, version=app.version, reason=str(e))
if len(failed_charts) > 0:
raise exception.KubeAppUploadFailure(
name=app.name, version=app.version, reason="one or more charts failed validation.")
def _get_chart_data_from_metadata(self, app):
"""Get chart related data from application metadata
This extracts the helm repo from the application metadata where the
chart should be loaded.
This also returns the list of charts that are disabled by default.
:param app: application
"""
repo = common.HELM_REPO_FOR_APPS
disabled_charts = []
lfile = os.path.join(app.inst_path, constants.APP_METADATA_FILE)
if os.path.exists(lfile) and os.path.getsize(lfile) > 0:
with io.open(lfile, 'r', encoding='utf-8') as f:
try:
y = yaml.safe_load(f)
repo = y.get('helm_repo', common.HELM_REPO_FOR_APPS)
disabled_charts = y.get('disabled_charts', [])
except KeyError:
pass
LOG.info("Application %s (%s) will load charts to chart repo %s" % (
app.name, app.version, repo))
LOG.info("Application %s (%s) will disable charts %s by default" % (
app.name, app.version, disabled_charts))
return (repo, disabled_charts)
def _upload_helm_charts(self, app):
# Set env path for helm-upload execution
env = os.environ.copy()
env['PATH'] = '/usr/local/sbin:' + env['PATH']
charts = [os.path.join(r, f)
for r, f in cutils.get_files_matching(app.inst_charts_dir, '.tgz')]
orig_uid, orig_gid = get_app_install_root_path_ownership()
(helm_repo, disabled_charts) = self._get_chart_data_from_metadata(app)
try:
# Temporarily change /scratch group ownership to sys_protected
os.chown(constants.APP_INSTALL_ROOT_PATH, orig_uid,
grp.getgrnam(constants.SYSINV_SYSADMIN_GRPNAME).gr_gid)
with open(os.devnull, "w") as fnull:
for chart in charts:
subprocess.check_call(['helm-upload', helm_repo, chart], # pylint: disable=not-callable
env=env, stdout=fnull, stderr=fnull)
LOG.info("Helm chart %s uploaded" % os.path.basename(chart))
# Make sure any helm repo changes are reflected for the users
helm_utils.refresh_helm_repo_information()
except Exception as e:
raise exception.KubeAppUploadFailure(
name=app.name, version=app.version, reason=str(e))
finally:
os.chown(constants.APP_INSTALL_ROOT_PATH, orig_uid, orig_gid)
# For system applications with plugin support, establish user override
# entries and disable charts based on application metadata.
self._plugins.activate_plugins(app)
db_app = self._dbapi.kube_app_get(app.name)
app_ns = self._helm.get_helm_application_namespaces(db_app.name)
for chart, namespaces in six.iteritems(app_ns):
for namespace in namespaces:
try:
db_chart = self._dbapi.helm_override_get(
db_app.id, chart, namespace)
except exception.HelmOverrideNotFound:
# Create it
try:
db_chart = self._dbapi.helm_override_create(
{'app_id': db_app.id, 'name': chart,
'namespace': namespace})
except Exception as e:
LOG.exception(e)
# Since we are uploading a fresh application. Ensure that
# charts are disabled based on metadata
system_overrides = db_chart.system_overrides
system_overrides.update({common.HELM_CHART_ATTR_ENABLED:
chart not in disabled_charts})
try:
self._dbapi.helm_override_update(
db_app.id, chart, namespace, {'system_overrides':
system_overrides})
except exception.HelmOverrideNotFound:
LOG.exception("Helm Override Not Found")
self._plugins.deactivate_plugins(app)
def _validate_labels(self, labels):
expr = re.compile(r'[a-z0-9]([-a-z0-9]*[a-z0-9])')
for label in labels:
if not expr.match(label):
return False
return True
def _update_kubernetes_labels(self, hostname, label_dict):
body = {
'metadata': {
'labels': {}
}
}
body['metadata']['labels'].update(label_dict)
if (common.LABEL_COMPUTE_LABEL in label_dict and
label_dict[common.LABEL_COMPUTE_LABEL] is None):
host = self.dbapi.ihost_get_by_hostname(hostname)
app_isolated_cpus = helm_base._get_host_cpu_list(host,
function=constants.ISOLATED_FUNCTION,
threads=True)
vswitch_cpus = helm_base._get_host_cpu_list(host,
function=constants.VSWITCH_FUNCTION,
threads=True)
if len(app_isolated_cpus) > 0 and len(vswitch_cpus) > 0:
raise exception.SysinvException(_(
"Failed to update kubernetes labels:"
" Only compute nodes may have application-isolated cores"
" and vswitch cores at the same time."))
try:
self._kube.kube_patch_node(hostname, body)
except exception.KubeNodeNotFound:
pass
def _assign_host_labels(self, hosts, labels):
for host in hosts:
if host.administrative != constants.ADMIN_LOCKED:
continue
for label_str in labels:
k, v = label_str.split('=')
try:
self._dbapi.label_create(
host.id, {'host_id': host.id,
'label_key': k,
'label_value': v})
except exception.HostLabelAlreadyExists:
pass
label_dict = {k: v for k, v in (i.split('=') for i in labels)}
try:
self._update_kubernetes_labels(host.hostname, label_dict)
except Exception as e:
LOG.exception(e)
def _find_label(self, host_uuid, label_str):
host_labels = self._dbapi.label_get_by_host(host_uuid)
for label_obj in host_labels:
if label_str == label_obj.label_key + '=' + label_obj.label_value:
return label_obj
return None
def _remove_host_labels(self, hosts, labels):
for host in hosts:
if host.administrative != constants.ADMIN_LOCKED:
continue
null_labels = {}
for label_str in labels:
lbl_obj = self._find_label(host.uuid, label_str)
if lbl_obj:
self._dbapi.label_destroy(lbl_obj.uuid)
key = lbl_obj.label_key
null_labels[key] = None
if null_labels:
try:
self._update_kubernetes_labels(host.hostname, null_labels)
except Exception as e:
LOG.exception(e)
def audit_local_registry_secrets(self, context, username=None):
"""
local registry uses admin's username&password for authentication.
K8s stores the authentication info in secrets in order to access
local registry, while admin's password is saved in keyring.
Admin's password could be changed by openstack client cmd outside of
sysinv and K8s. It will cause info mismatch between keyring and
k8s's secrets, and leads to authentication failure.
There are two ways to keep k8s's secrets updated with data in keyring:
1. Polling. Use a periodic task to sync info from keyring to secrets.
2. Notification. Keystone send out notification when there is password
update, and notification receiver to do the data sync.
To ensure k8s's secrets are timely and always synced with keyring, both
methods are used here. And this function will be called in both cases
to audit password info between keyring and registry-local-secret, and
update keyring's password to all local registry secrets if need.
"""
# Use lock to synchronize call from timer and notification
lock_name = "AUDIT_LOCAL_REGISTRY_SECRETS"
@cutils.synchronized(lock_name, external=False)
def _sync_audit_local_registry_secrets(self):
try:
secret = self._kube.kube_get_secret("registry-local-secret", kubernetes.NAMESPACE_KUBE_SYSTEM)
if secret is None:
return
secret_auth_body = base64.decode_as_text(secret.data['.dockerconfigjson'])
secret_auth_info = (secret_auth_body.split('auth":')[1]).split('"')[1]
registry_auth = cutils.get_local_docker_registry_auth()
registry_auth_info = '{0}:{1}'.format(registry_auth['username'],
registry_auth['password'])
if secret_auth_info == base64.encode_as_text(registry_auth_info):
LOG.debug("Auth info is the same, no update is needed for k8s secret.")
return
except Exception as e:
LOG.error(e)
return
try:
# update secret with new auth info
token = '{{\"auths\": {{\"{0}\": {{\"auth\": \"{1}\"}}}}}}'.format(
constants.DOCKER_REGISTRY_SERVER, base64.encode_as_text(registry_auth_info))
secret.data['.dockerconfigjson'] = base64.encode_as_text(token)
self._kube.kube_patch_secret("registry-local-secret", kubernetes.NAMESPACE_KUBE_SYSTEM, secret)
LOG.info("Secret registry-local-secret under Namespace kube-system is updated")
except Exception as e:
LOG.error("Failed to update Secret %s under Namespace kube-system: %s"
% ("registry-local-secret", e))
return
# update "default-registry-key" secret info under all namespaces
try:
ns_list = self._kube.kube_get_namespace_name_list()
for ns in ns_list:
secret = self._kube.kube_get_secret(AppOperator.DOCKER_REGISTRY_SECRET, ns)
if secret is None:
continue
try:
secret_auth_body = base64.decode_as_text(secret.data['.dockerconfigjson'])
if constants.DOCKER_REGISTRY_SERVER in secret_auth_body:
secret.data['.dockerconfigjson'] = base64.encode_as_text(token)
self._kube.kube_patch_secret(AppOperator.DOCKER_REGISTRY_SECRET, ns, secret)
LOG.info("Secret %s under Namespace %s is updated"
% (AppOperator.DOCKER_REGISTRY_SECRET, ns))
except Exception as e:
LOG.error("Failed to update Secret %s under Namespace %s: %s"
% (AppOperator.DOCKER_REGISTRY_SECRET, ns, e))
continue
except Exception as e:
LOG.error(e)
return
_sync_audit_local_registry_secrets(self)
def _wait_for_pod_termination(self, namespace):
loop_timeout = 0
loop_check_interval = 10
timeout = 300
try:
LOG.info("Waiting for pod termination in namespace %s ..." % namespace)
# Pod termination timeout 5mins
while(loop_timeout <= timeout):
if not self._kube.kube_namespaced_pods_exist(namespace):
# Pods have terminated
break
loop_timeout += loop_check_interval
time.sleep(loop_check_interval)
if loop_timeout > timeout:
raise exception.KubePodTerminateTimeout(name=namespace)
LOG.info("Pod termination in Namespace %s completed." % namespace)
except Exception as e:
LOG.error(e)
raise
def _get_list_of_charts(self, app):
if app.is_fluxcd_app:
return self._get_list_of_charts_fluxcd(app.sync_fluxcd_manifest)
else:
return self._get_list_of_charts_armada(app.sync_armada_mfile)
def _get_list_of_charts_fluxcd(self, manifest):
"""Get the charts information from the manifest directory
The following chart data for each chart in the manifest file
are extracted and stored into a namedtuple Chart object:
- metadata_name
- chart_name
- namespace
- location
- release
"""
helmrepo_path = os.path.join(manifest, "base", "helmrepository.yaml")
root_kustomization_path = os.path.join(
manifest, constants.APP_ROOT_KUSTOMIZE_FILE)
for f in (helmrepo_path, root_kustomization_path):
if not os.path.isfile(f):
raise exception.SysinvException(_(
"Mandatory FluxCD yaml file doesn't exist "
"%s" % helmrepo_path))
# get global namespace
with io.open(root_kustomization_path, 'r', encoding='utf-8') as f:
root_kustomization_yaml = next(yaml.safe_load_all(f))
global_namespace = root_kustomization_yaml["namespace"]
charts_groups = root_kustomization_yaml["resources"]
# get the helm repo base url
with io.open(helmrepo_path, 'r', encoding='utf-8') as f:
helm_repo_yaml = next(yaml.safe_load_all(f))
helm_repo_url = helm_repo_yaml["spec"]["url"]
charts = []
for chart_group in charts_groups:
if chart_group != "base":
chart_path = os.path.join(manifest, chart_group)
helmrelease_path = os.path.join(chart_path, "helmrelease.yaml")
chart_kustomization_path = os.path.join(chart_path, "kustomization.yaml")
if not os.path.isfile(chart_kustomization_path) or \
not os.path.isfile(helmrelease_path):
continue
with io.open(chart_kustomization_path, 'r', encoding='utf-8') as f:
chart_kustomization_yaml = next(yaml.safe_load_all(f))
namespace = chart_kustomization_yaml.get("namespace", global_namespace)
with io.open(helmrelease_path, 'r', encoding='utf-8') as f:
helmrelease_yaml = next(yaml.safe_load_all(f))
metadata_name = helmrelease_yaml["metadata"]["name"]
chart_spec = helmrelease_yaml["spec"]["chart"]
chart_name = chart_spec["spec"]["chart"]
location = "%s/%s-%s%s" % (helm_repo_url.rstrip("/"),
chart_name,
chart_spec["spec"]["version"],
".tgz")
release = helmrelease_yaml["spec"]["releaseName"]
# Dunno if we need to return these in order respecting dependsOn?
# dependencies = [dep["name"] for dep in helmrelease_yaml["spec"].get(["dependsOn"], [])]
chart_obj = FluxCDChart(
metadata_name=metadata_name,
name=metadata_name,
namespace=namespace,
location=location,
release=release,
chart_os_path=chart_path,
chart_label=chart_name
)
charts.append(chart_obj)
return charts
def _get_list_of_charts_armada(self, manifest_file):
"""Get the charts information from the manifest file
The following chart data for each chart in the manifest file
are extracted and stored into a namedtuple Chart object:
- metadata_name
- chart_name
- namespace
- location
- release
- pre-delete job labels
The method returns a list of namedtuple charts which following
the install order in the manifest chart_groups.
:param manifest_file: the manifest file of the application
:return: a list of namedtuple charts
"""
charts = []
release_prefix = ""
chart_group = {}
chart_groups = []
armada_charts = {}
with io.open(manifest_file, 'r', encoding='utf-8') as f:
docs = yaml.safe_load_all(f)
for doc in docs:
# iterative docs in the manifest file to get required
# chart information
try:
if "armada/Manifest/" in doc['schema']:
release_prefix = doc['data']['release_prefix']
chart_groups = doc['data']['chart_groups']
elif "armada/ChartGroup/" in doc['schema']:
chart_group.update(
{doc['metadata']['name']: {
'chart_group': doc['data']['chart_group'],
'sequenced': doc.get('data').get('sequenced', False)}})
elif "armada/Chart/" in doc['schema']:
labels = []
delete_resource = \
doc['data'].get('upgrade', {}).get('pre', {}).get('delete', [])
for resource in delete_resource:
if resource.get('type') == 'job':
label = ''
for k, v in resource['labels'].items():
label = k + '=' + v + ',' + label
labels.append(label[:-1])
armada_charts.update(
{doc['metadata']['name']: {
'chart_name': doc['data']['chart_name'],
'namespace': doc['data']['namespace'],
'location': doc['data']['source']['location'],
'release': doc['data']['release'],
'labels': labels}})
LOG.debug("Manifest: Chart: {} Namespace: {} "
"Location: {} Release: {}".format(
doc['data']['chart_name'],
doc['data']['namespace'],
doc['data']['source']['location'],
doc['data']['release']))
except KeyError:
pass
# Push Chart to the list that following the order
# in the chart_groups(install list)
for c_group in chart_groups:
for chart in chart_group[c_group]['chart_group']:
charts.append(Chart(
metadata_name=chart,
name=armada_charts[chart]['chart_name'],
namespace=armada_charts[chart]['namespace'],
location=armada_charts[chart]['location'],
release=armada_charts[chart]['release'],
labels=armada_charts[chart]['labels'],
sequenced=chart_group[c_group]['sequenced']))
del armada_charts[chart]
del chart_group[c_group]
# Push Chart to the list that are not referenced
# in the chart_groups (install list)
if chart_group:
for c_group in chart_group:
for chart in chart_group[c_group]['chart_group']:
charts.append(Chart(
metadata_name=chart,
name=armada_charts[chart]['chart_name'],
namespace=armada_charts[chart]['namespace'],
location=armada_charts[chart]['location'],
release=armada_charts[chart]['release'],
labels=armada_charts[chart]['labels'],
sequenced=chart_group[c_group]['sequenced']))
del armada_charts[chart]
if armada_charts:
for chart in armada_charts:
charts.append(Chart(
metadata_name=chart,
name=armada_charts[chart]['chart_name'],
namespace=armada_charts[chart]['namespace'],
location=armada_charts[chart]['location'],
release=armada_charts[chart]['release'],
labels=armada_charts[chart]['labels'],
sequenced=False))
# Update each Chart in the list if there has release prefix
# for each release
if release_prefix:
for i, chart in enumerate(charts):
charts[i] = chart._replace(
release=release_prefix + "-" + chart.release)
return charts
def _get_overrides_files(self, app, mode):
if app.is_fluxcd_app:
return self._get_overrides_files_fluxcd(app.sync_overrides_dir,
app.charts), []
else:
return self._get_overrides_files_armada(app.sync_overrides_dir,
app.charts,
app.name,
mode)
def _get_overrides_files_fluxcd(self, overrides_dir, charts):
return self._get_overrides_from_charts(overrides_dir, charts)
def _get_overrides_files_armada(self, overrides_dir, charts, app_name, mode):
"""Returns list of override files or None, used in
application-install and application-delete."""
helm_overrides = \
self._get_overrides_from_charts(overrides_dir, charts)
if not helm_overrides:
return None
# Get the armada manifest overrides files
manifest_op = self._helm.get_armada_manifest_operator(app_name)
armada_overrides = manifest_op.load_summary(overrides_dir)
return (helm_overrides, armada_overrides)
def _get_overrides_from_charts(self, overrides_dir, charts):
missing_helm_overrides = []
available_helm_overrides = []
for chart in charts:
overrides = chart.namespace + '-' + chart.name + '.yaml'
overrides_file = os.path.join(overrides_dir, overrides)
if not os.path.exists(overrides_file):
missing_helm_overrides.append(overrides_file)
else:
available_helm_overrides.append(overrides_file)
if missing_helm_overrides:
LOG.error("Missing the following overrides: %s" % missing_helm_overrides)
return None
return available_helm_overrides
def _write_fluxcd_overrides(self, charts, helm_files):
for chart in charts:
override_file = chart.namespace + '-' + chart.name + '.yaml'
for f in os.listdir(chart.chart_os_path):
if f.endswith("system-overrides.yaml"):
chart_system_overrides_path = os.path.join(chart.chart_os_path, f)
break
else:
LOG.error("Missing system-overrides.yaml file for chart %s" % chart.name)
continue
# copy helm chart overrides file to chart's system-overrides.yaml file
for helm_file in helm_files:
if os.path.basename(helm_file) == override_file:
shutil.copy(helm_file, chart_system_overrides_path)
def _generate_armada_overrides_str(self, app_name, app_version,
helm_files, armada_files):
overrides_str = ""
if helm_files:
overrides_str += " ".join([
' --values {0}/overrides/{1}/{2}/{3}'.format(
ARMADA_CONTAINER_TMP,
app_name, app_version, os.path.basename(i))
for i in helm_files
])
if armada_files:
overrides_str += " ".join([
' --values {0}/manifests/{1}/{2}/{3}'.format(
ARMADA_CONTAINER_TMP,
app_name, app_version, os.path.basename(i))
for i in armada_files
])
return overrides_str
def _remove_chart_overrides(self, overrides_dir, app):
charts = self._get_list_of_charts(app)
for chart in charts:
if chart.name in self._helm.chart_operators:
self._helm.remove_helm_chart_overrides(overrides_dir,
chart.name,
chart.namespace)
def _update_app_releases_version(self, app_name):
"""Update application helm releases records
This method retrieves the deployed helm releases and updates the
releases records in sysinv db if needed
:param app_name: the name of the application
"""
try:
deployed_releases = helm_utils.retrieve_helm_releases()
LOG.debug('deployed_releases = %s', deployed_releases)
app = self._dbapi.kube_app_get(app_name)
app_releases = self._dbapi.kube_app_chart_release_get_all(app.id)
for r in app_releases:
LOG.debug('app.id=%r, release=%r, version=%r, namespace=%r',
app.id, r.release, r.version, r.namespace)
if (r.release in deployed_releases and
r.namespace in deployed_releases[r.release] and
r.version != deployed_releases[r.release][r.namespace]):
self._dbapi.kube_app_chart_release_update(
app.id, r.release, r.namespace,
{'version': deployed_releases[r.release][r.namespace]})
except Exception as e:
LOG.exception(e)
raise exception.SysinvException(_(
"Failed to update/record application %s releases' versions." % str(e)))
def _create_app_releases_version(self, app_name, app_charts):
"""Create application helm releases records
This method creates/initializes the helm releases objects for the application.
:param app_name: the name of the application
:param app_charts: the charts of the application
"""
kube_app = self._dbapi.kube_app_get(app_name)
app_releases = self._dbapi.kube_app_chart_release_get_all(kube_app.id)
if app_releases:
return
for chart in app_charts:
values = {
'release': chart.release,
'version': 0,
'namespace': chart.namespace,
'app_id': kube_app.id
}
try:
self._dbapi.kube_app_chart_release_create(values)
except Exception as e:
LOG.exception(e)
def _get_metadata_value(self, app, key_or_keys, default=None,
enforce_type=False):
"""
Get application metadata value from nested dictionary.
If a default value is specified, this will enforce that
the value returned is of the same type.
:param app: application object
:param key_or_keys: single key string, or list of keys
:param default: default value (and type)
:param enforce_type: enforce type check between return value and default
:return: The value from nested dictionary D[key1][key2][...] = value
assuming all keys are present, otherwise default.
"""
value = default
if isinstance(key_or_keys, list):
keys = key_or_keys
else:
keys = [key_or_keys]
metadata_file = os.path.join(app.inst_path,
constants.APP_METADATA_FILE)
if os.path.exists(metadata_file) and os.path.getsize(metadata_file) > 0:
with io.open(metadata_file, 'r', encoding='utf-8') as f:
try:
metadata = yaml.safe_load(f) or {}
value = cutils.deep_get(metadata, keys, default=default)
# TODO(jgauld): There is inconsistent treatment of YAML
# boolean between the module ruamel.yaml and module yaml
# in utils.py, health.py, and kube_app.py. Until these
# usage variants are unified, leave the following check
# as optional.
if enforce_type and default is not None and value is not None:
default_type = type(default)
if type(value) != default_type:
raise exception.SysinvException(_(
"Invalid {}: {} {!r} expected value is {}."
"".format(metadata_file, '.'.join(keys),
value, default_type)))
except KeyError:
# metadata file does not have the key
pass
LOG.debug('_get_metadata_value: metadata_file=%s, keys=%s, default=%r, value=%r',
metadata_file, keys, default, value)
return value
def _preserve_user_overrides(self, from_app, to_app):
"""Dump user overrides
In the scenario of updating application to a new version, this
method is used to copy the user overrides from the old version
to the new version.
:param from_app: application object that application updating from
:param to_app: application object that application updating to
"""
to_db_app = self._dbapi.kube_app_get(to_app.name)
from_db_app = self._dbapi.kube_app_get_inactive_by_name_version(
from_app.name, version=from_app.version)
from_app_db_charts = self._dbapi.helm_override_get_all(from_db_app.id)
from_app_charts = {}
for chart in from_app_db_charts:
from_app_charts.setdefault(chart.name, {}).update(
{chart.namespace: chart.user_overrides})
for chart in to_app.charts:
if (chart.name in from_app_charts and
chart.namespace in from_app_charts[chart.name] and
from_app_charts[chart.name][chart.namespace]):
user_overrides = {'user_overrides': from_app_charts[chart.name][chart.namespace]}
try:
self._dbapi.helm_override_update(
app_id=to_db_app.id, name=chart.name,
namespace=chart.namespace, values=user_overrides)
except exception.HelmOverrideNotFound:
# Unexpected
values = {
'name': chart.name,
'namespace': chart.namespace,
'app_id': to_db_app.id
}
values.update(user_overrides)
self._dbapi.helm_override_create(values=values)
LOG.info("Application %s (%s) will apply the user overrides for"
"Chart %s from version %s" % (to_app.name, to_app.version,
chart.name, from_app.version))
def _make_app_request(self, app, request, overrides_str=None):
if app.is_fluxcd_app:
return self._make_fluxcd_operation_with_monitor(app, request)
else:
return self._make_armada_request_with_monitor(app, request, overrides_str)
@retry(retry_on_exception=lambda x: isinstance(x, exception.ApplicationApplyFailure),
stop_max_attempt_number=5, wait_fixed=30 * 1000)
def _make_fluxcd_operation_with_monitor(self, app, request):
def _patch_flux_suspend_false(resource):
""" Change resource.spec.suspend = False
"""
resource['spec']['suspend'] = False
def _patch_flux_suspend_true(resource):
""" Change resource.spec.suspend = True
"""
resource['spec']['suspend'] = True
def _recover_from_failed_helm_chart_on_app_apply(metadata_name, namespace):
""" Recovery logic for FluxCD on apply
HelmChart reconciliation needs to be triggered.
The trigger is to flip spec.suspended from True to False.
:param metadata_name: metadata name from helmrelease.yaml
:param namespace: namespace from kustomization.yaml
:return: tuple(attempt, error).
attempt is True if recovery is triggered
error is True if an error was encountered
"""
helm_chart_name = "{}-{}".format(namespace, metadata_name)
helm_release_name = metadata_name
attempt = False
# Check for condition
try:
helm_chart_resource = self._kube.get_custom_resource(
constants.FLUXCD_CRD_HELM_CHART_GROUP,
constants.FLUXCD_CRD_HELM_CHART_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_CHART_PLURAL,
helm_chart_name)
except Exception as err:
LOG.warning("Failed to get HelmChart resource {}: {}"
"".format(helm_chart_name, err))
return attempt, True
helm_chart_resource_status = \
self._fluxcd.extract_helm_chart_status(helm_chart_resource)
for error_string in constants.FLUXCD_RECOVERY_HELM_CHART_STATUS_ERRORS:
if helm_chart_resource_status.startswith(error_string):
LOG.info("For helm chart {} found a matching error string "
"we can attempt to recover from: {}"
"".format(helm_chart_name, helm_chart_resource_status))
attempt = True
break
if not attempt:
return attempt, False
# Flip to spec.suspended to True from HelmChart
try:
helm_chart_resource['spec']['suspend'] = True
group, version = helm_chart_resource['apiVersion'].split('/')
self._kube.apply_custom_resource(
group,
version,
helm_chart_resource['metadata']['namespace'],
constants.FLUXCD_CRD_HELM_CHART_PLURAL,
helm_chart_resource['metadata']['name'],
helm_chart_resource
)
except Exception as err:
LOG.warning("Failed to patch HelmChart resource {}: {}"
"".format(helm_chart_resource['metadata']['name'], err))
return attempt, True
# Flip to spec.suspended to False from HelmChart
try:
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_CHART_GROUP,
constants.FLUXCD_CRD_HELM_CHART_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_CHART_PLURAL,
helm_chart_name,
_patch_flux_suspend_false
)
except Exception:
return attempt, True
# Force HelmRelease reconciliation now, saves up to reconciliation
# timeout for the specific resource. Same trigger as with HelmChart.
try:
# Flip to spec.suspended to True from HelmRelease
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name,
_patch_flux_suspend_true
)
# Flip to spec.suspended to False from HelmRelease
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name,
_patch_flux_suspend_false
)
except Exception:
return attempt, True
return attempt, False
def _recover_from_helm_operation_in_progress_on_app_apply(metadata_name, namespace,
flux_error_message):
""" Recovery logic for FluxCD on apply
In case a helm operation is already in progress, FluxCD will raise
an error. Recover by patching the helm release secret, forcing
the status to be 'failed'.
:param metadata_name: metadata name from helmrelease.yaml
:param namespace: namespace from kustomization.yaml
:param flux_error_message: Error message FluxCD encountered
:return: tuple(attempt, error).
attempt is True if recovery is triggered
error is True if an error was encountered
"""
helm_release_name = metadata_name
attempt = False
for error_string in constants.FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS:
if flux_error_message.startswith(error_string):
LOG.info("For helm release {} found a matching error string "
"we can attempt to recover from: {}"
"".format(helm_release_name, error_string))
attempt = True
break
if not attempt:
return attempt, False
try:
secret_list = self._kube.kube_list_secret(namespace)
except Exception as err:
LOG.warning("Failed to get secrets in namespace {}: {}"
"".format(namespace, err))
return attempt, True
recover_list = []
for secret in secret_list:
label = secret.metadata.labels
if not label:
continue
if 'owner' not in label:
continue
if 'status' not in label:
continue
if label['owner'] == 'helm' and \
label['status'] in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS:
LOG.info("Found helm release {} in state {}"
"".format(secret.metadata.name, label['status']))
recover_list.append(secret)
# Force 'failed' status for helm releases
for secret in recover_list:
release_data = helm_utils.decompress_helm_release_data(secret.data['release'])
for status in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS:
release_data = release_data.replace('"status":"{}"'.format(status), '"status":"failed"')
release_data = helm_utils.compress_helm_release_data(release_data)
secret.data['release'] = release_data
try:
self._kube.kube_patch_secret(secret.metadata.name,
secret.metadata.namespace, secret)
except Exception as err:
LOG.warning("Failed to patch secret {} in namespace {}: {}"
"".format(secret.metadata.name,
secret.metadata.namespace, err))
return attempt, True
# Force HelmRelease reconciliation now, saves up to reconciliation
# timeout for the specific resource. Flip suspend True, then False.
try:
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name,
_patch_flux_suspend_true
)
self._kube.get_transform_patch_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
namespace,
constants.FLUXCD_CRD_HELM_REL_PLURAL,
helm_release_name,
_patch_flux_suspend_false
)
except Exception:
return attempt, True
return attempt, False
def _check_progress():
tadjust = 0
adjust = self._get_metadata_value(app,
constants.APP_METADATA_APPLY_PROGRESS_ADJUST,
constants.APP_METADATA_APPLY_PROGRESS_ADJUST_DEFAULT_VALUE)
# Build the list of expected chart releases. Re-read the
# kustomization.yaml file as charts may have been enabled/disabled
# via the plugins (helm or kustomize operator).
charts = {
c.metadata_name: {"namespace": c.namespace, "chart_label": c.chart_label}
for c in self._get_list_of_charts(app)
}
charts_count = len(charts)
if app.system_app:
tadjust = adjust
if tadjust >= charts_count:
LOG.error("Application metadata key '{}'"
"has an invalid value {} (too few charts)".
format(constants.APP_METADATA_APPLY_PROGRESS_ADJUST,
adjust))
tadjust = 0
while charts:
if AppOperator.is_app_aborted(app.name):
return False
num = charts_count - len(charts)
percent = round((float(num) / # pylint: disable=W1619, W1633
(charts_count - tadjust)) * 100)
progress_str = "Applying app {}, overall completion: {}%". \
format(app.name, percent)
if app.progress != progress_str:
LOG.info("%s" % progress_str)
self._update_app_status(app, new_progress=progress_str)
for release_name, chart_obj in list(charts.items()):
# Attempt to recover HelmCharts in some Failed states
_recover_from_failed_helm_chart_on_app_apply(
metadata_name=release_name,
namespace=chart_obj['namespace'])
# Request the helm release info
helm_rel = self._kube.get_custom_resource(
constants.FLUXCD_CRD_HELM_REL_GROUP,
constants.FLUXCD_CRD_HELM_REL_VERSION,
chart_obj["namespace"],
constants.FLUXCD_CRD_HELM_REL_PLURAL,
release_name)
if not helm_rel:
LOG.info("FluxCD Helm release info for {} is not "
"available".format(release_name))
continue
release_status, msg = self._fluxcd.get_helm_release_status(helm_rel)
if release_status == "False":
# If the helm release failed the app must also be in a
# failed state
err_msg = "{}".format(msg) if msg else ""
attempt, _ = _recover_from_helm_operation_in_progress_on_app_apply(
metadata_name=release_name,
namespace=chart_obj['namespace'],
flux_error_message=err_msg)
if not attempt:
LOG.exception("Application {}: release {}: Failed during {} :{}"
"".format(app.name, release_name, request, err_msg))
return False
elif release_status == "True":
# Special validation check needed for AIO-SX only, can
# go away once upstream issues are addressed. See method
# for details.
if self._fluxcd.verify_pods_status_for_release(chart_obj):
charts.pop(release_name)
else:
# Noisy log, so make it debug only, but good for debugging apps dev.
LOG.debug("Application {}: release {}: Helm release "
"status is unknown. Checking again.".format(
app.name, release_name))
# wait a bit to check again if the charts are ready
time.sleep(1)
return True
# This check is for cases where an abort is issued while
# this function waits between retries. In such cases, it
# should just return False
if AppOperator.is_app_aborted(app.name):
return False
lifecycle_hook_info = LifecycleHookInfo()
lifecycle_hook_info.operation = request
lifecycle_hook_info.relative_timing = constants.APP_LIFECYCLE_TIMING_PRE
lifecycle_hook_info.lifecycle_type = constants.APP_LIFECYCLE_TYPE_FLUXCD_REQUEST
self.app_lifecycle_actions(None, None, app._kube_app, lifecycle_hook_info)
try:
with Timeout(INSTALLATION_TIMEOUT,
exception.KubeAppProgressMonitorTimeout()):
rc = self._fluxcd.make_fluxcd_operation(request, app.sync_fluxcd_manifest)
# check progress only for apply for now
if rc and request == constants.APP_APPLY_OP:
rc = _check_progress()
except Exception as e:
# timeout or subprocess error
LOG.exception(e)
rc = False
# Here a manifest retry can be performed by throwing ApplicationApplyFailure
lifecycle_hook_info.relative_timing = constants.APP_LIFECYCLE_TIMING_POST
lifecycle_hook_info.lifecycle_type = constants.APP_LIFECYCLE_TYPE_FLUXCD_REQUEST
lifecycle_hook_info[LifecycleConstants.EXTRA][LifecycleConstants.RETURN_CODE] = rc
self.app_lifecycle_actions(None, None, app._kube_app, lifecycle_hook_info)
return rc
@retry(retry_on_exception=lambda x: isinstance(x, exception.ApplicationApplyFailure),
stop_max_attempt_number=5, wait_fixed=30 * 1000)
def _make_armada_request_with_monitor(self, app, request, overrides_str=None):
"""Initiate armada request with monitoring
This method delegates the armada request to docker helper and starts
a monitoring thread to persist status and progress along the way.
:param app: application data object
:param request: type of request (apply or delete)
:param overrides_str: list of overrides in string format to be applied
"""
def _get_armada_log_stats(pattern, logfile):
"""
TODO(tngo): In the absence of an Armada API that provides the current
status of an apply/delete manifest operation, the progress is derived
from specific log entries extracted from the execution logs. This
inner method is to be replaced with an official API call when
it becomes available.
"""
if pattern == ROLLBACK_SEARCH_PATTERN:
print_chart = '{print $10}'
else:
print_chart = '{print $NF}'
p1 = subprocess.Popen(['grep', pattern, logfile],
stdout=subprocess.PIPE)
p2 = subprocess.Popen(['awk', print_chart], stdin=p1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
p1.stdout.close()
result, err = p2.communicate()
if result:
# Scrape information from command output, example 'validate' log:
# 2020-03-26 09:47:58.594 1105 INFO armada.cli [-] Successfully validated:\
# ('/tmp/manifests/oidc-auth-apps/1.0-0/oidc-auth-apps-manifest.yaml',)
# Strip out ANSI color code that might be in the text stream
r = re.compile("\x1b\[[0-9;]*m")
result = r.sub('', result).replace(',', '')
matches = result.split()
num_chart_processed = len(matches)
last_chart_processed = matches[num_chart_processed - 1]
if '=' in last_chart_processed:
last_chart_processed = last_chart_processed.split('=')[1]
return last_chart_processed, num_chart_processed
return None, None
def _check_progress(monitor_flag, app, pattern, logfile):
""" Progress monitoring task, to be run in a separate thread """
LOG.info("Starting progress monitoring thread for app %s" % app.name)
try:
adjust = self._get_metadata_value(app,
constants.APP_METADATA_APPLY_PROGRESS_ADJUST,
constants.APP_METADATA_APPLY_PROGRESS_ADJUST_DEFAULT_VALUE)
with Timeout(INSTALLATION_TIMEOUT,
exception.KubeAppProgressMonitorTimeout()):
charts_count = len(app.charts)
while True:
try:
monitor_flag.get_nowait()
LOG.debug("Received monitor stop signal for %s" % app.name)
monitor_flag.task_done()
break
except queue.Empty:
last, num = _get_armada_log_stats(pattern, logfile)
if last:
if charts_count == 0:
percent = 100
else:
tadjust = 0
if app.system_app:
tadjust = adjust
if tadjust >= charts_count:
LOG.error("Application metadata key '{}'"
"has an invalid value {} (too few charts)".
format(constants.APP_METADATA_APPLY_PROGRESS_ADJUST,
adjust))
tadjust = 0
percent = round((float(num) / # pylint: disable=W1619
(charts_count - tadjust)) * 100)
progress_str = "processing chart: {}, overall completion: {}%".\
format(last, percent)
if app.progress != progress_str:
LOG.info("%s" % progress_str)
self._update_app_status(app, new_progress=progress_str)
greenthread.sleep(1)
except Exception as e:
# timeout or subprocess error
LOG.exception(e)
finally:
LOG.info("Exiting progress monitoring thread for app %s" % app.name)
def _cleanup_armada_log(location, app_name, request):
"""Cleanup the oldest armada log if reach the maximum"""
list_of_logs = [os.path.join(location, f) for f in os.listdir(location)
if re.match(r'{}-{}.*.log'.format(app_name, request), f)]
try:
if len(list_of_logs) > ARMADA_LOG_MAX:
oldest_logfile = min(list_of_logs, key=os.path.getctime)
os.remove(oldest_logfile)
except OSError:
pass
# Body of the outer method
# This check is for cases where an abort is issued while
# this function waits between retries. In such cases, it
# should just return False
if AppOperator.is_app_aborted(app.name):
return False
# TODO(dvoicule): Maybe pass a hook from outside to this function
# need to change perform_app_recover/rollback/update to support this.
# All the other hooks store the operation of the app itself (apply,
# remove, delete, upload, update) yet this hook stores the armada
# operation in the operation field. This is inconsistent behavior and
# should be changed the moment a hook from outside is passed here.
lifecycle_hook_info = LifecycleHookInfo()
lifecycle_hook_info.operation = request
lifecycle_hook_info.relative_timing = constants.APP_LIFECYCLE_TIMING_PRE
lifecycle_hook_info.lifecycle_type = constants.APP_LIFECYCLE_TYPE_ARMADA_REQUEST
self.app_lifecycle_actions(None, None, app._kube_app, lifecycle_hook_info)
mqueue = queue.Queue()
rc = True
logname = time.strftime(app.name + '-' + request + '_%Y-%m-%d-%H-%M-%S.log')
logfile = ARMADA_HOST_LOG_LOCATION + '/' + logname
if request == constants.APP_APPLY_OP:
pattern = APPLY_SEARCH_PATTERN
elif request == constants.APP_DELETE_OP:
pattern = DELETE_SEARCH_PATTERN
else:
pattern = ROLLBACK_SEARCH_PATTERN
monitor = greenthread.spawn_after(1, _check_progress, mqueue, app,
pattern, logfile)
rc = self._armada.make_armada_request(request, app.armada_service_mfile,
overrides_str, app.releases, logfile)
_cleanup_armada_log(ARMADA_HOST_LOG_LOCATION, app.name, request)
mqueue.put('done')
monitor.kill()
# Here a manifest retry can be performed by throwing ApplicationApplyFailure
lifecycle_hook_info.relative_timing = constants.APP_LIFECYCLE_TIMING_POST
lifecycle_hook_info.lifecycle_type = constants.APP_LIFECYCLE_TYPE_ARMADA_REQUEST
lifecycle_hook_info[LifecycleConstants.EXTRA][LifecycleConstants.RETURN_CODE] = rc
self.app_lifecycle_actions(None, None, app._kube_app, lifecycle_hook_info)
return rc
def _record_auto_update_failed_versions(self, from_app, to_app):
"""Record the new application version in the old application
metadata when the new application fails to be updated"""
new_metadata = copy.deepcopy(from_app.app_metadata)
try:
failed_versions = new_metadata[constants.APP_METADATA_UPGRADES][
constants.APP_METADATA_FAILED_VERSIONS]
if to_app.version not in failed_versions:
failed_versions.append(to_app.version)
except KeyError:
new_metadata.setdefault(constants.APP_METADATA_UPGRADES, {}).update(
{constants.APP_METADATA_FAILED_VERSIONS: [to_app.version]})
with self._lock:
from_app.update_app_metadata(new_metadata)
def _perform_app_recover(self, old_app, new_app, armada_process_required=True):
"""Perform application recover
This recover method is triggered when application update failed, it cleans
up the files/data for the new application and recover helm charts for the
old application. If the armada process is required, armada apply is invoked
to recover the application releases for the old version.
The app status will be populated to "apply-failed" if recover fails so that
the user can re-apply app.
:param old_app: the application object that application recovering to
:param new_app: the application object that application recovering from
:param armada_process_required: boolean, whether armada operation is needed
"""
def _activate_old_app_plugins(old_app):
# Enable the old app plugins.
self._plugins.activate_plugins(old_app)
LOG.info("Starting recover Application %s from version: %s to version: %s" %
(old_app.name, new_app.version, old_app.version))
# Ensure that the the failed app plugins are disabled prior to cleanup
self._plugins.deactivate_plugins(new_app)
self._update_app_status(
old_app, constants.APP_RECOVER_IN_PROGRESS,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_IN_PROGRESS.format(old_app.version))
# Set the status for the new app to inactive
self._update_app_status(new_app, constants.APP_INACTIVE_STATE)
try:
self._cleanup(new_app, app_dir=False)
self._utils._patch_report_app_dependencies(
new_app.name + '-' + new_app.version)
self._dbapi.kube_app_destroy(new_app.name,
version=new_app.version,
inactive=True)
LOG.info("Recovering helm charts for Application %s (%s)..."
% (old_app.name, old_app.version))
self._update_app_status(old_app,
new_progress=constants.APP_PROGRESS_RECOVER_CHARTS)
with self._lock:
self._upload_helm_charts(old_app)
rc = True
if armada_process_required:
overrides_str = ''
old_app.charts = self._get_list_of_charts(old_app)
if old_app.system_app:
(helm_files, armada_files) = self._get_overrides_files(
old_app, mode=None)
overrides_str = self._generate_armada_overrides_str(
old_app.name, old_app.version, helm_files, armada_files)
# Ensure that the old app plugins are enabled prior to armada process.
_activate_old_app_plugins(old_app)
if self._make_app_request(old_app, constants.APP_APPLY_OP, overrides_str):
old_app_charts = [c.release for c in old_app.charts]
deployed_releases = helm_utils.retrieve_helm_releases()
for new_chart in new_app.charts:
if (new_chart.release not in old_app_charts and
new_chart.release in deployed_releases):
# Cleanup the releases in the new application version
# but are not in the old application version
helm_utils.delete_helm_release(new_chart.release)
else:
rc = False
except exception.ApplicationApplyFailure:
rc = False
except Exception as e:
# ie. patch report error, cleanup application files error
# helm release delete failure
self._update_app_status(
old_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_COMPLETED.format(old_app.version) +
constants.APP_PROGRESS_CLEANUP_FAILED.format(new_app.version) +
'Please check logs for details.')
LOG.error(e)
return
finally:
# Ensure that the old app plugins are enabled after recovery
_activate_old_app_plugins(old_app)
self._record_auto_update_failed_versions(old_app, new_app)
if rc:
self._update_app_status(
old_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_COMPLETED.format(old_app.version) +
'Please check logs for details.')
# Recovery from an app update failure succeeded, clear app alarm
self._clear_app_alarm(old_app.name)
LOG.info("Application %s recover to version %s completed."
% (old_app.name, old_app.version))
else:
self._update_app_status(
old_app, constants.APP_APPLY_FAILURE,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_ABORTED.format(old_app.version) +
'Please check logs for details.')
LOG.error("Application %s recover to version %s aborted!"
% (old_app.name, old_app.version))
def _perform_app_rollback(self, from_app, to_app):
"""Perform application rollback request
This method invokes Armada to rollback the application releases to
previous installed versions. The jobs for the current installed
releases require to be cleaned up before starting armada rollback.
:param from_app: application object that application updating from
:param to_app: application object that application updating to
:return boolean: whether application rollback was successful
"""
LOG.info("Application %s (%s) rollback started." % (to_app.name, to_app.version))
try:
if AppOperator.is_app_aborted(to_app.name):
raise exception.KubeAppAbort()
to_db_app = self._dbapi.kube_app_get(to_app.name)
to_app_releases = \
self._dbapi.kube_app_chart_release_get_all(to_db_app.id)
from_db_app = self._dbapi.kube_app_get_inactive_by_name_version(
from_app.name, version=from_app.version)
from_app_releases = \
self._dbapi.kube_app_chart_release_get_all(from_db_app.id)
from_app_r_dict = {r.release: r.version for r in from_app_releases}
self._update_app_status(
to_app, new_progress=constants.APP_PROGRESS_ROLLBACK_RELEASES)
if AppOperator.is_app_aborted(to_app.name):
raise exception.KubeAppAbort()
charts_sequence = {c.release: c.sequenced for c in to_app.charts}
charts_labels = {c.release: c.labels for c in to_app.charts}
for to_app_r in to_app_releases:
if to_app_r.version != 0:
if (to_app_r.release not in from_app_r_dict or
(to_app_r.release in from_app_r_dict and
to_app_r.version != from_app_r_dict[to_app_r.release])):
# Append the release which needs to be rolled back
to_app.releases.append(
{'release': to_app_r.release,
'version': to_app_r.version,
'sequenced': charts_sequence[to_app_r.release]})
# Cleanup the jobs for the current installed release
if to_app_r.release in charts_labels:
for label in charts_labels[to_app_r.release]:
self._kube.kube_delete_collection_namespaced_job(
to_app_r.namespace, label)
LOG.info("Jobs deleted for release %s" % to_app_r.release)
if AppOperator.is_app_aborted(to_app.name):
raise exception.KubeAppAbort()
if self._make_app_request(to_app, constants.APP_ROLLBACK_OP):
self._update_app_status(to_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_COMPLETED)
LOG.info("Application %s (%s) rollback completed."
% (to_app.name, to_app.version))
return True
except exception.KubeAppAbort:
# If the update operation is aborted before Armada request is made,
# we don't want to return False which would trigger the recovery
# routine with an Armada request.
raise
except Exception as e:
# unexpected KubeAppNotFound, KubeAppInactiveNotFound, KeyError
# k8s exception:fail to cleanup release jobs
LOG.exception(e)
LOG.error("Application rollback aborted!")
return False
def perform_app_upload(self, rpc_app, tarfile, lifecycle_hook_info_app_upload, images=False):
"""Process application upload request
This method validates the application manifest. If Helm charts are
included, they are validated and uploaded to local Helm repo. It also
downloads the required docker images for custom apps during upload
stage.
:param rpc_app: application object in the RPC request
:param tarfile: location of application tarfile
:param lifecycle_hook_info_app_upload: LifecycleHookInfo object
:param images: save application images in the registry as part of app upload
"""
app = AppOperator.Application(rpc_app)
LOG.info("Application %s (%s) upload started." % (app.name, app.version))
try:
# TODO (rchurch): Remove this version check once all applications
# have been decoupled. Since compatible plugins will be delivered
# with the versioned application tarball, no version check will be
# required. For decoupled apps, plugins are loaded later in this
# method and this base class version check is called.
if not self._helm.version_check(app.name, app.version):
LOG.info("Application %s (%s) upload rejected. Unsupported version."
% (app.name, app.version))
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason="Unsupported application version.")
app.tarfile = tarfile
if cutils.is_url(app.tarfile):
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_TARFILE_DOWNLOAD)
downloaded_tarfile = self._download_tarfile(app)
if downloaded_tarfile is None:
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason="Failed to find the downloaded tarball.")
else:
app.tarfile = downloaded_tarfile
app.downloaded_tarfile = True
# Full extraction of application tarball at /scratch/apps.
# Manifest file is placed under /opt/platform/armada
# which is managed by drbd-sync and visible to Armada.
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_EXTRACT_TARFILE)
with self._lock:
self._extract_tarfile(app)
self._plugins.install_plugins(app)
if app.is_fluxcd_app:
manifest_sync_path = app.sync_fluxcd_manifest
manifest_sync_dir_path = app.sync_fluxcd_manifest_dir
validate_manifest = manifest_sync_path
validate_function = self._fluxcd.make_fluxcd_operation
else:
manifest_sync_path = app.sync_armada_mfile
manifest_sync_dir_path = app.sync_armada_mfile_dir
validate_manifest = app.armada_service_mfile
validate_function = self._armada.make_armada_request
# Copy the manifest and metadata file to the drbd
if os.path.isdir(app.inst_mfile):
shutil.copytree(app.inst_mfile, manifest_sync_path)
else:
shutil.copy(app.inst_mfile, manifest_sync_path)
inst_metadata_file = os.path.join(
app.inst_path, constants.APP_METADATA_FILE)
if os.path.exists(inst_metadata_file):
sync_metadata_file = os.path.join(
manifest_sync_dir_path, constants.APP_METADATA_FILE)
shutil.copy(inst_metadata_file, sync_metadata_file)
if not validate_function(constants.APP_VALIDATE_OP,
validate_manifest):
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason="Failed to validate application manifest.")
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_VALIDATE_UPLOAD_CHARTS)
if os.path.isdir(app.inst_charts_dir):
self._validate_helm_charts(app)
with self._lock:
self._upload_helm_charts(app)
# System overrides will be generated here. Plugins must be activated
# prior to scraping chart/system/armada overrides for images
self._save_images_list(app)
if images:
# We need to download the images at upload_app so that subclouds
# may use the distributed cloud registry
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_DOWNLOAD_IMAGES)
if AppOperator.is_app_aborted(app.name):
raise exception.KubeAppAbort()
self.download_images(app)
if app.patch_dependencies:
self._utils._patch_report_app_dependencies(
app.name + '-' + app.version, app.patch_dependencies)
self._create_app_releases_version(app.name, app.charts)
self._update_app_status(app, constants.APP_UPLOAD_SUCCESS,
constants.APP_PROGRESS_COMPLETED)
LOG.info("Application %s (%s) upload completed." % (app.name, app.version))
return app
except exception.KubeAppUploadFailure as e:
LOG.exception(e)
self._abort_operation(app, constants.APP_UPLOAD_OP, str(e))
raise
except Exception as e:
LOG.exception(e)
self._abort_operation(app, constants.APP_UPLOAD_OP)
raise exception.KubeAppUploadFailure(