Add scaling support to vanilla 2 plugin

Partially implements blueprint: vanilla-plugin-hadoop-2

Change-Id: Ic694fb48a8d42abddee719fb533023241c041416
This commit is contained in:
Sergey Reshetnyak 2014-02-21 18:20:05 +04:00
parent 7b37c6f517
commit fa72234b48
10 changed files with 390 additions and 38 deletions

View File

@ -27,6 +27,7 @@ include savanna/plugins/hdp/versions/2_0/resources/*.json
include savanna/resources/*.heat
include savanna/service/edp/resources/*.xml
include savanna/swift/resources/*.xml
include savanna/tests/unit/plugins/vanilla/v2_3_0/resources/*.txt
include savanna/tests/unit/resources/*.heat
include savanna/tests/unit/resources/*.xml
include savanna/tests/unit/resources/*.txt

View File

@ -30,15 +30,24 @@ HADOOP_GROUP = 'hadoop'
def configure_cluster(cluster):
LOG.debug("Configuring cluster \"%s\"", cluster.name)
instances = []
for node_group in cluster.node_groups:
_provisioning_configs(node_group)
_post_configuration(node_group)
for instance in node_group.instances:
instances.append(instance)
configure_instances(instances)
def _provisioning_configs(node_group):
xmls, env = _generate_configs(node_group)
_push_xml_configs(node_group, xmls)
_push_env_configs(node_group, env)
def configure_instances(instances):
for instance in instances:
_provisioning_configs(instance)
_post_configuration(instance)
def _provisioning_configs(instance):
xmls, env = _generate_configs(instance.node_group)
_push_xml_configs(instance, xmls)
_push_env_configs(instance, env)
def _generate_configs(node_group):
@ -62,11 +71,17 @@ def _get_hadoop_configs(node_group):
},
'HDFS': {
'dfs.namenode.name.dir': ','.join(dirs['hadoop_name_dirs']),
'dfs.namenode.data.dir': ','.join(dirs['hadoop_data_dirs'])
'dfs.namenode.data.dir': ','.join(dirs['hadoop_data_dirs']),
'dfs.hosts': '%s/dn-include' % HADOOP_CONF_DIR,
'dfs.hosts.exclude': '%s/dn-exclude' % HADOOP_CONF_DIR
},
'YARN': {
'yarn.nodemanager.aux-services': 'mapreduce_shuffle',
'yarn.resourcemanager.hostname': '%s' % res_hostname
'yarn.resourcemanager.hostname': '%s' % res_hostname,
'yarn.resourcemanager.nodes.include-path': '%s/nm-include' % (
HADOOP_CONF_DIR),
'yarn.resourcemanager.nodes.exclude-path': '%s/nm-exclude' % (
HADOOP_CONF_DIR)
},
'MapReduce': {
'mapreduce.framework.name': 'yarn'
@ -114,33 +129,32 @@ def _generate_xml(configs):
return xml_confs
def _push_env_configs(node_group, configs):
def _push_env_configs(instance, configs):
nn_heap = configs['HDFS']['NameNode Heap Size']
dn_heap = configs['HDFS']['DataNode Heap Size']
rm_heap = configs['YARN']['ResourceManager Heap Size']
nm_heap = configs['YARN']['NodeManager Heap Size']
for instance in node_group.instances:
with instance.remote() as r:
r.replace_remote_string(
'%s/hadoop-env.sh' % HADOOP_CONF_DIR,
'export HADOOP_NAMENODE_OPTS=.*',
'export HADOOP_NAMENODE_OPTS="-Xmx%dm"' % nn_heap)
r.replace_remote_string(
'%s/hadoop-env.sh' % HADOOP_CONF_DIR,
'export HADOOP_DATANODE_OPTS=.*',
'export HADOOP_DATANODE_OPTS="-Xmx%dm"' % dn_heap)
r.replace_remote_string(
'%s/yarn-env.sh' % HADOOP_CONF_DIR,
'\\#export YARN_RESOURCEMANAGER_HEAPSIZE=.*',
'export YARN_RESOURCEMANAGER_HEAPSIZE=%d' % rm_heap)
r.replace_remote_string(
'%s/yarn-env.sh' % HADOOP_CONF_DIR,
'\\#export YARN_NODEMANAGER_HEAPSIZE=.*',
'export YARN_NODEMANAGER_HEAPSIZE=%d' % nm_heap)
with instance.remote() as r:
r.replace_remote_string(
'%s/hadoop-env.sh' % HADOOP_CONF_DIR,
'export HADOOP_NAMENODE_OPTS=.*',
'export HADOOP_NAMENODE_OPTS="-Xmx%dm"' % nn_heap)
r.replace_remote_string(
'%s/hadoop-env.sh' % HADOOP_CONF_DIR,
'export HADOOP_DATANODE_OPTS=.*',
'export HADOOP_DATANODE_OPTS="-Xmx%dm"' % dn_heap)
r.replace_remote_string(
'%s/yarn-env.sh' % HADOOP_CONF_DIR,
'\\#export YARN_RESOURCEMANAGER_HEAPSIZE=.*',
'export YARN_RESOURCEMANAGER_HEAPSIZE=%d' % rm_heap)
r.replace_remote_string(
'%s/yarn-env.sh' % HADOOP_CONF_DIR,
'\\#export YARN_NODEMANAGER_HEAPSIZE=.*',
'export YARN_NODEMANAGER_HEAPSIZE=%d' % nm_heap)
def _push_xml_configs(node_group, configs):
def _push_xml_configs(instance, configs):
xmls = _generate_xml(configs)
service_to_conf_map = {
'Hadoop': '%s/core-site.xml' % HADOOP_CONF_DIR,
@ -152,8 +166,7 @@ def _push_xml_configs(node_group, configs):
for service, confs in six.iteritems(xmls):
xml_confs[service_to_conf_map[service]] = confs
for instance in node_group.instances:
_push_configs_to_instance(instance, xml_confs)
_push_configs_to_instance(instance, xml_confs)
def _push_configs_to_instance(instance, configs):
@ -163,7 +176,8 @@ def _push_configs_to_instance(instance, configs):
r.write_file_to(fl, data, run_as_root=True)
def _post_configuration(node_group):
def _post_configuration(instance):
node_group = instance.node_group
dirs = _get_hadoop_dirs(node_group)
args = {
'hadoop_user': HADOOP_USER,
@ -179,11 +193,10 @@ def _post_configuration(node_group):
'plugins/vanilla/v2_3_0/resources/post_conf.template')
post_conf_script = post_conf_script.format(**args)
for instance in node_group.instances:
with instance.remote() as r:
r.write_file_to('/tmp/post_conf.sh', post_conf_script)
r.execute_command('chmod +x /tmp/post_conf.sh')
r.execute_command('sudo /tmp/post_conf.sh')
with instance.remote() as r:
r.write_file_to('/tmp/post_conf.sh', post_conf_script)
r.execute_command('chmod +x /tmp/post_conf.sh')
r.execute_command('sudo /tmp/post_conf.sh')
def _get_hadoop_dirs(node_group):

View File

@ -21,3 +21,11 @@ sed -i "s,export HADOOP_SECURE_DN_LOG_DIR=.*,export HADOOP_SECURE_DN_LOG_DIR={ha
# change yarn log dir
sed -i "s,YARN_LOG_DIR=.*,YARN_LOG_DIR={yarn_log_dir}," {hadoop_conf_dir}/yarn-env.sh
# prepare scaling files
sc_all_files=('dn-include' 'nm-include' 'dn-exclude' 'nm-exclude')
for file in "${{sc_all_files[@]}}"
do
touch {hadoop_conf_dir}/$file
chown {hadoop_group}:{hadoop_user} {hadoop_conf_dir}/$file
done

View File

@ -13,6 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins.general import utils as u
def start_instance(instance):
processes = instance.node_group.node_processes
for process in processes:
if process in ['namenode', 'datanode']:
start_hadoop_process(instance, process)
elif process in ['resourcemanager', 'nodemanager']:
start_yarn_process(instance, process)
else:
raise RuntimeError("Process is not supported")
def start_hadoop_process(instance, process):
instance.remote().execute_command(
@ -27,3 +40,15 @@ def start_yarn_process(instance, process):
def format_namenode(instance):
instance.remote().execute_command(
'sudo su - -c "hdfs namenode -format" hadoop')
def refresh_hadoop_nodes(cluster):
nn = u.get_namenode(cluster)
nn.remote().execute_command(
'sudo su - -c "hdfs dfsadmin -refreshNodes" hadoop')
def refresh_yarn_nodes(cluster):
rm = u.get_resourcemanager(cluster)
rm.remote().execute_command(
'sudo su - -c "yarn rmadmin -refreshNodes" hadoop')

View File

@ -0,0 +1,124 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import context
from savanna import exceptions as ex
from savanna.openstack.common import timeutils
from savanna.plugins.general import utils as u
from savanna.plugins.vanilla.v2_3_0 import config
from savanna.plugins.vanilla.v2_3_0 import run_scripts as run
from savanna.plugins.vanilla.v2_3_0 import utils as pu
HADOOP_CONF_DIR = config.HADOOP_CONF_DIR
def scale_cluster(cluster, instances):
config.configure_instances(instances)
_update_include_files(cluster)
run.refresh_hadoop_nodes(cluster)
run.refresh_yarn_nodes(cluster)
for instance in instances:
run.start_instance(instance)
def _get_instances_with_service(instances, service):
ret = []
for instance in instances:
if service in instance.node_group.node_processes:
ret.append(instance)
return ret
def _update_include_files(cluster):
instances = u.get_instances(cluster)
datanodes = u.get_datanodes(cluster)
nodemanagers = u.get_nodemanagers(cluster)
dn_hosts = u.generate_fqdn_host_names(datanodes)
nm_hosts = u.generate_fqdn_host_names(nodemanagers)
for instance in instances:
with instance.remote() as r:
r.execute_command(
'sudo su - -c "echo \'%s\' > %s/dn-include" hadoop' % (
dn_hosts, HADOOP_CONF_DIR))
r.execute_command(
'sudo su - -c "echo \'%s\' > %s/nm-include" hadoop' % (
nm_hosts, HADOOP_CONF_DIR))
def decommission_nodes(cluster, instances):
datanodes = _get_instances_with_service(instances, 'datanode')
nodemanagers = _get_instances_with_service(instances, 'nodemanager')
_update_exclude_files(cluster, instances)
run.refresh_hadoop_nodes(cluster)
run.refresh_yarn_nodes(cluster)
_check_nodemanagers_decommission(cluster, nodemanagers)
_check_datanodes_decommission(cluster, datanodes)
_update_include_files(cluster)
_clear_exclude_files(cluster)
def _update_exclude_files(cluster, instances):
datanodes = _get_instances_with_service(instances, 'datanode')
nodemanagers = _get_instances_with_service(instances, 'nodemanager')
dn_hosts = u.generate_fqdn_host_names(datanodes)
nm_hosts = u.generate_fqdn_host_names(nodemanagers)
for instance in u.get_instances(cluster):
with instance.remote() as r:
r.execute_command(
'sudo su - -c "echo \'%s\' > %s/dn-exclude" hadoop' % (
dn_hosts, HADOOP_CONF_DIR))
r.execute_command(
'sudo su - -c "echo \'%s\' > %s/nm-exclude" hadoop' % (
nm_hosts, HADOOP_CONF_DIR))
def _clear_exclude_files(cluster):
for instance in u.get_instances(cluster):
with instance.remote() as r:
r.execute_command(
'sudo su - -c "echo > %s/dn-exclude" hadoop' % HADOOP_CONF_DIR)
r.execute_command(
'sudo su - -c "echo > %s/nm-exclude" hadoop' % HADOOP_CONF_DIR)
def _check_decommission(cluster, instances, check_func, timeout):
s_time = timeutils.utcnow()
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
statuses = check_func(cluster)
dec_ok = True
for instance in instances:
if statuses[instance.fqdn()] != 'decommissioned':
dec_ok = False
if dec_ok:
return
else:
context.sleep(5)
else:
ex.SavannaException("Cannot finish decommission in %d seconds" %
timeout)
def _check_nodemanagers_decommission(cluster, instances):
_check_decommission(cluster, instances, pu.get_nodemanagers_status, 300)
def _check_datanodes_decommission(cluster, instances):
_check_decommission(cluster, instances, pu.get_datanodes_status, 3600 * 4)

View File

@ -0,0 +1,46 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from savanna.plugins.general import utils as u
def get_datanodes_status(cluster):
statuses = {}
namenode = u.get_namenode(cluster)
status_regexp = r'^Hostname: (.*)\nDecommission Status : (.*)$'
matcher = re.compile(status_regexp, re.MULTILINE)
dfs_report = namenode.remote().execute_command(
'sudo su - -c "hdfs dfsadmin -report" hadoop')[1]
for host, status in matcher.findall(dfs_report):
statuses[host] = status.lower()
return statuses
def get_nodemanagers_status(cluster):
statuses = {}
resourcemanager = u.get_resourcemanager(cluster)
status_regexp = r'^(\S+):\d+\s+(\w+)'
matcher = re.compile(status_regexp, re.MULTILINE)
yarn_report = resourcemanager.remote().execute_command(
'sudo su - -c "yarn node -all -list" hadoop')[1]
for host, status in matcher.findall(yarn_report):
statuses[host] = status.lower()
return statuses

View File

@ -23,6 +23,7 @@ from savanna.plugins.vanilla import abstractversionhandler as avm
from savanna.plugins.vanilla.v2_3_0 import config as c
from savanna.plugins.vanilla.v2_3_0 import config_helper as c_helper
from savanna.plugins.vanilla.v2_3_0 import run_scripts as run
from savanna.plugins.vanilla.v2_3_0 import scaling as sc
conductor = conductor.API
LOG = logging.getLogger(__name__)
@ -67,13 +68,13 @@ class VersionHandler(avm.AbstractVersionHandler):
self._set_cluster_info(cluster)
def decommission_nodes(self, cluster, instances):
pass
sc.decommission_nodes(cluster, instances)
def validate_scaling(self, cluster, existing, additional):
pass
def scale_cluster(self, cluster, instances):
pass
sc.scale_cluster(cluster, instances)
def _set_cluster_info(self, cluster):
nn = utils.get_namenode(cluster)

View File

@ -0,0 +1,62 @@
Configured Capacity: 60249329664 (56.11 GB)
Present Capacity: 50438139904 (46.97 GB)
DFS Remaining: 50438041600 (46.97 GB)
DFS Used: 98304 (96 KB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
-------------------------------------------------
Datanodes available: 4 (4 total, 0 dead)
Live datanodes:
Name: 10.50.0.22:50010 (cluster-worker-001.novalocal)
Hostname: cluster-worker-001.novalocal
Decommission Status : Normal
Configured Capacity: 20083101696 (18.70 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 3270406144 (3.05 GB)
DFS Remaining: 16812670976 (15.66 GB)
DFS Used%: 0.00%
DFS Remaining%: 83.72%
Last contact: Mon Feb 24 13:41:13 UTC 2014
Name: 10.50.0.36:50010 (cluster-worker-003.novalocal)
Hostname: cluster-worker-003.novalocal
Decommission Status : Normal
Configured Capacity: 20083101696 (18.70 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 3270393856 (3.05 GB)
DFS Remaining: 16812683264 (15.66 GB)
DFS Used%: 0.00%
DFS Remaining%: 83.72%
Last contact: Mon Feb 24 13:41:11 UTC 2014
Name: 10.50.0.25:50010 (cluster-worker-002.novalocal)
Hostname: cluster-worker-002.novalocal
Decommission Status : Normal
Configured Capacity: 20083101696 (18.70 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 3270389760 (3.05 GB)
DFS Remaining: 16812687360 (15.66 GB)
DFS Used%: 0.00%
DFS Remaining%: 83.72%
Last contact: Mon Feb 24 13:41:12 UTC 2014
Name: 10.50.0.60:50010 (cluster-worker-004.novalocal)
Hostname: cluster-worker-004.novalocal
Decommission Status : Decommissioned
Configured Capacity: 20083101696 (18.70 GB)
DFS Used: 24576 (24 KB)
Non DFS Used: 3270316032 (3.05 GB)
DFS Remaining: 16812761088 (15.66 GB)
DFS Used%: 0.00%
DFS Remaining%: 83.72%
Last contact: Mon Feb 24 13:33:33 UTC 2014

View File

@ -0,0 +1,6 @@
Total Nodes:4
Node-Id Node-State Node-Http-Address Number-of-Running-Containers
cluster-worker-001.novalocal:54746 RUNNING cluster-worker-001.novalocal:8042 0
cluster-worker-002.novalocal:53509 RUNNING cluster-worker-002.novalocal:8042 0
cluster-worker-003.novalocal:60418 RUNNING cluster-worker-003.novalocal:8042 0
cluster-worker-004.novalocal:33876 DECOMMISSIONED cluster-worker-004.novalocal:8042 0

View File

@ -0,0 +1,66 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from savanna.plugins.vanilla.v2_3_0 import utils as u
from savanna.tests.unit import base
from savanna.utils import files
class UtilsTestCase(base.SavannaTestCase):
@mock.patch('savanna.plugins.general.utils.get_namenode')
def test_datanodes_status(self, nn):
report = files.get_file_text(
'tests/unit/plugins/vanilla/v2_3_0/resources/dfs-report.txt')
nn.return_value = self._get_instance(report)
statuses = u.get_datanodes_status(None)
expected = {
'cluster-worker-001.novalocal': 'normal',
'cluster-worker-002.novalocal': 'normal',
'cluster-worker-003.novalocal': 'normal',
'cluster-worker-004.novalocal': 'decommissioned'
}
self.assertDictEqual(statuses, expected)
@mock.patch('savanna.plugins.general.utils.get_resourcemanager')
def test_nodemanagers_status(self, rm):
report = files.get_file_text(
'tests/unit/plugins/vanilla/v2_3_0/resources/yarn-report.txt')
rm.return_value = self._get_instance(report)
statuses = u.get_nodemanagers_status(None)
expected = {
'cluster-worker-001.novalocal': 'running',
'cluster-worker-002.novalocal': 'running',
'cluster-worker-003.novalocal': 'running',
'cluster-worker-004.novalocal': 'decommissioned'
}
self.assertDictEqual(statuses, expected)
def _get_instance(self, out):
inst_remote = mock.MagicMock()
inst_remote.execute_command.return_value = 0, out
inst_remote.__enter__.return_value = inst_remote
inst = mock.MagicMock()
inst.remote.return_value = inst_remote
return inst