Merge "Provide progress update for upload, apply & remove"
This commit is contained in:
commit
c548f1ec97
@ -18,11 +18,16 @@ def _print_application_show(app):
|
||||
utils.print_dict(ordereddata, wrap=72)
|
||||
|
||||
|
||||
def _print_reminder_msg(app_name):
|
||||
print("Please use 'system application-list' or 'system "
|
||||
"application-show %s' to view the current progress." % app_name)
|
||||
|
||||
|
||||
def do_application_list(cc, args):
|
||||
"""List all containerized applications"""
|
||||
apps = cc.app.list()
|
||||
labels = ['application', 'manifest name', 'manifest file', 'status']
|
||||
fields = ['name', 'manifest_name', 'manifest_file', 'status']
|
||||
labels = ['application', 'manifest name', 'manifest file', 'status', 'progress']
|
||||
fields = ['name', 'manifest_name', 'manifest_file', 'status', 'progress']
|
||||
utils.print_list(apps, fields, labels, sortby=0)
|
||||
|
||||
|
||||
@ -59,6 +64,7 @@ def do_application_upload(cc, args):
|
||||
'tarfile': tarfile}
|
||||
response = cc.app.upload(data)
|
||||
_print_application_show(response)
|
||||
_print_reminder_msg(args.name)
|
||||
|
||||
|
||||
@utils.arg('name', metavar='<app name>',
|
||||
@ -68,6 +74,7 @@ def do_application_apply(cc, args):
|
||||
try:
|
||||
response = cc.app.apply(args.name)
|
||||
_print_application_show(response)
|
||||
_print_reminder_msg(args.name)
|
||||
except exc.HTTPNotFound:
|
||||
raise exc.CommandError('Application not found: %s' % args.name)
|
||||
|
||||
@ -79,6 +86,7 @@ def do_application_remove(cc, args):
|
||||
try:
|
||||
response = cc.app.remove(args.name)
|
||||
_print_application_show(response)
|
||||
_print_reminder_msg(args.name)
|
||||
except exc.HTTPNotFound:
|
||||
raise exc.CommandError('Application not found: %s' % args.name)
|
||||
|
||||
|
@ -68,6 +68,9 @@ class KubeApp(base.APIBase):
|
||||
status = wtypes.text
|
||||
"Represents the installation status of the application"
|
||||
|
||||
progress = wtypes.text
|
||||
"Represents the installation progress of the application"
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.fields = objects.kube_app.fields.keys()
|
||||
for k in self.fields:
|
||||
@ -80,7 +83,7 @@ class KubeApp(base.APIBase):
|
||||
app = KubeApp(**rpc_app.as_dict())
|
||||
if not expand:
|
||||
app.unset_fields_except(['name', 'manifest_name',
|
||||
'manifest_file', 'status'])
|
||||
'manifest_file', 'status', 'progress'])
|
||||
|
||||
# skip the id
|
||||
app.id = wtypes.Unset
|
||||
@ -320,6 +323,7 @@ class KubeAppController(rest.RestController):
|
||||
"Application-apply rejected: operation is not allowed "
|
||||
"while the current status is {}.".format(db_app.status)))
|
||||
db_app.status = constants.APP_APPLY_IN_PROGRESS
|
||||
db_app.progress = None
|
||||
db_app.save()
|
||||
pecan.request.rpcapi.perform_app_apply(pecan.request.context,
|
||||
db_app)
|
||||
@ -332,6 +336,7 @@ class KubeAppController(rest.RestController):
|
||||
"Application-remove rejected: operation is not allowed while "
|
||||
"the current status is {}.".format(db_app.status)))
|
||||
db_app.status = constants.APP_REMOVE_IN_PROGRESS
|
||||
db_app.progress = None
|
||||
db_app.save()
|
||||
pecan.request.rpcapi.perform_app_remove(pecan.request.context,
|
||||
db_app)
|
||||
|
@ -1507,6 +1507,16 @@ APP_APPLY_OP = 'apply'
|
||||
APP_REMOVE_OP = 'remove'
|
||||
APP_DELETE_OP = 'delete'
|
||||
|
||||
# Progress constants
|
||||
APP_PROGRESS_ABORTED = 'operation aborted, check logs for detail'
|
||||
APP_PROGRESS_APPLY_MANIFEST = 'applying application manifest'
|
||||
APP_PROGRESS_COMPLETED = 'completed'
|
||||
APP_PROGRESS_DELETE_MANIFEST = 'deleting application manifest'
|
||||
APP_PROGRESS_DOWNLOAD_IMAGES = 'retrieving docker images'
|
||||
APP_PROGRESS_EXTRACT_TARFILE = 'extracting application tar file'
|
||||
APP_PROGRESS_GENERATE_OVERRIDES = 'generating application overrides'
|
||||
APP_PROGRESS_VALIDATE_UPLOAD_CHARTS = 'validating and uploading charts'
|
||||
|
||||
# Node label operation constants
|
||||
LABEL_ASSIGN_OP = 'assign'
|
||||
LABEL_REMOVE_OP = 'remove'
|
||||
|
@ -1093,6 +1093,10 @@ class SysInvSignalTimeout(SysinvException):
|
||||
message = "Sysinv Timeout."
|
||||
|
||||
|
||||
class KubeAppProgressMonitorTimeout(SysinvException):
|
||||
message = "Armada execution progress monitor timed out."
|
||||
|
||||
|
||||
class InvalidEndpoint(SysinvException):
|
||||
message = "The provided endpoint is invalid"
|
||||
|
||||
|
@ -10,7 +10,6 @@
|
||||
""" System Inventory Kubernetes Application Operator."""
|
||||
|
||||
import docker
|
||||
import eventlet
|
||||
import grp
|
||||
import os
|
||||
import re
|
||||
@ -22,6 +21,10 @@ import time
|
||||
import yaml
|
||||
|
||||
from collections import namedtuple
|
||||
from eventlet import greenpool
|
||||
from eventlet import greenthread
|
||||
from eventlet import queue
|
||||
from eventlet import Timeout
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from sysinv.common import constants
|
||||
@ -44,6 +47,9 @@ CONF.register_opts(kube_app_opts)
|
||||
ARMADA_CONTAINER_NAME = 'armada_service'
|
||||
MAX_DOWNLOAD_THREAD = 20
|
||||
INSTALLATION_TIMEOUT = 3600
|
||||
APPLY_SEARCH_PATTERN = 'Processing Chart,'
|
||||
DELETE_SEARCH_PATTERN = 'Deleting release'
|
||||
CONTAINER_ABNORMAL_EXIT_CODE = 137
|
||||
|
||||
|
||||
Chart = namedtuple('Chart', 'name namespace')
|
||||
@ -73,18 +79,28 @@ class AppOperator(object):
|
||||
except OSError as e:
|
||||
LOG.exception(e)
|
||||
|
||||
def _update_app_status(self, app, new_status):
|
||||
def _update_app_status(self, app, new_status=None, new_progress=None):
|
||||
""" Persist new app status """
|
||||
|
||||
if new_status is None:
|
||||
new_status = app.status
|
||||
elif (new_status in [constants.APP_UPLOAD_SUCCESS,
|
||||
constants.APP_APPLY_SUCCESS]):
|
||||
new_progress = constants.APP_PROGRESS_COMPLETED
|
||||
|
||||
with self._lock:
|
||||
app.status = new_status
|
||||
app.update_status(new_status, new_progress)
|
||||
|
||||
def _abort_operation(self, app, operation):
|
||||
if (app.status == constants.APP_UPLOAD_IN_PROGRESS):
|
||||
self._update_app_status(app, constants.APP_UPLOAD_FAILURE)
|
||||
self._update_app_status(app, constants.APP_UPLOAD_FAILURE,
|
||||
constants.APP_PRORESS_ABORTED)
|
||||
elif (app.status == constants.APP_APPLY_IN_PROGRESS):
|
||||
self._update_app_status(app, constants.APP_APPLY_FAILURE)
|
||||
self._update_app_status(app, constants.APP_APPLY_FAILURE,
|
||||
constants.APP_PROGRESS_ABORTED)
|
||||
elif (app.status == constants.APP_REMOVE_IN_PROGRESS):
|
||||
self._update_app_status(app, constants.APP_REMOVE_FAILURE)
|
||||
self._update_app_status(app, constants.APP_REMOVE_FAILURE,
|
||||
constants.APP_PROGRESS_ABORTED)
|
||||
LOG.error("Application %s aborted!." % operation)
|
||||
|
||||
def _extract_tarfile(self, app):
|
||||
@ -163,10 +179,8 @@ class AppOperator(object):
|
||||
|
||||
def _register_embedded_images(self, app):
|
||||
"""
|
||||
TODO(tngo):
|
||||
=============
|
||||
When we're ready to support air-gapped scenario and private images, the
|
||||
following need to be done:
|
||||
TODO(tngo): When we're ready to support air-gap scenario and private
|
||||
images, the following need to be done:
|
||||
a. load the embedded images
|
||||
b. tag and push them to the docker registery on the controller
|
||||
c. find image tag IDs in each chart and replace their values with
|
||||
@ -186,9 +200,8 @@ class AppOperator(object):
|
||||
if app.system_app:
|
||||
# Grab the image tags from the overrides. If they don't exist
|
||||
# then mine them from the chart paths.
|
||||
charts = self._get_list_of_charts(app.mfile_abs)
|
||||
images_to_download = self._get_image_tags_by_charts(app.charts_dir,
|
||||
charts)
|
||||
app.charts)
|
||||
else:
|
||||
# For custom apps, mine image tags from application path
|
||||
images_to_download = self._get_image_tags_by_path(app.path)
|
||||
@ -203,7 +216,7 @@ class AppOperator(object):
|
||||
failed_downloads = []
|
||||
|
||||
start = time.time()
|
||||
pool = eventlet.greenpool.GreenPool(size=threads)
|
||||
pool = greenpool.GreenPool(size=threads)
|
||||
for tag, rc in pool.imap(self._docker.download_an_image,
|
||||
images_to_download):
|
||||
if not rc:
|
||||
@ -437,6 +450,96 @@ class AppOperator(object):
|
||||
self._helm.remove_helm_chart_overrides(chart.name,
|
||||
chart.namespace)
|
||||
|
||||
def _make_armada_request_with_monitor(self, app, request, overrides_str=None):
|
||||
"""Initiate armada request with monitoring
|
||||
|
||||
This method delegates the armada request to docker helper and starts
|
||||
a monitoring thread to persist status and progress along the way.
|
||||
|
||||
:param app: application data object
|
||||
:param request: type of request (apply or delete)
|
||||
:param overrides_str: list of overrides in string format to be applied
|
||||
"""
|
||||
|
||||
def _get_armada_log_stats(pattern, logfile):
|
||||
"""
|
||||
TODO(tngo): In the absence of an Armada API that provides the current
|
||||
status of an apply/delete manifest operation, the progress is derived
|
||||
from specific log entries extracted from the execution logs. This
|
||||
inner method is to be replaced with an official API call when
|
||||
it becomes available.
|
||||
"""
|
||||
p1 = subprocess.Popen(['docker', 'exec', ARMADA_CONTAINER_NAME,
|
||||
'grep', pattern, logfile],
|
||||
stdout=subprocess.PIPE)
|
||||
p2 = subprocess.Popen(['awk', '{print $NF}'], stdin=p1.stdout,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
p1.stdout.close()
|
||||
result, err = p2.communicate()
|
||||
if result:
|
||||
# Strip out ANSI color code that might be in the text stream
|
||||
r = re.compile("\x1b\[[0-9;]*m")
|
||||
result = r.sub('', result)
|
||||
matches = result.split()
|
||||
num_chart_processed = len(matches)
|
||||
last_chart_processed = matches[num_chart_processed - 1]
|
||||
if '=' in last_chart_processed:
|
||||
last_chart_processed = last_chart_processed.split('=')[1]
|
||||
return last_chart_processed, num_chart_processed
|
||||
|
||||
return None, None
|
||||
|
||||
def _check_progress(monitor_flag, app, pattern, logfile):
|
||||
""" Progress monitoring task, to be run in a separate thread """
|
||||
LOG.info("Starting progress monitoring thread for app %s" % app.name)
|
||||
try:
|
||||
with Timeout(INSTALLATION_TIMEOUT,
|
||||
exception.KubeAppProgressMonitorTimeout()):
|
||||
while True:
|
||||
try:
|
||||
monitor_flag.get_nowait()
|
||||
LOG.debug("Received monitor stop signal for %s" % app.name)
|
||||
monitor_flag.task_done()
|
||||
break
|
||||
except queue.Empty:
|
||||
last, num = _get_armada_log_stats(pattern, logfile)
|
||||
if last:
|
||||
if app.system_app:
|
||||
# helm-toolkit doesn't count
|
||||
percent = \
|
||||
round(float(num) / (len(app.charts) - 1) * 100)
|
||||
else:
|
||||
percent = round(float(num) / len(app.charts) * 100)
|
||||
progress_str = 'processing chart: ' + last +\
|
||||
', overall completion: ' + str(percent) + '%'
|
||||
if app.progress != progress_str:
|
||||
LOG.info("%s" % progress_str)
|
||||
self._update_app_status(
|
||||
app, new_progress=progress_str)
|
||||
greenthread.sleep(1)
|
||||
except Exception as e:
|
||||
# timeout or subprocess error
|
||||
LOG.exception(e)
|
||||
finally:
|
||||
LOG.info("Exiting progress monitoring thread for app %s" % app.name)
|
||||
|
||||
# Body of the outer method
|
||||
mqueue = queue.Queue()
|
||||
rc = True
|
||||
logfile = app.name + '-' + request + '.log'
|
||||
if request == constants.APP_APPLY_OP:
|
||||
pattern = APPLY_SEARCH_PATTERN
|
||||
else:
|
||||
pattern = DELETE_SEARCH_PATTERN
|
||||
|
||||
monitor = greenthread.spawn_after(1, _check_progress, mqueue, app,
|
||||
pattern, logfile)
|
||||
rc = self._docker.make_armada_request(request, app.armada_mfile,
|
||||
overrides_str, logfile)
|
||||
mqueue.put('done')
|
||||
monitor.kill()
|
||||
return rc
|
||||
|
||||
def perform_app_upload(self, rpc_app, tarfile):
|
||||
"""Process application upload request
|
||||
|
||||
@ -456,6 +559,8 @@ class AppOperator(object):
|
||||
# Full extraction of application tarball at /scratch/apps.
|
||||
# Manifest file is placed under /opt/platform/armada
|
||||
# which is managed by drbd-sync and visible to Armada.
|
||||
self._update_app_status(
|
||||
app, new_progress=constants.APP_PROGRESS_EXTRACT_TARFILE)
|
||||
orig_mode = stat.S_IMODE(os.lstat("/scratch").st_mode)
|
||||
app.tarfile = tarfile
|
||||
self._extract_tarfile(app)
|
||||
@ -463,6 +568,9 @@ class AppOperator(object):
|
||||
|
||||
if not self._docker.make_armada_request('validate', app.armada_mfile):
|
||||
return self._abort_operation(app, constants.APP_UPLOAD_OP)
|
||||
|
||||
self._update_app_status(
|
||||
app, new_progress=constants.APP_PROGRESS_VALIDATE_UPLOAD_CHARTS)
|
||||
if os.path.isdir(app.charts_dir):
|
||||
self._validate_helm_charts(app)
|
||||
# Temporarily allow read and execute access to /scratch so www
|
||||
@ -504,13 +612,15 @@ class AppOperator(object):
|
||||
overrides_str = ''
|
||||
ready = True
|
||||
try:
|
||||
app.charts = self._get_list_of_charts(app.mfile_abs)
|
||||
if app.system_app:
|
||||
charts = self._get_list_of_charts(app.mfile_abs)
|
||||
self._update_app_status(
|
||||
app, new_progress=constants.APP_PROGRESS_GENERATE_OVERRIDES)
|
||||
LOG.info("Generating application overrides...")
|
||||
self._helm.generate_helm_application_overrides(
|
||||
app.name, cnamespace=None, armada_format=True,
|
||||
combined=True)
|
||||
overrides_files = self._get_overrides_files(charts)
|
||||
overrides_files = self._get_overrides_files(app.charts)
|
||||
if overrides_files:
|
||||
LOG.info("Application overrides generated.")
|
||||
# Ensure all chart overrides are readable by Armada
|
||||
@ -518,18 +628,24 @@ class AppOperator(object):
|
||||
os.chmod(file, 0644)
|
||||
overrides_str =\
|
||||
self._generate_armada_overrides_str(overrides_files)
|
||||
self._update_app_status(
|
||||
app, new_progress=constants.APP_PROGRESS_DOWNLOAD_IMAGES)
|
||||
self._download_images(app)
|
||||
else:
|
||||
ready = False
|
||||
else:
|
||||
# No support for custom app overrides at this point, just
|
||||
# download the needed images.
|
||||
self._update_app_status(
|
||||
app, new_progress=constants.APP_PROGRESS_DOWNLOAD_IMAGES)
|
||||
self._download_images(app)
|
||||
|
||||
if ready:
|
||||
if self._docker.make_armada_request('apply',
|
||||
app.armada_mfile,
|
||||
overrides_str):
|
||||
self._update_app_status(
|
||||
app, new_progress=constants.APP_PROGRESS_APPLY_MANIFEST)
|
||||
if self._make_armada_request_with_monitor(app,
|
||||
constants.APP_APPLY_OP,
|
||||
overrides_str):
|
||||
self._update_app_status(app,
|
||||
constants.APP_APPLY_SUCCESS)
|
||||
LOG.info("Application (%s) apply completed." % app.name)
|
||||
@ -552,7 +668,11 @@ class AppOperator(object):
|
||||
app = AppOperator.Application(rpc_app)
|
||||
LOG.info("Application (%s) remove started." % app.name)
|
||||
|
||||
if self._docker.make_armada_request('delete', app.armada_mfile):
|
||||
app.charts = self._get_list_of_charts(app.mfile_abs)
|
||||
self._update_app_status(
|
||||
app, new_progress=constants.APP_PROGRESS_DELETE_MANIFEST)
|
||||
|
||||
if self._make_armada_request_with_monitor(app, constants.APP_DELETE_OP):
|
||||
if app.system_app:
|
||||
try:
|
||||
p1 = subprocess.Popen(['kubectl', 'get', 'pods', '-n',
|
||||
@ -628,6 +748,7 @@ class AppOperator(object):
|
||||
os.path.join(constants.APP_INSTALL_PATH,
|
||||
self._kube_app.get('name'),
|
||||
self._kube_app.get('manifest_file'))
|
||||
self.charts = []
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
@ -641,9 +762,14 @@ class AppOperator(object):
|
||||
def status(self):
|
||||
return self._kube_app.get('status')
|
||||
|
||||
@status.setter
|
||||
def status(self, new_status):
|
||||
@property
|
||||
def progress(self):
|
||||
return self._kube_app.get('progress')
|
||||
|
||||
def update_status(self, new_status, new_progress):
|
||||
self._kube_app.status = new_status
|
||||
if new_progress:
|
||||
self._kube_app.progress = new_progress
|
||||
self._kube_app.save()
|
||||
|
||||
|
||||
@ -696,8 +822,14 @@ class DockerHelper(object):
|
||||
os.unlink(kube_config)
|
||||
return None
|
||||
|
||||
def make_armada_request(self, request, manifest_file, overrides_str=''):
|
||||
def make_armada_request(self, request, manifest_file, overrides_str='',
|
||||
logfile=None):
|
||||
|
||||
if logfile is None:
|
||||
logfile = request + '.log'
|
||||
|
||||
rc = True
|
||||
|
||||
try:
|
||||
client = docker.from_env(timeout=INSTALLATION_TIMEOUT)
|
||||
armada_svc = self._start_armada_service(client)
|
||||
@ -710,10 +842,16 @@ class DockerHelper(object):
|
||||
manifest_file)
|
||||
else:
|
||||
rc = False
|
||||
LOG.error("Validation of the armada manifest %s "
|
||||
"failed: %s" % (manifest_file, exec_logs))
|
||||
elif request == 'apply':
|
||||
cmd = 'armada apply --debug ' + manifest_file + overrides_str
|
||||
if exit_code == CONTAINER_ABNORMAL_EXIT_CODE:
|
||||
LOG.error("Failed to validate application manifest %s. "
|
||||
"Armada service has exited abnormally." %
|
||||
manifest_file)
|
||||
else:
|
||||
LOG.error("Failed to validate application manifest "
|
||||
"%s: %s." % (manifest_file, exec_logs))
|
||||
elif request == constants.APP_APPLY_OP:
|
||||
cmd = "/bin/bash -c 'armada apply --debug " + manifest_file +\
|
||||
overrides_str + " | tee " + logfile + "'"
|
||||
LOG.info("Armada apply command = %s" % cmd)
|
||||
(exit_code, exec_logs) = armada_svc.exec_run(cmd)
|
||||
if exit_code == 0:
|
||||
@ -721,18 +859,29 @@ class DockerHelper(object):
|
||||
"applied/re-applied." % manifest_file)
|
||||
else:
|
||||
rc = False
|
||||
LOG.error("Failed to apply application manifest: %s" %
|
||||
exec_logs)
|
||||
elif request == 'delete':
|
||||
cmd = 'armada delete --debug --manifest ' + manifest_file
|
||||
if exit_code == CONTAINER_ABNORMAL_EXIT_CODE:
|
||||
LOG.error("Failed to apply application manifest %s. "
|
||||
"Armada service has exited abnormally." %
|
||||
manifest_file)
|
||||
else:
|
||||
LOG.error("Failed to apply application manifest %s: "
|
||||
"%s." % (manifest_file, exec_logs))
|
||||
elif request == constants.APP_DELETE_OP:
|
||||
cmd = "/bin/bash -c 'armada delete --debug --manifest " +\
|
||||
manifest_file + " | tee " + logfile + "'"
|
||||
(exit_code, exec_logs) = armada_svc.exec_run(cmd)
|
||||
if exit_code == 0:
|
||||
LOG.info("Application charts were successfully "
|
||||
"deleted.")
|
||||
else:
|
||||
rc = False
|
||||
LOG.error("Delete the application manifest failed: %s" %
|
||||
exec_logs)
|
||||
if exit_code == CONTAINER_ABNORMAL_EXIT_CODE:
|
||||
LOG.error("Failed to delete application manifest %s. "
|
||||
"Armada service has exited abnormally." %
|
||||
manifest_file)
|
||||
else:
|
||||
LOG.error("Failed to delete application manifest %s: "
|
||||
"%s" % (manifest_file, exec_logs))
|
||||
else:
|
||||
rc = False
|
||||
LOG.error("Unsupported armada request: %s." % request)
|
||||
@ -751,8 +900,8 @@ class DockerHelper(object):
|
||||
start = time.time()
|
||||
try:
|
||||
LOG.info("Image %s download started" % img_tag)
|
||||
c = docker.from_env(timeout=INSTALLATION_TIMEOUT)
|
||||
c.images.pull(img_tag)
|
||||
client = docker.from_env(timeout=INSTALLATION_TIMEOUT)
|
||||
client.images.pull(img_tag)
|
||||
except Exception as e:
|
||||
rc = False
|
||||
LOG.error("Image %s download failed: %s" % (img_tag, e))
|
||||
|
@ -36,6 +36,7 @@ def upgrade(migrate_engine):
|
||||
Column('manifest_name', String(255), nullable=False),
|
||||
Column('manifest_file', String(255), nullable=True),
|
||||
Column('status', String(255), nullable=False),
|
||||
Column('progress', String(255), nullable=True),
|
||||
|
||||
mysql_engine=ENGINE,
|
||||
mysql_charset=CHARSET,
|
||||
|
@ -1644,3 +1644,4 @@ class KubeApp(Base):
|
||||
manifest_name = Column(String(255), nullable=False)
|
||||
manifest_file = Column(String(255), nullable=False)
|
||||
status = Column(String(255), nullable=False)
|
||||
progress = Column(String(255), nullable=True)
|
||||
|
@ -21,6 +21,7 @@ class KubeApp(base.SysinvObject):
|
||||
'manifest_name': utils.str_or_none,
|
||||
'manifest_file': utils.str_or_none,
|
||||
'status': utils.str_or_none,
|
||||
'progress': utils.str_or_none,
|
||||
}
|
||||
|
||||
@base.remotable_classmethod
|
||||
|
Loading…
Reference in New Issue
Block a user