# Copyright (c) 2014 Mirantis Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import functools from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_utils import uuidutils import six from sahara import conductor as c from sahara import context from sahara import exceptions from sahara.i18n import _ from sahara.plugins import base as plugin_base from sahara.plugins import utils as u from sahara.service.edp import job_manager from sahara.service.edp.utils import shares from sahara.service.health import verification_base as ver_base from sahara.service import ntp_service from sahara.service import trusts from sahara.utils import cluster as c_u from sahara.utils.openstack import nova from sahara.utils import remote from sahara.utils import rpc as rpc_utils conductor = c.API CONF = cfg.CONF LOG = logging.getLogger(__name__) INFRA = None def setup_ops(engine): global INFRA INFRA = engine class LocalOps(object): def provision_cluster(self, cluster_id): context.spawn("cluster-creating-%s" % cluster_id, _provision_cluster, cluster_id) def provision_scaled_cluster(self, cluster_id, node_group_id_map, node_group_instance_map=None): context.spawn("cluster-scaling-%s" % cluster_id, _provision_scaled_cluster, cluster_id, node_group_id_map, node_group_instance_map) def terminate_cluster(self, cluster_id, force=False): context.spawn("cluster-terminating-%s" % cluster_id, terminate_cluster, cluster_id, force) def run_edp_job(self, job_execution_id): context.spawn("Starting Job Execution %s" % job_execution_id, _run_edp_job, job_execution_id) def cancel_job_execution(self, job_execution_id): context.spawn("Canceling Job Execution %s" % job_execution_id, _cancel_job_execution, job_execution_id) def delete_job_execution(self, job_execution_id): context.spawn("Deleting Job Execution %s" % job_execution_id, _delete_job_execution, job_execution_id) def handle_verification(self, cluster_id, values): context.spawn('Handling Verification for cluster %s' % cluster_id, _handle_verification, cluster_id, values) def get_engine_type_and_version(self): return INFRA.get_type_and_version() def job_execution_suspend(self, job_execution_id): context.spawn("Suspend Job Execution %s" % job_execution_id, _suspend_job_execution, job_execution_id) class RemoteOps(rpc_utils.RPCClient): def __init__(self): target = messaging.Target(topic='sahara-ops', version='1.0') super(RemoteOps, self).__init__(target) def provision_cluster(self, cluster_id): self.cast('provision_cluster', cluster_id=cluster_id) def update_keypair(self, cluster_id): self.cast('update_keypair', cluster_id=cluster_id) def provision_scaled_cluster(self, cluster_id, node_group_id_map, node_group_instance_map=None): self.cast('provision_scaled_cluster', cluster_id=cluster_id, node_group_id_map=node_group_id_map, node_group_instance_map=node_group_instance_map) def terminate_cluster(self, cluster_id, force=False): self.cast('terminate_cluster', cluster_id=cluster_id, force=force) def run_edp_job(self, job_execution_id): self.cast('run_edp_job', job_execution_id=job_execution_id) def cancel_job_execution(self, job_execution_id): self.cast('cancel_job_execution', job_execution_id=job_execution_id) def delete_job_execution(self, job_execution_id): self.cast('delete_job_execution', job_execution_id=job_execution_id) def handle_verification(self, cluster_id, values): self.cast('handle_verification', cluster_id=cluster_id, values=values) def get_engine_type_and_version(self): return self.call('get_engine_type_and_version') def job_execution_suspend(self, job_execution_id): self.cast('job_execution_suspend', job_execution_id=job_execution_id) def request_context(func): @functools.wraps(func) def wrapped(self, ctx, *args, **kwargs): context.set_ctx(context.Context(**ctx)) return func(self, *args, **kwargs) return wrapped class OpsServer(rpc_utils.RPCServer): def __init__(self): target = messaging.Target(topic='sahara-ops', server=uuidutils.generate_uuid(), version='1.0') super(OpsServer, self).__init__(target) @request_context def provision_cluster(self, cluster_id): _provision_cluster(cluster_id) @request_context def update_keypair(self, cluster_id): _update_keypair(cluster_id) @request_context def provision_scaled_cluster(self, cluster_id, node_group_id_map, node_group_instance_map=None): _provision_scaled_cluster(cluster_id, node_group_id_map, node_group_instance_map) @request_context def terminate_cluster(self, cluster_id, force=False): terminate_cluster(cluster_id, force) @request_context def run_edp_job(self, job_execution_id): _run_edp_job(job_execution_id) @request_context def cancel_job_execution(self, job_execution_id): _cancel_job_execution(job_execution_id) @request_context def delete_job_execution(self, job_execution_id): _delete_job_execution(job_execution_id) @request_context def handle_verification(self, cluster_id, values): _handle_verification(cluster_id, values) @request_context def get_engine_type_and_version(self): return INFRA.get_type_and_version() @request_context def job_execution_suspend(self, job_execution_id): _suspend_job_execution(job_execution_id) def _setup_trust_for_cluster(cluster): cluster = conductor.cluster_get(context.ctx(), cluster) trusts.create_trust_for_cluster(cluster) trusts.use_os_admin_auth_token(cluster) def ops_error_handler(description): def decorator(f): @functools.wraps(f) def wrapper(cluster_id, *args, **kwds): ctx = context.ctx() try: # Clearing status description before executing c_u.change_cluster_status_description(cluster_id, "") f(cluster_id, *args, **kwds) except Exception as ex: # something happened during cluster operation cluster = conductor.cluster_get(ctx, cluster_id) # check if cluster still exists (it might have been removed) if (cluster is None or cluster.status == c_u.CLUSTER_STATUS_DELETING): LOG.debug("Cluster was deleted or marked for deletion. " "Canceling current operation.") return msg = six.text_type(ex) LOG.exception("Error during operating on cluster (reason: " "{reason})".format(reason=msg)) try: # trying to rollback desc = description.format(reason=msg) if _rollback_cluster(cluster, ex): c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_ACTIVE, desc) else: c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_ERROR, desc) except Exception as rex: cluster = conductor.cluster_get(ctx, cluster_id) # check if cluster still exists (it might have been # removed during rollback) if (cluster is None or cluster.status == c_u.CLUSTER_STATUS_DELETING): LOG.debug("Cluster was deleted or marked for deletion." " Canceling current operation.") return LOG.exception( "Error during rollback of cluster (reason:" " {reason})".format(reason=six.text_type(rex))) desc = "{0}, {1}".format(msg, six.text_type(rex)) c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_ERROR, description.format(reason=desc)) return wrapper return decorator def _rollback_cluster(cluster, reason): _setup_trust_for_cluster(cluster) context.set_step_type(_("Engine: rollback cluster")) return INFRA.rollback_cluster(cluster, reason) def _prepare_provisioning(cluster_id): ctx = context.ctx() cluster = conductor.cluster_get(ctx, cluster_id) plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) for nodegroup in cluster.node_groups: update_dict = {} update_dict["image_username"] = INFRA.get_node_group_image_username( nodegroup) conductor.node_group_update(ctx, nodegroup, update_dict) _setup_trust_for_cluster(cluster) cluster = conductor.cluster_get(ctx, cluster_id) return ctx, cluster, plugin def _update_sahara_info(ctx, cluster): sahara_info = { 'infrastructure_engine': INFRA.get_type_and_version(), 'remote': remote.get_remote_type_and_version()} return conductor.cluster_update( ctx, cluster, {'sahara_info': sahara_info}) @ops_error_handler( _("Creating cluster failed for the following reason(s): {reason}")) def _provision_cluster(cluster_id): ctx, cluster, plugin = _prepare_provisioning(cluster_id) cluster = _update_sahara_info(ctx, cluster) # updating cluster infra cluster = c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_INFRAUPDATING) plugin.update_infra(cluster) # creating instances and configuring them cluster = conductor.cluster_get(ctx, cluster_id) context.set_step_type(_("Engine: create cluster")) INFRA.create_cluster(cluster) # configure cluster cluster = c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_CONFIGURING) context.set_step_type(_("Plugin: configure cluster")) if hasattr(plugin, 'validate_images'): plugin.validate_images(cluster, test_only=False) shares.mount_shares(cluster) plugin.configure_cluster(cluster) # starting prepared and configured cluster ntp_service.configure_ntp(cluster_id) cluster = c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_STARTING) context.set_step_type(_("Plugin: start cluster")) plugin.start_cluster(cluster) # cluster is now up and ready cluster = c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_ACTIVE) # schedule execution pending job for cluster for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id): job_manager.run_job(je.id) _refresh_health_for_cluster(cluster_id) @ops_error_handler( _("Scaling cluster failed for the following reason(s): {reason}")) def _provision_scaled_cluster(cluster_id, node_group_id_map, node_group_instance_map=None): ctx, cluster, plugin = _prepare_provisioning(cluster_id) # Decommissioning surplus nodes with the plugin cluster = c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING) try: instances_to_delete = [] for node_group in cluster.node_groups: new_count = node_group_id_map[node_group.id] if new_count < node_group.count: if (node_group_instance_map and node_group.id in node_group_instance_map): for instance_ref in node_group_instance_map[ node_group.id]: instance = _get_instance_obj(node_group.instances, instance_ref) instances_to_delete.append(instance) while node_group.count - new_count > len(instances_to_delete): instances_to_delete.append(_get_random_instance_from_ng( node_group.instances, instances_to_delete)) if instances_to_delete: context.set_step_type(_("Plugin: decommission cluster")) plugin.decommission_nodes(cluster, instances_to_delete) # Scaling infrastructure cluster = c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_SCALING) context.set_step_type(_("Engine: scale cluster")) instance_ids = INFRA.scale_cluster(cluster, node_group_id_map, instances_to_delete) # Setting up new nodes with the plugin if instance_ids: ntp_service.configure_ntp(cluster_id, instance_ids) cluster = c_u.change_cluster_status( cluster, c_u.CLUSTER_STATUS_CONFIGURING) instances = c_u.get_instances(cluster, instance_ids) context.set_step_type(_("Plugin: scale cluster")) plugin.scale_cluster(cluster, instances) c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE) _refresh_health_for_cluster(cluster_id) except Exception as e: c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE, six.text_type(e)) def _get_instance_obj(instances, instance_ref): for instance in instances: if (instance.instance_id == instance_ref or instance.instance_name == instance_ref): return instance raise exceptions.NotFoundException(str(instance_ref), _("Instance %s not found")) def _get_random_instance_from_ng(instances, instances_to_delete): # instances list doesn't order by creating date, so we should # sort it to make sure deleted instances same as heat deleted. insts = sorted(instances, key=lambda x: int(x['instance_name'].split('-')[-1])) for instance in reversed(insts): if instance not in instances_to_delete: return instance @ops_error_handler( _("Terminating cluster failed for the following reason(s): {reason}")) def terminate_cluster(cluster_id, force=False): ctx = context.ctx() _setup_trust_for_cluster(cluster_id) job_manager.update_job_statuses(cluster_id=cluster_id) cluster = conductor.cluster_get(ctx, cluster_id) plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) context.set_step_type(_("Plugin: shutdown cluster")) plugin.on_terminate_cluster(cluster) context.set_step_type(_("Engine: shutdown cluster")) INFRA.shutdown_cluster(cluster, force) trusts.delete_trust_from_cluster(cluster) conductor.cluster_destroy(ctx, cluster) def _run_edp_job(job_execution_id): job_manager.run_job(job_execution_id) def _suspend_job_execution(job_execution_id): job_manager.suspend_job(job_execution_id) def _cancel_job_execution(job_execution_id): job_manager.cancel_job(job_execution_id) def _delete_job_execution(job_execution_id): try: job_execution = job_manager.cancel_job(job_execution_id) if not job_execution: # job_execution was deleted already, nothing to do return except exceptions.CancelingFailed: LOG.error("Job execution can't be cancelled in time. " "Deleting it anyway.") conductor.job_execution_destroy(context.ctx(), job_execution_id) def _refresh_health_for_cluster(cluster_id): st_dict = {'verification': {'status': 'START'}} try: ver_base.validate_verification_start(cluster_id) ver_base.handle_verification(cluster_id, st_dict) except ver_base.CannotVerifyError: LOG.debug("Cannot verify cluster because verifications are disabled " "or cluster already is verifying") except Exception: # if occasional error occurred, there is no reason to move # cluster into error state LOG.debug("Skipping refreshing cluster health") ver_base.clean_verification_data(cluster_id) def _handle_verification(cluster_id, values): ver_base.handle_verification(cluster_id, values) def _update_keypair(cluster_id): ctx = context.ctx() cluster = conductor.cluster_get(ctx, cluster_id) keypair_name = cluster.user_keypair_id key = nova.get_keypair(keypair_name) nodes = u.get_instances(cluster) for node in nodes: with node.remote() as r: r.execute_command( "echo {keypair} >> ~/.ssh/authorized_keys". format(keypair=key.public_key))