Implement OS::Sahara::Cluster resource
* This patch implements resource for Hadoop cluster provisioned by Sahara based on Sahara's cluster template. * Heat template for stack based on this resource should have minimal set of fields needed to provision cluster. * To successfully provision cluster the following steps should be done: - Sahara enabled and working in your env - Sahara contains valid cluster template [1] - Sahara's image registry contains registered image with preinstalled Hadoop components [2] [1] http://docs.openstack.org/developer/sahara/restapi/rest_api_v1.0.html#create-cluster-template [2] http://docs.openstack.org/developer/sahara/devref/quickstart.html#upload-image-to-glance Co-Authored-By: Pavlo Shchelokovskyy <pshchelokovskyy@mirantis.com> Implements: blueprint sahara-as-heat-resource Change-Id: If470511cbb88349f47a61a35b1a19ba7c68efd83
This commit is contained in:
parent
2dca2007a4
commit
f15f4abd69
185
heat/engine/resources/sahara_cluster.py
Normal file
185
heat/engine/resources/sahara_cluster.py
Normal file
@ -0,0 +1,185 @@
|
||||
# Copyright (c) 2014 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from heat.common import exception
|
||||
from heat.engine import attributes
|
||||
from heat.engine import constraints
|
||||
from heat.engine import properties
|
||||
from heat.engine import resource
|
||||
from heat.openstack.common.gettextutils import _
|
||||
from heat.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SaharaCluster(resource.Resource):
|
||||
|
||||
PROPERTIES = (
|
||||
NAME, PLUGIN_NAME, HADOOP_VERSION, CLUSTER_TEMPLATE_ID,
|
||||
KEY_NAME, IMAGE, MANAGEMENT_NETWORK,
|
||||
) = (
|
||||
'name', 'plugin_name', 'hadoop_version', 'cluster_template_id',
|
||||
'key_name', 'image', 'neutron_management_network',
|
||||
)
|
||||
|
||||
ATTRIBUTES = (
|
||||
STATUS, INFO,
|
||||
) = (
|
||||
"status", "info",
|
||||
)
|
||||
|
||||
properties_schema = {
|
||||
NAME: properties.Schema(
|
||||
properties.Schema.STRING,
|
||||
_('Hadoop cluster name.'),
|
||||
),
|
||||
PLUGIN_NAME: properties.Schema(
|
||||
properties.Schema.STRING,
|
||||
_('Plugin name.'),
|
||||
required=True,
|
||||
),
|
||||
HADOOP_VERSION: properties.Schema(
|
||||
properties.Schema.STRING,
|
||||
_('Version of Hadoop running on instances.'),
|
||||
required=True,
|
||||
),
|
||||
CLUSTER_TEMPLATE_ID: properties.Schema(
|
||||
properties.Schema.STRING,
|
||||
_('ID of the Cluster Template used for '
|
||||
'Node Groups and configurations.'),
|
||||
required=True,
|
||||
),
|
||||
KEY_NAME: properties.Schema(
|
||||
properties.Schema.STRING,
|
||||
_('Keypair added to instances to make them accessible for user.'),
|
||||
constraints=[
|
||||
constraints.CustomConstraint('nova.keypair')
|
||||
],
|
||||
),
|
||||
IMAGE: properties.Schema(
|
||||
properties.Schema.STRING,
|
||||
_('Name or UUID of the image used to boot Hadoop nodes.'),
|
||||
constraints=[
|
||||
constraints.CustomConstraint('glance.image')
|
||||
],
|
||||
),
|
||||
MANAGEMENT_NETWORK: properties.Schema(
|
||||
properties.Schema.STRING,
|
||||
_('Name or UUID of Neutron network.'),
|
||||
constraints=[
|
||||
constraints.CustomConstraint('neutron.network')
|
||||
],
|
||||
),
|
||||
}
|
||||
|
||||
attributes_schema = {
|
||||
STATUS: attributes.Schema(
|
||||
_("Cluster status."),
|
||||
),
|
||||
INFO: attributes.Schema(
|
||||
_("Cluster information."),
|
||||
),
|
||||
}
|
||||
|
||||
default_client_name = 'sahara'
|
||||
|
||||
def _cluster_name(self):
|
||||
name = self.properties.get(self.NAME)
|
||||
if name:
|
||||
return name
|
||||
return self.physical_resource_name()
|
||||
|
||||
def handle_create(self):
|
||||
plugin_name = self.properties[self.PLUGIN_NAME]
|
||||
hadoop_version = self.properties[self.HADOOP_VERSION]
|
||||
cluster_template_id = self.properties[self.CLUSTER_TEMPLATE_ID]
|
||||
image_id = self.properties.get(self.IMAGE)
|
||||
if image_id:
|
||||
image_id = self.client_plugin('glance').get_image_id(image_id)
|
||||
|
||||
# check that image is provided in case when
|
||||
# cluster template is missing one
|
||||
cluster_template = self.client().cluster_templates.get(
|
||||
cluster_template_id)
|
||||
if cluster_template.default_image_id is None and not image_id:
|
||||
msg = _("%(img)s must be provided: Referenced cluster template "
|
||||
"%(tmpl)s has no default_image_id defined.") % {
|
||||
'img': self.IMAGE, 'tmpl': cluster_template_id}
|
||||
raise exception.StackValidationFailed(message=msg)
|
||||
|
||||
key_name = self.properties.get(self.KEY_NAME)
|
||||
net_id = self.properties.get(self.MANAGEMENT_NETWORK)
|
||||
if net_id:
|
||||
net_id = self.client_plugin('neutron').find_neutron_resource(
|
||||
self.properties, self.MANAGEMENT_NETWORK, 'network')
|
||||
|
||||
cluster = self.client().clusters.create(
|
||||
self._cluster_name(),
|
||||
plugin_name, hadoop_version,
|
||||
cluster_template_id=cluster_template_id,
|
||||
user_keypair_id=key_name,
|
||||
default_image_id=image_id,
|
||||
net_id=net_id)
|
||||
LOG.info(_('Cluster "%s" is being started.') % cluster.name)
|
||||
self.resource_id_set(cluster.id)
|
||||
return self.resource_id
|
||||
|
||||
def check_create_complete(self, cluster_id):
|
||||
cluster = self.client().clusters.get(cluster_id)
|
||||
if cluster.status == 'Error':
|
||||
raise resource.ResourceInError(resource_status=cluster.status)
|
||||
|
||||
if cluster.status != 'Active':
|
||||
return False
|
||||
|
||||
LOG.info(_("Cluster '%s' has been created") % cluster.name)
|
||||
return True
|
||||
|
||||
def handle_delete(self):
|
||||
if not self.resource_id:
|
||||
return
|
||||
|
||||
try:
|
||||
self.client().clusters.delete(self.resource_id)
|
||||
except Exception as ex:
|
||||
self.client_plugin().ignore_not_found(ex)
|
||||
|
||||
LOG.info(_("Cluster '%s' has been deleted")
|
||||
% self._cluster_name())
|
||||
|
||||
def _resolve_attribute(self, name):
|
||||
cluster = self.client().clusters.get(self.resource_id)
|
||||
return getattr(cluster, name, None)
|
||||
|
||||
def validate(self):
|
||||
res = super(SaharaCluster, self).validate()
|
||||
if res:
|
||||
return res
|
||||
|
||||
# check if running on neutron and MANAGEMENT_NETWORK missing
|
||||
#NOTE(pshchelo): on nova-network with MANAGEMENT_NETWORK present
|
||||
# overall stack validation will fail due to neutron.network constraint,
|
||||
# although the message will be not really relevant.
|
||||
if (self.is_using_neutron() and
|
||||
not self.properties.get(self.MANAGEMENT_NETWORK)):
|
||||
msg = _("%s must be provided"
|
||||
) % self.MANAGEMENT_NETWORK
|
||||
raise exception.StackValidationFailed(message=msg)
|
||||
|
||||
|
||||
def resource_mapping():
|
||||
return {
|
||||
'OS::Sahara::Cluster': SaharaCluster,
|
||||
}
|
172
heat/tests/test_sahara_cluster.py
Normal file
172
heat/tests/test_sahara_cluster.py
Normal file
@ -0,0 +1,172 @@
|
||||
# Copyright (c) 2014 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common import template_format
|
||||
from heat.engine.clients.os import glance
|
||||
from heat.engine.clients.os import neutron
|
||||
from heat.engine.clients.os import sahara
|
||||
from heat.engine.resources import sahara_cluster as sc
|
||||
from heat.engine import scheduler
|
||||
from heat.tests.common import HeatTestCase
|
||||
from heat.tests import utils
|
||||
|
||||
|
||||
cluster_stack_template = """
|
||||
heat_template_version: 2013-05-23
|
||||
description: Hadoop Cluster by Sahara
|
||||
resources:
|
||||
super-cluster:
|
||||
type: OS::Sahara::Cluster
|
||||
properties:
|
||||
name: super-cluster
|
||||
plugin_name: vanilla
|
||||
hadoop_version: 2.3.0
|
||||
cluster_template_id: some_cluster_template_id
|
||||
image: some_image
|
||||
key_name: admin
|
||||
neutron_management_network: some_network
|
||||
"""
|
||||
|
||||
|
||||
class FakeCluster(object):
|
||||
def __init__(self, status='Active'):
|
||||
self.status = status
|
||||
self.id = "some_id"
|
||||
self.name = "super-cluster"
|
||||
self.info = {"HDFS": {"NameNode": "hdfs://hostname:port",
|
||||
"Web UI": "http://host_ip:port"}}
|
||||
|
||||
|
||||
class SaharaClusterTest(HeatTestCase):
|
||||
def setUp(self):
|
||||
super(SaharaClusterTest, self).setUp()
|
||||
self.patchobject(sc.constraints.CustomConstraint, '_is_valid'
|
||||
).return_value = True
|
||||
self.patchobject(glance.GlanceClientPlugin, 'get_image_id'
|
||||
).return_value = 'some_image_id'
|
||||
self.patchobject(neutron.NeutronClientPlugin, '_create')
|
||||
self.patchobject(neutron.NeutronClientPlugin, 'find_neutron_resource'
|
||||
).return_value = 'some_network_id'
|
||||
self.sahara_mock = mock.MagicMock()
|
||||
self.patchobject(sahara.SaharaClientPlugin, '_create'
|
||||
).return_value = self.sahara_mock
|
||||
self.cl_mgr = self.sahara_mock.clusters
|
||||
self.fake_cl = FakeCluster()
|
||||
|
||||
self.t = template_format.parse(cluster_stack_template)
|
||||
|
||||
def _init_cluster(self, template):
|
||||
stack = utils.parse_stack(template)
|
||||
cluster = stack['super-cluster']
|
||||
return cluster
|
||||
|
||||
def _create_cluster(self, template):
|
||||
cluster = self._init_cluster(template)
|
||||
self.cl_mgr.create.return_value = self.fake_cl
|
||||
self.cl_mgr.get.return_value = self.fake_cl
|
||||
scheduler.TaskRunner(cluster.create)()
|
||||
self.assertEqual((cluster.CREATE, cluster.COMPLETE),
|
||||
cluster.state)
|
||||
self.assertEqual(self.fake_cl.id, cluster.resource_id)
|
||||
return cluster
|
||||
|
||||
def test_cluster_create(self):
|
||||
self._create_cluster(self.t)
|
||||
expected_args = ('super-cluster', 'vanilla', '2.3.0')
|
||||
expected_kwargs = {'cluster_template_id': 'some_cluster_template_id',
|
||||
'user_keypair_id': 'admin',
|
||||
'default_image_id': 'some_image_id',
|
||||
'net_id': 'some_network_id'}
|
||||
self.cl_mgr.create.assert_called_once_with(*expected_args,
|
||||
**expected_kwargs)
|
||||
self.cl_mgr.get.assert_called_once_with(self.fake_cl.id)
|
||||
|
||||
def test_cluster_delete(self):
|
||||
cluster = self._create_cluster(self.t)
|
||||
scheduler.TaskRunner(cluster.delete)()
|
||||
self.assertEqual((cluster.DELETE, cluster.COMPLETE),
|
||||
cluster.state)
|
||||
self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id)
|
||||
|
||||
def test_cluster_create_fails(self):
|
||||
cfg.CONF.set_override('action_retry_limit', 0)
|
||||
cluster = self._init_cluster(self.t)
|
||||
self.cl_mgr.create.return_value = self.fake_cl
|
||||
self.cl_mgr.get.return_value = FakeCluster(status='Error')
|
||||
create_task = scheduler.TaskRunner(cluster.create)
|
||||
ex = self.assertRaises(exception.ResourceFailure, create_task)
|
||||
expected = 'ResourceInError: Went to status Error due to "Unknown"'
|
||||
self.assertEqual(expected, six.text_type(ex))
|
||||
|
||||
def test_cluster_delete_fails(self):
|
||||
cluster = self._create_cluster(self.t)
|
||||
self.cl_mgr.delete.side_effect = sahara.sahara_base.APIException()
|
||||
delete_task = scheduler.TaskRunner(cluster.delete)
|
||||
ex = self.assertRaises(exception.ResourceFailure, delete_task)
|
||||
expected = "APIException: None"
|
||||
self.assertEqual(expected, six.text_type(ex))
|
||||
self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id)
|
||||
|
||||
def test_cluster_not_found_in_delete(self):
|
||||
cluster = self._create_cluster(self.t)
|
||||
self.cl_mgr.delete.side_effect = sahara.sahara_base.APIException(
|
||||
error_code=404)
|
||||
scheduler.TaskRunner(cluster.delete)()
|
||||
self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id)
|
||||
|
||||
def test_cluster_resolve_attribute(self):
|
||||
cluster = self._create_cluster(self.t)
|
||||
self.cl_mgr.get.reset_mock()
|
||||
self.assertEqual(self.fake_cl.info,
|
||||
cluster._resolve_attribute('info'))
|
||||
self.assertEqual(self.fake_cl.status,
|
||||
cluster._resolve_attribute('status'))
|
||||
self.assertEqual(2, self.cl_mgr.get.call_count)
|
||||
|
||||
def test_cluster_resource_mapping(self):
|
||||
cluster = self._init_cluster(self.t)
|
||||
mapping = sc.resource_mapping()
|
||||
self.assertEqual(1, len(mapping))
|
||||
self.assertEqual(sc.SaharaCluster,
|
||||
mapping['OS::Sahara::Cluster'])
|
||||
self.assertIsInstance(cluster, sc.SaharaCluster)
|
||||
|
||||
def test_cluster_create_no_image_anywhere_fails(self):
|
||||
self.t['resources']['super-cluster']['properties'].pop('image')
|
||||
self.sahara_mock.cluster_templates.get.return_value = mock.Mock(
|
||||
default_image_id=None)
|
||||
cluster = self._init_cluster(self.t)
|
||||
ex = self.assertRaises(exception.ResourceFailure,
|
||||
scheduler.TaskRunner(cluster.create))
|
||||
self.assertIsInstance(ex.exc, exception.StackValidationFailed)
|
||||
self.assertIn("image must be provided: "
|
||||
"Referenced cluster template some_cluster_template_id "
|
||||
"has no default_image_id defined.",
|
||||
six.text_type(ex.message))
|
||||
|
||||
def test_cluster_validate_no_network_on_neutron_fails(self):
|
||||
self.t['resources']['super-cluster']['properties'].pop(
|
||||
'neutron_management_network')
|
||||
cluster = self._init_cluster(self.t)
|
||||
self.patchobject(cluster, 'is_using_neutron', return_value=True)
|
||||
ex = self.assertRaises(exception.StackValidationFailed,
|
||||
cluster.validate)
|
||||
self.assertEqual("neutron_management_network must be provided",
|
||||
six.text_type(ex))
|
Loading…
Reference in New Issue
Block a user