Merge "Implement OS::Sahara::Cluster resource"
This commit is contained in:
commit
859c16c033
185
heat/engine/resources/sahara_cluster.py
Normal file
185
heat/engine/resources/sahara_cluster.py
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
# Copyright (c) 2014 Mirantis Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from heat.common import exception
|
||||||
|
from heat.engine import attributes
|
||||||
|
from heat.engine import constraints
|
||||||
|
from heat.engine import properties
|
||||||
|
from heat.engine import resource
|
||||||
|
from heat.openstack.common.gettextutils import _
|
||||||
|
from heat.openstack.common import log as logging
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SaharaCluster(resource.Resource):
|
||||||
|
|
||||||
|
PROPERTIES = (
|
||||||
|
NAME, PLUGIN_NAME, HADOOP_VERSION, CLUSTER_TEMPLATE_ID,
|
||||||
|
KEY_NAME, IMAGE, MANAGEMENT_NETWORK,
|
||||||
|
) = (
|
||||||
|
'name', 'plugin_name', 'hadoop_version', 'cluster_template_id',
|
||||||
|
'key_name', 'image', 'neutron_management_network',
|
||||||
|
)
|
||||||
|
|
||||||
|
ATTRIBUTES = (
|
||||||
|
STATUS, INFO,
|
||||||
|
) = (
|
||||||
|
"status", "info",
|
||||||
|
)
|
||||||
|
|
||||||
|
properties_schema = {
|
||||||
|
NAME: properties.Schema(
|
||||||
|
properties.Schema.STRING,
|
||||||
|
_('Hadoop cluster name.'),
|
||||||
|
),
|
||||||
|
PLUGIN_NAME: properties.Schema(
|
||||||
|
properties.Schema.STRING,
|
||||||
|
_('Plugin name.'),
|
||||||
|
required=True,
|
||||||
|
),
|
||||||
|
HADOOP_VERSION: properties.Schema(
|
||||||
|
properties.Schema.STRING,
|
||||||
|
_('Version of Hadoop running on instances.'),
|
||||||
|
required=True,
|
||||||
|
),
|
||||||
|
CLUSTER_TEMPLATE_ID: properties.Schema(
|
||||||
|
properties.Schema.STRING,
|
||||||
|
_('ID of the Cluster Template used for '
|
||||||
|
'Node Groups and configurations.'),
|
||||||
|
required=True,
|
||||||
|
),
|
||||||
|
KEY_NAME: properties.Schema(
|
||||||
|
properties.Schema.STRING,
|
||||||
|
_('Keypair added to instances to make them accessible for user.'),
|
||||||
|
constraints=[
|
||||||
|
constraints.CustomConstraint('nova.keypair')
|
||||||
|
],
|
||||||
|
),
|
||||||
|
IMAGE: properties.Schema(
|
||||||
|
properties.Schema.STRING,
|
||||||
|
_('Name or UUID of the image used to boot Hadoop nodes.'),
|
||||||
|
constraints=[
|
||||||
|
constraints.CustomConstraint('glance.image')
|
||||||
|
],
|
||||||
|
),
|
||||||
|
MANAGEMENT_NETWORK: properties.Schema(
|
||||||
|
properties.Schema.STRING,
|
||||||
|
_('Name or UUID of Neutron network.'),
|
||||||
|
constraints=[
|
||||||
|
constraints.CustomConstraint('neutron.network')
|
||||||
|
],
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
attributes_schema = {
|
||||||
|
STATUS: attributes.Schema(
|
||||||
|
_("Cluster status."),
|
||||||
|
),
|
||||||
|
INFO: attributes.Schema(
|
||||||
|
_("Cluster information."),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
default_client_name = 'sahara'
|
||||||
|
|
||||||
|
def _cluster_name(self):
|
||||||
|
name = self.properties.get(self.NAME)
|
||||||
|
if name:
|
||||||
|
return name
|
||||||
|
return self.physical_resource_name()
|
||||||
|
|
||||||
|
def handle_create(self):
|
||||||
|
plugin_name = self.properties[self.PLUGIN_NAME]
|
||||||
|
hadoop_version = self.properties[self.HADOOP_VERSION]
|
||||||
|
cluster_template_id = self.properties[self.CLUSTER_TEMPLATE_ID]
|
||||||
|
image_id = self.properties.get(self.IMAGE)
|
||||||
|
if image_id:
|
||||||
|
image_id = self.client_plugin('glance').get_image_id(image_id)
|
||||||
|
|
||||||
|
# check that image is provided in case when
|
||||||
|
# cluster template is missing one
|
||||||
|
cluster_template = self.client().cluster_templates.get(
|
||||||
|
cluster_template_id)
|
||||||
|
if cluster_template.default_image_id is None and not image_id:
|
||||||
|
msg = _("%(img)s must be provided: Referenced cluster template "
|
||||||
|
"%(tmpl)s has no default_image_id defined.") % {
|
||||||
|
'img': self.IMAGE, 'tmpl': cluster_template_id}
|
||||||
|
raise exception.StackValidationFailed(message=msg)
|
||||||
|
|
||||||
|
key_name = self.properties.get(self.KEY_NAME)
|
||||||
|
net_id = self.properties.get(self.MANAGEMENT_NETWORK)
|
||||||
|
if net_id:
|
||||||
|
net_id = self.client_plugin('neutron').find_neutron_resource(
|
||||||
|
self.properties, self.MANAGEMENT_NETWORK, 'network')
|
||||||
|
|
||||||
|
cluster = self.client().clusters.create(
|
||||||
|
self._cluster_name(),
|
||||||
|
plugin_name, hadoop_version,
|
||||||
|
cluster_template_id=cluster_template_id,
|
||||||
|
user_keypair_id=key_name,
|
||||||
|
default_image_id=image_id,
|
||||||
|
net_id=net_id)
|
||||||
|
LOG.info(_('Cluster "%s" is being started.') % cluster.name)
|
||||||
|
self.resource_id_set(cluster.id)
|
||||||
|
return self.resource_id
|
||||||
|
|
||||||
|
def check_create_complete(self, cluster_id):
|
||||||
|
cluster = self.client().clusters.get(cluster_id)
|
||||||
|
if cluster.status == 'Error':
|
||||||
|
raise resource.ResourceInError(resource_status=cluster.status)
|
||||||
|
|
||||||
|
if cluster.status != 'Active':
|
||||||
|
return False
|
||||||
|
|
||||||
|
LOG.info(_("Cluster '%s' has been created") % cluster.name)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def handle_delete(self):
|
||||||
|
if not self.resource_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.client().clusters.delete(self.resource_id)
|
||||||
|
except Exception as ex:
|
||||||
|
self.client_plugin().ignore_not_found(ex)
|
||||||
|
|
||||||
|
LOG.info(_("Cluster '%s' has been deleted")
|
||||||
|
% self._cluster_name())
|
||||||
|
|
||||||
|
def _resolve_attribute(self, name):
|
||||||
|
cluster = self.client().clusters.get(self.resource_id)
|
||||||
|
return getattr(cluster, name, None)
|
||||||
|
|
||||||
|
def validate(self):
|
||||||
|
res = super(SaharaCluster, self).validate()
|
||||||
|
if res:
|
||||||
|
return res
|
||||||
|
|
||||||
|
# check if running on neutron and MANAGEMENT_NETWORK missing
|
||||||
|
#NOTE(pshchelo): on nova-network with MANAGEMENT_NETWORK present
|
||||||
|
# overall stack validation will fail due to neutron.network constraint,
|
||||||
|
# although the message will be not really relevant.
|
||||||
|
if (self.is_using_neutron() and
|
||||||
|
not self.properties.get(self.MANAGEMENT_NETWORK)):
|
||||||
|
msg = _("%s must be provided"
|
||||||
|
) % self.MANAGEMENT_NETWORK
|
||||||
|
raise exception.StackValidationFailed(message=msg)
|
||||||
|
|
||||||
|
|
||||||
|
def resource_mapping():
|
||||||
|
return {
|
||||||
|
'OS::Sahara::Cluster': SaharaCluster,
|
||||||
|
}
|
172
heat/tests/test_sahara_cluster.py
Normal file
172
heat/tests/test_sahara_cluster.py
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
# Copyright (c) 2014 Mirantis Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from oslo.config import cfg
|
||||||
|
import six
|
||||||
|
|
||||||
|
from heat.common import exception
|
||||||
|
from heat.common import template_format
|
||||||
|
from heat.engine.clients.os import glance
|
||||||
|
from heat.engine.clients.os import neutron
|
||||||
|
from heat.engine.clients.os import sahara
|
||||||
|
from heat.engine.resources import sahara_cluster as sc
|
||||||
|
from heat.engine import scheduler
|
||||||
|
from heat.tests.common import HeatTestCase
|
||||||
|
from heat.tests import utils
|
||||||
|
|
||||||
|
|
||||||
|
cluster_stack_template = """
|
||||||
|
heat_template_version: 2013-05-23
|
||||||
|
description: Hadoop Cluster by Sahara
|
||||||
|
resources:
|
||||||
|
super-cluster:
|
||||||
|
type: OS::Sahara::Cluster
|
||||||
|
properties:
|
||||||
|
name: super-cluster
|
||||||
|
plugin_name: vanilla
|
||||||
|
hadoop_version: 2.3.0
|
||||||
|
cluster_template_id: some_cluster_template_id
|
||||||
|
image: some_image
|
||||||
|
key_name: admin
|
||||||
|
neutron_management_network: some_network
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class FakeCluster(object):
|
||||||
|
def __init__(self, status='Active'):
|
||||||
|
self.status = status
|
||||||
|
self.id = "some_id"
|
||||||
|
self.name = "super-cluster"
|
||||||
|
self.info = {"HDFS": {"NameNode": "hdfs://hostname:port",
|
||||||
|
"Web UI": "http://host_ip:port"}}
|
||||||
|
|
||||||
|
|
||||||
|
class SaharaClusterTest(HeatTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(SaharaClusterTest, self).setUp()
|
||||||
|
self.patchobject(sc.constraints.CustomConstraint, '_is_valid'
|
||||||
|
).return_value = True
|
||||||
|
self.patchobject(glance.GlanceClientPlugin, 'get_image_id'
|
||||||
|
).return_value = 'some_image_id'
|
||||||
|
self.patchobject(neutron.NeutronClientPlugin, '_create')
|
||||||
|
self.patchobject(neutron.NeutronClientPlugin, 'find_neutron_resource'
|
||||||
|
).return_value = 'some_network_id'
|
||||||
|
self.sahara_mock = mock.MagicMock()
|
||||||
|
self.patchobject(sahara.SaharaClientPlugin, '_create'
|
||||||
|
).return_value = self.sahara_mock
|
||||||
|
self.cl_mgr = self.sahara_mock.clusters
|
||||||
|
self.fake_cl = FakeCluster()
|
||||||
|
|
||||||
|
self.t = template_format.parse(cluster_stack_template)
|
||||||
|
|
||||||
|
def _init_cluster(self, template):
|
||||||
|
stack = utils.parse_stack(template)
|
||||||
|
cluster = stack['super-cluster']
|
||||||
|
return cluster
|
||||||
|
|
||||||
|
def _create_cluster(self, template):
|
||||||
|
cluster = self._init_cluster(template)
|
||||||
|
self.cl_mgr.create.return_value = self.fake_cl
|
||||||
|
self.cl_mgr.get.return_value = self.fake_cl
|
||||||
|
scheduler.TaskRunner(cluster.create)()
|
||||||
|
self.assertEqual((cluster.CREATE, cluster.COMPLETE),
|
||||||
|
cluster.state)
|
||||||
|
self.assertEqual(self.fake_cl.id, cluster.resource_id)
|
||||||
|
return cluster
|
||||||
|
|
||||||
|
def test_cluster_create(self):
|
||||||
|
self._create_cluster(self.t)
|
||||||
|
expected_args = ('super-cluster', 'vanilla', '2.3.0')
|
||||||
|
expected_kwargs = {'cluster_template_id': 'some_cluster_template_id',
|
||||||
|
'user_keypair_id': 'admin',
|
||||||
|
'default_image_id': 'some_image_id',
|
||||||
|
'net_id': 'some_network_id'}
|
||||||
|
self.cl_mgr.create.assert_called_once_with(*expected_args,
|
||||||
|
**expected_kwargs)
|
||||||
|
self.cl_mgr.get.assert_called_once_with(self.fake_cl.id)
|
||||||
|
|
||||||
|
def test_cluster_delete(self):
|
||||||
|
cluster = self._create_cluster(self.t)
|
||||||
|
scheduler.TaskRunner(cluster.delete)()
|
||||||
|
self.assertEqual((cluster.DELETE, cluster.COMPLETE),
|
||||||
|
cluster.state)
|
||||||
|
self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id)
|
||||||
|
|
||||||
|
def test_cluster_create_fails(self):
|
||||||
|
cfg.CONF.set_override('action_retry_limit', 0)
|
||||||
|
cluster = self._init_cluster(self.t)
|
||||||
|
self.cl_mgr.create.return_value = self.fake_cl
|
||||||
|
self.cl_mgr.get.return_value = FakeCluster(status='Error')
|
||||||
|
create_task = scheduler.TaskRunner(cluster.create)
|
||||||
|
ex = self.assertRaises(exception.ResourceFailure, create_task)
|
||||||
|
expected = 'ResourceInError: Went to status Error due to "Unknown"'
|
||||||
|
self.assertEqual(expected, six.text_type(ex))
|
||||||
|
|
||||||
|
def test_cluster_delete_fails(self):
|
||||||
|
cluster = self._create_cluster(self.t)
|
||||||
|
self.cl_mgr.delete.side_effect = sahara.sahara_base.APIException()
|
||||||
|
delete_task = scheduler.TaskRunner(cluster.delete)
|
||||||
|
ex = self.assertRaises(exception.ResourceFailure, delete_task)
|
||||||
|
expected = "APIException: None"
|
||||||
|
self.assertEqual(expected, six.text_type(ex))
|
||||||
|
self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id)
|
||||||
|
|
||||||
|
def test_cluster_not_found_in_delete(self):
|
||||||
|
cluster = self._create_cluster(self.t)
|
||||||
|
self.cl_mgr.delete.side_effect = sahara.sahara_base.APIException(
|
||||||
|
error_code=404)
|
||||||
|
scheduler.TaskRunner(cluster.delete)()
|
||||||
|
self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id)
|
||||||
|
|
||||||
|
def test_cluster_resolve_attribute(self):
|
||||||
|
cluster = self._create_cluster(self.t)
|
||||||
|
self.cl_mgr.get.reset_mock()
|
||||||
|
self.assertEqual(self.fake_cl.info,
|
||||||
|
cluster._resolve_attribute('info'))
|
||||||
|
self.assertEqual(self.fake_cl.status,
|
||||||
|
cluster._resolve_attribute('status'))
|
||||||
|
self.assertEqual(2, self.cl_mgr.get.call_count)
|
||||||
|
|
||||||
|
def test_cluster_resource_mapping(self):
|
||||||
|
cluster = self._init_cluster(self.t)
|
||||||
|
mapping = sc.resource_mapping()
|
||||||
|
self.assertEqual(1, len(mapping))
|
||||||
|
self.assertEqual(sc.SaharaCluster,
|
||||||
|
mapping['OS::Sahara::Cluster'])
|
||||||
|
self.assertIsInstance(cluster, sc.SaharaCluster)
|
||||||
|
|
||||||
|
def test_cluster_create_no_image_anywhere_fails(self):
|
||||||
|
self.t['resources']['super-cluster']['properties'].pop('image')
|
||||||
|
self.sahara_mock.cluster_templates.get.return_value = mock.Mock(
|
||||||
|
default_image_id=None)
|
||||||
|
cluster = self._init_cluster(self.t)
|
||||||
|
ex = self.assertRaises(exception.ResourceFailure,
|
||||||
|
scheduler.TaskRunner(cluster.create))
|
||||||
|
self.assertIsInstance(ex.exc, exception.StackValidationFailed)
|
||||||
|
self.assertIn("image must be provided: "
|
||||||
|
"Referenced cluster template some_cluster_template_id "
|
||||||
|
"has no default_image_id defined.",
|
||||||
|
six.text_type(ex.message))
|
||||||
|
|
||||||
|
def test_cluster_validate_no_network_on_neutron_fails(self):
|
||||||
|
self.t['resources']['super-cluster']['properties'].pop(
|
||||||
|
'neutron_management_network')
|
||||||
|
cluster = self._init_cluster(self.t)
|
||||||
|
self.patchobject(cluster, 'is_using_neutron', return_value=True)
|
||||||
|
ex = self.assertRaises(exception.StackValidationFailed,
|
||||||
|
cluster.validate)
|
||||||
|
self.assertEqual("neutron_management_network must be provided",
|
||||||
|
six.text_type(ex))
|
Loading…
Reference in New Issue
Block a user