Cluster scaling: deletion

Inmplemented:
* exclude/include files mechanism is implemented
* adding instances was change
* removing instances works ok
* "slaves" and "masters" are deleted

Implements blueprint delete-instances
Implements blueprint get-rid-of-slaves-file
Fixes: bug #1199884

Change-Id: Ie1bfaa421fce22efd3115ba13758932b6b592d7a
This commit is contained in:
Nadya Privalova 2013-07-09 20:12:26 +04:00
parent c80b956307
commit a4f27a88de
15 changed files with 420 additions and 96 deletions

View File

@ -129,7 +129,8 @@ class NodeGroup(mb.SavannaBase, NodeGroupMixin):
count = sa.Column(sa.Integer, nullable=False)
instances = relationship('Instance', cascade="all,delete",
backref='node_group')
backref='node_group',
order_by="Instance.instance_name")
cluster_id = sa.Column(sa.String(36), sa.ForeignKey('Cluster.id'))

View File

@ -70,6 +70,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def scale_cluster(self, cluster, instances):
pass
@plugins_base.required_with_default
def decommission_nodes(self, cluster, instances):
pass
@plugins_base.optional
def convert(self, hadoop_version, config_file):
pass

View File

@ -128,6 +128,8 @@ def generate_xml_configs(configs, storage_path, nn_hostname, jt_hostname=None):
'/lib/hadoop/hdfs/namenode'),
'dfs.data.dir': extract_hadoop_path(storage_path,
'/lib/hadoop/hdfs/datanode'),
'dfs.hosts': '/etc/hadoop/dn.incl',
'dfs.hosts.exclude': '/etc/hadoop/dn.excl',
}
if jt_hostname:
@ -136,7 +138,9 @@ def generate_xml_configs(configs, storage_path, nn_hostname, jt_hostname=None):
'mapred.system.dir': extract_hadoop_path(storage_path,
'/mapred/mapredsystem'),
'mapred.local.dir': extract_hadoop_path(storage_path,
'/lib/hadoop/mapred')
'/lib/hadoop/mapred'),
'mapred.hosts': '/etc/hadoop/tt.incl',
'mapred.hosts.exclude': '/etc/hadoop/tt.excl',
}
cfg.update(mr_cfg)
@ -227,3 +231,12 @@ def extract_name_values(configs):
def extract_hadoop_path(lst, hadoop_dir):
return ",".join([p + hadoop_dir for p in lst])
def determine_cluster_config(cluster, config_name):
if config_name in cluster.cluster_configs:
cluster.cluster_configs.get(config_name)
all_conf = get_plugin_configs()
for conf in all_conf:
if conf.name == config_name:
return conf.default_value

View File

@ -17,9 +17,12 @@ from savanna.openstack.common import log as logging
from savanna.plugins import provisioning as p
from savanna.plugins.vanilla import config_helper as c_helper
from savanna.plugins.vanilla import exceptions as ex
from savanna.plugins.vanilla import run_scripts as run
from savanna.plugins.vanilla import scaling as sc
from savanna.plugins.vanilla import utils
from savanna.utils import crypto
LOG = logging.getLogger(__name__)
@ -79,34 +82,39 @@ class VanillaProvider(p.ProvisioningPluginBase):
self._write_hadoop_user_keys(cluster.private_key,
utils.get_instances(cluster))
nn = utils.get_namenode(cluster)
nn.remote.execute_command(
"sudo su -c 'hadoop namenode -format' hadoop")
def start_cluster(self, cluster):
nn_instance = utils.get_namenode(cluster)
datanodes = utils.get_datanodes(cluster)
jt_instance = utils.get_jobtracker(cluster)
tasktrackers = utils.get_tasktrackers(cluster)
nn_instance.remote.execute_command(
'sudo su -c /usr/sbin/start-dfs.sh hadoop >>'
' /tmp/savanna-hadoop-start-dfs.log')
with nn_instance.remote as remote:
run.format_namenode(remote)
run.start_process(remote, "namenode")
LOG.info("HDFS service at '%s' has been started", nn_instance.hostname)
snns = utils.get_secondarynamenodes(cluster)
if snns:
for snn in snns:
run.start_process(snn.remote, "secondarynamenode")
for dn in datanodes:
run.start_process(dn.remote, "datanode")
LOG.info("HDFS service at '%s' has been started",
nn_instance.hostname)
if jt_instance:
jt_instance.remote.execute_command(
'sudo su -c /usr/sbin/start-mapred.sh hadoop >>'
' /tmp/savanna-hadoop-start-mapred.log')
run.start_process(jt_instance.remote, "jobtracker")
for tt in tasktrackers:
run.start_process(tt.remote, "tasktracker")
LOG.info("MapReduce service at '%s' has been started",
jt_instance.hostname)
LOG.info('Cluster %s has been started successfully' % cluster.name)
self._set_cluster_info(cluster)
def _extract_configs(self, cluster):
nn = utils.get_namenode(cluster)
jt = utils.get_jobtracker(cluster)
for ng in cluster.node_groups:
ng.extra = {
'xml': c_helper.generate_xml_configs(ng.configuration,
@ -120,58 +128,47 @@ class VanillaProvider(p.ProvisioningPluginBase):
)
}
def validate_scaling(self, cluster, existing, additional):
ng_names = existing.copy()
allowed = ["datanode", "tasktracker"]
#validate existing n_g scaling at first:
for ng in cluster.node_groups:
#we do not support deletion now
if ng.name in ng_names:
del ng_names[ng.name]
#we do not support deletion now
if ng.count > existing[ng.name]:
raise ex.NodeGroupCannotBeScaled(
ng.name, "Vanilla plugin cannot shrink node_group")
if not set(ng.node_processes).issubset(allowed):
raise ex.NodeGroupCannotBeScaled(
ng.name, "Vanilla plugin cannot scale nodegroup"
" with processes: " +
' '.join(ng.node_processes))
if len(ng_names) != 0:
raise ex.NodeGroupsDoNotExist(ng_names.keys())
#validate additional n_g
jt = utils.get_jobtracker(cluster)
def decommission_nodes(self, cluster, instances):
tts = utils.get_tasktrackers(cluster)
dns = utils.get_datanodes(cluster)
decommission_dns = False
decommission_tts = False
for i in instances:
if 'datanode' in i.node_group.node_processes:
dns.remove(i)
decommission_dns = True
if 'tasktracker' in i.node_group.node_processes:
tts.remove(i)
decommission_tts = True
nn = utils.get_namenode(cluster)
for ng in additional:
if (not set(ng.node_processes).issubset(allowed)) or (
not jt and 'tasktracker' in ng.node_processes) or (
not nn and 'datanode' in ng.node_processes):
raise ex.NodeGroupCannotBeScaled(
ng.name, "Vanilla plugin cannot scale node group with "
"processes which have no master-processes run "
"in cluster")
jt = utils.get_jobtracker(cluster)
if decommission_tts:
sc.decommission_tt(jt, instances, tts)
if decommission_dns:
sc.decommission_dn(nn, instances, dns)
def validate_scaling(self, cluster, existing, additional):
self._validate_existing_ng_scaling(cluster, existing)
self._validate_additional_ng_scaling(cluster, additional)
def scale_cluster(self, cluster, instances):
self._extract_configs(cluster)
self._push_configs_to_nodes(cluster, instances=instances)
self._write_hadoop_user_keys(cluster.private_key,
instances)
run.refresh_nodes(utils.get_namenode(cluster).remote, "dfsadmin")
run.refresh_nodes(utils.get_jobtracker(cluster).remote, "mradmin")
for i in instances:
with i.remote as remote:
if "datanode" in i.node_group.node_processes:
remote.execute_command('sudo su -c '
'"/usr/sbin/hadoop-daemon.sh '
'start datanode" hadoop'
'>> /tmp/savanna-start-datanode.log'
' 2>&1')
run.start_process(remote, "datanode")
if "tasktracker" in i.node_group.node_processes:
remote.execute_command('sudo su -c '
'"/usr/sbin/hadoop-daemon.sh '
'start tasktracker" hadoop'
'>> /tmp/savanna-start-'
'tasktracker.log 2>&1')
run.start_process(remote, "tasktracker")
def _push_configs_to_nodes(self, cluster, instances=None):
if instances is None:
@ -199,19 +196,19 @@ class VanillaProvider(p.ProvisioningPluginBase):
r.execute_command(
'sudo /tmp/savanna-hadoop-init.sh '
'>> /tmp/savanna-hadoop-init.log 2>&1')
nn = utils.get_namenode(cluster)
jt = utils.get_jobtracker(cluster)
nn.remote.write_files_to({
'/etc/hadoop/slaves': utils.generate_host_names(
utils.get_datanodes(cluster)),
'/etc/hadoop/masters': utils.generate_host_names(
utils.get_secondarynamenodes(cluster))
})
if jt and nn.instance_id != jt.instance_id:
jt.remote.write_file_to('/etc/hadoop/slaves',
utils.generate_host_names(
utils.get_tasktrackers(cluster)))
with nn.remote as r:
r.write_file_to('/etc/hadoop/dn.incl', utils.
generate_fqdn_host_names(
utils.get_datanodes(cluster)))
if jt:
with jt.remote as r:
r.write_file_to('/etc/hadoop/tt.incl', utils.
generate_fqdn_host_names(
utils.get_tasktrackers(cluster)))
def _set_cluster_info(self, cluster):
nn = utils.get_namenode(cluster)
@ -245,3 +242,50 @@ class VanillaProvider(p.ProvisioningPluginBase):
with instance.remote as remote:
remote.write_files_to(files)
remote.execute_command(mv_cmd)
def _get_scalable_processes(self):
return ["datanode", "tasktracker"]
def _validate_additional_ng_scaling(self, cluster, additional):
jt = utils.get_jobtracker(cluster)
scalable_processes = self._get_scalable_processes()
for ng in additional:
if not set(ng.node_processes).issubset(scalable_processes):
raise ex.NodeGroupCannotBeScaled(
ng.name, "Vanilla plugin cannot scale nodegroup"
" with processes: " +
' '.join(ng.node_processes))
if not jt and 'tasktracker' in ng.node_processes:
raise ex.NodeGroupCannotBeScaled(
ng.name, "Vanilla plugin cannot scale node group with "
"processes which have no master-processes run "
"in cluster")
def _validate_existing_ng_scaling(self, cluster, existing):
ng_names = existing.copy()
scalable_processes = self._get_scalable_processes()
dn_to_delete = 0
for ng in cluster.node_groups:
if ng.name in existing:
del ng_names[ng.name]
if ng.count > existing[ng.name] and "datanode" in \
ng.node_processes:
dn_to_delete += 1
if not set(ng.node_processes).issubset(scalable_processes):
raise ex.NodeGroupCannotBeScaled(
ng.name, "Vanilla plugin cannot scale nodegroup"
" with processes: " +
' '.join(ng.node_processes))
dn_amount = len(utils.get_datanodes(cluster))
rep_factor = c_helper.determine_cluster_config(cluster,
"dfs.replication")
if dn_to_delete > 0 and dn_amount - dn_to_delete < rep_factor:
raise Exception("Vanilla plugin cannot shrink cluster because "
"it would be not enough nodes for replicas "
"(replication factor is %s )" % rep_factor)
if len(ng_names) != 0:
raise ex.NodeGroupsDoNotExist(ng_names.keys())

View File

@ -0,0 +1,28 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def start_process(remote, process):
remote.execute_command('sudo su -c "/usr/sbin/hadoop-daemon.sh start %s" '
'hadoop' % process)
def refresh_nodes(remote, service):
remote.execute_command("sudo su -c 'hadoop %s -refreshNodes' hadoop"
% service)
def format_namenode(nn_remote):
nn_remote.execute_command("sudo su -c 'hadoop namenode -format' hadoop")

View File

@ -0,0 +1,94 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import context
from savanna.plugins.vanilla import run_scripts as run
from savanna.plugins.vanilla import utils
def decommission_tt(jt, inst_to_be_deleted, survived_inst):
with jt.remote as r:
r.write_file_to('/etc/hadoop/tt.excl',
utils.generate_fqdn_host_names(
inst_to_be_deleted))
run.refresh_nodes(jt.remote, "mradmin")
context.sleep(3)
r.write_files_to({'/etc/hadoop/tt.incl':
utils.generate_fqdn_host_names(survived_inst),
'/etc/hadoop/tt.excl': "",
})
def decommission_dn(nn, inst_to_be_deleted, survived_inst):
with nn.remote as r:
r.write_file_to('/etc/hadoop/dn.excl',
utils.generate_fqdn_host_names(
inst_to_be_deleted))
run.refresh_nodes(nn.remote, "dfsadmin")
context.sleep(3)
att_amount = 10
while att_amount:
cmd = r.execute_command(
"sudo su -c 'hadoop dfsadmin -report' hadoop")
all_found = True
datanodes_info = parse_dfs_report(cmd[1])
for i in inst_to_be_deleted:
for dn in datanodes_info:
if (dn["Name"].startswith(i.internal_ip)) and (
dn["Decommission Status"] != "Decommissioned"):
all_found = False
break
if all_found:
r.write_files_to({'/etc/hadoop/dn.incl':
utils.
generate_fqdn_host_names(survived_inst),
'/etc/hadoop/dn.excl': "",
})
break
else:
att_amount -= 1
if not att_amount:
raise Exception("Cannot finish decommission")
def parse_dfs_report(report):
array = []
started = False
for line in report:
if started:
array.append(line)
if line.startswith("---"):
started = True
res = []
i = 0
while i < len(array) - 1:
i += 2
datanode_info = {}
d = array[i]
while d != '\n':
idx = str.find(d, ':')
name = d[0:idx]
value = d[idx + 2:len(d) - 1]
datanode_info[name.strip()] = value.strip()
i += 1
d = array[i]
res.append(datanode_info)
return res

View File

@ -49,3 +49,7 @@ def get_secondarynamenodes(cluster):
def generate_host_names(nodes):
return "\n".join([n.hostname for n in nodes])
def generate_fqdn_host_names(nodes):
return "\n".join([n.fqdn for n in nodes])

View File

@ -89,7 +89,7 @@ def _provision_nodes(cluster_id, node_group_names_map):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
context.model_update(cluster, status='Scaling')
instances = i.scale_cluster(cluster, node_group_names_map)
instances = i.scale_cluster(cluster, node_group_names_map, plugin)
if instances:
context.model_update(cluster, status='Configuring')

View File

@ -49,30 +49,30 @@ def create_cluster(cluster):
_rollback_cluster_creation(cluster, ex)
def scale_cluster(cluster, node_group_names_map):
def scale_cluster(cluster, node_group_names_map, plugin):
# Now let's work with real node_groups, not names:
node_groups_map = {}
session = context.ctx().session
for ng in cluster.node_groups:
if ng.name in node_group_names_map:
node_groups_map.update({ng: node_group_names_map[ng.name]})
instances_list = []
try:
instances_list = _scale_cluster_instances(
cluster, node_groups_map)
cluster, node_groups_map, plugin)
_clean_cluster_from_empty_ng(cluster)
_await_instances(cluster)
volumes.attach_to_instances(instances_list)
except Exception as ex:
LOG.warn("Can't scale cluster '%s' (reason: %s)", cluster.name, ex)
with excutils.save_and_reraise_exception():
ng_to_delete = _rollback_cluster_scaling(cluster, instances_list,
ex)
_rollback_cluster_scaling(cluster, instances_list, ex)
instances_list = []
with session.begin():
for ng in ng_to_delete:
session.delete(ng)
_clean_cluster_from_empty_ng(cluster)
if cluster.status == 'Decommissioning':
cluster.status = 'Error'
else:
cluster.status = 'Active'
# we should be here with valid cluster: if instances creation
# was not successful all extra-instances will be removed above
if instances_list:
@ -103,22 +103,42 @@ def _create_instances(cluster):
_run_instance(cluster, node_group, idx, aa_groups, userdata)
def _scale_cluster_instances(cluster, node_groups_map):
def _scale_cluster_instances(cluster, node_groups_map, plugin):
aa_groups = _generate_anti_affinity_groups(cluster)
instances = []
instances_to_delete = []
node_groups_to_enlarge = []
for node_group in node_groups_map:
count = node_groups_map[node_group]
userdata = _generate_user_data_script(node_group)
for idx in xrange(node_group.count + 1, count + 1):
instance = _run_instance(cluster, node_group, idx,
aa_groups, userdata)
instances.append(instance)
if count < node_group.count:
instances_to_delete += node_group.instances[count:node_group.count]
else:
node_groups_to_enlarge.append(node_group)
node_group.count = count
context.model_save(node_group)
context.model_save(cluster)
if instances_to_delete:
cluster.status = 'Decommissioning'
plugin.decommission_nodes(cluster, instances_to_delete)
cluster.status = 'Deleting Instances'
for instance in instances_to_delete:
node_group = instance.node_group
node_group.instances.remove(instance)
_shutdown_instance(instance)
node_group.count -= 1
context.model_save(node_group)
return instances
instances_to_add = []
if node_groups_to_enlarge:
cluster.status = 'Adding Instances'
for node_group in node_groups_to_enlarge:
count = node_groups_map[node_group]
userdata = _generate_user_data_script(node_group)
for idx in xrange(node_group.count + 1, count + 1):
instance = _run_instance(cluster, node_group, idx,
aa_groups, userdata)
instances_to_add.append(instance)
node_group.count = count
return instances_to_add
def _run_instance(cluster, node_group, idx, aa_groups, userdata):
@ -264,22 +284,15 @@ def _rollback_cluster_creation(cluster, ex):
def _rollback_cluster_scaling(cluster, instances, ex):
"""Attempt to rollback cluster scaling."""
LOG.info("Cluster '%s' scaling rollback (reason: %s)", cluster.name, ex)
try:
volumes.detach_from_instances(instances)
except Exception:
raise
finally:
#if some nodes are up we should shut them down and update "count" in
# node_group
ng_to_delete = []
for i in instances:
ng = i.node_group
_shutdown_instance(i)
ng.count -= 1
if ng.count == 0:
ng_to_delete.append(ng)
return ng_to_delete
def _shutdown_instances(cluster, quiet=False):
@ -299,3 +312,13 @@ def shutdown_cluster(cluster):
"""Shutdown specified cluster and all related resources."""
volumes.detach(cluster)
_shutdown_instances(cluster)
def _clean_cluster_from_empty_ng(cluster):
session = context.ctx().session
with session.begin():
all_ng = cluster.node_groups
for ng in all_ng:
if ng.count == 0:
session.delete(ng)
cluster.node_groups.remove(ng)

View File

@ -40,7 +40,7 @@ CLUSTER_SCALING_SCHEMA = {
},
"count": {
"type": "integer",
"minimum": 1,
"minimum": 0,
},
},
"additionalProperties": False,
@ -68,8 +68,10 @@ CLUSTER_SCALING_SCHEMA = {
def check_cluster_scaling(data, cluster_id, **kwargs):
cluster = api.get_cluster(id=cluster_id)
if not plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name,
'scale_cluster'):
if not (plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name,
'scale_cluster') and (
plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name,
'decommission_nodes'))):
raise ex.InvalidException(
"Requested plugin '%s' doesn't support cluster scaling feature"
% cluster.plugin_name)

View File

@ -0,0 +1,61 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pkg_resources as pkg
import unittest2
from savanna.plugins.vanilla import scaling as sc
from savanna import version
class ProvisioningPluginBaseTest(unittest2.TestCase):
def test_result_for_3_nodes(self):
ins = open(pkg.resource_filename(
version.version_info.package, "tests/unit/resources/"
"dfs_admin_3_nodes.txt"), "r")
array = []
for line in ins:
array.append(line)
exp1 = {"Name": "10.155.0.94:50010", "Decommission Status": "Normal"}
exp2 = {"Name": "10.155.0.90:50010", "Last contact": "Tue Jul 16 12:"
"00:07 UTC 2013"}
exp3 = {"Configured Capacity": "10568916992 (9.84 GB)", "DFS "
"Remaining%": "93.42%"}
expected = [exp1, exp2, exp3]
res = sc.parse_dfs_report(array)
self.assertItemsEqual(expected, res)
def test_result_for_0_nodes(self):
ins = open(pkg.resource_filename(
version.version_info.package, "tests/unit/resources/"
"dfs_admin_0_nodes.txt"), "r")
array = []
for line in ins:
array.append(line)
res = sc.parse_dfs_report(array)
self.assertEqual(0, len(res))
def test_result_for_1_node(self):
ins = open(pkg.resource_filename(
version.version_info.package, "tests/unit/resources/"
"dfs_admin_1_nodes.txt"), "r")
array = []
for line in ins:
array.append(line)
exp = {"Name": "10.155.0.94:50010", "Decommission Status": "Normal"}
res = sc.parse_dfs_report(array)
self.assertIn(exp, res)

View File

@ -0,0 +1,11 @@
Configured Capacity: 0 (0 KB)
Present Capacity: 0 (0 KB)
DFS Remaining: 0 (0 KB)
DFS Used: 0 (0 KB)
DFS Used%: <20>%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
-------------------------------------------------
Datanodes available: 0 (0 total, 0 dead)

View File

@ -0,0 +1,15 @@
Configured Capacity: 31706750976 (29.53 GB)
Present Capacity: 29622116382 (27.59 GB)
DFS Remaining: 29622018048 (27.59 GB)
DFS Used: 98334 (96.03 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
-------------------------------------------------
Datanodes available: 3 (3 total, 0 dead)
Name: 10.155.0.94:50010
Decommission Status : Normal

View File

@ -0,0 +1,23 @@
Configured Capacity: 31706750976 (29.53 GB)
Present Capacity: 29622116382 (27.59 GB)
DFS Remaining: 29622018048 (27.59 GB)
DFS Used: 98334 (96.03 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
-------------------------------------------------
Datanodes available: 3 (3 total, 0 dead)
Name: 10.155.0.94:50010
Decommission Status : Normal
Name: 10.155.0.90:50010
Last contact: Tue Jul 16 12:00:07 UTC 2013
Configured Capacity: 10568916992 (9.84 GB)
DFS Remaining%: 93.42%

View File

@ -44,7 +44,8 @@ setuptools.setup(
package_data={'savanna': [
'plugins/vanilla/resources/*.xml',
'swift/resources/*.xml',
'tests/unit/resources/*.xml'
'tests/unit/resources/*.xml',
'tests/unit/resources/*.txt',
]},
install_requires=requires,
dependency_links=depend_links,