From 4696bd3613749d018471505f04cb9720df66497d Mon Sep 17 00:00:00 2001 From: Telles Nobrega Date: Wed, 10 Jun 2015 14:38:24 -0300 Subject: [PATCH] Allow multiple clusters creation This patch enables the creation of multiple clusters simultaneously using the same configuration. Change-Id: If739c1f5883582ab969afe1c363e35f280721c8c Partially-implements: bp simultaneously-creating-multiple-clusters --- sahara/api/v10.py | 7 +++++ sahara/service/api.py | 28 +++++++++++++++++- sahara/service/validations/clusters.py | 19 ++++++++++++ sahara/tests/unit/service/test_api.py | 40 ++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 1 deletion(-) diff --git a/sahara/api/v10.py b/sahara/api/v10.py index b427adb4..5df8da71 100644 --- a/sahara/api/v10.py +++ b/sahara/api/v10.py @@ -50,6 +50,13 @@ def clusters_create(data): return u.render(api.create_cluster(data).to_wrapped_dict()) +@rest.post('/clusters/multiple') +@acl.enforce("data-processing:clusters:create") +@v.validate(v_c.MULTIPLE_CLUSTER_SCHEMA, v_c.check_multiple_clusters_create) +def clusters_create_multiple(data): + return u.render(api.create_multiple_clusters(data)) + + @rest.put('/clusters/') @acl.enforce("data-processing:clusters:scale") @v.check_exists(api.get_cluster, 'cluster_id') diff --git a/sahara/service/api.py b/sahara/service/api.py index 77841bee..e091eed7 100644 --- a/sahara/service/api.py +++ b/sahara/service/api.py @@ -96,12 +96,34 @@ def scale_cluster(id, data): def create_cluster(values): + plugin = plugin_base.PLUGINS.get_plugin(values['plugin_name']) + return _cluster_create(values, plugin) + + +def create_multiple_clusters(values): + num_of_clusters = values['count'] + clusters = [] + plugin = plugin_base.PLUGINS.get_plugin(values['plugin_name']) + for counter in range(num_of_clusters): + cluster_dict = values.copy() + cluster_name = cluster_dict['name'] + cluster_dict['name'] = get_multiple_cluster_name(num_of_clusters, + cluster_name, + counter + 1) + cluster = _cluster_create(cluster_dict, plugin) + + clusters.append(cluster.id) + + clusters_dict = {'clusters': clusters} + return clusters_dict + + +def _cluster_create(values, plugin): ctx = context.ctx() cluster = conductor.cluster_create(ctx, values) context.set_current_cluster_id(cluster.id) sender.notify(ctx, cluster.id, cluster.name, "New", "create") - plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) _add_ports_for_auto_sg(ctx, cluster, plugin) # validating cluster @@ -119,6 +141,10 @@ def create_cluster(values): return cluster +def get_multiple_cluster_name(num_of_clusters, name, counter): + return "%%s-%%0%dd" % len(str(num_of_clusters)) % (name, counter) + + def _add_ports_for_auto_sg(ctx, cluster, plugin): for ng in cluster.node_groups: if ng.auto_security_group: diff --git a/sahara/service/validations/clusters.py b/sahara/service/validations/clusters.py index 486464f1..3a9abf4a 100644 --- a/sahara/service/validations/clusters.py +++ b/sahara/service/validations/clusters.py @@ -45,10 +45,29 @@ def _build_cluster_schema(): CLUSTER_SCHEMA = _build_cluster_schema() +MULTIPLE_CLUSTER_SCHEMA = copy.deepcopy(CLUSTER_SCHEMA) +MULTIPLE_CLUSTER_SCHEMA['properties'].update({ + "count": { + "type": "integer" + }}) +MULTIPLE_CLUSTER_SCHEMA['required'].append('count') def check_cluster_create(data, **kwargs): b.check_cluster_unique_name(data['name']) + _check_cluster_create(data) + + +def check_multiple_clusters_create(data, **kwargs): + _check_cluster_create(data) + for counter in range(data['count']): + cluster_name = api.get_multiple_cluster_name(data['count'], + data['name'], + counter + 1) + b.check_cluster_unique_name(cluster_name) + + +def _check_cluster_create(data): b.check_plugin_name_exists(data['plugin_name']) b.check_plugin_supports_version(data['plugin_name'], data['hadoop_version']) diff --git a/sahara/tests/unit/service/test_api.py b/sahara/tests/unit/service/test_api.py index cc5025f4..6af0d0df 100644 --- a/sahara/tests/unit/service/test_api.py +++ b/sahara/tests/unit/service/test_api.py @@ -181,6 +181,46 @@ class TestApi(base.SaharaWithDbTestCase): 'ops.provision_cluster', 'ops.terminate_cluster'], self.calls_order) + @mock.patch('sahara.service.quotas.check_cluster', return_value=None) + def test_create_multiple_clusters_success(self, check_cluster): + MULTIPLE_CLUSTERS = SAMPLE_CLUSTER.copy() + MULTIPLE_CLUSTERS['count'] = 2 + clusters = api.create_multiple_clusters(MULTIPLE_CLUSTERS) + self.assertEqual(2, check_cluster.call_count) + result_cluster1 = api.get_cluster(clusters['clusters'][0]) + result_cluster2 = api.get_cluster(clusters['clusters'][1]) + self.assertEqual('Active', result_cluster1.status) + self.assertEqual('Active', result_cluster2.status) + expected_count = { + 'ng_1': 1, + 'ng_2': 3, + 'ng_3': 1, + } + ng_count = 0 + for ng in result_cluster1.node_groups: + self.assertEqual(expected_count[ng.name], ng.count) + ng_count += 1 + self.assertEqual(3, ng_count) + api.terminate_cluster(result_cluster1.id) + api.terminate_cluster(result_cluster2.id) + self.assertEqual( + ['get_open_ports', 'validate', + 'ops.provision_cluster', + 'get_open_ports', 'validate', + 'ops.provision_cluster', + 'ops.terminate_cluster', + 'ops.terminate_cluster'], self.calls_order) + + @mock.patch('sahara.service.quotas.check_cluster') + def test_create_multiple_clusters_failed(self, check_cluster): + MULTIPLE_CLUSTERS = SAMPLE_CLUSTER.copy() + MULTIPLE_CLUSTERS['count'] = 2 + check_cluster.side_effect = exc.QuotaException( + 'resource', 'requested', 'available') + with testtools.ExpectedException(exc.QuotaException): + api.create_cluster(SAMPLE_CLUSTER) + self.assertEqual('Error', api.get_clusters()[0].status) + @mock.patch('sahara.service.quotas.check_cluster') def test_create_cluster_failed(self, check_cluster): check_cluster.side_effect = exc.QuotaException(