diff --git a/sysinv/sysinv/centos/build_srpm.data b/sysinv/sysinv/centos/build_srpm.data index bbdd071fd3..1f1ee444b1 100644 --- a/sysinv/sysinv/centos/build_srpm.data +++ b/sysinv/sysinv/centos/build_srpm.data @@ -1,2 +1,2 @@ SRC_DIR="sysinv" -TIS_PATCH_VER=314 +TIS_PATCH_VER=315 diff --git a/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/kube_app.py b/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/kube_app.py index adbb02b24c..475712de37 100644 --- a/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/kube_app.py +++ b/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/kube_app.py @@ -339,8 +339,9 @@ class KubeAppHelper(object): ) query_patches = response['pd'] except Exception as e: + # Assume that a patching operation is underway, raise an exception. LOG.error(_("No response from patch api: %s" % e)) - return + raise for patch in query_patches: patch_state = query_patches[patch].get('patchstate', None) @@ -430,6 +431,10 @@ class KubeAppHelper(object): raise exception.SysinvException(_( "{}. Please upload after the patching operation " "is completed.".format(e))) + except Exception as e: + raise exception.SysinvException(_( + "{}. Communication Error with patching subsytem. " + "Preventing application upload.".format(e))) applied = self._check_patch_is_applied(patches) if not applied: diff --git a/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/utils.py b/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/utils.py index 8571a8e491..f4292c367c 100644 --- a/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/utils.py +++ b/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/utils.py @@ -32,7 +32,6 @@ import tsconfig.tsconfig as tsc from oslo_config import cfg from sysinv.common import constants from sysinv.common import exception -from sysinv.common.utils import memoized from sysinv.helm import common as helm_common from sysinv.openstack.common.gettextutils import _ from sysinv.openstack.common import log @@ -338,7 +337,6 @@ class SystemHelper(object): class HostHelper(object): @staticmethod - @memoized def get_active_controller(dbapi=None): """Returns host object for active controller.""" if not dbapi: diff --git a/sysinv/sysinv/sysinv/sysinv/common/constants.py b/sysinv/sysinv/sysinv/sysinv/common/constants.py index 6f96255e25..de56d59333 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/constants.py +++ b/sysinv/sysinv/sysinv/sysinv/common/constants.py @@ -1480,6 +1480,12 @@ HELM_APP_APPLY_MODES = { HELM_APP_OPENSTACK: OPENSTACK_APP_APPLY_MODES } +HELM_APPS_PLATFORM_MANAGED = [ + HELM_APP_PLATFORM, +] + +HELM_APP_ISO_INSTALL_PATH = '/usr/local/share/applications/helm' + # RBD Provisioner Ceph backend capabilities fields K8S_RBD_PROV_STORAGECLASS_NAME = 'rbd_storageclass_name' # Customer K8S_RBD_PROV_NAMESPACES = 'rbd_provisioner_namespaces' # Customer @@ -1501,6 +1507,7 @@ APP_SYNCED_DATA_PATH = os.path.join(tsc.PLATFORM_PATH, 'armada', tsc.SW_VERSION) APP_METADATA_FILE = 'metadata.yaml' # State constants +APP_NOT_PRESENT = 'missing' APP_UPLOAD_IN_PROGRESS = 'uploading' APP_UPLOAD_SUCCESS = 'uploaded' APP_UPLOAD_FAILURE = 'upload-failed' diff --git a/sysinv/sysinv/sysinv/sysinv/conductor/manager.py b/sysinv/sysinv/sysinv/sysinv/conductor/manager.py index 10ee088e39..fda69d14dd 100644 --- a/sysinv/sysinv/sysinv/sysinv/conductor/manager.py +++ b/sysinv/sysinv/sysinv/sysinv/conductor/manager.py @@ -31,6 +31,7 @@ collection of inventory data for each host. import errno import filecmp +import fnmatch import glob import math import os @@ -45,12 +46,14 @@ import xml.etree.ElementTree as ElementTree from contextlib import contextmanager import tsconfig.tsconfig as tsc +from collections import namedtuple from cgcs_patch.patch_verify import verify_files from controllerconfig.upgrades import management as upgrades_management from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa +from eventlet import greenthread from fm_api import constants as fm_constants from fm_api import fm_api from netaddr import IPAddress @@ -62,6 +65,7 @@ from six.moves import http_client as httplib from sysinv.agent import rpcapi as agent_rpcapi from sysinv.api.controllers.v1 import address_pool from sysinv.api.controllers.v1 import cpu_utils +from sysinv.api.controllers.v1 import kube_app as kube_api from sysinv.api.controllers.v1 import mtce_api from sysinv.api.controllers.v1 import utils from sysinv.api.controllers.v1 import vim_api @@ -83,6 +87,7 @@ from sysinv.conductor import kube_app from sysinv.conductor import openstack from sysinv.db import api as dbapi from sysinv.objects import base as objects_base +from sysinv.objects import kube_app as kubeapp_obj from sysinv.openstack.common import excutils from sysinv.openstack.common import jsonutils from sysinv.openstack.common import log @@ -200,6 +205,7 @@ class ConductorManager(service.PeriodicService): self._ceph = iceph.CephOperator(self.dbapi) self._helm = helm.HelmOperator(self.dbapi) self._kube = kubernetes.KubeOperator(self.dbapi) + self._kube_app_helper = kube_api.KubeAppHelper(self.dbapi) self._fernet = fernet.FernetOperator() # Upgrade start tasks @@ -4999,6 +5005,183 @@ class ConductorManager(service.PeriodicService): elif bk.backend in self._stor_bck_op_timeouts: del self._stor_bck_op_timeouts[bk.backend] + @periodic_task.periodic_task(spacing=CONF.conductor.audit_interval, + run_immediately=True) + def _k8s_application_audit(self, context): + """Make sure that the required k8s applications are running""" + + AppTarBall = namedtuple( + 'AppTarBall', + "tarball_name app_name app_version manifest_name manifest_file") + + def _check_tarfile(app_name): + tarfiles = [] + for f in os.listdir(constants.HELM_APP_ISO_INSTALL_PATH): + if fnmatch.fnmatch(f, '{}-*'.format(app_name)): + tarfiles.append(f) + + if not tarfiles: + LOG.error("Failed to find an application tarball for {}.".format(app_name)) + return AppTarBall(None, None, None, None, None) + elif len(tarfiles) > 1: + LOG.error("Found multiple application tarballs for {}.".format(app_name)) + return AppTarBall(None, None, None, None, None) + + tarball_name = '{}/{}'.format( + constants.HELM_APP_ISO_INSTALL_PATH, tarfiles[0]) + + with kube_api.TempDirectory() as app_path: + if not cutils.extract_tarfile(app_path, tarball_name): + LOG.error("Failed to extract tar file {}.".format( + os.path.basename(tarball_name))) + return AppTarBall(tarball_name, None, None, None, None) + + # If checksum file is included in the tarball, verify its contents. + if not cutils.verify_checksum(app_path): + LOG.error("Checksum validation failed for %s." % app_name) + return AppTarBall(tarball_name, None, None, None, None) + + try: + name, version, patches = \ + self._kube_app_helper._verify_metadata_file( + app_path, app_name, None) + manifest_name, manifest_file = \ + self._kube_app_helper._find_manifest_file(app_path) + self._kube_app_helper._extract_helm_charts(app_path) + except exception.SysinvException as e: + LOG.error("Extracting tarfile for %s failed: %s." % ( + app_name, str(e))) + return AppTarBall(tarball_name, None, None, None, None) + + LOG.debug("Tar file of application %s verified." % app_name) + return AppTarBall(tarball_name, name, version, + manifest_name, manifest_file) + + def _patching_operation_is_occurring(): + # Makes sure a patching operation is not currently underway. We want + # all hosts to be patch-current before taking any application + # actions + # + # Execute this check in a function as the rest_api has info logs on + # the request/response. Call this only when an action will occur and + # not on in every audit cycle + try: + self._kube_app_helper._check_patching_operation() + return False + except exception.SysinvException as e: + LOG.info("{}. Patching operations are in progress. Suspending " + "actions on platform managed application until patching is " + "completed.".format(e)) + except Exception as e: + LOG.error("{}. Communication Error with patching subsystem. " + "Preventing managed application actions.".format(e)) + return True + + LOG.debug("Periodic Task: _k8s_application_audit: Starting") + # Make sure that the active controller is unlocked/enabled. Only + # install an application if the controller has been provisioned. + active_ctrl = utils.HostHelper.get_active_controller(self.dbapi) + + if (active_ctrl is None or + ((active_ctrl.administrative != constants.ADMIN_UNLOCKED) or + (active_ctrl.operational != constants.OPERATIONAL_ENABLED))): + return + + # Check the application state and take the approprate action + for app_name in constants.HELM_APPS_PLATFORM_MANAGED: + + # Handle initial loading states + try: + app = kubeapp_obj.get_by_name(context, app_name) + status = app.status + except exception.KubeAppNotFound: + status = constants.APP_NOT_PRESENT + + LOG.debug("Platform managed application %s: %s" % (app_name, status)) + if status == constants.APP_NOT_PRESENT: + + LOG.info("Platform managed application %s: Creating..." % app_name) + app_data = {'name': app_name, + 'app_version': constants.APP_VERSION_PLACEHOLDER, + 'manifest_name': constants.APP_MANIFEST_NAME_PLACEHOLDER, + 'manifest_file': constants.APP_TARFILE_NAME_PLACEHOLDER, + 'status': constants.APP_UPLOAD_IN_PROGRESS} + try: + self.dbapi.kube_app_create(app_data) + app = kubeapp_obj.get_by_name(context, app_name) + except exception.KubeAppAlreadyExists as e: + LOG.exception(e) + continue + except exception.KubeAppNotFound as e: + LOG.exception(e) + continue + + tarball = _check_tarfile(app_name) + if ((tarball.manifest_name is None) or + (tarball.manifest_file is None)): + app.status = constants.APP_UPLOAD_FAILURE + app.save() + continue + + app.name = tarball.app_name + app.app_version = tarball.app_version + app.manifest_name = tarball.manifest_name + app.manifest_file = os.path.basename(tarball.manifest_file) + app.save() + + if _patching_operation_is_occurring(): + continue + + # Action: Upload. + # Do not block this audit task or any other periodic task. This + # could be long running. The next audit cycle will pick up the + # latest status. + LOG.info("Platform managed application %s: " + "Uploading..." % app_name) + greenthread.spawn(self._app.perform_app_upload, app, + tarball.tarball_name) + elif status == constants.APP_UPLOAD_IN_PROGRESS: + # Action: do nothing + pass + elif status == constants.APP_UPLOAD_FAILURE: + # Action: Raise alarm? + pass + elif status == constants.APP_UPLOAD_SUCCESS: + if _patching_operation_is_occurring(): + continue + + try: + app = kubeapp_obj.get_by_name(context, app_name) + app.status = constants.APP_APPLY_IN_PROGRESS + except exception.KubeAppNotFound as e: + LOG.exception(e) + continue + + # Action: Apply the application + # Do not block this audit task or any other periodic task. This + # could be long running. The next audit cycle will pick up the + # latest status. + LOG.info("Platform managed application %s: " + "Applying..." % app_name) + greenthread.spawn(self._app.perform_app_apply, app, None) + pass + elif status == constants.APP_APPLY_IN_PROGRESS: + # Action: do nothing + pass + elif status == constants.APP_APPLY_FAILURE: + # Action: Raise alarm? + pass + elif status == constants.APP_APPLY_SUCCESS: + # Action: do nothing -> done + + # TODO(rchurch): Check to see if an existing application needs + # upgrading. Wait for the proper application versioning + # support to the determine proper action. + + pass + + LOG.debug("Periodic Task: _k8s_application_audit: Finished") + def get_k8s_namespaces(self, context): """ Get Kubernetes namespaces :returns: list of namespaces