Merge "Introduce application PluginHelper"

This commit is contained in:
Zuul 2020-05-21 22:26:32 +00:00 committed by Gerrit Code Review
commit 50886e5190
4 changed files with 329 additions and 19 deletions

View File

@ -13,16 +13,21 @@ import base64
import copy
import docker
from eventlet.green import subprocess
import glob
import grp
import functools
import os
import pkg_resources
import pwd
import re
import ruamel.yaml as yaml
import shutil
import site
import six
import sys
import threading
import time
import zipfile
from collections import namedtuple
from eventlet import greenpool
@ -45,7 +50,6 @@ from sysinv.common.storage_backend_conf import StorageBackendConfig
from sysinv.conductor import kube_pod_helper as kube_pod
from sysinv.conductor import openstack
from sysinv.helm import common
from sysinv.helm import helm
from sysinv.helm import utils as helm_utils
@ -136,11 +140,12 @@ class AppOperator(object):
# List of in progress apps and their abort status
abort_requested = {}
def __init__(self, dbapi):
def __init__(self, dbapi, helm_op):
self._dbapi = dbapi
self._helm = helm_op
self._plugins = PluginHelper(self._dbapi, self._helm)
self._fm_api = fm_api.FaultAPIs()
self._docker = DockerHelper(self._dbapi)
self._helm = helm.HelmOperator(self._dbapi)
self._kube = kubernetes.KubeOperator()
self._utils = kube_app.KubeAppHelper(self._dbapi)
self._image = AppImageParser()
@ -150,6 +155,11 @@ class AppOperator(object):
if not os.path.isfile(constants.ANSIBLE_BOOTSTRAP_FLAG):
self._clear_stuck_applications()
# Audit discoverable app plugins to remove any stale plugins that may
# have been removed since this host was last tasked to manage
# applications
self._plugins.audit_plugins()
def _clear_armada_locks(self):
lock_name = "{}.{}.{}".format(ARMADA_LOCK_PLURAL,
ARMADA_LOCK_GROUP,
@ -264,6 +274,7 @@ class AppOperator(object):
def _cleanup(self, app, app_dir=True):
"""" Remove application directories and override files """
self._plugins.uninstall_plugins(app)
try:
if os.path.exists(app.sync_overrides_dir):
shutil.rmtree(app.sync_overrides_dir)
@ -614,10 +625,13 @@ class AppOperator(object):
# applicable. Save the list to the same location as the armada manifest
# so it can be sync'ed.
app.charts = self._get_list_of_charts(app.sync_armada_mfile)
LOG.info("Generating application overrides...")
self._plugins.activate_plugins(app)
LOG.info("Generating application overrides to discover required images.")
self._helm.generate_helm_application_overrides(
app.sync_overrides_dir, app.name, mode=None, cnamespace=None,
armada_format=True, armada_chart_info=app.charts, combined=True)
self._plugins.deactivate_plugins(app)
self._save_images_list_by_charts(app)
# Get the list of images from the updated images overrides
@ -830,6 +844,7 @@ class AppOperator(object):
# For system applications with plugin support, establish user override
# entries and disable charts based on application metadata.
self._plugins.activate_plugins(app)
db_app = self._dbapi.kube_app_get(app.name)
app_ns = self._helm.get_helm_application_namespaces(db_app.name)
for chart, namespaces in six.iteritems(app_ns):
@ -858,6 +873,7 @@ class AppOperator(object):
system_overrides})
except exception.HelmOverrideNotFound:
LOG.exception("Helm Override Not Found")
self._plugins.deactivate_plugins(app)
def _validate_labels(self, labels):
expr = re.compile(r'[a-z0-9]([-a-z0-9]*[a-z0-9])')
@ -1876,6 +1892,11 @@ class AppOperator(object):
LOG.info("Application %s (%s) upload started." % (app.name, app.version))
try:
# TODO (rchurch): Remove this version check once all applications
# have been decoupled. Since compatible plugins will be delivered
# with the versioned application tarball, no version check will be
# required. For decoupled apps, plugins are loaded later in this
# method and this base class version check is called.
if not self._helm.version_check(app.name, app.version):
LOG.info("Application %s (%s) upload rejected. Unsupported version."
% (app.name, app.version))
@ -1910,6 +1931,7 @@ class AppOperator(object):
with self._lock:
self._extract_tarfile(app)
self._plugins.install_plugins(app)
# Copy the armada manfest and metadata file to the drbd
shutil.copy(app.inst_armada_mfile, app.sync_armada_mfile)
@ -1935,7 +1957,10 @@ class AppOperator(object):
with self._lock:
self._upload_helm_charts(app)
# System overrides will be generated here. Plugins must be activated
# prior to scraping chart/system/armada overrides for images
self._save_images_list(app)
if app.patch_dependencies:
self._utils._patch_report_app_dependencies(
app.name + '-' + app.version, app.patch_dependencies)
@ -2093,6 +2118,7 @@ class AppOperator(object):
if self._rbd_provisioner_required(app.name):
self._create_rbd_provisioner_secrets(app.name)
self._create_app_specific_resources(app.name)
self._plugins.activate_plugins(app)
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_GENERATE_OVERRIDES)
@ -2251,7 +2277,11 @@ class AppOperator(object):
% (to_app.name, from_app.version, to_app.version))
try:
# Upload new app tarball
# Upload new app tarball. The upload will enable the new plugins to
# generate overrides for images. Disable the plugins for the current
# application as the new plugin module will have the same name. Only
# one version of the module can be enabled at any given moment
self._plugins.deactivate_plugins(from_app)
to_app = self.perform_app_upload(to_rpc_app, tarfile)
# Check whether the new application is compatible with the current k8s version
self._utils._check_app_compatibility(to_app.name, to_app.version)
@ -2268,8 +2298,16 @@ class AppOperator(object):
# Preserve user overrides for the new app
if reuse_overrides:
self._preserve_user_overrides(from_app, to_app)
# The app_apply will generate new versioned overrides for the
# app upgrade and will enable the new plugins for that version.
result = self.perform_app_apply(to_rpc_app, mode=None, caller='update')
elif operation == constants.APP_ROLLBACK_OP:
# The app_rollback will use the previous helm releases known to
# the k8s cluster. Overrides are not generated from any plugins
# in the case. Make sure that the enabled plugins correspond to
# the version expected to be activated
self._plugins.activate_plugins(to_app)
result = self._perform_app_rollback(from_app, to_app)
if not result:
@ -2374,6 +2412,7 @@ class AppOperator(object):
if self._rbd_provisioner_required(app.name):
self._delete_rbd_provisioner_secrets(app.name)
self._delete_app_specific_resources(app.name, constants.APP_REMOVE_OP)
self._plugins.deactivate_plugins(app)
except Exception as e:
self._abort_operation(app, constants.APP_REMOVE_OP)
LOG.exception(e)
@ -3182,3 +3221,193 @@ class AppImageParser(object):
self.generate_download_images_list(v, download_imgs_list)
return list(set(download_imgs_list))
class PluginHelper(object):
""" Utility class to help manage application plugin lifecycle """
# An enabled plugin will have a python path configuration file name with the
# following format: stx_app-platform-integ-apps-1.0-8.pth
PTH_PREFIX = 'stx_app-'
PTH_PATTERN = re.compile("{}/([\w-]+)/(\d+\.\d+-\d+)".format(common.HELM_OVERRIDES_PATH))
def __init__(self, dbapi, helm_op):
self._dbapi = dbapi
self._helm_op = helm_op
self._system_path = self._get_python_system_path()
def _get_python_system_path(self):
path = None
try:
path = site.getsitepackages()[0]
except AttributeError:
# Based on https://github.com/pypa/virtualenv/issues/737.
# site.getsitepackages() function is not available in a virtualenv.
# So use a tox friendly method when in a virtualenv
try:
from distutils.sysconfig import get_python_lib
path = get_python_lib()
except Exception as e:
raise exception.SysinvException(_(
"Failed to determine the python site packages path" % str(e)))
if not path:
raise exception.SysinvException(_(
"Failed to determine the python site packages path."))
return path
def _get_pth_fqpn(self, app):
return "{}/{}{}-{}.pth".format(
self._system_path, self.PTH_PREFIX, app.name, app.version)
def audit_plugins(self):
""" Verify that only enabled application plugins are discoverable """
pattern = '{}/{}*.pth'.format(self._system_path, self.PTH_PREFIX)
discoverable_pths = glob.glob(pattern)
LOG.debug("PluginHelper: Discoverable app plugins: %s" % discoverable_pths)
for pth in discoverable_pths:
with open(pth, 'r') as f:
contents = f.readlines()
if len(contents) == 1:
LOG.debug("PluginHelper: Plugin Path: %s" % contents[0])
match = self.PTH_PATTERN.match(contents[0])
if match:
app = match.group(1)
ver = match.group(2)
try:
app_obj = self._dbapi.kube_app_get(app)
if app_obj.app_version == ver:
LOG.info("PluginHelper: App %s, version %s: Found "
"valid plugin" % (app, ver))
continue
else:
LOG.warning("PluginHelper: Stale plugin pth file "
"found %s: Wrong plugin version "
"enabled %s." % (pth, ver))
except exception.KubeAppNotFound:
LOG.warning("PluginHelper: Stale plugin pth file found"
" %s: App is not active." % pth)
else:
LOG.warning("PluginHelper: Invalid pth file %s: Invalid "
"name or version." % pth)
else:
LOG.warning("PluginHelper: Invalid pth file %s: Only one path"
" is expected." % pth)
LOG.info("PluginHelper: Removing invalid plugin pth: %s" % pth)
os.remove(pth)
def install_plugins(self, app):
""" Install application plugins. """
# An app may be packaged with multiple wheels, discover and install them
# in the synced app plugin directory
pattern = '{}/*.whl'.format(app.inst_plugins_dir)
discovered_whls = glob.glob(pattern)
if not discovered_whls:
LOG.info("PluginHelper: %s does not contains any platform plugins." %
app.name)
return
if not os.path.isdir(app.sync_plugins_dir):
LOG.info("PluginHelper: Creating %s plugin directory %s." % (
app.name, app.sync_plugins_dir))
os.makedirs(app.sync_plugins_dir)
for whl in discovered_whls:
LOG.info("PluginHelper: Installing %s plugin %s to %s." % (
app.name, whl, app.sync_plugins_dir))
with zipfile.ZipFile(whl) as zf:
zf.extractall(app.sync_plugins_dir)
def uninstall_plugins(self, app):
""" Uninstall application plugins."""
if os.path.isdir(app.sync_plugins_dir):
try:
LOG.info("PluginHelper: Removing plugin directory %s" %
app.sync_plugins_dir)
shutil.rmtree(app.sync_plugins_dir)
except OSError:
LOG.exception("PluginHelper: Failed to remove plugin directory:"
" %s" % app.sync_plugins_dir)
else:
LOG.info("PluginHelper: Plugin directory %s does not exist. No "
"need to remove." % app.sync_plugins_dir)
def activate_plugins(self, app):
# Add a .pth file to a site-packages directory so the plugin is picked
# automatically on a conductor restart
pth_fqpn = self._get_pth_fqpn(app)
with open(pth_fqpn, 'w') as f:
f.write(app.sync_plugins_dir + '\n')
LOG.info("PluginHelper: Enabled plugin directory %s: created %s" % (
app.sync_plugins_dir, pth_fqpn))
# Make sure the sys.path reflects enabled plugins Add the plugin to
# sys.path
site.addsitedir(app.sync_plugins_dir)
# Find the distribution and add it to the resources working set
for d in pkg_resources.find_distributions(app.sync_plugins_dir,
only=True):
pkg_resources.working_set.add(d, entry=None, insert=True,
replace=True)
if self._helm_op:
self._helm_op.discover_plugins()
def deactivate_plugins(self, app):
pth_fqpn = self._get_pth_fqpn(app)
if os.path.exists(pth_fqpn):
# Remove the pth file, so on a conductor restart this installed
# plugin is not discoverable
try:
os.remove(pth_fqpn)
LOG.info("PluginHelper: Disabled plugin directory %s: removed "
"%s" % (app.sync_plugins_dir, pth_fqpn))
except OSError:
# Not present, should be, but continue on...
pass
# Make sure the sys.path reflects only enabled plugins
try:
sys.path.remove(app.sync_plugins_dir)
except ValueError:
# Not present, should be, but continue on...
LOG.warning("sys.path (%s) is missing plugin (%s)" % (
sys.path, app.sync_plugins_dir))
# Determine distributions installed by this plugin
if app.sync_plugins_dir in pkg_resources.working_set.entry_keys:
plugin_distributions = pkg_resources.working_set.entry_keys[app.sync_plugins_dir]
LOG.info("PluginHelper: Disabling distributions: %s" % plugin_distributions)
# Clean up the distribution(s) module names
module_name_cleanup = []
for module_name, value in six.iteritems(sys.modules):
for distribution in plugin_distributions:
distribution_module_name = distribution.replace('-', '_')
if ((module_name == distribution_module_name) or
(module_name.startswith(distribution_module_name + '.'))):
LOG.debug("PluginHelper: Removing module name: %s: %s" % (module_name, value))
module_name_cleanup.append(module_name)
for module_name in module_name_cleanup:
del sys.modules[module_name]
# Clean up the working set
for distribution in plugin_distributions:
del pkg_resources.working_set.by_key[distribution]
del pkg_resources.working_set.entry_keys[app.sync_plugins_dir]
pkg_resources.working_set.entries.remove(app.sync_plugins_dir)
if self._helm_op:
# purge this plugin from the stevedore plugin cache so this version
# of the plugin endoints are not discoverable
self._helm_op.purge_cache_by_location(app.sync_plugins_dir)

View File

@ -242,9 +242,9 @@ class ConductorManager(service.PeriodicService):
# brought up during bootstrap manifest apply and is not restarted
# until host unlock and we need ceph-mon up in order to configure
# ceph for the initial unlock.
self._app = kube_app.AppOperator(self.dbapi)
self._docker = kube_app.DockerHelper(self.dbapi)
self._helm = helm.HelmOperator(self.dbapi)
self._app = kube_app.AppOperator(self.dbapi, self._helm)
self._docker = kube_app.DockerHelper(self.dbapi)
self._kube = kubernetes.KubeOperator()
self._kube_pod = kube_pod.K8sPodOperator(self._kube)
self._kube_app_helper = kube_api.KubeAppHelper(self.dbapi)

View File

@ -63,33 +63,98 @@ def suppress_stevedore_errors(manager, entrypoint, exception):
class HelmOperator(object):
"""Class to encapsulate helm override operations for System Inventory"""
# Define the stevedore namespaces that will need to be managed for plugins
STEVEDORE_APPS = 'systemconfig.helm_applications'
STEVEDORE_ARMADA = 'systemconfig.armada.manifest_ops'
def __init__(self, dbapi=None):
self.dbapi = dbapi
# Initialize the plugins
self.helm_system_applications = {}
self.chart_operators = {}
self.armada_manifest_operators = {}
# Find all plugins for apps, charts per app, and armada manifest
# operators
self.discover_plugins()
def discover_plugins(self):
""" Scan for all available plugins """
LOG.info("HelmOperator: Loading available helm and armada plugins.")
# Initialize the plugins
self.helm_system_applications = {}
self.chart_operators = {}
self.armada_manifest_operators = {}
# Need to purge the stevedore plugin cache so that when we discover the
# plugins, new plugin resources are found. If the cache exists, then no
# new plugins are discoverable.
self.purge_cache()
# dict containing sequence of helm charts per app
self.helm_system_applications = self._load_helm_applications()
# dict containing Armada manifest operators per app
self.armada_manifest_operators = self._load_armada_manifest_operators()
def purge_cache_by_location(self, install_location):
"""Purge the stevedore entry point cache."""
for armada_ep in extension.ExtensionManager.ENTRY_POINT_CACHE[self.STEVEDORE_ARMADA]:
if armada_ep.dist.location == install_location:
extension.ExtensionManager.ENTRY_POINT_CACHE[self.STEVEDORE_ARMADA].remove(armada_ep)
break
else:
LOG.info("Couldn't find endpoint distribution located at %s for "
"%s" % (install_location, armada_ep.dist))
for app_ep in extension.ExtensionManager.ENTRY_POINT_CACHE[self.STEVEDORE_APPS]:
if app_ep.dist.location == install_location:
namespace = app_ep.module_name
purged_list = []
for helm_ep in extension.ExtensionManager.ENTRY_POINT_CACHE[namespace]:
if helm_ep.dist.location != install_location:
purged_list.append(helm_ep)
if purged_list:
extension.ExtensionManager.ENTRY_POINT_CACHE[namespace] = purged_list
else:
del extension.ExtensionManager.ENTRY_POINT_CACHE[namespace]
extension.ExtensionManager.ENTRY_POINT_CACHE[self.STEVEDORE_APPS].remove(app_ep)
LOG.info("Removed stevedore namespace: %s" % namespace)
def purge_cache(self):
"""Purge the stevedore entry point cache."""
if self.STEVEDORE_APPS in extension.ExtensionManager.ENTRY_POINT_CACHE:
for entry_point in extension.ExtensionManager.ENTRY_POINT_CACHE[self.STEVEDORE_APPS]:
namespace = entry_point.module_name
try:
del extension.ExtensionManager.ENTRY_POINT_CACHE[namespace]
LOG.debug("Deleted entry points for %s." % namespace)
except KeyError:
LOG.info("No entry points for %s found." % namespace)
try:
del extension.ExtensionManager.ENTRY_POINT_CACHE[self.STEVEDORE_APPS]
LOG.debug("Deleted entry points for %s." % self.STEVEDORE_APPS)
except KeyError:
LOG.info("No entry points for %s found." % self.STEVEDORE_APPS)
else:
LOG.info("No entry points for %s found." % self.STEVEDORE_APPS)
try:
del extension.ExtensionManager.ENTRY_POINT_CACHE[self.STEVEDORE_ARMADA]
LOG.debug("Deleted entry points for %s." % self.STEVEDORE_ARMADA)
except KeyError:
LOG.info("No entry points for %s found." % self.STEVEDORE_ARMADA)
def _load_armada_manifest_operators(self):
"""Build a dictionary of armada manifest operators"""
operators_dict = {}
dist_info_dict = {}
armada_manifest_operators = extension.ExtensionManager(
namespace='systemconfig.armada.manifest_ops',
namespace=self.STEVEDORE_ARMADA,
invoke_on_load=True, invoke_args=())
sorted_armada_manifest_operators = sorted(
@ -99,13 +164,21 @@ class HelmOperator(object):
if (op.name[-(ARMADA_PLUGIN_SUFFIX_LENGTH - 1):].isdigit() and
op.name[-ARMADA_PLUGIN_SUFFIX_LENGTH:-3] == '_'):
op_name = op.name[0:-ARMADA_PLUGIN_SUFFIX_LENGTH]
LOG.info("_load_armada_manifest_operators op.name=%s "
"adjust to op_name=%s" % (op.name, op_name))
else:
op_name = op.name
operators_dict[op_name] = op.obj
# Extract distribution information for logging
dist_info_dict[op_name] = {
'name': op.entry_point.dist.project_name,
'location': op.entry_point.dist.location,
}
# Provide some log feedback on plugins being used
for (app_name, info) in iteritems(dist_info_dict):
LOG.info("Plugins for %-20s: loaded from %-20s - %s." % (app_name,
info['name'], info['location']))
return operators_dict
def get_armada_manifest_operator(self, app_name):
@ -122,7 +195,7 @@ class HelmOperator(object):
helm_application_dict = {}
helm_applications = extension.ExtensionManager(
namespace='systemconfig.helm_applications',
namespace=self.STEVEDORE_APPS,
on_load_failure_callback=suppress_stevedore_errors
)
for entry_point in helm_applications.list_entry_points():
@ -135,6 +208,11 @@ class HelmOperator(object):
namespace=namespace, invoke_on_load=True, invoke_args=(self,))
sorted_helm_plugins = sorted(helm_plugins.extensions, key=lambda x: x.name)
for plugin in sorted_helm_plugins:
LOG.debug("%s: helm plugin %s loaded from %s - %s." % (name,
plugin.name,
plugin.entry_point.dist.project_name,
plugin.entry_point.dist.location))
plugin_name = plugin.name[HELM_PLUGIN_PREFIX_LENGTH:]
self.chart_operators.update({plugin_name: plugin.obj})
# Remove duplicates, keeping last occurrence only
@ -471,7 +549,7 @@ class HelmOperator(object):
# Extract the info we want.
values = output.split('USER-SUPPLIED VALUES:\n')[1].split(
'\nCOMPUTED VALUES:')[0]
'\nCOMPUTED VALUES:')[0]
except Exception:
raise
finally:

View File

@ -11,6 +11,7 @@ import fixtures
from sysinv.common import constants
from sysinv.conductor import kube_app
from sysinv.db import api as dbapi
from sysinv.helm import helm
from sysinv.openstack.common import context
from sysinv.objects import kube_app as obj_app
@ -24,7 +25,9 @@ class AppOperatorTestCase(base.DbTestCase):
super(AppOperatorTestCase, self).setUp()
# Set up objects for testing
self.app_operator = kube_app.AppOperator(dbapi.get_instance())
self.helm_operator = helm.HelmOperator(dbapi.get_instance())
self.app_operator = kube_app.AppOperator(dbapi.get_instance(),
self.helm_operator)
self.context = context.get_admin_context()
self.dbapi = dbapi.get_instance()
self.temp_dir = self.useFixture(fixtures.TempDir())