Sahara aims to provide users with simple means to provision a Hadoop cluster by specifying several parameters like Hadoop version, cluster topology, nodes hardware details and a few more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clusters.py 5.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # Copyright (c) 2016 Red Hat, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from oslo_utils import excutils
  16. import six
  17. from sahara import conductor as c
  18. from sahara import context
  19. from sahara.plugins import base as plugin_base
  20. from sahara.service import api
  21. from sahara.service.health import verification_base
  22. from sahara.service import quotas
  23. from sahara.utils import cluster as c_u
  24. from sahara.utils import general as g
  25. from sahara.utils.notification import sender
  26. conductor = c.API
  27. # Cluster ops
  28. def get_clusters(**kwargs):
  29. return conductor.cluster_get_all(context.ctx(),
  30. regex_search=True, **kwargs)
  31. def get_cluster(id, show_progress=False):
  32. return conductor.cluster_get(context.ctx(), id, show_progress)
  33. def scale_cluster(id, data):
  34. context.set_current_cluster_id(id)
  35. ctx = context.ctx()
  36. cluster = conductor.cluster_get(ctx, id)
  37. plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
  38. existing_node_groups = data.get('resize_node_groups', [])
  39. additional_node_groups = data.get('add_node_groups', [])
  40. # the next map is the main object we will work with
  41. # to_be_enlarged : {node_group_id: desired_amount_of_instances}
  42. to_be_enlarged = {}
  43. node_group_instance_map = {}
  44. for ng in existing_node_groups:
  45. ng_id = g.find(cluster.node_groups, name=ng['name'])['id']
  46. to_be_enlarged.update({ng_id: ng['count']})
  47. if 'instances' in ng:
  48. node_group_instance_map.update({ng_id: ng['instances']})
  49. additional = construct_ngs_for_scaling(cluster, additional_node_groups)
  50. cluster = conductor.cluster_get(ctx, cluster)
  51. _add_ports_for_auto_sg(ctx, cluster, plugin)
  52. try:
  53. cluster = c_u.change_cluster_status(
  54. cluster, c_u.CLUSTER_STATUS_VALIDATING)
  55. quotas.check_scaling(cluster, to_be_enlarged, additional)
  56. plugin.recommend_configs(cluster, scaling=True)
  57. plugin.validate_scaling(cluster, to_be_enlarged, additional)
  58. except Exception as e:
  59. with excutils.save_and_reraise_exception():
  60. c_u.clean_cluster_from_empty_ng(cluster)
  61. c_u.change_cluster_status(
  62. cluster, c_u.CLUSTER_STATUS_ACTIVE, six.text_type(e))
  63. # If we are here validation is successful.
  64. # So let's update to_be_enlarged map:
  65. to_be_enlarged.update(additional)
  66. for node_group in cluster.node_groups:
  67. if node_group.id not in to_be_enlarged:
  68. to_be_enlarged[node_group.id] = node_group.count
  69. api.OPS.provision_scaled_cluster(id, to_be_enlarged,
  70. node_group_instance_map)
  71. return cluster
  72. def create_cluster(values):
  73. plugin = plugin_base.PLUGINS.get_plugin(values['plugin_name'])
  74. return _cluster_create(values, plugin)
  75. def create_multiple_clusters(values):
  76. num_of_clusters = values['count']
  77. clusters = []
  78. plugin = plugin_base.PLUGINS.get_plugin(values['plugin_name'])
  79. for counter in range(num_of_clusters):
  80. cluster_dict = values.copy()
  81. cluster_name = cluster_dict['name']
  82. cluster_dict['name'] = get_multiple_cluster_name(num_of_clusters,
  83. cluster_name,
  84. counter + 1)
  85. cluster = _cluster_create(cluster_dict, plugin).to_wrapped_dict()
  86. clusters.append(cluster)
  87. clusters_dict = {'clusters': clusters}
  88. return clusters_dict
  89. def _cluster_create(values, plugin):
  90. ctx = context.ctx()
  91. cluster = conductor.cluster_create(ctx, values)
  92. context.set_current_cluster_id(cluster.id)
  93. sender.status_notify(cluster.id, cluster.name, "New",
  94. "create")
  95. _add_ports_for_auto_sg(ctx, cluster, plugin)
  96. # validating cluster
  97. try:
  98. plugin.recommend_configs(cluster)
  99. cluster = c_u.change_cluster_status(
  100. cluster, c_u.CLUSTER_STATUS_VALIDATING)
  101. quotas.check_cluster(cluster)
  102. plugin.validate(cluster)
  103. except Exception as e:
  104. with excutils.save_and_reraise_exception():
  105. c_u.change_cluster_status(
  106. cluster, c_u.CLUSTER_STATUS_ERROR, six.text_type(e))
  107. api.OPS.provision_cluster(cluster.id)
  108. return cluster
  109. def get_multiple_cluster_name(num_of_clusters, name, counter):
  110. return "%%s-%%0%dd" % len(str(num_of_clusters)) % (name, counter)
  111. def _add_ports_for_auto_sg(ctx, cluster, plugin):
  112. for ng in cluster.node_groups:
  113. if ng.auto_security_group:
  114. ports = {'open_ports': plugin.get_open_ports(ng)}
  115. conductor.node_group_update(ctx, ng, ports)
  116. def terminate_cluster(id, force=False):
  117. context.set_current_cluster_id(id)
  118. cluster = c_u.change_cluster_status(id, c_u.CLUSTER_STATUS_DELETING)
  119. if cluster is None:
  120. return
  121. api.OPS.terminate_cluster(id, force)
  122. sender.status_notify(cluster.id, cluster.name, cluster.status,
  123. "delete")
  124. def update_cluster(id, values):
  125. if verification_base.update_verification_required(values):
  126. api.OPS.handle_verification(id, values)
  127. return conductor.cluster_get(context.ctx(), id)
  128. return conductor.cluster_update(context.ctx(), id, values)
  129. def construct_ngs_for_scaling(cluster, additional_node_groups):
  130. ctx = context.ctx()
  131. additional = {}
  132. for ng in additional_node_groups:
  133. count = ng['count']
  134. ng['count'] = 0
  135. ng_id = conductor.node_group_add(ctx, cluster, ng)
  136. additional.update({ng_id: count})
  137. return additional