Merge "Allow multiple clusters creation"
This commit is contained in:
commit
0c657293a3
@ -50,6 +50,13 @@ def clusters_create(data):
|
||||
return u.render(api.create_cluster(data).to_wrapped_dict())
|
||||
|
||||
|
||||
@rest.post('/clusters/multiple')
|
||||
@acl.enforce("data-processing:clusters:create")
|
||||
@v.validate(v_c.MULTIPLE_CLUSTER_SCHEMA, v_c.check_multiple_clusters_create)
|
||||
def clusters_create_multiple(data):
|
||||
return u.render(api.create_multiple_clusters(data))
|
||||
|
||||
|
||||
@rest.put('/clusters/<cluster_id>')
|
||||
@acl.enforce("data-processing:clusters:scale")
|
||||
@v.check_exists(api.get_cluster, 'cluster_id')
|
||||
|
@ -95,12 +95,34 @@ def scale_cluster(id, data):
|
||||
|
||||
|
||||
def create_cluster(values):
|
||||
plugin = plugin_base.PLUGINS.get_plugin(values['plugin_name'])
|
||||
return _cluster_create(values, plugin)
|
||||
|
||||
|
||||
def create_multiple_clusters(values):
|
||||
num_of_clusters = values['count']
|
||||
clusters = []
|
||||
plugin = plugin_base.PLUGINS.get_plugin(values['plugin_name'])
|
||||
for counter in range(num_of_clusters):
|
||||
cluster_dict = values.copy()
|
||||
cluster_name = cluster_dict['name']
|
||||
cluster_dict['name'] = get_multiple_cluster_name(num_of_clusters,
|
||||
cluster_name,
|
||||
counter + 1)
|
||||
cluster = _cluster_create(cluster_dict, plugin)
|
||||
|
||||
clusters.append(cluster.id)
|
||||
|
||||
clusters_dict = {'clusters': clusters}
|
||||
return clusters_dict
|
||||
|
||||
|
||||
def _cluster_create(values, plugin):
|
||||
ctx = context.ctx()
|
||||
cluster = conductor.cluster_create(ctx, values)
|
||||
context.set_current_cluster_id(cluster.id)
|
||||
sender.notify(ctx, cluster.id, cluster.name, "New",
|
||||
"create")
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
_add_ports_for_auto_sg(ctx, cluster, plugin)
|
||||
|
||||
# validating cluster
|
||||
@ -118,6 +140,10 @@ def create_cluster(values):
|
||||
return cluster
|
||||
|
||||
|
||||
def get_multiple_cluster_name(num_of_clusters, name, counter):
|
||||
return "%%s-%%0%dd" % len(str(num_of_clusters)) % (name, counter)
|
||||
|
||||
|
||||
def _add_ports_for_auto_sg(ctx, cluster, plugin):
|
||||
for ng in cluster.node_groups:
|
||||
if ng.auto_security_group:
|
||||
|
@ -45,10 +45,29 @@ def _build_cluster_schema():
|
||||
|
||||
|
||||
CLUSTER_SCHEMA = _build_cluster_schema()
|
||||
MULTIPLE_CLUSTER_SCHEMA = copy.deepcopy(CLUSTER_SCHEMA)
|
||||
MULTIPLE_CLUSTER_SCHEMA['properties'].update({
|
||||
"count": {
|
||||
"type": "integer"
|
||||
}})
|
||||
MULTIPLE_CLUSTER_SCHEMA['required'].append('count')
|
||||
|
||||
|
||||
def check_cluster_create(data, **kwargs):
|
||||
b.check_cluster_unique_name(data['name'])
|
||||
_check_cluster_create(data)
|
||||
|
||||
|
||||
def check_multiple_clusters_create(data, **kwargs):
|
||||
_check_cluster_create(data)
|
||||
for counter in range(data['count']):
|
||||
cluster_name = api.get_multiple_cluster_name(data['count'],
|
||||
data['name'],
|
||||
counter + 1)
|
||||
b.check_cluster_unique_name(cluster_name)
|
||||
|
||||
|
||||
def _check_cluster_create(data):
|
||||
b.check_plugin_name_exists(data['plugin_name'])
|
||||
b.check_plugin_supports_version(data['plugin_name'],
|
||||
data['hadoop_version'])
|
||||
|
@ -181,6 +181,46 @@ class TestApi(base.SaharaWithDbTestCase):
|
||||
'ops.provision_cluster',
|
||||
'ops.terminate_cluster'], self.calls_order)
|
||||
|
||||
@mock.patch('sahara.service.quotas.check_cluster', return_value=None)
|
||||
def test_create_multiple_clusters_success(self, check_cluster):
|
||||
MULTIPLE_CLUSTERS = SAMPLE_CLUSTER.copy()
|
||||
MULTIPLE_CLUSTERS['count'] = 2
|
||||
clusters = api.create_multiple_clusters(MULTIPLE_CLUSTERS)
|
||||
self.assertEqual(2, check_cluster.call_count)
|
||||
result_cluster1 = api.get_cluster(clusters['clusters'][0])
|
||||
result_cluster2 = api.get_cluster(clusters['clusters'][1])
|
||||
self.assertEqual('Active', result_cluster1.status)
|
||||
self.assertEqual('Active', result_cluster2.status)
|
||||
expected_count = {
|
||||
'ng_1': 1,
|
||||
'ng_2': 3,
|
||||
'ng_3': 1,
|
||||
}
|
||||
ng_count = 0
|
||||
for ng in result_cluster1.node_groups:
|
||||
self.assertEqual(expected_count[ng.name], ng.count)
|
||||
ng_count += 1
|
||||
self.assertEqual(3, ng_count)
|
||||
api.terminate_cluster(result_cluster1.id)
|
||||
api.terminate_cluster(result_cluster2.id)
|
||||
self.assertEqual(
|
||||
['get_open_ports', 'validate',
|
||||
'ops.provision_cluster',
|
||||
'get_open_ports', 'validate',
|
||||
'ops.provision_cluster',
|
||||
'ops.terminate_cluster',
|
||||
'ops.terminate_cluster'], self.calls_order)
|
||||
|
||||
@mock.patch('sahara.service.quotas.check_cluster')
|
||||
def test_create_multiple_clusters_failed(self, check_cluster):
|
||||
MULTIPLE_CLUSTERS = SAMPLE_CLUSTER.copy()
|
||||
MULTIPLE_CLUSTERS['count'] = 2
|
||||
check_cluster.side_effect = exc.QuotaException(
|
||||
'resource', 'requested', 'available')
|
||||
with testtools.ExpectedException(exc.QuotaException):
|
||||
api.create_cluster(SAMPLE_CLUSTER)
|
||||
self.assertEqual('Error', api.get_clusters()[0].status)
|
||||
|
||||
@mock.patch('sahara.service.quotas.check_cluster')
|
||||
def test_create_cluster_failed(self, check_cluster):
|
||||
check_cluster.side_effect = exc.QuotaException(
|
||||
|
Loading…
Reference in New Issue
Block a user