From f15f4abd69f0c27f1d163650874b3374faea5352 Mon Sep 17 00:00:00 2001 From: Alexander Ignatov Date: Mon, 10 Feb 2014 16:44:42 +0400 Subject: [PATCH] Implement OS::Sahara::Cluster resource * This patch implements resource for Hadoop cluster provisioned by Sahara based on Sahara's cluster template. * Heat template for stack based on this resource should have minimal set of fields needed to provision cluster. * To successfully provision cluster the following steps should be done: - Sahara enabled and working in your env - Sahara contains valid cluster template [1] - Sahara's image registry contains registered image with preinstalled Hadoop components [2] [1] http://docs.openstack.org/developer/sahara/restapi/rest_api_v1.0.html#create-cluster-template [2] http://docs.openstack.org/developer/sahara/devref/quickstart.html#upload-image-to-glance Co-Authored-By: Pavlo Shchelokovskyy Implements: blueprint sahara-as-heat-resource Change-Id: If470511cbb88349f47a61a35b1a19ba7c68efd83 --- heat/engine/resources/sahara_cluster.py | 185 ++++++++++++++++++++++++ heat/tests/test_sahara_cluster.py | 172 ++++++++++++++++++++++ 2 files changed, 357 insertions(+) create mode 100644 heat/engine/resources/sahara_cluster.py create mode 100644 heat/tests/test_sahara_cluster.py diff --git a/heat/engine/resources/sahara_cluster.py b/heat/engine/resources/sahara_cluster.py new file mode 100644 index 000000000..7cc018429 --- /dev/null +++ b/heat/engine/resources/sahara_cluster.py @@ -0,0 +1,185 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from heat.common import exception +from heat.engine import attributes +from heat.engine import constraints +from heat.engine import properties +from heat.engine import resource +from heat.openstack.common.gettextutils import _ +from heat.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class SaharaCluster(resource.Resource): + + PROPERTIES = ( + NAME, PLUGIN_NAME, HADOOP_VERSION, CLUSTER_TEMPLATE_ID, + KEY_NAME, IMAGE, MANAGEMENT_NETWORK, + ) = ( + 'name', 'plugin_name', 'hadoop_version', 'cluster_template_id', + 'key_name', 'image', 'neutron_management_network', + ) + + ATTRIBUTES = ( + STATUS, INFO, + ) = ( + "status", "info", + ) + + properties_schema = { + NAME: properties.Schema( + properties.Schema.STRING, + _('Hadoop cluster name.'), + ), + PLUGIN_NAME: properties.Schema( + properties.Schema.STRING, + _('Plugin name.'), + required=True, + ), + HADOOP_VERSION: properties.Schema( + properties.Schema.STRING, + _('Version of Hadoop running on instances.'), + required=True, + ), + CLUSTER_TEMPLATE_ID: properties.Schema( + properties.Schema.STRING, + _('ID of the Cluster Template used for ' + 'Node Groups and configurations.'), + required=True, + ), + KEY_NAME: properties.Schema( + properties.Schema.STRING, + _('Keypair added to instances to make them accessible for user.'), + constraints=[ + constraints.CustomConstraint('nova.keypair') + ], + ), + IMAGE: properties.Schema( + properties.Schema.STRING, + _('Name or UUID of the image used to boot Hadoop nodes.'), + constraints=[ + constraints.CustomConstraint('glance.image') + ], + ), + MANAGEMENT_NETWORK: properties.Schema( + properties.Schema.STRING, + _('Name or UUID of Neutron network.'), + constraints=[ + constraints.CustomConstraint('neutron.network') + ], + ), + } + + attributes_schema = { + STATUS: attributes.Schema( + _("Cluster status."), + ), + INFO: attributes.Schema( + _("Cluster information."), + ), + } + + default_client_name = 'sahara' + + def _cluster_name(self): + name = self.properties.get(self.NAME) + if name: + return name + return self.physical_resource_name() + + def handle_create(self): + plugin_name = self.properties[self.PLUGIN_NAME] + hadoop_version = self.properties[self.HADOOP_VERSION] + cluster_template_id = self.properties[self.CLUSTER_TEMPLATE_ID] + image_id = self.properties.get(self.IMAGE) + if image_id: + image_id = self.client_plugin('glance').get_image_id(image_id) + + # check that image is provided in case when + # cluster template is missing one + cluster_template = self.client().cluster_templates.get( + cluster_template_id) + if cluster_template.default_image_id is None and not image_id: + msg = _("%(img)s must be provided: Referenced cluster template " + "%(tmpl)s has no default_image_id defined.") % { + 'img': self.IMAGE, 'tmpl': cluster_template_id} + raise exception.StackValidationFailed(message=msg) + + key_name = self.properties.get(self.KEY_NAME) + net_id = self.properties.get(self.MANAGEMENT_NETWORK) + if net_id: + net_id = self.client_plugin('neutron').find_neutron_resource( + self.properties, self.MANAGEMENT_NETWORK, 'network') + + cluster = self.client().clusters.create( + self._cluster_name(), + plugin_name, hadoop_version, + cluster_template_id=cluster_template_id, + user_keypair_id=key_name, + default_image_id=image_id, + net_id=net_id) + LOG.info(_('Cluster "%s" is being started.') % cluster.name) + self.resource_id_set(cluster.id) + return self.resource_id + + def check_create_complete(self, cluster_id): + cluster = self.client().clusters.get(cluster_id) + if cluster.status == 'Error': + raise resource.ResourceInError(resource_status=cluster.status) + + if cluster.status != 'Active': + return False + + LOG.info(_("Cluster '%s' has been created") % cluster.name) + return True + + def handle_delete(self): + if not self.resource_id: + return + + try: + self.client().clusters.delete(self.resource_id) + except Exception as ex: + self.client_plugin().ignore_not_found(ex) + + LOG.info(_("Cluster '%s' has been deleted") + % self._cluster_name()) + + def _resolve_attribute(self, name): + cluster = self.client().clusters.get(self.resource_id) + return getattr(cluster, name, None) + + def validate(self): + res = super(SaharaCluster, self).validate() + if res: + return res + + # check if running on neutron and MANAGEMENT_NETWORK missing + #NOTE(pshchelo): on nova-network with MANAGEMENT_NETWORK present + # overall stack validation will fail due to neutron.network constraint, + # although the message will be not really relevant. + if (self.is_using_neutron() and + not self.properties.get(self.MANAGEMENT_NETWORK)): + msg = _("%s must be provided" + ) % self.MANAGEMENT_NETWORK + raise exception.StackValidationFailed(message=msg) + + +def resource_mapping(): + return { + 'OS::Sahara::Cluster': SaharaCluster, + } diff --git a/heat/tests/test_sahara_cluster.py b/heat/tests/test_sahara_cluster.py new file mode 100644 index 000000000..86106cb09 --- /dev/null +++ b/heat/tests/test_sahara_cluster.py @@ -0,0 +1,172 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from oslo.config import cfg +import six + +from heat.common import exception +from heat.common import template_format +from heat.engine.clients.os import glance +from heat.engine.clients.os import neutron +from heat.engine.clients.os import sahara +from heat.engine.resources import sahara_cluster as sc +from heat.engine import scheduler +from heat.tests.common import HeatTestCase +from heat.tests import utils + + +cluster_stack_template = """ +heat_template_version: 2013-05-23 +description: Hadoop Cluster by Sahara +resources: + super-cluster: + type: OS::Sahara::Cluster + properties: + name: super-cluster + plugin_name: vanilla + hadoop_version: 2.3.0 + cluster_template_id: some_cluster_template_id + image: some_image + key_name: admin + neutron_management_network: some_network +""" + + +class FakeCluster(object): + def __init__(self, status='Active'): + self.status = status + self.id = "some_id" + self.name = "super-cluster" + self.info = {"HDFS": {"NameNode": "hdfs://hostname:port", + "Web UI": "http://host_ip:port"}} + + +class SaharaClusterTest(HeatTestCase): + def setUp(self): + super(SaharaClusterTest, self).setUp() + self.patchobject(sc.constraints.CustomConstraint, '_is_valid' + ).return_value = True + self.patchobject(glance.GlanceClientPlugin, 'get_image_id' + ).return_value = 'some_image_id' + self.patchobject(neutron.NeutronClientPlugin, '_create') + self.patchobject(neutron.NeutronClientPlugin, 'find_neutron_resource' + ).return_value = 'some_network_id' + self.sahara_mock = mock.MagicMock() + self.patchobject(sahara.SaharaClientPlugin, '_create' + ).return_value = self.sahara_mock + self.cl_mgr = self.sahara_mock.clusters + self.fake_cl = FakeCluster() + + self.t = template_format.parse(cluster_stack_template) + + def _init_cluster(self, template): + stack = utils.parse_stack(template) + cluster = stack['super-cluster'] + return cluster + + def _create_cluster(self, template): + cluster = self._init_cluster(template) + self.cl_mgr.create.return_value = self.fake_cl + self.cl_mgr.get.return_value = self.fake_cl + scheduler.TaskRunner(cluster.create)() + self.assertEqual((cluster.CREATE, cluster.COMPLETE), + cluster.state) + self.assertEqual(self.fake_cl.id, cluster.resource_id) + return cluster + + def test_cluster_create(self): + self._create_cluster(self.t) + expected_args = ('super-cluster', 'vanilla', '2.3.0') + expected_kwargs = {'cluster_template_id': 'some_cluster_template_id', + 'user_keypair_id': 'admin', + 'default_image_id': 'some_image_id', + 'net_id': 'some_network_id'} + self.cl_mgr.create.assert_called_once_with(*expected_args, + **expected_kwargs) + self.cl_mgr.get.assert_called_once_with(self.fake_cl.id) + + def test_cluster_delete(self): + cluster = self._create_cluster(self.t) + scheduler.TaskRunner(cluster.delete)() + self.assertEqual((cluster.DELETE, cluster.COMPLETE), + cluster.state) + self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id) + + def test_cluster_create_fails(self): + cfg.CONF.set_override('action_retry_limit', 0) + cluster = self._init_cluster(self.t) + self.cl_mgr.create.return_value = self.fake_cl + self.cl_mgr.get.return_value = FakeCluster(status='Error') + create_task = scheduler.TaskRunner(cluster.create) + ex = self.assertRaises(exception.ResourceFailure, create_task) + expected = 'ResourceInError: Went to status Error due to "Unknown"' + self.assertEqual(expected, six.text_type(ex)) + + def test_cluster_delete_fails(self): + cluster = self._create_cluster(self.t) + self.cl_mgr.delete.side_effect = sahara.sahara_base.APIException() + delete_task = scheduler.TaskRunner(cluster.delete) + ex = self.assertRaises(exception.ResourceFailure, delete_task) + expected = "APIException: None" + self.assertEqual(expected, six.text_type(ex)) + self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id) + + def test_cluster_not_found_in_delete(self): + cluster = self._create_cluster(self.t) + self.cl_mgr.delete.side_effect = sahara.sahara_base.APIException( + error_code=404) + scheduler.TaskRunner(cluster.delete)() + self.cl_mgr.delete.assert_called_once_with(self.fake_cl.id) + + def test_cluster_resolve_attribute(self): + cluster = self._create_cluster(self.t) + self.cl_mgr.get.reset_mock() + self.assertEqual(self.fake_cl.info, + cluster._resolve_attribute('info')) + self.assertEqual(self.fake_cl.status, + cluster._resolve_attribute('status')) + self.assertEqual(2, self.cl_mgr.get.call_count) + + def test_cluster_resource_mapping(self): + cluster = self._init_cluster(self.t) + mapping = sc.resource_mapping() + self.assertEqual(1, len(mapping)) + self.assertEqual(sc.SaharaCluster, + mapping['OS::Sahara::Cluster']) + self.assertIsInstance(cluster, sc.SaharaCluster) + + def test_cluster_create_no_image_anywhere_fails(self): + self.t['resources']['super-cluster']['properties'].pop('image') + self.sahara_mock.cluster_templates.get.return_value = mock.Mock( + default_image_id=None) + cluster = self._init_cluster(self.t) + ex = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(cluster.create)) + self.assertIsInstance(ex.exc, exception.StackValidationFailed) + self.assertIn("image must be provided: " + "Referenced cluster template some_cluster_template_id " + "has no default_image_id defined.", + six.text_type(ex.message)) + + def test_cluster_validate_no_network_on_neutron_fails(self): + self.t['resources']['super-cluster']['properties'].pop( + 'neutron_management_network') + cluster = self._init_cluster(self.t) + self.patchobject(cluster, 'is_using_neutron', return_value=True) + ex = self.assertRaises(exception.StackValidationFailed, + cluster.validate) + self.assertEqual("neutron_management_network must be provided", + six.text_type(ex))