Add CLUSTER_STATUS

We should define a set of CLUSTER_STATUS in stead of using direct string
in code.

1. Add cluster.py in utils/
2. Add cluster status.
3. move cluster operation related methods from general.py to cluster.py

Change-Id: Id95d982a911ab5d0f789265e03bff2256cf75856
This commit is contained in:
Li, Chen 2015-07-09 15:10:27 +08:00
parent bff7110acf
commit e1f5bcf08c
27 changed files with 400 additions and 280 deletions

View File

@ -23,6 +23,7 @@ from sahara import conductor as c
from sahara import context
from sahara.plugins import base as plugin_base
from sahara.service import quotas
from sahara.utils import cluster as c_u
from sahara.utils import general as g
from sahara.utils.notification import sender
from sahara.utils.openstack import base as b
@ -74,14 +75,16 @@ def scale_cluster(id, data):
_add_ports_for_auto_sg(ctx, cluster, plugin)
try:
cluster = g.change_cluster_status(cluster, "Validating")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_VALIDATING)
quotas.check_scaling(cluster, to_be_enlarged, additional)
plugin.recommend_configs(cluster)
plugin.validate_scaling(cluster, to_be_enlarged, additional)
except Exception as e:
with excutils.save_and_reraise_exception():
g.clean_cluster_from_empty_ng(cluster)
g.change_cluster_status(cluster, "Active", six.text_type(e))
c_u.clean_cluster_from_empty_ng(cluster)
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ACTIVE, six.text_type(e))
# If we are here validation is successful.
# So let's update to_be_enlarged map:
@ -129,13 +132,14 @@ def _cluster_create(values, plugin):
# validating cluster
try:
plugin.recommend_configs(cluster)
cluster = g.change_cluster_status(cluster, "Validating")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_VALIDATING)
quotas.check_cluster(cluster)
plugin.validate(cluster)
except Exception as e:
with excutils.save_and_reraise_exception():
g.change_cluster_status(cluster, "Error",
six.text_type(e))
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ERROR, six.text_type(e))
OPS.provision_cluster(cluster.id)
@ -155,7 +159,7 @@ def _add_ports_for_auto_sg(ctx, cluster, plugin):
def terminate_cluster(id):
context.set_current_cluster_id(id)
cluster = g.change_cluster_status(id, "Deleting")
cluster = c_u.change_cluster_status(id, c_u.CLUSTER_STATUS_DELETING)
if cluster is None:
return

View File

@ -26,6 +26,7 @@ from sahara.i18n import _LW
from sahara.service import engine as e
from sahara.service import networks
from sahara.service import volumes
from sahara.utils import cluster as c_u
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as g
from sahara.utils.openstack import neutron
@ -55,12 +56,14 @@ class DirectEngine(e.Engine):
self._update_rollback_strategy(cluster, shutdown=True)
# create all instances
cluster = g.change_cluster_status(cluster, "Spawning")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_SPAWNING)
self._create_instances(cluster)
# wait for all instances are up and networks ready
cluster = g.change_cluster_status(cluster, "Waiting")
instances = g.get_instances(cluster)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_WAITING)
instances = c_u.get_instances(cluster)
self._await_active(cluster, instances)
@ -71,10 +74,11 @@ class DirectEngine(e.Engine):
cluster = conductor.cluster_get(ctx, cluster)
# attach volumes
volumes.attach_to_instances(g.get_instances(cluster))
volumes.attach_to_instances(c_u.get_instances(cluster))
# prepare all instances
cluster = g.change_cluster_status(cluster, "Preparing")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_PREPARING)
self._configure_instances(cluster)
@ -83,7 +87,8 @@ class DirectEngine(e.Engine):
def scale_cluster(self, cluster, node_group_id_map):
_warning_logger()
ctx = context.ctx()
cluster = g.change_cluster_status(cluster, "Scaling: Spawning")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_SCALING_SPAWNING)
instance_ids = self._scale_cluster_instances(cluster,
node_group_id_map)
@ -91,10 +96,10 @@ class DirectEngine(e.Engine):
self._update_rollback_strategy(cluster, instance_ids=instance_ids)
cluster = conductor.cluster_get(ctx, cluster)
g.clean_cluster_from_empty_ng(cluster)
c_u.clean_cluster_from_empty_ng(cluster)
cluster = conductor.cluster_get(ctx, cluster)
instances = g.get_instances(cluster, instance_ids)
instances = c_u.get_instances(cluster, instance_ids)
self._await_active(cluster, instances)
@ -105,7 +110,7 @@ class DirectEngine(e.Engine):
cluster = conductor.cluster_get(ctx, cluster)
volumes.attach_to_instances(
g.get_instances(cluster, instance_ids))
c_u.get_instances(cluster, instance_ids))
# we should be here with valid cluster: if instances creation
# was not successful all extra-instances will be removed above
@ -130,7 +135,8 @@ class DirectEngine(e.Engine):
instance_ids = rollback_info.get('instance_ids', [])
if instance_ids:
self._rollback_cluster_scaling(
cluster, g.get_instances(cluster, instance_ids), reason)
cluster,
c_u.get_instances(cluster, instance_ids), reason)
LOG.warning(_LW("Cluster scaling rollback "
"(reason: {reason})").format(reason=reason))
@ -175,7 +181,8 @@ class DirectEngine(e.Engine):
if cluster.anti_affinity:
aa_group = self._create_aa_server_group(cluster)
cpo.add_provisioning_step(
cluster.id, _("Run instances"), g.count_instances(cluster))
cluster.id, _("Run instances"),
c_u.count_instances(cluster))
for node_group in cluster.node_groups:
count = node_group.count
@ -276,7 +283,8 @@ class DirectEngine(e.Engine):
self._create_auto_security_group(node_group)
if instances_to_delete:
cluster = g.change_cluster_status(cluster, "Deleting Instances")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_DELETING_INSTANCES)
for instance in instances_to_delete:
with context.set_current_instance_id(instance.instance_id):
@ -296,7 +304,8 @@ class DirectEngine(e.Engine):
self._count_instances_to_scale(
node_groups_to_enlarge, node_group_id_map, cluster))
cluster = g.change_cluster_status(cluster, "Adding Instances")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ADDING_INSTANCES)
for ng in cluster.node_groups:
if ng.id in node_groups_to_enlarge:
count = node_group_id_map[ng.id]
@ -426,7 +435,7 @@ class DirectEngine(e.Engine):
'await_for_instances_active',
_("Wait for instances to become active"), sleep=1)
def _check_active(self, active_ids, cluster, instances):
if not g.check_cluster_exists(cluster):
if not c_u.check_cluster_exists(cluster):
return True
for instance in instances:
if instance.id not in active_ids:
@ -471,7 +480,7 @@ class DirectEngine(e.Engine):
self._shutdown_instance(i)
cluster = conductor.cluster_get(context.ctx(), cluster)
g.clean_cluster_from_empty_ng(cluster)
c_u.clean_cluster_from_empty_ng(cluster)
def shutdown_cluster(self, cluster):
"""Shutdown specified cluster and all related resources."""

View File

@ -22,7 +22,7 @@ from sahara import conductor as c
from sahara import context
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.utils import general as g
from sahara.utils import cluster as cluster_utils
conductor = c.API
@ -82,7 +82,7 @@ def _get_cluster_hosts_information(host, cluster):
for i in u.get_instances(clust):
if i.instance_name == host:
return g.generate_etc_hosts(clust)
return cluster_utils.generate_etc_hosts(clust)
return None

View File

@ -30,6 +30,7 @@ from sahara.service.edp import job_utils
from sahara.service.edp.oozie import engine as oozie_engine
from sahara.service.edp.spark import engine as spark_engine
from sahara.service.edp.storm import engine as storm_engine
from sahara.utils import cluster as c_u
from sahara.utils import edp
from sahara.utils import proxy as p
@ -90,7 +91,7 @@ def _run_job(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active':
if cluster.status != c_u.CLUSTER_STATUS_ACTIVE:
return
eng = _get_job_engine(cluster, job_execution)
@ -183,7 +184,8 @@ def get_job_status(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster is not None and cluster.status == 'Active':
if (cluster is not None and
cluster.status == c_u.CLUSTER_STATUS_ACTIVE):
engine = _get_job_engine(cluster, job_execution)
if engine is not None:
job_execution = _update_job_status(engine,

View File

@ -31,9 +31,9 @@ from sahara.service.edp import job_utils
from sahara.service.validations.edp import job_execution as j
from sahara.swift import swift_helper as sw
from sahara.swift import utils as su
from sahara.utils import cluster as c_u
from sahara.utils import edp
from sahara.utils import files
from sahara.utils import general
from sahara.utils import remote
from sahara.utils import xmlutils
@ -73,7 +73,7 @@ class SparkJobEngine(base_engine.JobEngine):
# is gone, we should probably change the status somehow.
# For now, do nothing.
try:
instance = general.get_instances(self.cluster, [inst_id])[0]
instance = c_u.get_instances(self.cluster, [inst_id])[0]
except Exception:
instance = None
return pid, instance

View File

@ -29,9 +29,9 @@ from sahara.service.edp import base_engine
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp import job_utils
from sahara.service.validations.edp import job_execution as j
from sahara.utils import cluster as cluster_utils
from sahara.utils import edp
from sahara.utils import files
from sahara.utils import general
from sahara.utils import remote
conductor = c.API
@ -63,7 +63,7 @@ class StormJobEngine(base_engine.JobEngine):
# is gone, we should probably change the status somehow.
# For now, do nothing.
try:
instance = general.get_instances(self.cluster, [inst_id])[0]
instance = cluster_utils.get_instances(self.cluster, [inst_id])[0]
except Exception:
instance = None
return topology_name, instance

View File

@ -29,6 +29,7 @@ from sahara.i18n import _LI
from sahara.i18n import _LW
from sahara.service import networks
from sahara.service import volumes
from sahara.utils import cluster as cluster_utils
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp
from sahara.utils import general as g
@ -73,7 +74,7 @@ class Engine(object):
@poll_utils.poll_status('ips_assign_timeout', _("Assign IPs"), sleep=1)
def _ips_assign(self, ips_assigned, cluster, instances):
if not g.check_cluster_exists(cluster):
if not cluster_utils.check_cluster_exists(cluster):
return True
for instance in instances:
if instance.id not in ips_assigned:
@ -96,7 +97,7 @@ class Engine(object):
_LI("All instances have IPs assigned"))
cluster = conductor.cluster_get(context.ctx(), cluster)
instances = g.get_instances(cluster, ips_assigned)
instances = cluster_utils.get_instances(cluster, ips_assigned)
cpo.add_provisioning_step(
cluster.id, _("Wait for instance accessibility"), len(instances))
@ -113,7 +114,7 @@ class Engine(object):
'wait_until_accessible', _("Wait for instance accessibility"),
sleep=5)
def _is_accessible(self, instance):
if not g.check_cluster_exists(instance.cluster):
if not cluster_utils.check_cluster_exists(instance.cluster):
return True
try:
# check if ssh is accessible and cloud-init
@ -143,9 +144,10 @@ class Engine(object):
* setup passwordless login
* etc.
"""
hosts_file = g.generate_etc_hosts(cluster)
hosts_file = cluster_utils.generate_etc_hosts(cluster)
cpo.add_provisioning_step(
cluster.id, _("Configure instances"), g.count_instances(cluster))
cluster.id, _("Configure instances"),
cluster_utils.count_instances(cluster))
with context.ThreadGroup() as tg:
for node_group in cluster.node_groups:
@ -271,7 +273,7 @@ sed '/^Defaults requiretty*/ s/^/#/' -i /etc/sudoers\n
'delete_instances_timeout',
_("Wait for instances to be deleted"), sleep=1)
def _check_deleted(self, deleted_ids, cluster, instances):
if not g.check_cluster_exists(cluster):
if not cluster_utils.check_cluster_exists(cluster):
return True
for instance in instances:
@ -305,6 +307,6 @@ sed '/^Defaults requiretty*/ s/^/#/' -i /etc/sudoers\n
def _remove_db_objects(self, cluster):
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, cluster)
instances = g.get_instances(cluster)
instances = cluster_utils.get_instances(cluster)
for inst in instances:
conductor.instance_remove(ctx, inst)

View File

@ -24,8 +24,8 @@ from sahara.i18n import _LW
from sahara.service import engine as e
from sahara.service.heat import templates as ht
from sahara.service import volumes
from sahara.utils import cluster as c_u
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as g
from sahara.utils.openstack import base as b
from sahara.utils.openstack import heat
@ -33,10 +33,14 @@ conductor = c.API
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CREATE_STAGES = ["Spawning", "Waiting", "Preparing"]
SCALE_STAGES = ["Scaling: Spawning", "Scaling: Waiting", "Scaling: Preparing"]
ROLLBACK_STAGES = ["Rollback: Spawning", "Rollback: Waiting",
"Rollback: Preparing"]
CREATE_STAGES = [c_u.CLUSTER_STATUS_SPAWNING, c_u.CLUSTER_STATUS_WAITING,
c_u.CLUSTER_STATUS_PREPARING]
SCALE_STAGES = [c_u.CLUSTER_STATUS_SCALING_SPAWNING,
c_u.CLUSTER_STATUS_SCALING_WAITING,
c_u.CLUSTER_STATUS_SCALING_PREPARING]
ROLLBACK_STAGES = [c_u.CLUSTER_STATUS_ROLLBACK_SPAWNING,
c_u.CLUSTER_STATUS_ROLLBACK_WAITING,
c_u.CLUSTER_STATUS_ROLLBACK__PREPARING]
class HeatEngine(e.Engine):
@ -78,7 +82,7 @@ class HeatEngine(e.Engine):
update_stack=True, disable_rollback=False)
cluster = conductor.cluster_get(ctx, cluster)
g.clean_cluster_from_empty_ng(cluster)
c_u.clean_cluster_from_empty_ng(cluster)
self._update_rollback_strategy(cluster)
@ -126,7 +130,7 @@ class HeatEngine(e.Engine):
def _populate_cluster(self, cluster, stack):
ctx = context.ctx()
old_ids = [i.instance_id for i in g.get_instances(cluster)]
old_ids = [i.instance_id for i in c_u.get_instances(cluster)]
new_ids = []
for node_group in cluster.node_groups:
@ -198,22 +202,22 @@ class HeatEngine(e.Engine):
def _launch_instances(self, cluster, target_count, stages,
update_stack=False, disable_rollback=True):
# create all instances
cluster = g.change_cluster_status(cluster, stages[0])
cluster = c_u.change_cluster_status(cluster, stages[0])
inst_ids = self._create_instances(
cluster, target_count, update_stack, disable_rollback)
# wait for all instances are up and networks ready
cluster = g.change_cluster_status(cluster, stages[1])
cluster = c_u.change_cluster_status(cluster, stages[1])
instances = g.get_instances(cluster, inst_ids)
instances = c_u.get_instances(cluster, inst_ids)
self._await_networks(cluster, instances)
# prepare all instances
cluster = g.change_cluster_status(cluster, stages[2])
cluster = c_u.change_cluster_status(cluster, stages[2])
instances = g.get_instances(cluster, inst_ids)
instances = c_u.get_instances(cluster, inst_ids)
volumes.mount_to_instances(instances)
self._configure_instances(cluster)

View File

@ -21,7 +21,7 @@ from sahara import context
from sahara.i18n import _LI
from sahara.i18n import _LW
from sahara.plugins import provisioning as common_configs
from sahara.utils import general as g
from sahara.utils import cluster as c_u
CONF = cfg.CONF
@ -115,7 +115,7 @@ def configure_ntp(cluster_id):
if not is_ntp_enabled(cluster):
LOG.debug("Don't configure NTP on cluster")
return
instances = g.get_instances(cluster)
instances = c_u.get_instances(cluster)
url = retrieve_ntp_server_url(cluster)
with context.ThreadGroup() as tg:
for instance in instances:

View File

@ -31,7 +31,7 @@ from sahara.service.edp import job_manager
from sahara.service import ntp_service
from sahara.service import shares
from sahara.service import trusts
from sahara.utils import general as g
from sahara.utils import cluster as c_u
from sahara.utils import remote
from sahara.utils import rpc as rpc_utils
@ -160,13 +160,14 @@ def ops_error_handler(description):
ctx = context.ctx()
try:
# Clearing status description before executing
g.change_cluster_status_description(cluster_id, "")
c_u.change_cluster_status_description(cluster_id, "")
f(cluster_id, *args, **kwds)
except Exception as ex:
# something happened during cluster operation
cluster = conductor.cluster_get(ctx, cluster_id)
# check if cluster still exists (it might have been removed)
if cluster is None or cluster.status == 'Deleting':
if (cluster is None or
cluster.status == c_u.CLUSTER_STATUS_DELETING):
LOG.debug("Cluster was deleted or marked for deletion. "
"Canceling current operation.")
return
@ -179,14 +180,17 @@ def ops_error_handler(description):
# trying to rollback
desc = description.format(reason=msg)
if _rollback_cluster(cluster, ex):
g.change_cluster_status(cluster, "Active", desc)
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ACTIVE, desc)
else:
g.change_cluster_status(cluster, "Error", desc)
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ERROR, desc)
except Exception as rex:
cluster = conductor.cluster_get(ctx, cluster_id)
# check if cluster still exists (it might have been
# removed during rollback)
if cluster is None or cluster.status == 'Deleting':
if (cluster is None or
cluster.status == c_u.CLUSTER_STATUS_DELETING):
LOG.debug("Cluster was deleted or marked for deletion."
" Canceling current operation.")
return
@ -195,8 +199,9 @@ def ops_error_handler(description):
_LE("Error during rollback of cluster (reason:"
" {reason})").format(reason=six.text_type(rex)))
desc = "{0}, {1}".format(msg, six.text_type(rex))
g.change_cluster_status(
cluster, "Error", description.format(reason=desc))
c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ERROR,
description.format(reason=desc))
return wrapper
return decorator
@ -245,7 +250,8 @@ def _provision_cluster(cluster_id):
cluster = _update_sahara_info(ctx, cluster)
# updating cluster infra
cluster = g.change_cluster_status(cluster, "InfraUpdating")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_INFRAUPDATING)
plugin.update_infra(cluster)
# creating instances and configuring them
@ -254,19 +260,24 @@ def _provision_cluster(cluster_id):
INFRA.create_cluster(cluster)
# configure cluster
cluster = g.change_cluster_status(cluster, "Configuring")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
shares.mount_shares(cluster)
context.set_step_type(_("Plugin: configure cluster"))
plugin.configure_cluster(cluster)
# starting prepared and configured cluster
ntp_service.configure_ntp(cluster_id)
cluster = g.change_cluster_status(cluster, "Starting")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_STARTING)
context.set_step_type(_("Plugin: start cluster"))
plugin.start_cluster(cluster)
# cluster is now up and ready
cluster = g.change_cluster_status(cluster, "Active")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ACTIVE)
# schedule execution pending job for cluster
for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id):
@ -284,7 +295,8 @@ def _provision_scaled_cluster(cluster_id, node_group_id_map):
try:
# Decommissioning surplus nodes with the plugin
cluster = g.change_cluster_status(cluster, "Decommissioning")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING)
instances_to_delete = []
@ -299,19 +311,21 @@ def _provision_scaled_cluster(cluster_id, node_group_id_map):
plugin.decommission_nodes(cluster, instances_to_delete)
# Scaling infrastructure
cluster = g.change_cluster_status(cluster, "Scaling")
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_SCALING)
context.set_step_type(_("Engine: scale cluster"))
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map)
# Setting up new nodes with the plugin
if instance_ids:
ntp_service.configure_ntp(cluster_id)
cluster = g.change_cluster_status(cluster, "Configuring")
instances = g.get_instances(cluster, instance_ids)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
instances = c_u.get_instances(cluster, instance_ids)
context.set_step_type(_("Plugin: scale cluster"))
plugin.scale_cluster(cluster, instances)
g.change_cluster_status(cluster, "Active")
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
finally:
if CONF.use_identity_api_v3 and not cluster.is_transient:

View File

@ -29,6 +29,7 @@ from sahara.i18n import _LW
from sahara.service.edp import job_manager
from sahara.service import ops
from sahara.service import trusts
from sahara.utils import cluster as c_u
from sahara.utils import edp
from sahara.utils import proxy as p
@ -98,11 +99,11 @@ def terminate_cluster(ctx, cluster, description):
description=description))
else:
if cluster.status != 'AwaitingTermination':
if (cluster.status !=
c_u.CLUSTER_STATUS_AWAITINGTERMINATION):
conductor.cluster_update(
ctx,
cluster,
{'status': 'AwaitingTermination'})
ctx, cluster,
{'status': c_u.CLUSTER_STATUS_AWAITINGTERMINATION})
def set_context(func):
@ -139,7 +140,8 @@ def _make_periodic_tasks():
@set_context
def terminate_unneeded_transient_clusters(self, ctx):
LOG.debug('Terminating unneeded transient clusters')
for cluster in conductor.cluster_get_all(ctx, status='Active'):
for cluster in conductor.cluster_get_all(
ctx, status=c_u.CLUSTER_STATUS_ACTIVE):
if not cluster.is_transient:
continue
@ -183,7 +185,10 @@ def _make_periodic_tasks():
# Criteria support need to be implemented in sahara db API to
# have SQL filtering.
for cluster in conductor.cluster_get_all(ctx):
if cluster.status in ['Active', 'Error', 'Deleting']:
if (cluster.status in
[c_u.CLUSTER_STATUS_ACTIVE,
c_u.CLUSTER_STATUS_ERROR,
c_u.CLUSTER_STATUS_DELETING]):
continue
spacing = get_time_since_last_update(cluster)

View File

@ -21,6 +21,7 @@ import sahara.plugins.base as plugin_base
import sahara.service.api as api
import sahara.service.validations.base as b
import sahara.service.validations.cluster_template_schema as ct_schema
from sahara.utils import cluster as c_u
def _build_node_groups_schema():
@ -99,7 +100,7 @@ def check_cluster_scaling(data, cluster_id, **kwargs):
_("Requested plugin '%s' doesn't support cluster scaling feature")
% cluster.plugin_name)
if cluster.status != 'Active':
if cluster.status != c_u.CLUSTER_STATUS_ACTIVE:
raise ex.InvalidReferenceException(
_("Cluster cannot be scaled not in 'Active' status. "
"Cluster status: %s") % cluster.status)

View File

@ -59,7 +59,7 @@ class TestSpark(base.SaharaTestCase):
pid, inst_id = eng._get_pid_and_inst_id("pid@instance")
self.assertEqual(("pid", "instance"), (pid, inst_id))
@mock.patch('sahara.utils.general.get_instances')
@mock.patch('sahara.utils.cluster.get_instances')
def test_get_instance_if_running(self, get_instances):
'''Test retrieval of pid and instance object for running job

View File

@ -51,7 +51,7 @@ class TestStorm(base.SaharaTestCase):
self.assertEqual(("topology_name", "instance"),
(topology_name, inst_id))
@mock.patch('sahara.utils.general.get_instances')
@mock.patch('sahara.utils.cluster.get_instances')
def test_get_instance_if_running(self, get_instances):
'''Test retrieval of topology_name and instance object for running job
@ -96,7 +96,7 @@ class TestStorm(base.SaharaTestCase):
self.assertIsNone(instance)
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.utils.general.get_instances')
@mock.patch('sahara.utils.cluster.get_instances')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
@ -184,7 +184,7 @@ class TestStorm(base.SaharaTestCase):
'_get_job_status_from_remote',
autospec=True,
return_value={"status": edp.JOB_STATUS_KILLED})
@mock.patch('sahara.utils.general.get_instances')
@mock.patch('sahara.utils.cluster.get_instances')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job(self, get_remote, get_instance, get_instances,

View File

@ -71,7 +71,7 @@ class HDFSHelperTestCase(base.SaharaTestCase):
self.cluster.execute_command.assert_called_once_with(
'sudo su - -c "hadoop dfs -mkdir -p Earth" BigBang')
@mock.patch('sahara.utils.general.generate_etc_hosts')
@mock.patch('sahara.utils.cluster.generate_etc_hosts')
@mock.patch('sahara.plugins.utils.get_instances')
@mock.patch('sahara.conductor.api.LocalApi.cluster_get_all')
def test_get_cluster_hosts_information_smthg_wrong(self, mock_get_all,
@ -81,7 +81,7 @@ class HDFSHelperTestCase(base.SaharaTestCase):
self.assertIsNone(res)
@mock.patch('sahara.context.ctx')
@mock.patch('sahara.utils.general.generate_etc_hosts')
@mock.patch('sahara.utils.cluster.generate_etc_hosts')
@mock.patch('sahara.plugins.utils.get_instances')
@mock.patch('sahara.conductor.api.LocalApi.cluster_get_all')
def test_get_cluster_hosts_information_c_id(self, mock_get_all,
@ -97,7 +97,7 @@ class HDFSHelperTestCase(base.SaharaTestCase):
self.assertIsNone(res)
@mock.patch('sahara.context.ctx')
@mock.patch('sahara.utils.general.generate_etc_hosts')
@mock.patch('sahara.utils.cluster.generate_etc_hosts')
@mock.patch('sahara.plugins.utils.get_instances')
@mock.patch('sahara.conductor.api.LocalApi.cluster_get_all')
def test_get_cluster_hosts_information_i_name(self, mock_get_all,

View File

@ -64,7 +64,7 @@ class TestEngine(base.SaharaWithDbTestCase):
@mock.patch('sahara.service.networks.init_instances_ips',
return_value=True)
@mock.patch('sahara.context.set_current_instance_id')
@mock.patch('sahara.utils.general.check_cluster_exists', return_value=True)
@mock.patch('sahara.utils.cluster.check_cluster_exists', return_value=True)
def test_ips_assign(self, g, ctx, init, ops):
cluster = mock.Mock()
instances = [mock.Mock(id='1'), mock.Mock(id='2')]

View File

@ -21,8 +21,8 @@ from sahara import context
from sahara.service import direct_engine as e
from sahara.service import ops
from sahara.tests.unit import base
import sahara.utils.crypto as c
from sahara.utils import general as g
from sahara.utils import cluster as cluster_utils
from sahara.utils import crypto as c
conductor = cond.API
@ -220,7 +220,7 @@ class IpManagementTest(AbstractInstanceTest):
self.engine._create_instances(cluster)
cluster = conductor.cluster_get(ctx, cluster)
instances_list = g.get_instances(cluster)
instances_list = cluster_utils.get_instances(cluster)
self.engine._assign_floating_ips(instances_list)
@ -245,7 +245,7 @@ class ShutdownClusterTest(AbstractInstanceTest):
self.engine._create_instances(cluster)
cluster = conductor.cluster_get(ctx, cluster)
instances_list = g.get_instances(cluster)
instances_list = cluster_utils.get_instances(cluster)
self.engine._assign_floating_ips(instances_list)

View File

@ -76,12 +76,12 @@ class FakeINFRA(object):
class TestOPS(base.SaharaWithDbTestCase):
SEQUENCE = []
@mock.patch('sahara.utils.general.change_cluster_status_description',
@mock.patch('sahara.utils.cluster.change_cluster_status_description',
return_value=FakeCluster())
@mock.patch('sahara.service.ops._update_sahara_info')
@mock.patch('sahara.service.ops._prepare_provisioning',
return_value=(mock.Mock(), mock.Mock(), FakePlugin()))
@mock.patch('sahara.utils.general.change_cluster_status')
@mock.patch('sahara.utils.cluster.change_cluster_status')
@mock.patch('sahara.conductor.API.cluster_get')
@mock.patch('sahara.service.ops.CONF')
@mock.patch('sahara.service.trusts.create_trust_for_cluster')
@ -103,9 +103,9 @@ class TestOPS(base.SaharaWithDbTestCase):
@mock.patch('sahara.service.ops.CONF')
@mock.patch('sahara.service.ops._prepare_provisioning',
return_value=(mock.Mock(), mock.Mock(), FakePlugin()))
@mock.patch('sahara.utils.general.change_cluster_status',
@mock.patch('sahara.utils.cluster.change_cluster_status',
return_value=FakePlugin())
@mock.patch('sahara.utils.general.get_instances')
@mock.patch('sahara.utils.cluster.get_instances')
def test_provision_scaled_cluster(self, p_get_instances, p_change_status,
p_prep_provisioning, p_conf, p_ntp):
del self.SEQUENCE[:]
@ -132,9 +132,9 @@ class TestOPS(base.SaharaWithDbTestCase):
'cluster_destroy'], self.SEQUENCE,
'Order of calls is wrong')
@mock.patch('sahara.utils.general.change_cluster_status_description')
@mock.patch('sahara.utils.cluster.change_cluster_status_description')
@mock.patch('sahara.service.ops._prepare_provisioning')
@mock.patch('sahara.utils.general.change_cluster_status')
@mock.patch('sahara.utils.cluster.change_cluster_status')
@mock.patch('sahara.service.ops._rollback_cluster')
@mock.patch('sahara.conductor.API.cluster_get')
def test_ops_error_hadler_success_rollback(
@ -156,9 +156,9 @@ class TestOPS(base.SaharaWithDbTestCase):
ops._provision_scaled_cluster(fake_cluster.id, {'id': 1})
self.assertEqual(expected, p_change_cluster_status.call_args_list)
@mock.patch('sahara.utils.general.change_cluster_status_description')
@mock.patch('sahara.utils.cluster.change_cluster_status_description')
@mock.patch('sahara.service.ops._prepare_provisioning')
@mock.patch('sahara.utils.general.change_cluster_status')
@mock.patch('sahara.utils.cluster.change_cluster_status')
@mock.patch('sahara.service.ops._rollback_cluster')
@mock.patch('sahara.conductor.API.cluster_get')
def test_ops_error_hadler_failed_rollback(

View File

@ -21,7 +21,7 @@ from sahara.conductor import resource as r
from sahara import exceptions as ex
from sahara.service import volumes
from sahara.tests.unit import base
from sahara.utils import general as g
from sahara.utils import cluster as cluster_utils
class TestAttachVolume(base.SaharaWithDbTestCase):
@ -112,7 +112,7 @@ class TestAttachVolume(base.SaharaWithDbTestCase):
cluster = r.ClusterResource({'node_groups': [ng]})
volumes.attach_to_instances(g.get_instances(cluster))
volumes.attach_to_instances(cluster_utils.get_instances(cluster))
self.assertEqual(4, p_create_attach_vol.call_count)
self.assertEqual(2, p_await.call_count)
self.assertEqual(4, p_mount.call_count)

View File

@ -0,0 +1,113 @@
# Copyright (c) 2015 Intel Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import conductor
from sahara import context
from sahara.tests.unit import base
from sahara.tests.unit.conductor import test_api
from sahara.utils import cluster as cluster_utils
class UtilsClusterTest(base.SaharaWithDbTestCase):
def setUp(self):
super(UtilsClusterTest, self).setUp()
self.api = conductor.API
def _make_sample(self):
ctx = context.ctx()
cluster = self.api.cluster_create(ctx, test_api.SAMPLE_CLUSTER)
return cluster
def test_change_cluster_status(self):
cluster = self._make_sample()
cluster = cluster_utils.change_cluster_status(
cluster, "Deleting", "desc")
self.assertEqual("Deleting", cluster.status)
self.assertEqual("desc", cluster.status_description)
cluster_utils.change_cluster_status(cluster, "Spawning")
self.assertEqual("Deleting", cluster.status)
def test_change_status_description(self):
ctx = context.ctx()
cluster = self._make_sample()
cluster_id = cluster.id
cluster = cluster_utils.change_cluster_status_description(
cluster, "desc")
self.assertEqual('desc', cluster.status_description)
self.api.cluster_destroy(ctx, cluster)
cluster = cluster_utils.change_cluster_status_description(
cluster_id, "desc")
self.assertIsNone(cluster)
def test_get_instances(self):
cluster = self._make_sample()
ctx = context.ctx()
idx = 0
ids = []
for ng in cluster.node_groups:
for i in range(ng.count):
idx += 1
ids.append(self.api.instance_add(ctx, ng, {
'instance_id': str(idx),
'instance_name': str(idx),
}))
cluster = self.api.cluster_get(ctx, cluster)
instances = cluster_utils.get_instances(cluster, ids)
ids = set()
for inst in instances:
ids.add(inst.instance_id)
self.assertEqual(idx, len(ids))
for i in range(1, idx):
self.assertIn(str(i), ids)
instances = cluster_utils.get_instances(cluster)
ids = set()
for inst in instances:
ids.add(inst.instance_id)
self.assertEqual(idx, len(ids))
for i in range(1, idx):
self.assertIn(str(i), ids)
def test_clean_cluster_from_empty_ng(self):
ctx = context.ctx()
cluster = self._make_sample()
ng = cluster.node_groups[0]
ng_len = len(cluster.node_groups)
self.api.node_group_update(ctx, ng, {'count': 0})
cluster = self.api.cluster_get(ctx, cluster.id)
cluster_utils.clean_cluster_from_empty_ng(cluster)
cluster = self.api.cluster_get(ctx, cluster.id)
self.assertEqual(ng_len - 1, len(cluster.node_groups))
def test_generate_etc_hosts(self):
cluster = self._make_sample()
ctx = context.ctx()
idx = 0
for ng in cluster.node_groups:
for i in range(ng.count):
idx += 1
self.api.instance_add(ctx, ng, {
'instance_id': str(idx),
'instance_name': str(idx),
'internal_ip': str(idx),
})
cluster = self.api.cluster_get(ctx, cluster)
value = cluster_utils.generate_etc_hosts(cluster)
expected = ("127.0.0.1 localhost\n"
"1 1.novalocal 1\n"
"2 2.novalocal 2\n"
"3 3.novalocal 3\n"
"4 4.novalocal 4\n")
self.assertEqual(expected, value)

View File

@ -165,7 +165,7 @@ class ClusterProgressOpsTest(base.SaharaWithDbTestCase):
pass
@mock.patch('sahara.utils.cluster_progress_ops._find_in_args')
@mock.patch('sahara.utils.general.check_cluster_exists')
@mock.patch('sahara.utils.cluster.check_cluster_exists')
def test_event_wrapper(self, p_check_cluster_exists, p_find):
self.override_config("disable_event_log", True)
self._do_nothing()

View File

@ -15,22 +15,13 @@
import mock
from sahara import conductor
from sahara import context
from sahara.tests.unit import base
from sahara.tests.unit.conductor import test_api
from sahara.utils import general
class UtilsGeneralTest(base.SaharaWithDbTestCase):
def setUp(self):
super(UtilsGeneralTest, self).setUp()
self.api = conductor.API
def _make_sample(self):
ctx = context.ctx()
cluster = self.api.cluster_create(ctx, test_api.SAMPLE_CLUSTER)
return cluster
def test_find_dict(self):
iterable = [
@ -74,82 +65,3 @@ class UtilsGeneralTest(base.SaharaWithDbTestCase):
self.assertIsNone(general.get_by_id(lst, 9))
self.assertEqual(lst[0], general.get_by_id(lst, 5))
self.assertEqual(lst[1], general.get_by_id(lst, 7))
def test_change_cluster_status(self):
cluster = self._make_sample()
cluster = general.change_cluster_status(cluster, "Deleting", "desc")
self.assertEqual("Deleting", cluster.status)
self.assertEqual("desc", cluster.status_description)
general.change_cluster_status(cluster, "Spawning")
self.assertEqual("Deleting", cluster.status)
def test_change_status_description(self):
ctx = context.ctx()
cluster = self._make_sample()
cluster_id = cluster.id
cluster = general.change_cluster_status_description(cluster, "desc")
self.assertEqual('desc', cluster.status_description)
self.api.cluster_destroy(ctx, cluster)
cluster = general.change_cluster_status_description(cluster_id, "desc")
self.assertIsNone(cluster)
def test_get_instances(self):
cluster = self._make_sample()
ctx = context.ctx()
idx = 0
ids = []
for ng in cluster.node_groups:
for i in range(ng.count):
idx += 1
ids.append(self.api.instance_add(context.ctx(), ng, {
'instance_id': str(idx),
'instance_name': str(idx),
}))
cluster = self.api.cluster_get(ctx, cluster)
instances = general.get_instances(cluster, ids)
ids = set()
for inst in instances:
ids.add(inst.instance_id)
self.assertEqual(idx, len(ids))
for i in range(1, idx):
self.assertIn(str(i), ids)
instances = general.get_instances(cluster)
ids = set()
for inst in instances:
ids.add(inst.instance_id)
self.assertEqual(idx, len(ids))
for i in range(1, idx):
self.assertIn(str(i), ids)
def test_clean_cluster_from_empty_ng(self):
ctx = context.ctx()
cluster = self._make_sample()
ng = cluster.node_groups[0]
ng_len = len(cluster.node_groups)
self.api.node_group_update(ctx, ng, {'count': 0})
cluster = self.api.cluster_get(ctx, cluster.id)
general.clean_cluster_from_empty_ng(cluster)
cluster = self.api.cluster_get(ctx, cluster.id)
self.assertEqual(ng_len - 1, len(cluster.node_groups))
def test_generate_etc_hosts(self):
cluster = self._make_sample()
ctx = context.ctx()
idx = 0
for ng in cluster.node_groups:
for i in range(ng.count):
idx += 1
self.api.instance_add(ctx, ng, {
'instance_id': str(idx),
'instance_name': str(idx),
'internal_ip': str(idx),
})
cluster = self.api.cluster_get(ctx, cluster)
value = general.generate_etc_hosts(cluster)
expected = ("127.0.0.1 localhost\n"
"1 1.novalocal 1\n"
"2 2.novalocal 2\n"
"3 3.novalocal 3\n"
"4 4.novalocal 4\n")
self.assertEqual(expected, value)

View File

@ -84,7 +84,7 @@ class TestPollUtils(base.SaharaTestCase):
self.assertEqual(expected_message, message)
@mock.patch('sahara.utils.poll_utils.LOG.debug')
@mock.patch('sahara.utils.general.check_cluster_exists')
@mock.patch('sahara.utils.cluster.check_cluster_exists')
def test_plugin_poll_first_scenario(self, cluster_exists, logger):
cluster_exists.return_value = True
fake_get_status = mock.Mock()
@ -98,7 +98,7 @@ class TestPollUtils(base.SaharaTestCase):
self.assertEqual([expected_call], logger.call_args_list)
@mock.patch('sahara.utils.poll_utils.LOG.debug')
@mock.patch('sahara.utils.general.check_cluster_exists')
@mock.patch('sahara.utils.cluster.check_cluster_exists')
def test_plugin_poll_second_scenario(self, cluster_exists, logger):
cluster_exists.return_value = False
fake_get_status = mock.Mock()

138
sahara/utils/cluster.py Normal file
View File

@ -0,0 +1,138 @@
# Copyright (c) 2015 Intel Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from sahara import conductor as c
from sahara import context
from sahara import exceptions as e
from sahara.i18n import _LI
from sahara.utils.notification import sender
conductor = c.API
LOG = logging.getLogger(__name__)
# cluster status
CLUSTER_STATUS_VALIDATING = "Validating"
CLUSTER_STATUS_INFRAUPDATING = "InfraUpdating"
CLUSTER_STATUS_SPAWNING = "Spawning"
CLUSTER_STATUS_WAITING = "Waiting"
CLUSTER_STATUS_PREPARING = "Preparing"
CLUSTER_STATUS_CONFIGURING = "Configuring"
CLUSTER_STATUS_STARTING = "Starting"
CLUSTER_STATUS_ACTIVE = "Active"
CLUSTER_STATUS_DECOMMISSIONING = "Decommissioning"
CLUSTER_STATUS_ERROR = "Error"
CLUSTER_STATUS_DELETING = "Deleting"
CLUSTER_STATUS_AWAITINGTERMINATION = "AwaitingTermination"
# cluster status -- Instances
CLUSTER_STATUS_DELETING_INSTANCES = "Deleting Instances"
CLUSTER_STATUS_ADDING_INSTANCES = "Adding Instances"
# Scaling status
CLUSTER_STATUS_SCALING = "Scaling"
CLUSTER_STATUS_SCALING_SPAWNING = (CLUSTER_STATUS_SCALING +
": " + CLUSTER_STATUS_SPAWNING)
CLUSTER_STATUS_SCALING_WAITING = (CLUSTER_STATUS_SCALING +
": " + CLUSTER_STATUS_WAITING)
CLUSTER_STATUS_SCALING_PREPARING = (CLUSTER_STATUS_SCALING +
": " + CLUSTER_STATUS_PREPARING)
# Rollback status
CLUSTER_STATUS_ROLLBACK = "Rollback"
CLUSTER_STATUS_ROLLBACK_SPAWNING = (CLUSTER_STATUS_ROLLBACK +
": " + CLUSTER_STATUS_SPAWNING)
CLUSTER_STATUS_ROLLBACK_WAITING = (CLUSTER_STATUS_ROLLBACK +
": " + CLUSTER_STATUS_WAITING)
CLUSTER_STATUS_ROLLBACK__PREPARING = (CLUSTER_STATUS_ROLLBACK +
": " + CLUSTER_STATUS_PREPARING)
def change_cluster_status_description(cluster, status_description):
try:
ctx = context.ctx()
return conductor.cluster_update(
ctx, cluster, {'status_description': status_description})
except e.NotFoundException:
return None
def change_cluster_status(cluster, status, status_description=None):
ctx = context.ctx()
# Update cluster status. Race conditions with deletion are still possible,
# but this reduces probability at least.
cluster = conductor.cluster_get(ctx, cluster) if cluster else None
if status_description is not None:
change_cluster_status_description(cluster, status_description)
# 'Deleting' is final and can't be changed
if cluster is None or cluster.status == CLUSTER_STATUS_DELETING:
return cluster
update_dict = {"status": status}
cluster = conductor.cluster_update(ctx, cluster, update_dict)
conductor.cluster_provision_progress_update(ctx, cluster.id)
LOG.info(_LI("Cluster status has been changed. New status="
"{status}").format(status=cluster.status))
sender.notify(ctx, cluster.id, cluster.name, cluster.status,
"update")
return cluster
def count_instances(cluster):
return sum([node_group.count for node_group in cluster.node_groups])
def check_cluster_exists(cluster):
ctx = context.ctx()
# check if cluster still exists (it might have been removed)
cluster = conductor.cluster_get(ctx, cluster)
return cluster is not None
def get_instances(cluster, instances_ids=None):
inst_map = {}
for node_group in cluster.node_groups:
for instance in node_group.instances:
inst_map[instance.id] = instance
if instances_ids is not None:
return [inst_map[id] for id in instances_ids]
else:
return [v for v in inst_map.values()]
def clean_cluster_from_empty_ng(cluster):
ctx = context.ctx()
for ng in cluster.node_groups:
if ng.count == 0:
conductor.node_group_remove(ctx, ng)
def generate_etc_hosts(cluster):
hosts = "127.0.0.1 localhost\n"
for node_group in cluster.node_groups:
for instance in node_group.instances:
hosts += "%s %s %s\n" % (instance.internal_ip,
instance.fqdn(),
instance.hostname())
return hosts

View File

@ -23,7 +23,7 @@ import six
from sahara import conductor as c
from sahara.conductor import resource
from sahara import context
from sahara.utils import general as g
from sahara.utils import cluster as cluster_utils
conductor = c.API
CONF = cfg.CONF
@ -73,7 +73,8 @@ def add_fail_event(instance, exception):
def add_provisioning_step(cluster_id, step_name, total):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
if (CONF.disable_event_log or
not cluster_utils.check_cluster_exists(cluster_id)):
return
prev_step = get_current_provisioning_step(cluster_id)
@ -93,7 +94,8 @@ def add_provisioning_step(cluster_id, step_name, total):
def get_current_provisioning_step(cluster_id):
if CONF.disable_event_log or not g.check_cluster_exists(cluster_id):
if (CONF.disable_event_log or
not cluster_utils.check_cluster_exists(cluster_id)):
return None
current_instance_info = context.ctx().current_instance_info
return current_instance_info.step_id
@ -123,7 +125,7 @@ def event_wrapper(mark_successful_on_exit, **spec):
instance = _find_in_args(spec, *args, **kwargs)
cluster_id = instance.cluster_id
if not g.check_cluster_exists(cluster_id):
if not cluster_utils.check_cluster_exists(cluster_id):
return func(*args, **kwargs)
if step_name:

View File

@ -15,17 +15,8 @@
import re
from oslo_log import log as logging
import six
from sahara import conductor as c
from sahara import context
from sahara import exceptions as e
from sahara.i18n import _LI
from sahara.utils.notification import sender
conductor = c.API
LOG = logging.getLogger(__name__)
NATURAL_SORT_RE = re.compile('([0-9]+)')
@ -73,83 +64,6 @@ def natural_sort_key(s):
for text in re.split(NATURAL_SORT_RE, s)]
def change_cluster_status_description(cluster, status_description):
try:
ctx = context.ctx()
return conductor.cluster_update(
ctx, cluster, {'status_description': status_description})
except e.NotFoundException:
return None
def change_cluster_status(cluster, status, status_description=None):
ctx = context.ctx()
# Update cluster status. Race conditions with deletion are still possible,
# but this reduces probability at least.
cluster = conductor.cluster_get(ctx, cluster) if cluster else None
if status_description is not None:
change_cluster_status_description(cluster, status_description)
# 'Deleting' is final and can't be changed
if cluster is None or cluster.status == 'Deleting':
return cluster
update_dict = {"status": status}
cluster = conductor.cluster_update(ctx, cluster, update_dict)
conductor.cluster_provision_progress_update(ctx, cluster.id)
LOG.info(_LI("Cluster status has been changed. New status="
"{status}").format(status=cluster.status))
sender.notify(ctx, cluster.id, cluster.name, cluster.status,
"update")
return cluster
def count_instances(cluster):
return sum([node_group.count for node_group in cluster.node_groups])
def check_cluster_exists(cluster):
ctx = context.ctx()
# check if cluster still exists (it might have been removed)
cluster = conductor.cluster_get(ctx, cluster)
return cluster is not None
def get_instances(cluster, instances_ids=None):
inst_map = {}
for node_group in cluster.node_groups:
for instance in node_group.instances:
inst_map[instance.id] = instance
if instances_ids is not None:
return [inst_map[id] for id in instances_ids]
else:
return [v for v in six.itervalues(inst_map)]
def clean_cluster_from_empty_ng(cluster):
ctx = context.ctx()
for ng in cluster.node_groups:
if ng.count == 0:
conductor.node_group_remove(ctx, ng)
def generate_etc_hosts(cluster):
hosts = "127.0.0.1 localhost\n"
for node_group in cluster.node_groups:
for instance in node_group.instances:
hosts += "%s %s %s\n" % (instance.internal_ip,
instance.fqdn(),
instance.hostname())
return hosts
def generate_instance_name(cluster_name, node_group_name, index):
return ("%s-%s-%03d" % (cluster_name, node_group_name, index)).lower()

View File

@ -21,7 +21,7 @@ from oslo_utils import timeutils
from sahara import context
from sahara import exceptions as ex
from sahara.utils import general
from sahara.utils import cluster as cluster_utils
LOG = logging.getLogger(__name__)
@ -140,7 +140,7 @@ def plugin_option_poll(cluster, get_status, option, operation_name, sleep_time,
kwargs):
def _get(n_cluster, n_kwargs):
if not general.check_cluster_exists(n_cluster):
if not cluster_utils.check_cluster_exists(n_cluster):
return True
return get_status(**n_kwargs)