From a4f27a88de089d58a0138af2cc132b0202892562 Mon Sep 17 00:00:00 2001 From: Nadya Privalova Date: Tue, 9 Jul 2013 20:12:26 +0400 Subject: [PATCH] Cluster scaling: deletion Inmplemented: * exclude/include files mechanism is implemented * adding instances was change * removing instances works ok * "slaves" and "masters" are deleted Implements blueprint delete-instances Implements blueprint get-rid-of-slaves-file Fixes: bug #1199884 Change-Id: Ie1bfaa421fce22efd3115ba13758932b6b592d7a --- savanna/db/models.py | 3 +- savanna/plugins/provisioning.py | 4 + savanna/plugins/vanilla/config_helper.py | 15 +- savanna/plugins/vanilla/plugin.py | 166 +++++++++++------- savanna/plugins/vanilla/run_scripts.py | 28 +++ savanna/plugins/vanilla/scaling.py | 94 ++++++++++ savanna/plugins/vanilla/utils.py | 4 + savanna/service/api.py | 2 +- savanna/service/instances.py | 79 ++++++--- .../service/validations/clusters_scaling.py | 8 +- .../unit/plugins/test_dfsadmin_parsing.py | 61 +++++++ .../unit/resources/dfs_admin_0_nodes.txt | 11 ++ .../unit/resources/dfs_admin_1_nodes.txt | 15 ++ .../unit/resources/dfs_admin_3_nodes.txt | 23 +++ setup.py | 3 +- 15 files changed, 420 insertions(+), 96 deletions(-) create mode 100644 savanna/plugins/vanilla/run_scripts.py create mode 100644 savanna/plugins/vanilla/scaling.py create mode 100644 savanna/tests/unit/plugins/test_dfsadmin_parsing.py create mode 100644 savanna/tests/unit/resources/dfs_admin_0_nodes.txt create mode 100644 savanna/tests/unit/resources/dfs_admin_1_nodes.txt create mode 100644 savanna/tests/unit/resources/dfs_admin_3_nodes.txt diff --git a/savanna/db/models.py b/savanna/db/models.py index 485acfbbb0..68670fde5e 100644 --- a/savanna/db/models.py +++ b/savanna/db/models.py @@ -129,7 +129,8 @@ class NodeGroup(mb.SavannaBase, NodeGroupMixin): count = sa.Column(sa.Integer, nullable=False) instances = relationship('Instance', cascade="all,delete", - backref='node_group') + backref='node_group', + order_by="Instance.instance_name") cluster_id = sa.Column(sa.String(36), sa.ForeignKey('Cluster.id')) diff --git a/savanna/plugins/provisioning.py b/savanna/plugins/provisioning.py index fd101cf9bc..0df8a2c6b7 100644 --- a/savanna/plugins/provisioning.py +++ b/savanna/plugins/provisioning.py @@ -70,6 +70,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def scale_cluster(self, cluster, instances): pass + @plugins_base.required_with_default + def decommission_nodes(self, cluster, instances): + pass + @plugins_base.optional def convert(self, hadoop_version, config_file): pass diff --git a/savanna/plugins/vanilla/config_helper.py b/savanna/plugins/vanilla/config_helper.py index 9d1e0e2838..1c4b09b836 100644 --- a/savanna/plugins/vanilla/config_helper.py +++ b/savanna/plugins/vanilla/config_helper.py @@ -128,6 +128,8 @@ def generate_xml_configs(configs, storage_path, nn_hostname, jt_hostname=None): '/lib/hadoop/hdfs/namenode'), 'dfs.data.dir': extract_hadoop_path(storage_path, '/lib/hadoop/hdfs/datanode'), + 'dfs.hosts': '/etc/hadoop/dn.incl', + 'dfs.hosts.exclude': '/etc/hadoop/dn.excl', } if jt_hostname: @@ -136,7 +138,9 @@ def generate_xml_configs(configs, storage_path, nn_hostname, jt_hostname=None): 'mapred.system.dir': extract_hadoop_path(storage_path, '/mapred/mapredsystem'), 'mapred.local.dir': extract_hadoop_path(storage_path, - '/lib/hadoop/mapred') + '/lib/hadoop/mapred'), + 'mapred.hosts': '/etc/hadoop/tt.incl', + 'mapred.hosts.exclude': '/etc/hadoop/tt.excl', } cfg.update(mr_cfg) @@ -227,3 +231,12 @@ def extract_name_values(configs): def extract_hadoop_path(lst, hadoop_dir): return ",".join([p + hadoop_dir for p in lst]) + + +def determine_cluster_config(cluster, config_name): + if config_name in cluster.cluster_configs: + cluster.cluster_configs.get(config_name) + all_conf = get_plugin_configs() + for conf in all_conf: + if conf.name == config_name: + return conf.default_value diff --git a/savanna/plugins/vanilla/plugin.py b/savanna/plugins/vanilla/plugin.py index fdc1cc9b87..d265f0da59 100644 --- a/savanna/plugins/vanilla/plugin.py +++ b/savanna/plugins/vanilla/plugin.py @@ -17,9 +17,12 @@ from savanna.openstack.common import log as logging from savanna.plugins import provisioning as p from savanna.plugins.vanilla import config_helper as c_helper from savanna.plugins.vanilla import exceptions as ex +from savanna.plugins.vanilla import run_scripts as run +from savanna.plugins.vanilla import scaling as sc from savanna.plugins.vanilla import utils from savanna.utils import crypto + LOG = logging.getLogger(__name__) @@ -79,34 +82,39 @@ class VanillaProvider(p.ProvisioningPluginBase): self._write_hadoop_user_keys(cluster.private_key, utils.get_instances(cluster)) - nn = utils.get_namenode(cluster) - nn.remote.execute_command( - "sudo su -c 'hadoop namenode -format' hadoop") - def start_cluster(self, cluster): nn_instance = utils.get_namenode(cluster) + datanodes = utils.get_datanodes(cluster) jt_instance = utils.get_jobtracker(cluster) + tasktrackers = utils.get_tasktrackers(cluster) - nn_instance.remote.execute_command( - 'sudo su -c /usr/sbin/start-dfs.sh hadoop >>' - ' /tmp/savanna-hadoop-start-dfs.log') + with nn_instance.remote as remote: + run.format_namenode(remote) + run.start_process(remote, "namenode") - LOG.info("HDFS service at '%s' has been started", nn_instance.hostname) + snns = utils.get_secondarynamenodes(cluster) + if snns: + for snn in snns: + run.start_process(snn.remote, "secondarynamenode") + for dn in datanodes: + run.start_process(dn.remote, "datanode") + LOG.info("HDFS service at '%s' has been started", + nn_instance.hostname) if jt_instance: - jt_instance.remote.execute_command( - 'sudo su -c /usr/sbin/start-mapred.sh hadoop >>' - ' /tmp/savanna-hadoop-start-mapred.log') + run.start_process(jt_instance.remote, "jobtracker") + for tt in tasktrackers: + run.start_process(tt.remote, "tasktracker") LOG.info("MapReduce service at '%s' has been started", jt_instance.hostname) LOG.info('Cluster %s has been started successfully' % cluster.name) - self._set_cluster_info(cluster) def _extract_configs(self, cluster): nn = utils.get_namenode(cluster) jt = utils.get_jobtracker(cluster) + for ng in cluster.node_groups: ng.extra = { 'xml': c_helper.generate_xml_configs(ng.configuration, @@ -120,58 +128,47 @@ class VanillaProvider(p.ProvisioningPluginBase): ) } - def validate_scaling(self, cluster, existing, additional): - ng_names = existing.copy() - allowed = ["datanode", "tasktracker"] - #validate existing n_g scaling at first: - for ng in cluster.node_groups: - #we do not support deletion now - if ng.name in ng_names: - del ng_names[ng.name] - #we do not support deletion now - if ng.count > existing[ng.name]: - raise ex.NodeGroupCannotBeScaled( - ng.name, "Vanilla plugin cannot shrink node_group") - if not set(ng.node_processes).issubset(allowed): - raise ex.NodeGroupCannotBeScaled( - ng.name, "Vanilla plugin cannot scale nodegroup" - " with processes: " + - ' '.join(ng.node_processes)) - if len(ng_names) != 0: - raise ex.NodeGroupsDoNotExist(ng_names.keys()) - #validate additional n_g - jt = utils.get_jobtracker(cluster) + def decommission_nodes(self, cluster, instances): + tts = utils.get_tasktrackers(cluster) + dns = utils.get_datanodes(cluster) + decommission_dns = False + decommission_tts = False + + for i in instances: + if 'datanode' in i.node_group.node_processes: + dns.remove(i) + decommission_dns = True + if 'tasktracker' in i.node_group.node_processes: + tts.remove(i) + decommission_tts = True + nn = utils.get_namenode(cluster) - for ng in additional: - if (not set(ng.node_processes).issubset(allowed)) or ( - not jt and 'tasktracker' in ng.node_processes) or ( - not nn and 'datanode' in ng.node_processes): - raise ex.NodeGroupCannotBeScaled( - ng.name, "Vanilla plugin cannot scale node group with " - "processes which have no master-processes run " - "in cluster") + jt = utils.get_jobtracker(cluster) + + if decommission_tts: + sc.decommission_tt(jt, instances, tts) + if decommission_dns: + sc.decommission_dn(nn, instances, dns) + + def validate_scaling(self, cluster, existing, additional): + self._validate_existing_ng_scaling(cluster, existing) + self._validate_additional_ng_scaling(cluster, additional) def scale_cluster(self, cluster, instances): self._extract_configs(cluster) self._push_configs_to_nodes(cluster, instances=instances) self._write_hadoop_user_keys(cluster.private_key, instances) + run.refresh_nodes(utils.get_namenode(cluster).remote, "dfsadmin") + run.refresh_nodes(utils.get_jobtracker(cluster).remote, "mradmin") for i in instances: with i.remote as remote: if "datanode" in i.node_group.node_processes: - remote.execute_command('sudo su -c ' - '"/usr/sbin/hadoop-daemon.sh ' - 'start datanode" hadoop' - '>> /tmp/savanna-start-datanode.log' - ' 2>&1') + run.start_process(remote, "datanode") if "tasktracker" in i.node_group.node_processes: - remote.execute_command('sudo su -c ' - '"/usr/sbin/hadoop-daemon.sh ' - 'start tasktracker" hadoop' - '>> /tmp/savanna-start-' - 'tasktracker.log 2>&1') + run.start_process(remote, "tasktracker") def _push_configs_to_nodes(self, cluster, instances=None): if instances is None: @@ -199,19 +196,19 @@ class VanillaProvider(p.ProvisioningPluginBase): r.execute_command( 'sudo /tmp/savanna-hadoop-init.sh ' '>> /tmp/savanna-hadoop-init.log 2>&1') + nn = utils.get_namenode(cluster) jt = utils.get_jobtracker(cluster) - nn.remote.write_files_to({ - '/etc/hadoop/slaves': utils.generate_host_names( - utils.get_datanodes(cluster)), - '/etc/hadoop/masters': utils.generate_host_names( - utils.get_secondarynamenodes(cluster)) - }) - if jt and nn.instance_id != jt.instance_id: - jt.remote.write_file_to('/etc/hadoop/slaves', - utils.generate_host_names( - utils.get_tasktrackers(cluster))) + with nn.remote as r: + r.write_file_to('/etc/hadoop/dn.incl', utils. + generate_fqdn_host_names( + utils.get_datanodes(cluster))) + if jt: + with jt.remote as r: + r.write_file_to('/etc/hadoop/tt.incl', utils. + generate_fqdn_host_names( + utils.get_tasktrackers(cluster))) def _set_cluster_info(self, cluster): nn = utils.get_namenode(cluster) @@ -245,3 +242,50 @@ class VanillaProvider(p.ProvisioningPluginBase): with instance.remote as remote: remote.write_files_to(files) remote.execute_command(mv_cmd) + + def _get_scalable_processes(self): + return ["datanode", "tasktracker"] + + def _validate_additional_ng_scaling(self, cluster, additional): + jt = utils.get_jobtracker(cluster) + scalable_processes = self._get_scalable_processes() + + for ng in additional: + if not set(ng.node_processes).issubset(scalable_processes): + raise ex.NodeGroupCannotBeScaled( + ng.name, "Vanilla plugin cannot scale nodegroup" + " with processes: " + + ' '.join(ng.node_processes)) + if not jt and 'tasktracker' in ng.node_processes: + raise ex.NodeGroupCannotBeScaled( + ng.name, "Vanilla plugin cannot scale node group with " + "processes which have no master-processes run " + "in cluster") + + def _validate_existing_ng_scaling(self, cluster, existing): + ng_names = existing.copy() + scalable_processes = self._get_scalable_processes() + dn_to_delete = 0 + for ng in cluster.node_groups: + if ng.name in existing: + del ng_names[ng.name] + if ng.count > existing[ng.name] and "datanode" in \ + ng.node_processes: + dn_to_delete += 1 + if not set(ng.node_processes).issubset(scalable_processes): + raise ex.NodeGroupCannotBeScaled( + ng.name, "Vanilla plugin cannot scale nodegroup" + " with processes: " + + ' '.join(ng.node_processes)) + + dn_amount = len(utils.get_datanodes(cluster)) + rep_factor = c_helper.determine_cluster_config(cluster, + "dfs.replication") + + if dn_to_delete > 0 and dn_amount - dn_to_delete < rep_factor: + raise Exception("Vanilla plugin cannot shrink cluster because " + "it would be not enough nodes for replicas " + "(replication factor is %s )" % rep_factor) + + if len(ng_names) != 0: + raise ex.NodeGroupsDoNotExist(ng_names.keys()) diff --git a/savanna/plugins/vanilla/run_scripts.py b/savanna/plugins/vanilla/run_scripts.py new file mode 100644 index 0000000000..2e652772f6 --- /dev/null +++ b/savanna/plugins/vanilla/run_scripts.py @@ -0,0 +1,28 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def start_process(remote, process): + remote.execute_command('sudo su -c "/usr/sbin/hadoop-daemon.sh start %s" ' + 'hadoop' % process) + + +def refresh_nodes(remote, service): + remote.execute_command("sudo su -c 'hadoop %s -refreshNodes' hadoop" + % service) + + +def format_namenode(nn_remote): + nn_remote.execute_command("sudo su -c 'hadoop namenode -format' hadoop") diff --git a/savanna/plugins/vanilla/scaling.py b/savanna/plugins/vanilla/scaling.py new file mode 100644 index 0000000000..c5c9e12e06 --- /dev/null +++ b/savanna/plugins/vanilla/scaling.py @@ -0,0 +1,94 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from savanna import context +from savanna.plugins.vanilla import run_scripts as run +from savanna.plugins.vanilla import utils + + +def decommission_tt(jt, inst_to_be_deleted, survived_inst): + with jt.remote as r: + r.write_file_to('/etc/hadoop/tt.excl', + utils.generate_fqdn_host_names( + inst_to_be_deleted)) + run.refresh_nodes(jt.remote, "mradmin") + context.sleep(3) + r.write_files_to({'/etc/hadoop/tt.incl': + utils.generate_fqdn_host_names(survived_inst), + '/etc/hadoop/tt.excl': "", + }) + + +def decommission_dn(nn, inst_to_be_deleted, survived_inst): + with nn.remote as r: + r.write_file_to('/etc/hadoop/dn.excl', + utils.generate_fqdn_host_names( + inst_to_be_deleted)) + run.refresh_nodes(nn.remote, "dfsadmin") + context.sleep(3) + + att_amount = 10 + while att_amount: + cmd = r.execute_command( + "sudo su -c 'hadoop dfsadmin -report' hadoop") + all_found = True + + datanodes_info = parse_dfs_report(cmd[1]) + + for i in inst_to_be_deleted: + for dn in datanodes_info: + if (dn["Name"].startswith(i.internal_ip)) and ( + dn["Decommission Status"] != "Decommissioned"): + all_found = False + break + + if all_found: + r.write_files_to({'/etc/hadoop/dn.incl': + utils. + generate_fqdn_host_names(survived_inst), + '/etc/hadoop/dn.excl': "", + }) + break + else: + att_amount -= 1 + + if not att_amount: + raise Exception("Cannot finish decommission") + + +def parse_dfs_report(report): + array = [] + started = False + for line in report: + if started: + array.append(line) + if line.startswith("---"): + started = True + + res = [] + i = 0 + while i < len(array) - 1: + i += 2 + datanode_info = {} + d = array[i] + while d != '\n': + idx = str.find(d, ':') + name = d[0:idx] + value = d[idx + 2:len(d) - 1] + datanode_info[name.strip()] = value.strip() + i += 1 + d = array[i] + res.append(datanode_info) + return res diff --git a/savanna/plugins/vanilla/utils.py b/savanna/plugins/vanilla/utils.py index db8217e1b5..af09f573ec 100644 --- a/savanna/plugins/vanilla/utils.py +++ b/savanna/plugins/vanilla/utils.py @@ -49,3 +49,7 @@ def get_secondarynamenodes(cluster): def generate_host_names(nodes): return "\n".join([n.hostname for n in nodes]) + + +def generate_fqdn_host_names(nodes): + return "\n".join([n.fqdn for n in nodes]) diff --git a/savanna/service/api.py b/savanna/service/api.py index aa174ab92a..7deb329cab 100644 --- a/savanna/service/api.py +++ b/savanna/service/api.py @@ -89,7 +89,7 @@ def _provision_nodes(cluster_id, node_group_names_map): plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) context.model_update(cluster, status='Scaling') - instances = i.scale_cluster(cluster, node_group_names_map) + instances = i.scale_cluster(cluster, node_group_names_map, plugin) if instances: context.model_update(cluster, status='Configuring') diff --git a/savanna/service/instances.py b/savanna/service/instances.py index 84ad010aee..eb871b6d76 100644 --- a/savanna/service/instances.py +++ b/savanna/service/instances.py @@ -49,30 +49,30 @@ def create_cluster(cluster): _rollback_cluster_creation(cluster, ex) -def scale_cluster(cluster, node_group_names_map): +def scale_cluster(cluster, node_group_names_map, plugin): # Now let's work with real node_groups, not names: node_groups_map = {} - session = context.ctx().session for ng in cluster.node_groups: if ng.name in node_group_names_map: node_groups_map.update({ng: node_group_names_map[ng.name]}) - + instances_list = [] try: instances_list = _scale_cluster_instances( - cluster, node_groups_map) + cluster, node_groups_map, plugin) + _clean_cluster_from_empty_ng(cluster) _await_instances(cluster) volumes.attach_to_instances(instances_list) + except Exception as ex: LOG.warn("Can't scale cluster '%s' (reason: %s)", cluster.name, ex) with excutils.save_and_reraise_exception(): - ng_to_delete = _rollback_cluster_scaling(cluster, instances_list, - ex) + _rollback_cluster_scaling(cluster, instances_list, ex) instances_list = [] - with session.begin(): - for ng in ng_to_delete: - session.delete(ng) + _clean_cluster_from_empty_ng(cluster) + if cluster.status == 'Decommissioning': + cluster.status = 'Error' + else: cluster.status = 'Active' - # we should be here with valid cluster: if instances creation # was not successful all extra-instances will be removed above if instances_list: @@ -103,22 +103,42 @@ def _create_instances(cluster): _run_instance(cluster, node_group, idx, aa_groups, userdata) -def _scale_cluster_instances(cluster, node_groups_map): +def _scale_cluster_instances(cluster, node_groups_map, plugin): aa_groups = _generate_anti_affinity_groups(cluster) - instances = [] + instances_to_delete = [] + node_groups_to_enlarge = [] + for node_group in node_groups_map: count = node_groups_map[node_group] - userdata = _generate_user_data_script(node_group) - for idx in xrange(node_group.count + 1, count + 1): - instance = _run_instance(cluster, node_group, idx, - aa_groups, userdata) - instances.append(instance) + if count < node_group.count: + instances_to_delete += node_group.instances[count:node_group.count] + else: + node_groups_to_enlarge.append(node_group) - node_group.count = count - context.model_save(node_group) - context.model_save(cluster) + if instances_to_delete: + cluster.status = 'Decommissioning' + plugin.decommission_nodes(cluster, instances_to_delete) + cluster.status = 'Deleting Instances' + for instance in instances_to_delete: + node_group = instance.node_group + node_group.instances.remove(instance) + _shutdown_instance(instance) + node_group.count -= 1 + context.model_save(node_group) - return instances + instances_to_add = [] + if node_groups_to_enlarge: + cluster.status = 'Adding Instances' + for node_group in node_groups_to_enlarge: + count = node_groups_map[node_group] + userdata = _generate_user_data_script(node_group) + for idx in xrange(node_group.count + 1, count + 1): + instance = _run_instance(cluster, node_group, idx, + aa_groups, userdata) + instances_to_add.append(instance) + node_group.count = count + + return instances_to_add def _run_instance(cluster, node_group, idx, aa_groups, userdata): @@ -264,22 +284,15 @@ def _rollback_cluster_creation(cluster, ex): def _rollback_cluster_scaling(cluster, instances, ex): """Attempt to rollback cluster scaling.""" LOG.info("Cluster '%s' scaling rollback (reason: %s)", cluster.name, ex) - try: volumes.detach_from_instances(instances) except Exception: raise finally: - #if some nodes are up we should shut them down and update "count" in - # node_group - ng_to_delete = [] for i in instances: ng = i.node_group _shutdown_instance(i) ng.count -= 1 - if ng.count == 0: - ng_to_delete.append(ng) - return ng_to_delete def _shutdown_instances(cluster, quiet=False): @@ -299,3 +312,13 @@ def shutdown_cluster(cluster): """Shutdown specified cluster and all related resources.""" volumes.detach(cluster) _shutdown_instances(cluster) + + +def _clean_cluster_from_empty_ng(cluster): + session = context.ctx().session + with session.begin(): + all_ng = cluster.node_groups + for ng in all_ng: + if ng.count == 0: + session.delete(ng) + cluster.node_groups.remove(ng) diff --git a/savanna/service/validations/clusters_scaling.py b/savanna/service/validations/clusters_scaling.py index b3be2fcbd4..50e3139cdf 100644 --- a/savanna/service/validations/clusters_scaling.py +++ b/savanna/service/validations/clusters_scaling.py @@ -40,7 +40,7 @@ CLUSTER_SCALING_SCHEMA = { }, "count": { "type": "integer", - "minimum": 1, + "minimum": 0, }, }, "additionalProperties": False, @@ -68,8 +68,10 @@ CLUSTER_SCALING_SCHEMA = { def check_cluster_scaling(data, cluster_id, **kwargs): cluster = api.get_cluster(id=cluster_id) - if not plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name, - 'scale_cluster'): + if not (plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name, + 'scale_cluster') and ( + plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name, + 'decommission_nodes'))): raise ex.InvalidException( "Requested plugin '%s' doesn't support cluster scaling feature" % cluster.plugin_name) diff --git a/savanna/tests/unit/plugins/test_dfsadmin_parsing.py b/savanna/tests/unit/plugins/test_dfsadmin_parsing.py new file mode 100644 index 0000000000..3fb7ee720f --- /dev/null +++ b/savanna/tests/unit/plugins/test_dfsadmin_parsing.py @@ -0,0 +1,61 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pkg_resources as pkg +import unittest2 + +from savanna.plugins.vanilla import scaling as sc +from savanna import version + + +class ProvisioningPluginBaseTest(unittest2.TestCase): + def test_result_for_3_nodes(self): + ins = open(pkg.resource_filename( + version.version_info.package, "tests/unit/resources/" + "dfs_admin_3_nodes.txt"), "r") + array = [] + for line in ins: + array.append(line) + + exp1 = {"Name": "10.155.0.94:50010", "Decommission Status": "Normal"} + exp2 = {"Name": "10.155.0.90:50010", "Last contact": "Tue Jul 16 12:" + "00:07 UTC 2013"} + exp3 = {"Configured Capacity": "10568916992 (9.84 GB)", "DFS " + "Remaining%": "93.42%"} + expected = [exp1, exp2, exp3] + res = sc.parse_dfs_report(array) + self.assertItemsEqual(expected, res) + + def test_result_for_0_nodes(self): + ins = open(pkg.resource_filename( + version.version_info.package, "tests/unit/resources/" + "dfs_admin_0_nodes.txt"), "r") + array = [] + for line in ins: + array.append(line) + + res = sc.parse_dfs_report(array) + self.assertEqual(0, len(res)) + + def test_result_for_1_node(self): + ins = open(pkg.resource_filename( + version.version_info.package, "tests/unit/resources/" + "dfs_admin_1_nodes.txt"), "r") + array = [] + for line in ins: + array.append(line) + exp = {"Name": "10.155.0.94:50010", "Decommission Status": "Normal"} + res = sc.parse_dfs_report(array) + self.assertIn(exp, res) diff --git a/savanna/tests/unit/resources/dfs_admin_0_nodes.txt b/savanna/tests/unit/resources/dfs_admin_0_nodes.txt new file mode 100644 index 0000000000..82b69be706 --- /dev/null +++ b/savanna/tests/unit/resources/dfs_admin_0_nodes.txt @@ -0,0 +1,11 @@ +Configured Capacity: 0 (0 KB) +Present Capacity: 0 (0 KB) +DFS Remaining: 0 (0 KB) +DFS Used: 0 (0 KB) +DFS Used%: �% +Under replicated blocks: 0 +Blocks with corrupt replicas: 0 +Missing blocks: 0 + +------------------------------------------------- +Datanodes available: 0 (0 total, 0 dead) diff --git a/savanna/tests/unit/resources/dfs_admin_1_nodes.txt b/savanna/tests/unit/resources/dfs_admin_1_nodes.txt new file mode 100644 index 0000000000..b13123ab2c --- /dev/null +++ b/savanna/tests/unit/resources/dfs_admin_1_nodes.txt @@ -0,0 +1,15 @@ +Configured Capacity: 31706750976 (29.53 GB) +Present Capacity: 29622116382 (27.59 GB) +DFS Remaining: 29622018048 (27.59 GB) +DFS Used: 98334 (96.03 KB) +DFS Used%: 0% +Under replicated blocks: 0 +Blocks with corrupt replicas: 0 +Missing blocks: 0 + +------------------------------------------------- +Datanodes available: 3 (3 total, 0 dead) + +Name: 10.155.0.94:50010 +Decommission Status : Normal + diff --git a/savanna/tests/unit/resources/dfs_admin_3_nodes.txt b/savanna/tests/unit/resources/dfs_admin_3_nodes.txt new file mode 100644 index 0000000000..b906b498c2 --- /dev/null +++ b/savanna/tests/unit/resources/dfs_admin_3_nodes.txt @@ -0,0 +1,23 @@ +Configured Capacity: 31706750976 (29.53 GB) +Present Capacity: 29622116382 (27.59 GB) +DFS Remaining: 29622018048 (27.59 GB) +DFS Used: 98334 (96.03 KB) +DFS Used%: 0% +Under replicated blocks: 0 +Blocks with corrupt replicas: 0 +Missing blocks: 0 + +------------------------------------------------- +Datanodes available: 3 (3 total, 0 dead) + +Name: 10.155.0.94:50010 +Decommission Status : Normal + + +Name: 10.155.0.90:50010 +Last contact: Tue Jul 16 12:00:07 UTC 2013 + + +Configured Capacity: 10568916992 (9.84 GB) +DFS Remaining%: 93.42% + diff --git a/setup.py b/setup.py index 4f8721e126..2a57d2e7e0 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,8 @@ setuptools.setup( package_data={'savanna': [ 'plugins/vanilla/resources/*.xml', 'swift/resources/*.xml', - 'tests/unit/resources/*.xml' + 'tests/unit/resources/*.xml', + 'tests/unit/resources/*.txt', ]}, install_requires=requires, dependency_links=depend_links,