Merge "Send application lifecycle notifications for backup and restore."
This commit is contained in:
commit
648151bd3a
|
@ -222,6 +222,28 @@ def create_host_overrides(filename):
|
|||
sys.exit(1)
|
||||
|
||||
|
||||
VALID_NOTIFICATION_VALUES = constants.HOOK_PARAMETERS_MAP.keys()
|
||||
NOTIFICATION_ACTION_SUCCESS_VALUES = {'success': True,
|
||||
'failure': False}
|
||||
|
||||
|
||||
def send_notification(operation, success):
|
||||
if operation not in VALID_NOTIFICATION_VALUES:
|
||||
LOG.error("Invalid notification '{}'.".format(operation))
|
||||
sys.exit(2)
|
||||
ctx = context.get_admin_context()
|
||||
rpcapi = conductor_rpcapi.ConductorAPI(topic=conductor_rpcapi.MANAGER_TOPIC)
|
||||
ok, app = rpcapi.backup_restore_lifecycle_actions(ctx, operation, success)
|
||||
if not ok:
|
||||
if app is not None:
|
||||
LOG.error("Operation '{}' was aborted by '{}' appliction.".format(operation, app))
|
||||
sys.stderr.write(app)
|
||||
sys.exit(1)
|
||||
else:
|
||||
LOG.error("Error while performing operation '{}'.".format(operation))
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
def add_action_parsers(subparsers):
|
||||
|
||||
parser = subparsers.add_parser('create-host-overrides')
|
||||
|
@ -234,6 +256,14 @@ def add_action_parsers(subparsers):
|
|||
parser.add_argument('--all-apps', action='store_true', default=False)
|
||||
parser.add_argument('--apps', nargs='*', required=False, default=None)
|
||||
|
||||
parser = subparsers.add_parser('notify')
|
||||
parser.set_defaults(func=send_notification)
|
||||
parser.add_argument('operation')
|
||||
parser.add_argument('success',
|
||||
choices=NOTIFICATION_ACTION_SUCCESS_VALUES.keys(),
|
||||
default='success',
|
||||
nargs='?')
|
||||
|
||||
|
||||
CONF.register_cli_opt(
|
||||
cfg.SubCommandOpt('action',
|
||||
|
@ -255,5 +285,8 @@ def main():
|
|||
LOG.error("filename is required")
|
||||
else:
|
||||
CONF.action.func(CONF.action.filename, CONF.action.apps, CONF.action.all_apps)
|
||||
elif CONF.action.name == 'notify':
|
||||
success = NOTIFICATION_ACTION_SUCCESS_VALUES[CONF.action.success]
|
||||
CONF.action.func(CONF.action.operation, success)
|
||||
else:
|
||||
CONF.action.func()
|
||||
|
|
|
@ -1575,6 +1575,10 @@ APP_UPDATE_OP = 'update'
|
|||
APP_ROLLBACK_OP = 'rollback'
|
||||
APP_ABORT_OP = 'abort'
|
||||
APP_EVALUATE_REAPPLY_OP = 'evaluate-reapply'
|
||||
# Backup/Restore lifecycle actions:
|
||||
APP_BACKUP = 'backup'
|
||||
APP_ETCD_BACKUP = 'etcd-backup'
|
||||
APP_RESTORE = 'restore'
|
||||
|
||||
# Lifecycle constants
|
||||
APP_LIFECYCLE_TIMING_PRE = 'pre'
|
||||
|
@ -1593,9 +1597,51 @@ APP_LIFECYCLE_TYPE_ARMADA_REQUEST = 'armada-request'
|
|||
APP_LIFECYCLE_MODE_MANUAL = 'manual'
|
||||
APP_LIFECYCLE_MODE_AUTO = 'auto'
|
||||
APP_LIFECYCLE_FORCE_OPERATION = 'force'
|
||||
|
||||
APP_LIFECYCLE_OPERATION_MTC_ACTION = 'mtc-action'
|
||||
|
||||
BACKUP_ACTION_NOTIFY_SUCCESS = 'success'
|
||||
BACKUP_ACTION_NOTIFY_FAILURE = 'failure'
|
||||
|
||||
BACKUP_ACTION_SEMANTIC_CHECK = 'backup-semantic-check'
|
||||
BACKUP_ACTION_PRE_BACKUP = 'pre-backup-action'
|
||||
BACKUP_ACTION_PRE_ETCD_BACKUP = 'pre-etcd-backup-action'
|
||||
BACKUP_ACTION_POST_ETCD_BACKUP = 'post-etcd-backup-action'
|
||||
BACKUP_ACTION_POST_BACKUP = 'post-backup-action'
|
||||
BACKUP_ACTION_PRE_RESTORE = 'pre-restore-action'
|
||||
BACKUP_ACTION_POST_RESTORE = 'post-restore-action'
|
||||
|
||||
# backup/restore parameters from the command line utility:
|
||||
HOOK_PARAMETERS_MAP = {
|
||||
BACKUP_ACTION_SEMANTIC_CHECK: [APP_LIFECYCLE_MODE_AUTO,
|
||||
APP_LIFECYCLE_TYPE_SEMANTIC_CHECK,
|
||||
APP_LIFECYCLE_TIMING_PRE,
|
||||
APP_BACKUP],
|
||||
BACKUP_ACTION_PRE_BACKUP: [APP_LIFECYCLE_MODE_AUTO,
|
||||
APP_LIFECYCLE_TYPE_OPERATION,
|
||||
APP_LIFECYCLE_TIMING_PRE,
|
||||
APP_BACKUP],
|
||||
BACKUP_ACTION_POST_BACKUP: [APP_LIFECYCLE_MODE_AUTO,
|
||||
APP_LIFECYCLE_TYPE_OPERATION,
|
||||
APP_LIFECYCLE_TIMING_POST,
|
||||
APP_BACKUP],
|
||||
BACKUP_ACTION_PRE_ETCD_BACKUP: [APP_LIFECYCLE_MODE_AUTO,
|
||||
APP_LIFECYCLE_TYPE_OPERATION,
|
||||
APP_LIFECYCLE_TIMING_PRE,
|
||||
APP_ETCD_BACKUP],
|
||||
BACKUP_ACTION_POST_ETCD_BACKUP: [APP_LIFECYCLE_MODE_AUTO,
|
||||
APP_LIFECYCLE_TYPE_OPERATION,
|
||||
APP_LIFECYCLE_TIMING_POST,
|
||||
APP_ETCD_BACKUP],
|
||||
BACKUP_ACTION_PRE_RESTORE: [APP_LIFECYCLE_MODE_AUTO,
|
||||
APP_LIFECYCLE_TYPE_OPERATION,
|
||||
APP_LIFECYCLE_TIMING_PRE,
|
||||
APP_RESTORE],
|
||||
BACKUP_ACTION_POST_RESTORE: [APP_LIFECYCLE_MODE_AUTO,
|
||||
APP_LIFECYCLE_TYPE_OPERATION,
|
||||
APP_LIFECYCLE_TIMING_POST,
|
||||
APP_RESTORE],
|
||||
}
|
||||
|
||||
# Application metadata constants
|
||||
APP_METADATA_MAINTAIN_USER_OVERRIDES = 'maintain_user_overrides'
|
||||
APP_METADATA_APPLY_PROGRESS_ADJUST = 'apply_progress_adjust'
|
||||
|
|
|
@ -1554,3 +1554,13 @@ class LifecycleSemanticCheckOpererationBlocked(SysinvException):
|
|||
|
||||
class LifecycleMissingInfo(SysinvException):
|
||||
message = _("Lifecycle hook missing information.")
|
||||
|
||||
|
||||
class BackupRestoreInvalidRevertOperation(SysinvException):
|
||||
message = _("Operation %(operation)s has no revert action associated.")
|
||||
|
||||
|
||||
class ApplicationLifecycleNotificationException(Exception):
|
||||
def __init__(self, application_name, message):
|
||||
self.application_name = application_name
|
||||
super(ApplicationLifecycleNotificationException, self).__init__(message)
|
||||
|
|
|
@ -49,9 +49,11 @@ import uuid
|
|||
import xml.etree.ElementTree as ElementTree
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from copy import deepcopy
|
||||
|
||||
import tsconfig.tsconfig as tsc
|
||||
from collections import namedtuple
|
||||
from collections import OrderedDict
|
||||
from cgcs_patch.patch_verify import verify_files
|
||||
from controllerconfig.upgrades import management as upgrades_management
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
@ -231,6 +233,19 @@ class ConductorManager(service.PeriodicService):
|
|||
constants.APP_METADATA_DESIRED_STATES: {},
|
||||
constants.APP_METADATA_ORDERED_APPS: []}
|
||||
|
||||
self._backup_action_map = dict()
|
||||
for action in [constants.BACKUP_ACTION_SEMANTIC_CHECK,
|
||||
constants.BACKUP_ACTION_PRE_BACKUP,
|
||||
constants.BACKUP_ACTION_POST_BACKUP,
|
||||
constants.BACKUP_ACTION_PRE_ETCD_BACKUP,
|
||||
constants.BACKUP_ACTION_POST_ETCD_BACKUP,
|
||||
constants.BACKUP_ACTION_PRE_RESTORE,
|
||||
constants.BACKUP_ACTION_POST_RESTORE]:
|
||||
impl = getattr(self, '_do_' + action.replace('-', '_'))
|
||||
self._backup_action_map[action] = impl
|
||||
|
||||
self._initialize_backup_actions_log()
|
||||
|
||||
def start(self):
|
||||
self._start()
|
||||
# accept API calls and run periodic tasks after
|
||||
|
@ -12311,6 +12326,192 @@ class ConductorManager(service.PeriodicService):
|
|||
LOG.info("Metadata-evaluation: {}".format(e))
|
||||
raise
|
||||
|
||||
def _log_applications_not_reverted(self, operation):
|
||||
try:
|
||||
operation_log = self._backup_actions_log[operation]
|
||||
if len(operation_log):
|
||||
LOG.error("{} : {} applications:\n{}".
|
||||
format(operation,
|
||||
len(operation_log),
|
||||
'\n'.join(['\t- {}'.format(_) for _ in operation_log.keys()])
|
||||
))
|
||||
except KeyError:
|
||||
LOG.error("Internal error, no such revert operation '{}'".format(operation))
|
||||
|
||||
def _initialize_backup_actions_log(self, report_operation=None):
|
||||
|
||||
if report_operation is not None:
|
||||
LOG.error("Failed to revert backup from {}.\n"
|
||||
"The following applications were left in an undeterminate state:".
|
||||
format(report_operation))
|
||||
|
||||
self._log_applications_not_reverted(constants.BACKUP_ACTION_PRE_ETCD_BACKUP)
|
||||
self._log_applications_not_reverted(constants.BACKUP_ACTION_PRE_BACKUP)
|
||||
|
||||
actions_list = self._backup_action_map.keys()
|
||||
self._backup_actions_log = dict(zip(actions_list, [OrderedDict()] * len(actions_list)))
|
||||
|
||||
def _revert_backup_operation(self, operation):
|
||||
if operation not in self._backup_actions_log:
|
||||
raise exception.BackupRestoreInvalidRevertOperation(operation=operation)
|
||||
current_app = None
|
||||
completed_apps = []
|
||||
operation_log = self._backup_actions_log[operation]
|
||||
for app_name, callback in operation_log.iteritems():
|
||||
current_app = app_name
|
||||
LOG.info("Reverting backup of app {} : {}".format(current_app, operation))
|
||||
try:
|
||||
callback()
|
||||
except Exception as ex:
|
||||
# we must swallow any exceptions and keep reverting all apps:
|
||||
LOG.exception("Unhandled exception {} from app {} while reverting backup.".
|
||||
format(str(ex), current_app))
|
||||
continue
|
||||
completed_apps.append(current_app)
|
||||
# remove all apps that had their callback() finish successfully:
|
||||
for app in completed_apps:
|
||||
del operation_log[app]
|
||||
|
||||
def _make_backup_hook_info(self, operation, success):
|
||||
try:
|
||||
hook_parameters = constants.HOOK_PARAMETERS_MAP[operation]
|
||||
hook_info = LifecycleHookInfo()
|
||||
hook_info.init(*hook_parameters)
|
||||
hook_info.extra[constants.BACKUP_ACTION_NOTIFY_SUCCESS] = success
|
||||
return hook_info
|
||||
except KeyError:
|
||||
LOG.error("Unexpected action '{}' (success={})".format(operation, success))
|
||||
raise
|
||||
except Exception as ex:
|
||||
LOG.exception("Failed to create a backup/restore hook for operation '{}': {}".
|
||||
format(operation, ex))
|
||||
raise
|
||||
|
||||
def _get_kube_apps_list(self, context):
|
||||
try:
|
||||
return [kubeapp_obj.get_by_name(context, k.name) for k in self.dbapi.kube_app_get_all()]
|
||||
except Exception as ex:
|
||||
LOG.exception("Failed to to get list of kube applications: {}".format(ex))
|
||||
raise
|
||||
|
||||
def _do_backup_semantic_check(self, context, success):
|
||||
hook_info = self._make_backup_hook_info(constants.BACKUP_ACTION_SEMANTIC_CHECK, success)
|
||||
try:
|
||||
for app in self._get_kube_apps_list(context):
|
||||
self._app.app_lifecycle_actions(context, self, app, deepcopy(hook_info))
|
||||
except Exception as ex:
|
||||
app_name = app.name if app is not None else None
|
||||
raise exception.ApplicationLifecycleNotificationException(app_name, str(ex))
|
||||
|
||||
def _do_pre_action(self, context, operation, revert_operation, success,
|
||||
continue_on_exception=False):
|
||||
hook_info = self._make_backup_hook_info(operation, success)
|
||||
revert_hook_info = self._make_backup_hook_info(revert_operation,
|
||||
constants.BACKUP_ACTION_NOTIFY_FAILURE)
|
||||
|
||||
operation_log = self._backup_actions_log[operation]
|
||||
try:
|
||||
for app in self._get_kube_apps_list(context):
|
||||
# log the 'revert' operation for this app so we can call it in case something fails:
|
||||
operation_log[app.name] = lambda app=app: \
|
||||
self._app.app_lifecycle_actions(context, self, app,
|
||||
deepcopy(revert_hook_info))
|
||||
|
||||
try:
|
||||
self._app.app_lifecycle_actions(context, self, app, deepcopy(hook_info))
|
||||
except Exception as ex:
|
||||
if continue_on_exception:
|
||||
LOG.exception("Application {} raised '{}', ignoring.".
|
||||
format(app.name, str(ex)))
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
except Exception as ex:
|
||||
# we always revert in the correct order for the backup state machine:
|
||||
self._revert_backup_operation(constants.BACKUP_ACTION_PRE_ETCD_BACKUP)
|
||||
self._revert_backup_operation(constants.BACKUP_ACTION_PRE_BACKUP)
|
||||
# report error and clean all pending reverts
|
||||
self._initialize_backup_actions_log(operation)
|
||||
app_name = app.name if app is not None else None
|
||||
raise exception.ApplicationLifecycleNotificationException(app_name, str(ex))
|
||||
|
||||
def _do_post_action(self, context, operation, success,
|
||||
remove_revert_operations=None): # noqa 0102
|
||||
hook_info = self._make_backup_hook_info(operation, success)
|
||||
|
||||
try:
|
||||
for app in self._get_kube_apps_list(context):
|
||||
self._app.app_lifecycle_actions(context, self, app, deepcopy(hook_info))
|
||||
# if we notified all apps successfully of this POST action, then we need to
|
||||
# remove any 'revert' actions from its associated PRE action:
|
||||
for op in remove_revert_operations if remove_revert_operations is not None else []:
|
||||
self._backup_actions_log[op] = OrderedDict()
|
||||
except Exception as ex:
|
||||
app_name = app.name if app is not None else None
|
||||
raise exception.ApplicationLifecycleNotificationException(app_name, str(ex))
|
||||
|
||||
def _do_pre_backup_action(self, context, success):
|
||||
operation = constants.BACKUP_ACTION_PRE_BACKUP
|
||||
revert_operation = constants.BACKUP_ACTION_POST_BACKUP
|
||||
self._do_pre_action(context, operation, revert_operation, success)
|
||||
|
||||
def _do_post_backup_action(self, context, success):
|
||||
operation = constants.BACKUP_ACTION_POST_BACKUP
|
||||
self._do_post_action(context=context,
|
||||
operation=operation,
|
||||
success=success,
|
||||
remove_revert_operations=[constants.BACKUP_ACTION_PRE_BACKUP])
|
||||
|
||||
def _do_pre_etcd_backup_action(self, context, success):
|
||||
operation = constants.BACKUP_ACTION_PRE_ETCD_BACKUP
|
||||
revert_operation = constants.BACKUP_ACTION_POST_ETCD_BACKUP
|
||||
self._do_pre_action(context, operation, revert_operation, success)
|
||||
|
||||
def _do_post_etcd_backup_action(self, context, success):
|
||||
operation = constants.BACKUP_ACTION_POST_ETCD_BACKUP
|
||||
self._do_post_action(context=context,
|
||||
operation=operation,
|
||||
success=success,
|
||||
remove_revert_operations=[constants.BACKUP_ACTION_PRE_ETCD_BACKUP])
|
||||
|
||||
def _do_pre_restore_action(self, context, success):
|
||||
operation = constants.BACKUP_ACTION_PRE_RESTORE
|
||||
raise NotImplementedError("{} action not implemented.".format(operation))
|
||||
|
||||
def _do_post_restore_action(self, context, success):
|
||||
operation = constants.BACKUP_ACTION_POST_RESTORE
|
||||
hook_info = self._make_backup_hook_info(operation, success)
|
||||
|
||||
for app in self._get_kube_apps_list(context):
|
||||
try:
|
||||
self._app.app_lifecycle_actions(context, self, app, deepcopy(hook_info))
|
||||
except Exception as ex:
|
||||
LOG.exception("Application {} raised '{}' during {}, ignoring.".
|
||||
format(app.name, str(ex), operation))
|
||||
app.status = constants.APP_APPLY_FAILURE
|
||||
app.save()
|
||||
continue
|
||||
|
||||
def backup_restore_lifecycle_actions(self, context, operation, success):
|
||||
"""Perform any lifecycle actions for backup and restore operations.
|
||||
:param context: request context
|
||||
:param operation: operation we are notified about
|
||||
:param success: true if the operation was successful, false if it fails.
|
||||
used in post-*-action to indicate that an operation in progress failed.
|
||||
"""
|
||||
|
||||
# TODO (agrosu): if this blocks for too long, it might trigger a RPC timeout.
|
||||
# maybe parallelize the calls to pre/post hooks.
|
||||
try:
|
||||
self._backup_action_map[operation](context, success)
|
||||
return (True, None)
|
||||
except exception.ApplicationLifecycleNotificationException as ex:
|
||||
LOG.exception(ex)
|
||||
return (False, ex.application_name)
|
||||
except Exception as ex:
|
||||
LOG.exception(ex)
|
||||
return (False, None)
|
||||
|
||||
def perform_app_upload(self, context, rpc_app, tarfile, lifecycle_hook_info_app_upload):
|
||||
"""Handling of application upload request (via AppOperator)
|
||||
|
||||
|
@ -13213,6 +13414,7 @@ class ConductorManager(service.PeriodicService):
|
|||
else:
|
||||
return constants.RESTORE_PROGRESS_ALREADY_IN_PROGRESS
|
||||
|
||||
# TODO (agrosu): no use case at this point for sending a BACKUP_ACTION_PRE_RESTORE notification.
|
||||
return constants.RESTORE_PROGRESS_STARTED
|
||||
|
||||
def complete_restore(self, context):
|
||||
|
@ -13249,8 +13451,17 @@ class ConductorManager(service.PeriodicService):
|
|||
except exception.NotFound:
|
||||
return constants.RESTORE_PROGRESS_ALREADY_COMPLETED
|
||||
else:
|
||||
ok, app = self.backup_restore_lifecycle_actions(context,
|
||||
constants.BACKUP_ACTION_POST_RESTORE,
|
||||
constants.BACKUP_ACTION_NOTIFY_SUCCESS)
|
||||
state = constants.RESTORE_STATE_COMPLETED
|
||||
if not ok:
|
||||
if app is None:
|
||||
app = 'unknown'
|
||||
LOG.error("Restore action failed because of application '{}'".format(app))
|
||||
|
||||
self.dbapi.restore_update(restore.uuid,
|
||||
values={'state': constants.RESTORE_STATE_COMPLETED})
|
||||
values={'state': state})
|
||||
|
||||
LOG.info("Complete the restore procedure.")
|
||||
|
||||
|
|
|
@ -1881,6 +1881,17 @@ class ConductorAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
|
|||
rpc_app=rpc_app,
|
||||
hook_info=hook_info))
|
||||
|
||||
def backup_restore_lifecycle_actions(self, context, operation, success):
|
||||
"""Synchronously, perform any lifecycle actions required
|
||||
for backup and restore operations
|
||||
:param context: request context.
|
||||
:param operation: what operation to notify about.
|
||||
:param success: True if the operation was successful, False if it fails.
|
||||
used in post-*-action to indicate that an operation in progress failed.
|
||||
"""
|
||||
return self.call(context, self.make_msg('backup_restore_lifecycle_actions',
|
||||
operation=operation, success=success))
|
||||
|
||||
def perform_app_upload(self, context, rpc_app, tarfile, lifecycle_hook_info):
|
||||
"""Handle application upload request
|
||||
|
||||
|
|
|
@ -7,6 +7,8 @@
|
|||
#
|
||||
|
||||
import six
|
||||
from copy import copy
|
||||
from copy import deepcopy
|
||||
|
||||
from sysinv.objects import base
|
||||
from sysinv.objects import utils
|
||||
|
@ -67,3 +69,13 @@ class LifecycleHookInfo(base.SysinvObject):
|
|||
{k: v for k, v in six.iteritems(self)
|
||||
if (k.startswith('_') and k[1:] in self.fields.keys())
|
||||
or k in self.fields.keys()})
|
||||
|
||||
def __copy__(self):
|
||||
instance = type(self).__new__(self.__class__)
|
||||
instance.__dict__.update(self.__dict__)
|
||||
return instance
|
||||
|
||||
def __deepcopy__(self, m):
|
||||
hook = copy(self)
|
||||
hook.extra = deepcopy(self.extra, m)
|
||||
return hook
|
||||
|
|
Loading…
Reference in New Issue