sahara/sahara/service/ops.py
Zhuang Changkun 1d3b02ee6e Replace the deleted keypair in clusters for API v2
This commit allows users to create a new keypair to access to
the running cluster when the cluster's keypair is deleted. But
the name of new keypair should be same as the deleted one, and
the new keypair is available for cluster scaling.
we can implement it via update_keypair parameter.

Change-Id: I3bc0ba52ceebff8ae128321aeb9fdbb5769baaf5
Closes-Bug:#1745142
2018-06-13 09:55:41 +08:00

479 lines
17 KiB
Python

# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_utils import uuidutils
import six
from sahara import conductor as c
from sahara import context
from sahara import exceptions
from sahara.i18n import _
from sahara.plugins import base as plugin_base
from sahara.plugins import utils as u
from sahara.service.edp import job_manager
from sahara.service.edp.utils import shares
from sahara.service.health import verification_base as ver_base
from sahara.service import ntp_service
from sahara.service import trusts
from sahara.utils import cluster as c_u
from sahara.utils.openstack import nova
from sahara.utils import remote
from sahara.utils import rpc as rpc_utils
conductor = c.API
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
INFRA = None
def setup_ops(engine):
global INFRA
INFRA = engine
class LocalOps(object):
def provision_cluster(self, cluster_id):
context.spawn("cluster-creating-%s" % cluster_id,
_provision_cluster, cluster_id)
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
node_group_instance_map=None):
context.spawn("cluster-scaling-%s" % cluster_id,
_provision_scaled_cluster, cluster_id, node_group_id_map,
node_group_instance_map)
def terminate_cluster(self, cluster_id, force=False):
context.spawn("cluster-terminating-%s" % cluster_id,
terminate_cluster, cluster_id, force)
def run_edp_job(self, job_execution_id):
context.spawn("Starting Job Execution %s" % job_execution_id,
_run_edp_job, job_execution_id)
def cancel_job_execution(self, job_execution_id):
context.spawn("Canceling Job Execution %s" % job_execution_id,
_cancel_job_execution, job_execution_id)
def delete_job_execution(self, job_execution_id):
context.spawn("Deleting Job Execution %s" % job_execution_id,
_delete_job_execution, job_execution_id)
def handle_verification(self, cluster_id, values):
context.spawn('Handling Verification for cluster %s' % cluster_id,
_handle_verification, cluster_id, values)
def get_engine_type_and_version(self):
return INFRA.get_type_and_version()
def job_execution_suspend(self, job_execution_id):
context.spawn("Suspend Job Execution %s" % job_execution_id,
_suspend_job_execution, job_execution_id)
class RemoteOps(rpc_utils.RPCClient):
def __init__(self):
target = messaging.Target(topic='sahara-ops', version='1.0')
super(RemoteOps, self).__init__(target)
def provision_cluster(self, cluster_id):
self.cast('provision_cluster', cluster_id=cluster_id)
def update_keypair(self, cluster_id):
self.cast('update_keypair', cluster_id=cluster_id)
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
node_group_instance_map=None):
self.cast('provision_scaled_cluster', cluster_id=cluster_id,
node_group_id_map=node_group_id_map,
node_group_instance_map=node_group_instance_map)
def terminate_cluster(self, cluster_id, force=False):
self.cast('terminate_cluster', cluster_id=cluster_id, force=force)
def run_edp_job(self, job_execution_id):
self.cast('run_edp_job', job_execution_id=job_execution_id)
def cancel_job_execution(self, job_execution_id):
self.cast('cancel_job_execution',
job_execution_id=job_execution_id)
def delete_job_execution(self, job_execution_id):
self.cast('delete_job_execution',
job_execution_id=job_execution_id)
def handle_verification(self, cluster_id, values):
self.cast('handle_verification', cluster_id=cluster_id, values=values)
def get_engine_type_and_version(self):
return self.call('get_engine_type_and_version')
def job_execution_suspend(self, job_execution_id):
self.cast('job_execution_suspend', job_execution_id=job_execution_id)
def request_context(func):
@functools.wraps(func)
def wrapped(self, ctx, *args, **kwargs):
context.set_ctx(context.Context(**ctx))
return func(self, *args, **kwargs)
return wrapped
class OpsServer(rpc_utils.RPCServer):
def __init__(self):
target = messaging.Target(topic='sahara-ops',
server=uuidutils.generate_uuid(),
version='1.0')
super(OpsServer, self).__init__(target)
@request_context
def provision_cluster(self, cluster_id):
_provision_cluster(cluster_id)
@request_context
def update_keypair(self, cluster_id):
_update_keypair(cluster_id)
@request_context
def provision_scaled_cluster(self, cluster_id, node_group_id_map,
node_group_instance_map=None):
_provision_scaled_cluster(cluster_id, node_group_id_map,
node_group_instance_map)
@request_context
def terminate_cluster(self, cluster_id, force=False):
terminate_cluster(cluster_id, force)
@request_context
def run_edp_job(self, job_execution_id):
_run_edp_job(job_execution_id)
@request_context
def cancel_job_execution(self, job_execution_id):
_cancel_job_execution(job_execution_id)
@request_context
def delete_job_execution(self, job_execution_id):
_delete_job_execution(job_execution_id)
@request_context
def handle_verification(self, cluster_id, values):
_handle_verification(cluster_id, values)
@request_context
def get_engine_type_and_version(self):
return INFRA.get_type_and_version()
@request_context
def job_execution_suspend(self, job_execution_id):
_suspend_job_execution(job_execution_id)
def _setup_trust_for_cluster(cluster):
cluster = conductor.cluster_get(context.ctx(), cluster)
trusts.create_trust_for_cluster(cluster)
trusts.use_os_admin_auth_token(cluster)
def ops_error_handler(description):
def decorator(f):
@functools.wraps(f)
def wrapper(cluster_id, *args, **kwds):
ctx = context.ctx()
try:
# Clearing status description before executing
c_u.change_cluster_status_description(cluster_id, "")
f(cluster_id, *args, **kwds)
except Exception as ex:
# something happened during cluster operation
cluster = conductor.cluster_get(ctx, cluster_id)
# check if cluster still exists (it might have been removed)
if (cluster is None or
cluster.status == c_u.CLUSTER_STATUS_DELETING):
LOG.debug("Cluster was deleted or marked for deletion. "
"Canceling current operation.")
return
msg = six.text_type(ex)
LOG.exception("Error during operating on cluster (reason: "
"{reason})".format(reason=msg))
try:
# trying to rollback
desc = description.format(reason=msg)
if _rollback_cluster(cluster, ex):
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ACTIVE, desc)
else:
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ERROR, desc)
except Exception as rex:
cluster = conductor.cluster_get(ctx, cluster_id)
# check if cluster still exists (it might have been
# removed during rollback)
if (cluster is None or
cluster.status == c_u.CLUSTER_STATUS_DELETING):
LOG.debug("Cluster was deleted or marked for deletion."
" Canceling current operation.")
return
LOG.exception(
"Error during rollback of cluster (reason:"
" {reason})".format(reason=six.text_type(rex)))
desc = "{0}, {1}".format(msg, six.text_type(rex))
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ERROR,
description.format(reason=desc))
return wrapper
return decorator
def _rollback_cluster(cluster, reason):
_setup_trust_for_cluster(cluster)
context.set_step_type(_("Engine: rollback cluster"))
return INFRA.rollback_cluster(cluster, reason)
def _prepare_provisioning(cluster_id):
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
for nodegroup in cluster.node_groups:
update_dict = {}
update_dict["image_username"] = INFRA.get_node_group_image_username(
nodegroup)
conductor.node_group_update(ctx, nodegroup, update_dict)
_setup_trust_for_cluster(cluster)
cluster = conductor.cluster_get(ctx, cluster_id)
return ctx, cluster, plugin
def _update_sahara_info(ctx, cluster):
sahara_info = {
'infrastructure_engine': INFRA.get_type_and_version(),
'remote': remote.get_remote_type_and_version()}
return conductor.cluster_update(
ctx, cluster, {'sahara_info': sahara_info})
@ops_error_handler(
_("Creating cluster failed for the following reason(s): {reason}"))
def _provision_cluster(cluster_id):
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
cluster = _update_sahara_info(ctx, cluster)
# updating cluster infra
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_INFRAUPDATING)
plugin.update_infra(cluster)
# creating instances and configuring them
cluster = conductor.cluster_get(ctx, cluster_id)
context.set_step_type(_("Engine: create cluster"))
INFRA.create_cluster(cluster)
# configure cluster
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
context.set_step_type(_("Plugin: configure cluster"))
if hasattr(plugin, 'validate_images'):
plugin.validate_images(cluster, test_only=False)
shares.mount_shares(cluster)
plugin.configure_cluster(cluster)
# starting prepared and configured cluster
ntp_service.configure_ntp(cluster_id)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_STARTING)
context.set_step_type(_("Plugin: start cluster"))
plugin.start_cluster(cluster)
# cluster is now up and ready
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ACTIVE)
# schedule execution pending job for cluster
for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id):
job_manager.run_job(je.id)
_refresh_health_for_cluster(cluster_id)
@ops_error_handler(
_("Scaling cluster failed for the following reason(s): {reason}"))
def _provision_scaled_cluster(cluster_id, node_group_id_map,
node_group_instance_map=None):
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
# Decommissioning surplus nodes with the plugin
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING)
try:
instances_to_delete = []
for node_group in cluster.node_groups:
new_count = node_group_id_map[node_group.id]
if new_count < node_group.count:
if (node_group_instance_map and
node_group.id in node_group_instance_map):
for instance_ref in node_group_instance_map[
node_group.id]:
instance = _get_instance_obj(node_group.instances,
instance_ref)
instances_to_delete.append(instance)
while node_group.count - new_count > len(instances_to_delete):
instances_to_delete.append(_get_random_instance_from_ng(
node_group.instances, instances_to_delete))
if instances_to_delete:
context.set_step_type(_("Plugin: decommission cluster"))
plugin.decommission_nodes(cluster, instances_to_delete)
# Scaling infrastructure
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_SCALING)
context.set_step_type(_("Engine: scale cluster"))
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map,
instances_to_delete)
# Setting up new nodes with the plugin
if instance_ids:
ntp_service.configure_ntp(cluster_id, instance_ids)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
instances = c_u.get_instances(cluster, instance_ids)
context.set_step_type(_("Plugin: scale cluster"))
plugin.scale_cluster(cluster, instances)
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
_refresh_health_for_cluster(cluster_id)
except Exception as e:
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE,
six.text_type(e))
def _get_instance_obj(instances, instance_ref):
for instance in instances:
if (instance.instance_id == instance_ref or
instance.instance_name == instance_ref):
return instance
raise exceptions.NotFoundException(str(instance_ref),
_("Instance %s not found"))
def _get_random_instance_from_ng(instances, instances_to_delete):
# instances list doesn't order by creating date, so we should
# sort it to make sure deleted instances same as heat deleted.
insts = sorted(instances,
key=lambda x: int(x['instance_name'].split('-')[-1]))
for instance in reversed(insts):
if instance not in instances_to_delete:
return instance
@ops_error_handler(
_("Terminating cluster failed for the following reason(s): {reason}"))
def terminate_cluster(cluster_id, force=False):
ctx = context.ctx()
_setup_trust_for_cluster(cluster_id)
job_manager.update_job_statuses(cluster_id=cluster_id)
cluster = conductor.cluster_get(ctx, cluster_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
context.set_step_type(_("Plugin: shutdown cluster"))
plugin.on_terminate_cluster(cluster)
context.set_step_type(_("Engine: shutdown cluster"))
INFRA.shutdown_cluster(cluster, force)
trusts.delete_trust_from_cluster(cluster)
conductor.cluster_destroy(ctx, cluster)
def _run_edp_job(job_execution_id):
job_manager.run_job(job_execution_id)
def _suspend_job_execution(job_execution_id):
job_manager.suspend_job(job_execution_id)
def _cancel_job_execution(job_execution_id):
job_manager.cancel_job(job_execution_id)
def _delete_job_execution(job_execution_id):
try:
job_execution = job_manager.cancel_job(job_execution_id)
if not job_execution:
# job_execution was deleted already, nothing to do
return
except exceptions.CancelingFailed:
LOG.error("Job execution can't be cancelled in time. "
"Deleting it anyway.")
conductor.job_execution_destroy(context.ctx(), job_execution_id)
def _refresh_health_for_cluster(cluster_id):
st_dict = {'verification': {'status': 'START'}}
try:
ver_base.validate_verification_start(cluster_id)
ver_base.handle_verification(cluster_id, st_dict)
except ver_base.CannotVerifyError:
LOG.debug("Cannot verify cluster because verifications are disabled "
"or cluster already is verifying")
except Exception:
# if occasional error occurred, there is no reason to move
# cluster into error state
LOG.debug("Skipping refreshing cluster health")
ver_base.clean_verification_data(cluster_id)
def _handle_verification(cluster_id, values):
ver_base.handle_verification(cluster_id, values)
def _update_keypair(cluster_id):
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster_id)
keypair_name = cluster.user_keypair_id
key = nova.get_keypair(keypair_name)
nodes = u.get_instances(cluster)
for node in nodes:
with node.remote() as r:
r.execute_command(
"echo {keypair} >> ~/.ssh/authorized_keys".
format(keypair=key.public_key))