e1f5bcf08c
We should define a set of CLUSTER_STATUS in stead of using direct string in code. 1. Add cluster.py in utils/ 2. Add cluster status. 3. move cluster operation related methods from general.py to cluster.py Change-Id: Id95d982a911ab5d0f789265e03bff2256cf75856
373 lines
13 KiB
Python
373 lines
13 KiB
Python
# Copyright (c) 2014 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import functools
|
|
import uuid
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
import oslo_messaging as messaging
|
|
import six
|
|
|
|
from sahara import conductor as c
|
|
from sahara import context
|
|
from sahara import exceptions
|
|
from sahara.i18n import _
|
|
from sahara.i18n import _LE
|
|
from sahara.plugins import base as plugin_base
|
|
from sahara.service.edp import job_manager
|
|
from sahara.service import ntp_service
|
|
from sahara.service import shares
|
|
from sahara.service import trusts
|
|
from sahara.utils import cluster as c_u
|
|
from sahara.utils import remote
|
|
from sahara.utils import rpc as rpc_utils
|
|
|
|
|
|
conductor = c.API
|
|
CONF = cfg.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
INFRA = None
|
|
|
|
|
|
def setup_ops(engine):
|
|
global INFRA
|
|
|
|
INFRA = engine
|
|
|
|
|
|
class LocalOps(object):
|
|
def provision_cluster(self, cluster_id):
|
|
context.spawn("cluster-creating-%s" % cluster_id,
|
|
_provision_cluster, cluster_id)
|
|
|
|
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
|
context.spawn("cluster-scaling-%s" % cluster_id,
|
|
_provision_scaled_cluster, cluster_id, node_group_id_map)
|
|
|
|
def terminate_cluster(self, cluster_id):
|
|
context.spawn("cluster-terminating-%s" % cluster_id,
|
|
terminate_cluster, cluster_id)
|
|
|
|
def run_edp_job(self, job_execution_id):
|
|
context.spawn("Starting Job Execution %s" % job_execution_id,
|
|
_run_edp_job, job_execution_id)
|
|
|
|
def cancel_job_execution(self, job_execution_id):
|
|
context.spawn("Canceling Job Execution %s" % job_execution_id,
|
|
_cancel_job_execution, job_execution_id)
|
|
|
|
def delete_job_execution(self, job_execution_id):
|
|
context.spawn("Deleting Job Execution %s" % job_execution_id,
|
|
_delete_job_execution, job_execution_id)
|
|
|
|
def get_engine_type_and_version(self):
|
|
return INFRA.get_type_and_version()
|
|
|
|
|
|
class RemoteOps(rpc_utils.RPCClient):
|
|
def __init__(self):
|
|
target = messaging.Target(topic='sahara-ops', version='1.0')
|
|
super(RemoteOps, self).__init__(target)
|
|
|
|
def provision_cluster(self, cluster_id):
|
|
self.cast('provision_cluster', cluster_id=cluster_id)
|
|
|
|
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
|
self.cast('provision_scaled_cluster', cluster_id=cluster_id,
|
|
node_group_id_map=node_group_id_map)
|
|
|
|
def terminate_cluster(self, cluster_id):
|
|
self.cast('terminate_cluster', cluster_id=cluster_id)
|
|
|
|
def run_edp_job(self, job_execution_id):
|
|
self.cast('run_edp_job', job_execution_id=job_execution_id)
|
|
|
|
def cancel_job_execution(self, job_execution_id):
|
|
self.cast('cancel_job_execution',
|
|
job_execution_id=job_execution_id)
|
|
|
|
def delete_job_execution(self, job_execution_id):
|
|
self.cast('delete_job_execution',
|
|
job_execution_id=job_execution_id)
|
|
|
|
def get_engine_type_and_version(self):
|
|
return self.call('get_engine_type_and_version')
|
|
|
|
|
|
def request_context(func):
|
|
@functools.wraps(func)
|
|
def wrapped(self, ctx, *args, **kwargs):
|
|
context.set_ctx(context.Context(**ctx))
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
class OpsServer(rpc_utils.RPCServer):
|
|
def __init__(self):
|
|
target = messaging.Target(topic='sahara-ops', server=uuid.uuid4(),
|
|
version='1.0')
|
|
super(OpsServer, self).__init__(target)
|
|
|
|
@request_context
|
|
def provision_cluster(self, cluster_id):
|
|
_provision_cluster(cluster_id)
|
|
|
|
@request_context
|
|
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
|
_provision_scaled_cluster(cluster_id, node_group_id_map)
|
|
|
|
@request_context
|
|
def terminate_cluster(self, cluster_id):
|
|
terminate_cluster(cluster_id)
|
|
|
|
@request_context
|
|
def run_edp_job(self, job_execution_id):
|
|
_run_edp_job(job_execution_id)
|
|
|
|
@request_context
|
|
def cancel_job_execution(self, job_execution_id):
|
|
_cancel_job_execution(job_execution_id)
|
|
|
|
@request_context
|
|
def delete_job_execution(self, job_execution_id):
|
|
_delete_job_execution(job_execution_id)
|
|
|
|
@request_context
|
|
def get_engine_type_and_version(self):
|
|
return INFRA.get_type_and_version()
|
|
|
|
|
|
def ops_error_handler(description):
|
|
def decorator(f):
|
|
@functools.wraps(f)
|
|
def wrapper(cluster_id, *args, **kwds):
|
|
ctx = context.ctx()
|
|
try:
|
|
# Clearing status description before executing
|
|
c_u.change_cluster_status_description(cluster_id, "")
|
|
f(cluster_id, *args, **kwds)
|
|
except Exception as ex:
|
|
# something happened during cluster operation
|
|
cluster = conductor.cluster_get(ctx, cluster_id)
|
|
# check if cluster still exists (it might have been removed)
|
|
if (cluster is None or
|
|
cluster.status == c_u.CLUSTER_STATUS_DELETING):
|
|
LOG.debug("Cluster was deleted or marked for deletion. "
|
|
"Canceling current operation.")
|
|
return
|
|
|
|
msg = six.text_type(ex)
|
|
LOG.exception(_LE("Error during operating on cluster (reason: "
|
|
"{reason})").format(reason=msg))
|
|
|
|
try:
|
|
# trying to rollback
|
|
desc = description.format(reason=msg)
|
|
if _rollback_cluster(cluster, ex):
|
|
c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_ACTIVE, desc)
|
|
else:
|
|
c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_ERROR, desc)
|
|
except Exception as rex:
|
|
cluster = conductor.cluster_get(ctx, cluster_id)
|
|
# check if cluster still exists (it might have been
|
|
# removed during rollback)
|
|
if (cluster is None or
|
|
cluster.status == c_u.CLUSTER_STATUS_DELETING):
|
|
LOG.debug("Cluster was deleted or marked for deletion."
|
|
" Canceling current operation.")
|
|
return
|
|
|
|
LOG.exception(
|
|
_LE("Error during rollback of cluster (reason:"
|
|
" {reason})").format(reason=six.text_type(rex)))
|
|
desc = "{0}, {1}".format(msg, six.text_type(rex))
|
|
c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_ERROR,
|
|
description.format(reason=desc))
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def _rollback_cluster(cluster, reason):
|
|
context.set_step_type(_("Engine: rollback cluster"))
|
|
return INFRA.rollback_cluster(cluster, reason)
|
|
|
|
|
|
def _prepare_provisioning(cluster_id):
|
|
ctx = context.ctx()
|
|
cluster = conductor.cluster_get(ctx, cluster_id)
|
|
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
|
|
|
for nodegroup in cluster.node_groups:
|
|
update_dict = {}
|
|
update_dict["image_username"] = INFRA.get_node_group_image_username(
|
|
nodegroup)
|
|
conductor.node_group_update(ctx, nodegroup, update_dict)
|
|
|
|
if CONF.use_identity_api_v3:
|
|
trusts.create_trust_for_cluster(cluster,
|
|
expires=not cluster.is_transient)
|
|
trusts.use_os_admin_auth_token(cluster)
|
|
|
|
cluster = conductor.cluster_get(ctx, cluster_id)
|
|
|
|
return ctx, cluster, plugin
|
|
|
|
|
|
def _update_sahara_info(ctx, cluster):
|
|
sahara_info = {
|
|
'infrastructure_engine': INFRA.get_type_and_version(),
|
|
'remote': remote.get_remote_type_and_version()}
|
|
|
|
return conductor.cluster_update(
|
|
ctx, cluster, {'sahara_info': sahara_info})
|
|
|
|
|
|
@ops_error_handler(
|
|
_("Creating cluster failed for the following reason(s): {reason}"))
|
|
def _provision_cluster(cluster_id):
|
|
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
|
|
|
|
try:
|
|
cluster = _update_sahara_info(ctx, cluster)
|
|
|
|
# updating cluster infra
|
|
cluster = c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_INFRAUPDATING)
|
|
plugin.update_infra(cluster)
|
|
|
|
# creating instances and configuring them
|
|
cluster = conductor.cluster_get(ctx, cluster_id)
|
|
context.set_step_type(_("Engine: create cluster"))
|
|
INFRA.create_cluster(cluster)
|
|
|
|
# configure cluster
|
|
cluster = c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
|
|
shares.mount_shares(cluster)
|
|
|
|
context.set_step_type(_("Plugin: configure cluster"))
|
|
plugin.configure_cluster(cluster)
|
|
|
|
# starting prepared and configured cluster
|
|
ntp_service.configure_ntp(cluster_id)
|
|
cluster = c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_STARTING)
|
|
|
|
context.set_step_type(_("Plugin: start cluster"))
|
|
plugin.start_cluster(cluster)
|
|
|
|
# cluster is now up and ready
|
|
cluster = c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_ACTIVE)
|
|
|
|
# schedule execution pending job for cluster
|
|
for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id):
|
|
job_manager.run_job(je.id)
|
|
|
|
finally:
|
|
if CONF.use_identity_api_v3 and not cluster.is_transient:
|
|
trusts.delete_trust_from_cluster(cluster)
|
|
|
|
|
|
@ops_error_handler(
|
|
_("Scaling cluster failed for the following reason(s): {reason}"))
|
|
def _provision_scaled_cluster(cluster_id, node_group_id_map):
|
|
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
|
|
|
|
try:
|
|
# Decommissioning surplus nodes with the plugin
|
|
cluster = c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING)
|
|
|
|
instances_to_delete = []
|
|
|
|
for node_group in cluster.node_groups:
|
|
new_count = node_group_id_map[node_group.id]
|
|
if new_count < node_group.count:
|
|
instances_to_delete += node_group.instances[new_count:
|
|
node_group.count]
|
|
|
|
if instances_to_delete:
|
|
context.set_step_type(_("Plugin: decommission cluster"))
|
|
plugin.decommission_nodes(cluster, instances_to_delete)
|
|
|
|
# Scaling infrastructure
|
|
cluster = c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_SCALING)
|
|
context.set_step_type(_("Engine: scale cluster"))
|
|
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map)
|
|
|
|
# Setting up new nodes with the plugin
|
|
if instance_ids:
|
|
ntp_service.configure_ntp(cluster_id)
|
|
cluster = c_u.change_cluster_status(
|
|
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
|
|
instances = c_u.get_instances(cluster, instance_ids)
|
|
context.set_step_type(_("Plugin: scale cluster"))
|
|
plugin.scale_cluster(cluster, instances)
|
|
|
|
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
|
|
|
|
finally:
|
|
if CONF.use_identity_api_v3 and not cluster.is_transient:
|
|
trusts.delete_trust_from_cluster(cluster)
|
|
|
|
|
|
@ops_error_handler(
|
|
_("Terminating cluster failed for the following reason(s): {reason}"))
|
|
def terminate_cluster(cluster_id):
|
|
ctx = context.ctx()
|
|
job_manager.update_job_statuses(cluster_id=cluster_id)
|
|
cluster = conductor.cluster_get(ctx, cluster_id)
|
|
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
|
|
|
context.set_step_type(_("Plugin: shutdown cluster"))
|
|
plugin.on_terminate_cluster(cluster)
|
|
|
|
context.set_step_type(_("Engine: shutdown cluster"))
|
|
INFRA.shutdown_cluster(cluster)
|
|
|
|
if CONF.use_identity_api_v3:
|
|
trusts.delete_trust_from_cluster(cluster)
|
|
|
|
conductor.cluster_destroy(ctx, cluster)
|
|
|
|
|
|
def _run_edp_job(job_execution_id):
|
|
job_manager.run_job(job_execution_id)
|
|
|
|
|
|
def _cancel_job_execution(job_execution_id):
|
|
job_manager.cancel_job(job_execution_id)
|
|
|
|
|
|
def _delete_job_execution(job_execution_id):
|
|
try:
|
|
job_execution = job_manager.cancel_job(job_execution_id)
|
|
if not job_execution:
|
|
# job_execution was deleted already, nothing to do
|
|
return
|
|
except exceptions.CancelingFailed:
|
|
LOG.error(_LE("Job execution can't be cancelled in time. "
|
|
"Deleting it anyway."))
|
|
conductor.job_execution_destroy(context.ctx(), job_execution_id)
|