diff --git a/etc/rest-api-samples/scale_cluster.json b/etc/rest-api-samples/scale_cluster.json new file mode 100644 index 00000000..1b9db90d --- /dev/null +++ b/etc/rest-api-samples/scale_cluster.json @@ -0,0 +1,18 @@ +{ + "add_node_groups":[ + { + "node_group_template_id": "9e1225e0-b7ce-425b-9555-3a61166167c8", + "count":1, + "name": "test_ng" + + } + ], + + "resize_node_groups":[ + { + "name" : "worker", + "count":2 + + } + ] +} \ No newline at end of file diff --git a/savanna/api/v10.py b/savanna/api/v10.py index 5237881b..c04d8e43 100644 --- a/savanna/api/v10.py +++ b/savanna/api/v10.py @@ -40,18 +40,18 @@ def clusters_create(data): return u.render(api.create_cluster(data).wrapped_dict) +@rest.put('/clusters/') +@v.check_exists(api.get_cluster, 'cluster_id') +def clusters_scale(cluster_id, data): + return u.render(api.scale_cluster(cluster_id, data).wrapped_dict) + + @rest.get('/clusters/') @v.check_exists(api.get_cluster, 'cluster_id') def clusters_get(cluster_id): return u.render(api.get_cluster(id=cluster_id).wrapped_dict) -@rest.put('/clusters/') -@v.check_exists(api.get_cluster, 'cluster_id') -def clusters_update(cluster_id): - return _not_implemented() - - @rest.delete('/clusters/') @v.check_exists(api.get_cluster, 'cluster_id') def clusters_delete(cluster_id): diff --git a/savanna/plugins/provisioning.py b/savanna/plugins/provisioning.py index 5442cf26..bdec8b5e 100644 --- a/savanna/plugins/provisioning.py +++ b/savanna/plugins/provisioning.py @@ -49,6 +49,9 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def validate(self, cluster): pass + def validate_scaling(self, cluster, existing, additional): + pass + def update_infra(self, cluster): pass @@ -60,6 +63,9 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def start_cluster(self, cluster): pass + def scale_cluster(self, cluster, instances): + pass + def convert(self, hadoop_version, config_file): pass diff --git a/savanna/plugins/vanilla/exceptions.py b/savanna/plugins/vanilla/exceptions.py index f2ac068c..5232051f 100644 --- a/savanna/plugins/vanilla/exceptions.py +++ b/savanna/plugins/vanilla/exceptions.py @@ -34,3 +34,19 @@ class TaskTrackersWithoutJobTracker(e.SavannaException): def __init__(self): self.message = "TaskTrackers cannot be configures without JobTracker" self.code = "TASK_TRACKERS_WITHOUT_JOB_TRACKER" + + +class NodeGroupsDoNotExist(e.SavannaException): + def __init__(self, ng_names): + names = ''.join(ng_names) + self.message = "Cluster does not contain required node groups " +\ + names + self.code = "NODE_GROUP_DOES_NOT_EXIST" + + +class NodeGroupCannotBeScaled(e.SavannaException): + def __init__(self, ng_name): + self.message = "Chosen node group " + ng_name + " cannot be " \ + "scaled : Vanilla plugin supports only datanode and " \ + "tasktracker scaling" + self.code = "NODE_GROUP_CANNOT_BE_SCALED" diff --git a/savanna/plugins/vanilla/plugin.py b/savanna/plugins/vanilla/plugin.py index ddaaeead..29421660 100644 --- a/savanna/plugins/vanilla/plugin.py +++ b/savanna/plugins/vanilla/plugin.py @@ -74,15 +74,14 @@ class VanillaProvider(p.ProvisioningPluginBase): pass def configure_cluster(self, cluster): - for ng in cluster.node_groups: - for inst in ng.instances: - inst.remote.execute_command( - 'sudo chown -R $USER:$USER /etc/hadoop' - ) - self._extract_configs(cluster) self._push_configs_to_nodes(cluster) - self._write_hadoop_user_keys(cluster) + self._write_hadoop_user_keys(cluster.private_key, + utils.get_instances(cluster)) + + nn = utils.get_namenode(cluster) + nn.remote.execute_command( + "sudo su -c 'hadoop namenode -format' hadoop") def start_cluster(self, cluster): nn_instance = utils.get_namenode(cluster) @@ -121,26 +120,76 @@ class VanillaProvider(p.ProvisioningPluginBase): ) } - def _push_configs_to_nodes(self, cluster): + def validate_scaling(self, cluster, existing, additional): + ng_names = existing.copy() + allowed = ["datanode", "tasktracker"] + #validate existing n_g scaling at first: for ng in cluster.node_groups: + if ng.name in ng_names: + del ng_names[ng.name] + if not set(ng.node_processes).issubset(allowed): + raise ex.NodeGroupCannotBeScaled(ng.name) + if len(ng_names) != 0: + raise ex.NodeGroupsDoNotExist(ng_names.keys()) + #validate additional n_g + jt = utils.get_jobtracker(cluster) + nn = utils.get_namenode(cluster) + for ng in additional: + if (not set(ng.node_processes).issubset(allowed)) or ( + not jt and 'tasktracker' in ng.node_processes) or ( + not nn and 'datanode' in ng.node_processes): + raise ex.NodeGroupCannotBeScaled(ng.name) + + def scale_cluster(self, cluster, instances): + self._extract_configs(cluster) + self._push_configs_to_nodes(cluster, instances=instances) + self._write_hadoop_user_keys(cluster.private_key, + instances) + + for i in instances: + with i.remote as remote: + if "datanode" in i.node_group.node_processes: + remote.execute_command('sudo su -c ' + '"/usr/sbin/hadoop-daemon.sh ' + 'start datanode" hadoop' + '>> /tmp/savanna-start-datanode.log' + ' 2>&1') + + if "tasktracker" in i.node_group.node_processes: + remote.execute_command('sudo su -c ' + '"/usr/sbin/hadoop-daemon.sh ' + 'start tasktracker" hadoop' + '>> /tmp/savanna-start-' + 'tasktracker.log 2>&1') + + def _push_configs_to_nodes(self, cluster, instances=None): + if not instances: + instances = utils.get_instances(cluster) + + for inst in instances: files = { - '/etc/hadoop/core-site.xml': ng.extra['xml']['core-site'], - '/etc/hadoop/mapred-site.xml': ng.extra['xml']['mapred-site'], - '/etc/hadoop/hdfs-site.xml': ng.extra['xml']['hdfs-site'], - '/tmp/savanna-hadoop-init.sh': ng.extra['setup_script'] + '/etc/hadoop/core-site.xml': inst.node_group.extra['xml'][ + 'core-site'], + '/etc/hadoop/mapred-site.xml': inst.node_group.extra['xml'][ + 'mapred-site'], + '/etc/hadoop/hdfs-site.xml': inst.node_group.extra['xml'][ + 'hdfs-site'], + '/tmp/savanna-hadoop-init.sh': inst.node_group.extra[ + 'setup_script'] } - for inst in ng.instances: - inst.remote.write_files_to(files) - inst.remote.execute_command( + with inst.remote as r: + r.execute_command( + 'sudo chown -R $USER:$USER /etc/hadoop' + ) + r.write_files_to(files) + r.execute_command( 'sudo chmod 0500 /tmp/savanna-hadoop-init.sh' ) - inst.remote.execute_command( + r.execute_command( 'sudo /tmp/savanna-hadoop-init.sh ' '>> /tmp/savanna-hadoop-init.log 2>&1') - nn = utils.get_namenode(cluster) jt = utils.get_jobtracker(cluster) - nn.remote.write_files_to({ '/etc/hadoop/slaves': utils.generate_host_names( utils.get_datanodes(cluster)), @@ -148,9 +197,6 @@ class VanillaProvider(p.ProvisioningPluginBase): utils.get_secondarynamenodes(cluster)) }) - nn.remote.execute_command( - "sudo su -c 'hadoop namenode -format' hadoop") - if jt and nn.instance_id != jt.instance_id: jt.remote.write_file_to('/etc/hadoop/slaves', utils.generate_host_names( @@ -171,9 +217,8 @@ class VanillaProvider(p.ProvisioningPluginBase): 'Web UI': 'http://%s:50070' % nn.management_ip } - def _write_hadoop_user_keys(self, cluster): - private_key = cluster.private_key - public_key = crypto.private_key_to_public_key(cluster.private_key) + def _write_hadoop_user_keys(self, private_key, instances): + public_key = crypto.private_key_to_public_key(private_key) files = { 'id_rsa': private_key, @@ -185,8 +230,7 @@ class VanillaProvider(p.ProvisioningPluginBase): 'sudo chown -R hadoop:hadoop /home/hadoop/.ssh; ' \ 'sudo chmod 600 /home/hadoop/.ssh/{id_rsa,authorized_keys}' - for node_group in cluster.node_groups: - for instance in node_group.instances: - with instance.remote as remote: - remote.write_files_to(files) - remote.execute_command(mv_cmd) + for instance in instances: + with instance.remote as remote: + remote.write_files_to(files) + remote.execute_command(mv_cmd) diff --git a/savanna/service/api.py b/savanna/service/api.py index 97d9070a..3fa90280 100644 --- a/savanna/service/api.py +++ b/savanna/service/api.py @@ -33,6 +33,41 @@ get_clusters = s.get_clusters get_cluster = s.get_cluster +def scale_cluster(cluster_id, data): + cluster = get_cluster(id=cluster_id) + plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) + existing_node_groups = data.get('resize_node_groups', []) + additional_node_groups = data.get('add_node_groups', []) + + #the next map is the main object we will work with + #to_be_enlarged : {node_group_name: desired_amount_of_instances} + to_be_enlarged = {} + for ng in existing_node_groups: + to_be_enlarged.update({ng['name']: ng['count']}) + + additional = construct_ngs_for_scaling(additional_node_groups) + + try: + cluster.status = 'Validating' + context.model_save(cluster) + _validate_cluster(cluster, plugin, additional) + plugin.validate_scaling(cluster, to_be_enlarged, additional) + except Exception: + with excutils.save_and_reraise_exception(): + cluster.status = 'Active' + context.model_save(cluster) + + # If we are here validation is successful. + # So let's update bd and to_be_enlarged map: + for add_n_g in additional: + cluster.node_groups.append(add_n_g) + to_be_enlarged.update({add_n_g.name: additional[add_n_g]}) + context.model_save(cluster) + + context.spawn(_provision_nodes, cluster_id, to_be_enlarged) + return cluster + + def create_cluster(values): cluster = s.create_cluster(values) plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) @@ -55,6 +90,24 @@ def create_cluster(values): return cluster +#node_group_names_map = {node_group_name:desired_amount_of_instances} +def _provision_nodes(cluster_id, node_group_names_map): + cluster = get_cluster(id=cluster_id) + plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) + + cluster.status = 'Scaling' + context.model_save(cluster) + instances = i.scale_cluster(cluster, node_group_names_map) + + cluster.status = 'Configuring' + context.model_save(cluster) + plugin.scale_cluster(cluster, instances) + + # cluster is now up and ready + cluster.status = 'Active' + context.model_save(cluster) + + def _provision_cluster(cluster_id): cluster = get_cluster(id=cluster_id) plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) @@ -145,27 +198,60 @@ def convert_to_cluster_template(plugin_name, version, config_file): def _validate(cluster, plugin): # Validate that user configs are not included in plugin configs set + pl_confs = _get_plugin_configs(cluster, plugin) + for ng in cluster.node_groups: + _validate_node_group(pl_confs, ng) + + +def _validate_cluster(cluster, plugin, node_groups): + # Validate that user configs are not included in plugin configs set + pl_confs = _get_plugin_configs(cluster, plugin) + for ng in node_groups: + ng.cluster = cluster + _validate_node_group(pl_confs, ng) + ng.cluster = None + + +def _get_plugin_configs(cluster, plugin): pl_confs = {} for config in plugin.get_configs(cluster.hadoop_version): if pl_confs.get(config.applicable_target): pl_confs[config.applicable_target].append(config.name) else: pl_confs[config.applicable_target] = [config.name] + return pl_confs - for ng in cluster.node_groups: - for app_target, configs in ng.configuration.items(): - if app_target not in pl_confs: - raise RuntimeError("Plugin doesn't contain applicable " - "target '%s'" % app_target) - for name, values in configs.items(): - if name not in pl_confs[app_target]: - raise RuntimeError("Plugin's applicable target '%s' " - "doesn't contain config with name '%s'" - % (app_target, name)) +def _validate_node_group(pl_confs, node_group): + for app_target, configs in node_group.configuration.items(): + if app_target not in pl_confs: + raise RuntimeError("Plugin doesn't contain applicable " + "target '%s'" % app_target) + for name, values in configs.items(): + if name not in pl_confs[app_target]: + raise RuntimeError("Plugin's applicable target '%s' " + "doesn't contain config with name '%s'" + % (app_target, name)) + + +def construct_ngs_for_scaling(additional_node_groups): + additional = {} + for ng in additional_node_groups: + tmpl_id = ng['node_group_template_id'] + count = ng['count'] + if tmpl_id: + tmpl = get_node_group_template(id=tmpl_id) + node_group = tmpl.to_object(ng, m.NodeGroup) + else: + node_group = m.NodeGroup(**ng) + #need to set 0 because tmpl.to_object overwrote count + node_group.count = 0 + additional.update({node_group: count}) + return additional ## Image Registry + def get_images(tags): return nova.client().images.list_registered(tags) diff --git a/savanna/service/instances.py b/savanna/service/instances.py index 15645e1c..2368339f 100644 --- a/savanna/service/instances.py +++ b/savanna/service/instances.py @@ -46,30 +46,81 @@ def create_cluster(cluster): _rollback_cluster_creation(cluster, ex) +#node_group_names: {node_group_name:desired_amount_of_instances} +def scale_cluster(cluster, node_group_names_map): + #Now let's work with real node_groups, not names: + node_groups_map = {} + for ng in cluster.node_groups: + if ng.name in node_group_names_map: + node_groups_map.update({ng: node_group_names_map[ng.name]}) + + try: + instances_list = _create_cluster_instances( + cluster, node_groups_map) + _await_instances(cluster) + except Exception as ex: + LOG.warn("Can't scale cluster: %s", ex) + with excutils.save_and_reraise_exception(): + _rollback_cluster_scaling(instances_list) + instances_list = [] + cluster.status = 'Active' + context.model_save(cluster) + + # we should be here with valid cluster: if instances creation + # was not successful all extra-instances will be removed above + _configure_instances(cluster) + return instances_list + + def _create_instances(cluster): - """Create all instances using nova client and persist them into DB.""" - session = context.ctx().session aa_groups = _generate_anti_affinity_groups(cluster) for node_group in cluster.node_groups: userdata = _generate_user_data_script(node_group) for idx in xrange(1, node_group.count + 1): - name = '%s-%s-%03d' % (cluster.name, node_group.name, idx) - aa_group = node_group.anti_affinity_group - ids = aa_groups[aa_group] - hints = {'different_host': list(ids)} if ids else None + _run_instance(cluster, node_group, idx, aa_groups, userdata) - nova_instance = nova.client().servers.create( - name, node_group.get_image_id(), node_group.flavor_id, - scheduler_hints=hints, userdata=userdata, - key_name=cluster.user_keypair_id) - with session.begin(): - instance = m.Instance(node_group.id, nova_instance.id, name) - node_group.instances.append(instance) - session.add(instance) +def _create_cluster_instances(cluster, node_groups_map): + aa_groups = _generate_anti_affinity_groups(cluster) + instances = [] + for node_group in node_groups_map: + count = node_groups_map[node_group] + userdata = _generate_user_data_script(node_group) + for idx in xrange(node_group.count + 1, count + 1): + instance = _run_instance(cluster, node_group, idx, + aa_groups, userdata) + instances.append(instance) - if aa_group: - aa_groups[aa_group].append(nova_instance.id) + node_group.count = count + context.model_save(node_group) + context.model_save(cluster) + + return instances + + +#extracted method. Return +def _run_instance(cluster, node_group, idx, aa_groups, userdata): + """Create instance using nova client and persist them into DB.""" + session = context.ctx().session + name = '%s-%s-%03d' % (cluster.name, node_group.name, idx) + aa_group = node_group.anti_affinity_group + ids = aa_groups[aa_group] + hints = {'different_host': list(ids)} if ids else None + context.model_save(node_group) + nova_instance = nova.client().servers.create( + name, node_group.get_image_id(), node_group.flavor_id, + scheduler_hints=hints, userdata=userdata, + key_name=cluster.user_keypair_id) + + with session.begin(): + instance = m.Instance(node_group.id, nova_instance.id, name) + node_group.instances.append(instance) + session.add(instance) + + if aa_group: + aa_groups[aa_group].append(nova_instance.id) + + return instance def _generate_user_data_script(node_group): @@ -77,7 +128,6 @@ def _generate_user_data_script(node_group): echo "%(public_key)s" >> %(user_home)s/.ssh/authorized_keys echo "%(private_key)s" > %(user_home)s/.ssh/id_rsa """ - cluster = node_group.cluster if node_group.username == "root": user_home = "/root/" @@ -169,13 +219,8 @@ def _rollback_cluster_creation(cluster, ex): """Shutdown all instances and update cluster status.""" # update cluster status # update cluster status description - _shutdown_instances(cluster, True) - - -def _shutdown_instances(cluster, quiet=False): - """Shutdown all instances related to the specified cluster.""" session = context.ctx().session - + _shutdown_instances(cluster, True) alive_instances = set([srv.id for srv in nova.client().servers.list()]) for node_group in cluster.node_groups: @@ -186,6 +231,34 @@ def _shutdown_instances(cluster, quiet=False): session.delete(instance) +def _rollback_cluster_scaling(instances): + #if some nodes are up we should shut them down and update "count" in + # node_group + session = context.ctx().session + for i in instances: + ng = i.node_group + _shutdown_instance(i) + ng.count -= 1 + context.model_save(ng) + if ng.count == 0: + with session.begin(): + session.delete(ng) + + +def _shutdown_instances(cluster, quiet=False): + for node_group in cluster.node_groups: + for instance in node_group.instances: + _shutdown_instance(instance) + + +def _shutdown_instance(instance): + session = context.ctx().session + nova.client().servers.delete(instance.instance_id) + with session.begin(): + instance.node_group = None + session.delete(instance) + + def shutdown_cluster(cluster): """Shutdown specified cluster and all related resources.""" _shutdown_instances(cluster)