Retry application of managed apps after failure

platform-integ-apps as a managed app is automatically uploaded
and applied. It may happen that while the application is applied
the image registry it tries to download from is not available
so platform-integ-apps is left in apply-failed state with no
attempt to recover it. This requires manual intervention to
re-apply the app but can be automated as well.

Add two recovery mechanisms:

1. if one image download fails then restart the entire download
   procedure. Images that were already downloaded are be available in
   the local registry. The overhead of downloading them again is
   low.

2. automatically apply managed apps if they are in "apply-failed"
   caused by image download issues. Previously managed apps
   are automatically applied when in "uploaded" state. This
   happens after a larger timeout so the app can be manually
   managed (removed, etc.)

Promote nested k8s application audit functions to class scope
to increase readability and create auto-upload/apply/recover
managed app functions.

Closes-Bug: 1832032
Change-Id: Ic12b5ba89a28d34995444572d4c30ed583e5f7b4
Signed-off-by: Daniel Badea <daniel.badea@windriver.com>
This commit is contained in:
Daniel Badea 2019-07-16 15:20:53 +00:00
parent 6ce738bf26
commit 7659d4f063
7 changed files with 266 additions and 173 deletions

View File

@ -275,6 +275,7 @@ class KubeAppController(rest.RestController):
"while the current status is {}.".format(db_app.status)))
db_app.status = constants.APP_APPLY_IN_PROGRESS
db_app.progress = None
db_app.recovery_attempts = 0
db_app.save()
pecan.request.rpcapi.perform_app_apply(pecan.request.context,
db_app, mode=mode)

View File

@ -1409,6 +1409,7 @@ APP_PROGRESS_APPLY_MANIFEST = 'applying application manifest'
APP_PROGRESS_COMPLETED = 'completed'
APP_PROGRESS_DELETE_MANIFEST = 'deleting application manifest'
APP_PROGRESS_DOWNLOAD_IMAGES = 'retrieving docker images'
APP_PROGRESS_IMAGES_DOWNLOAD_FAILED = 'failed to download one or more image(s).'
APP_PROGRESS_EXTRACT_TARFILE = 'extracting application tar file'
APP_PROGRESS_GENERATE_OVERRIDES = 'generating application overrides'
APP_PROGRESS_TARFILE_DOWNLOAD = 'downloading tarfile'
@ -1423,6 +1424,9 @@ APP_PROGRESS_CLEANUP_FAILED = 'Application files/helm release cleanup for versio
APP_PROGRESS_RECOVER_IN_PROGRESS = 'recovering version {} '
APP_PROGRESS_RECOVER_CHARTS = 'recovering helm charts'
# Auto-recovery limits
APP_AUTO_RECOVERY_MAX_COUNT = 5
# Node label operation constants
LABEL_ASSIGN_OP = 'assign'
LABEL_REMOVE_OP = 'remove'

View File

@ -13,6 +13,7 @@ import base64
import copy
import docker
import grp
import functools
import keyring
import os
import pwd
@ -68,6 +69,8 @@ DELETE_SEARCH_PATTERN = 'Deleting release'
ROLLBACK_SEARCH_PATTERN = 'Helm rollback of release'
INSTALLATION_TIMEOUT = 3600
MAX_DOWNLOAD_THREAD = 5
MAX_DOWNLOAD_ATTEMPTS = 3
DOWNLOAD_WAIT_BEFORE_RETRY = 30
TARFILE_DOWNLOAD_CONNECTION_TIMEOUT = 60
TARFILE_TRANSFER_CHUNK_SIZE = 1024 * 512
DOCKER_REGISTRY_USER = 'admin'
@ -726,11 +729,8 @@ class AppOperator(object):
total_count = len(images_to_download)
threads = min(MAX_DOWNLOAD_THREAD, total_count)
failed_downloads = []
start = time.time()
pool = greenpool.GreenPool(size=threads)
try:
local_registry_auth = get_local_docker_registry_auth()
with self._lock:
@ -740,30 +740,39 @@ class AppOperator(object):
name=app.name,
version=app.version,
reason=str(e))
f = lambda x: self._docker.download_an_image(
app.name, local_registry_auth, x)
for tag, rc in pool.imap(f, images_to_download):
if not rc:
failed_downloads.append(tag)
with self._lock:
self._docker._reset_registries_info()
elapsed = time.time() - start
failed_count = len(failed_downloads)
if failed_count > 0:
if not AppOperator.is_app_aborted(app.name):
reason = "failed to download one or more image(s)."
for idx in reversed(range(MAX_DOWNLOAD_ATTEMPTS)):
pool = greenpool.GreenPool(size=threads)
for tag, success in pool.imap(
functools.partial(self._docker.download_an_image,
app.name, local_registry_auth),
images_to_download):
if success:
continue
if AppOperator.is_app_aborted(app.name):
raise exception.KubeAppApplyFailure(
name=app.name,
version=app.version,
reason="operation aborted by user.")
else:
LOG.info("Failed to download image: %s", tag)
break
else:
reason = "operation aborted by user."
with self._lock:
self._docker._reset_registries_info()
elapsed = time.time() - start
LOG.info("All docker images for application %s were successfully "
"downloaded in %d seconds", app.name, elapsed)
break
# don't sleep after last download attempt
if idx:
LOG.info("Retry docker images download for application %s "
"after %d seconds", app.name, DOWNLOAD_WAIT_BEFORE_RETRY)
time.sleep(DOWNLOAD_WAIT_BEFORE_RETRY)
else:
raise exception.KubeAppApplyFailure(
name=app.name,
version=app.version,
reason=reason)
else:
LOG.info("All docker images for application %s were successfully "
"downloaded in %d seconds" % (app.name, elapsed))
reason=constants.APP_PROGRESS_IMAGES_DOWNLOAD_FAILED)
def _validate_helm_charts(self, app):
failed_charts = []
@ -1818,7 +1827,7 @@ class AppOperator(object):
self._abort_operation(app, constants.APP_APPLY_OP,
user_initiated=True)
else:
self._abort_operation(app, constants.APP_APPLY_OP)
self._abort_operation(app, constants.APP_APPLY_OP, str(e))
if not caller:
# If apply is not called from update method, deregister the app's
@ -2225,6 +2234,10 @@ class AppOperator(object):
def active(self):
return self._kube_app.get('active')
@property
def recovery_attempts(self):
return self._kube_app.get('recovery_attempts')
def update_status(self, new_status, new_progress):
self._kube_app.status = new_status
if new_progress:

View File

@ -44,6 +44,7 @@ import time
import uuid
import xml.etree.ElementTree as ElementTree
from contextlib import contextmanager
from datetime import datetime
import tsconfig.tsconfig as tsc
from collections import namedtuple
@ -120,6 +121,9 @@ conductor_opts = [
cfg.IntOpt('osd_remove_retry_interval',
default=5,
help='Interval in seconds between retries to remove Ceph OSD.'),
cfg.IntOpt('managed_app_auto_recovery_interval',
default=300,
help='Interval to run managed app auto recovery'),
]
CONF = cfg.CONF
@ -146,6 +150,11 @@ CONFIG_REBOOT_REQUIRED = (1 << 127)
LOCK_NAME_UPDATE_CONFIG = 'update_config_'
AppTarBall = namedtuple(
'AppTarBall',
"tarball_name app_name app_version manifest_name manifest_file")
class ConductorManager(service.PeriodicService):
"""Sysinv Conductor service main class."""
@ -4851,92 +4860,184 @@ class ConductorManager(service.PeriodicService):
elif bk.backend in self._stor_bck_op_timeouts:
del self._stor_bck_op_timeouts[bk.backend]
def _auto_upload_managed_app(self, context, app_name):
if self._patching_operation_is_occurring():
return
LOG.info("Platform managed application %s: Creating..." % app_name)
app_data = {'name': app_name,
'app_version': constants.APP_VERSION_PLACEHOLDER,
'manifest_name': constants.APP_MANIFEST_NAME_PLACEHOLDER,
'manifest_file': constants.APP_TARFILE_NAME_PLACEHOLDER,
'status': constants.APP_UPLOAD_IN_PROGRESS}
try:
self.dbapi.kube_app_create(app_data)
app = kubeapp_obj.get_by_name(context, app_name)
except exception.KubeAppAlreadyExists as e:
LOG.exception(e)
return
except exception.KubeAppNotFound as e:
LOG.exception(e)
return
tarball = self._check_tarfile(app_name)
if ((tarball.manifest_name is None) or
(tarball.manifest_file is None)):
app.status = constants.APP_UPLOAD_FAILURE
app.save()
return
app.name = tarball.app_name
app.app_version = tarball.app_version
app.manifest_name = tarball.manifest_name
app.manifest_file = os.path.basename(tarball.manifest_file)
app.save()
# Action: Upload.
# Do not block this audit task or any other periodic task. This
# could be long running. The next audit cycle will pick up the
# latest status.
LOG.info("Platform managed application %s: "
"Uploading..." % app_name)
greenthread.spawn(self._app.perform_app_upload, app,
tarball.tarball_name)
def _auto_apply_managed_app(self, context, app_name):
if not self._met_app_apply_prerequisites(app_name):
LOG.info("Platform managed application %s: Prerequisites "
"not met." % app_name)
return
if self._patching_operation_is_occurring():
return
try:
app = kubeapp_obj.get_by_name(context, app_name)
except exception.KubeAppNotFound as e:
LOG.exception(e)
return
app.status = constants.APP_APPLY_IN_PROGRESS
app.save()
# Action: Apply the application
# Do not block this audit task or any other periodic task. This
# could be long running. The next audit cycle will pick up the
# latest status.
LOG.info("Platform managed application %s: "
"Applying..." % app_name)
greenthread.spawn(self._app.perform_app_apply, app, None)
def _check_tarfile(self, app_name):
tarfiles = []
for f in os.listdir(constants.HELM_APP_ISO_INSTALL_PATH):
if fnmatch.fnmatch(f, '{}-*'.format(app_name)):
tarfiles.append(f)
if not tarfiles:
LOG.error("Failed to find an application tarball for {}.".format(app_name))
return AppTarBall(None, None, None, None, None)
elif len(tarfiles) > 1:
LOG.error("Found multiple application tarballs for {}.".format(app_name))
return AppTarBall(None, None, None, None, None)
tarball_name = '{}/{}'.format(
constants.HELM_APP_ISO_INSTALL_PATH, tarfiles[0])
with kube_api.TempDirectory() as app_path:
if not cutils.extract_tarfile(app_path, tarball_name):
LOG.error("Failed to extract tar file {}.".format(
os.path.basename(tarball_name)))
return AppTarBall(tarball_name, None, None, None, None)
# If checksum file is included in the tarball, verify its contents.
if not cutils.verify_checksum(app_path):
LOG.error("Checksum validation failed for %s." % app_name)
return AppTarBall(tarball_name, None, None, None, None)
try:
name, version, patches = \
self._kube_app_helper._verify_metadata_file(
app_path, app_name, None)
manifest_name, manifest_file = \
self._kube_app_helper._find_manifest_file(app_path)
self._kube_app_helper._extract_helm_charts(app_path)
except exception.SysinvException as e:
LOG.error("Extracting tarfile for %s failed: %s." % (
app_name, str(e)))
return AppTarBall(tarball_name, None, None, None, None)
LOG.debug("Tar file of application %s verified." % app_name)
return AppTarBall(tarball_name, name, version,
manifest_name, manifest_file)
def _patching_operation_is_occurring(self):
# Makes sure a patching operation is not currently underway. We want
# all hosts to be patch-current before taking any application
# actions
#
# Execute this check in a function as the rest_api has info logs on
# the request/response. Call this only when an action will occur and
# not on in every audit cycle
try:
self._kube_app_helper._check_patching_operation()
return False
except exception.SysinvException as e:
LOG.info("{}. Patching operations are in progress. Suspending "
"actions on platform managed application until patching is "
"completed.".format(e))
except Exception as e:
LOG.error("{}. Communication Error with patching subsystem. "
"Preventing managed application actions.".format(e))
return True
def _met_app_apply_prerequisites(self, app_name):
prereqs_met = False
if app_name == constants.HELM_APP_PLATFORM:
# make sure for the ceph related apps that we have ceph access
# and the crushmap is applied to correctly set up related k8s
# resources.
crushmap_flag_file = os.path.join(constants.SYSINV_CONFIG_PATH,
constants.CEPH_CRUSH_MAP_APPLIED)
if (os.path.isfile(crushmap_flag_file) and
self._ceph.have_ceph_monitor_access() and
self._ceph.ceph_status_ok()):
prereqs_met = True
return prereqs_met
def _auto_recover_managed_app(self, context, app_name):
try:
app = kubeapp_obj.get_by_name(context, app_name)
except exception.KubeAppNotFound as e:
LOG.exception(e)
return
if self._app.is_app_aborted(app_name):
return
if constants.APP_PROGRESS_IMAGES_DOWNLOAD_FAILED not in app.progress:
return
if app.recovery_attempts >= constants.APP_AUTO_RECOVERY_MAX_COUNT:
return
tz = app.updated_at.tzinfo
if (datetime.now(tz) - app.updated_at).total_seconds() \
< CONF.conductor.managed_app_auto_recovery_interval:
return
app.status = constants.APP_UPLOAD_SUCCESS
LOG.info("Reset managed application %s status to %s",
app_name, app.status)
app.recovery_attempts += 1
app.save()
self._auto_apply_managed_app(context, app_name)
@periodic_task.periodic_task(spacing=CONF.conductor.audit_interval,
run_immediately=True)
def _k8s_application_audit(self, context):
"""Make sure that the required k8s applications are running"""
AppTarBall = namedtuple(
'AppTarBall',
"tarball_name app_name app_version manifest_name manifest_file")
def _check_tarfile(app_name):
tarfiles = []
for f in os.listdir(constants.HELM_APP_ISO_INSTALL_PATH):
if fnmatch.fnmatch(f, '{}-*'.format(app_name)):
tarfiles.append(f)
if not tarfiles:
LOG.error("Failed to find an application tarball for {}.".format(app_name))
return AppTarBall(None, None, None, None, None)
elif len(tarfiles) > 1:
LOG.error("Found multiple application tarballs for {}.".format(app_name))
return AppTarBall(None, None, None, None, None)
tarball_name = '{}/{}'.format(
constants.HELM_APP_ISO_INSTALL_PATH, tarfiles[0])
with kube_api.TempDirectory() as app_path:
if not cutils.extract_tarfile(app_path, tarball_name):
LOG.error("Failed to extract tar file {}.".format(
os.path.basename(tarball_name)))
return AppTarBall(tarball_name, None, None, None, None)
# If checksum file is included in the tarball, verify its contents.
if not cutils.verify_checksum(app_path):
LOG.error("Checksum validation failed for %s." % app_name)
return AppTarBall(tarball_name, None, None, None, None)
try:
name, version, patches = \
self._kube_app_helper._verify_metadata_file(
app_path, app_name, None)
manifest_name, manifest_file = \
self._kube_app_helper._find_manifest_file(app_path)
self._kube_app_helper._extract_helm_charts(app_path)
except exception.SysinvException as e:
LOG.error("Extracting tarfile for %s failed: %s." % (
app_name, str(e)))
return AppTarBall(tarball_name, None, None, None, None)
LOG.debug("Tar file of application %s verified." % app_name)
return AppTarBall(tarball_name, name, version,
manifest_name, manifest_file)
def _patching_operation_is_occurring():
# Makes sure a patching operation is not currently underway. We want
# all hosts to be patch-current before taking any application
# actions
#
# Execute this check in a function as the rest_api has info logs on
# the request/response. Call this only when an action will occur and
# not on in every audit cycle
try:
self._kube_app_helper._check_patching_operation()
return False
except exception.SysinvException as e:
LOG.info("{}. Patching operations are in progress. Suspending "
"actions on platform managed application until patching is "
"completed.".format(e))
except Exception as e:
LOG.error("{}. Communication Error with patching subsystem. "
"Preventing managed application actions.".format(e))
return True
def _met_app_apply_prerequisites(app_name):
prereqs_met = False
if app_name == constants.HELM_APP_PLATFORM:
# make sure for the ceph related apps that we have ceph access
# and the crushmap is applied to correctly set up related k8s
# resources.
crushmap_flag_file = os.path.join(constants.SYSINV_CONFIG_PATH,
constants.CEPH_CRUSH_MAP_APPLIED)
if (os.path.isfile(crushmap_flag_file) and
self._ceph.have_ceph_monitor_access() and
self._ceph.ceph_status_ok()):
prereqs_met = True
return prereqs_met
LOG.debug("Periodic Task: _k8s_application_audit: Starting")
# Make sure that the active controller is unlocked/enabled. Only
# install an application if the controller has been provisioned.
@ -4959,47 +5060,7 @@ class ConductorManager(service.PeriodicService):
LOG.debug("Platform managed application %s: %s" % (app_name, status))
if status == constants.APP_NOT_PRESENT:
if _patching_operation_is_occurring():
continue
LOG.info("Platform managed application %s: Creating..." % app_name)
app_data = {'name': app_name,
'app_version': constants.APP_VERSION_PLACEHOLDER,
'manifest_name': constants.APP_MANIFEST_NAME_PLACEHOLDER,
'manifest_file': constants.APP_TARFILE_NAME_PLACEHOLDER,
'status': constants.APP_UPLOAD_IN_PROGRESS}
try:
self.dbapi.kube_app_create(app_data)
app = kubeapp_obj.get_by_name(context, app_name)
except exception.KubeAppAlreadyExists as e:
LOG.exception(e)
continue
except exception.KubeAppNotFound as e:
LOG.exception(e)
continue
tarball = _check_tarfile(app_name)
if ((tarball.manifest_name is None) or
(tarball.manifest_file is None)):
app.status = constants.APP_UPLOAD_FAILURE
app.save()
continue
app.name = tarball.app_name
app.app_version = tarball.app_version
app.manifest_name = tarball.manifest_name
app.manifest_file = os.path.basename(tarball.manifest_file)
app.save()
# Action: Upload.
# Do not block this audit task or any other periodic task. This
# could be long running. The next audit cycle will pick up the
# latest status.
LOG.info("Platform managed application %s: "
"Uploading..." % app_name)
greenthread.spawn(self._app.perform_app_upload, app,
tarball.tarball_name)
self._auto_upload_managed_app(context, app_name)
elif status == constants.APP_UPLOAD_IN_PROGRESS:
# Action: do nothing
pass
@ -5007,37 +5068,12 @@ class ConductorManager(service.PeriodicService):
# Action: Raise alarm?
pass
elif status == constants.APP_UPLOAD_SUCCESS:
if not _met_app_apply_prerequisites(app_name):
LOG.info("Platform managed application %s: Prerequisites "
"not met." % app_name)
continue
if _patching_operation_is_occurring():
continue
try:
app = kubeapp_obj.get_by_name(context, app_name)
except exception.KubeAppNotFound as e:
LOG.exception(e)
continue
app.status = constants.APP_APPLY_IN_PROGRESS
app.save()
# Action: Apply the application
# Do not block this audit task or any other periodic task. This
# could be long running. The next audit cycle will pick up the
# latest status.
LOG.info("Platform managed application %s: "
"Applying..." % app_name)
greenthread.spawn(self._app.perform_app_apply, app, None)
pass
self._auto_apply_managed_app(context, app_name)
elif status == constants.APP_APPLY_IN_PROGRESS:
# Action: do nothing
pass
elif status == constants.APP_APPLY_FAILURE:
# Action: Raise alarm?
pass
self._auto_recover_managed_app(context, app_name)
elif status == constants.APP_APPLY_SUCCESS:
# Action: do nothing -> done

View File

@ -0,0 +1,37 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2019 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from sqlalchemy import Column, MetaData, Table
from sqlalchemy import Integer
ENGINE = 'InnoDB'
CHARSET = 'utf8'
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# add recovery_attempts to kube_app table
kube_app = Table(
'kube_app',
meta,
Column('id', Integer,
primary_key=True),
mysql_engine=ENGINE,
mysql_charset=CHARSET,
autoload=True)
col = Column('recovery_attempts', Integer, nullable=True, default=0)
col.create(kube_app)
col.alter(nullable=False)
def downgrade(migrate_engine):
# As per other openstack components, downgrade is
# unsupported in this release.
raise NotImplementedError('SysInv database downgrade is unsupported.')

View File

@ -1709,6 +1709,7 @@ class KubeApp(Base):
status = Column(String(255), nullable=False)
progress = Column(String(255), nullable=True)
active = Column(Boolean, nullable=False, default=False)
recovery_attempts = Column(Integer, nullable=False, default=0)
UniqueConstraint('name', 'app_version', name='u_app_name_version')

View File

@ -25,6 +25,7 @@ class KubeApp(base.SysinvObject):
'status': utils.str_or_none,
'progress': utils.str_or_none,
'active': utils.bool_or_none,
'recovery_attempts': utils.int_or_zero,
}
@base.remotable_classmethod