Added config tests

Added tests for hadoop config and tests for crud cluster with cluster templates
Rewrited hadoop test

Partially implements blueprint itest-savanna-0-2

Change-Id: I195bc845133d3bffb58181926abd04c7987bdf02
This commit is contained in:
Vadim Rovachev 2013-07-09 18:35:08 +04:00
parent d929c45c9b
commit f0545ccb6c
9 changed files with 384 additions and 86 deletions

2
.gitignore vendored
View File

@ -35,7 +35,7 @@ etc/local.cfg
etc/savanna/*.conf
etc/savanna.conf
ChangeLog
savanna/tests/integration/config.py
savanna/tests/integration/configs/config.py
cscope.out
tools/lintstack.head.py
tools/pylint_exceptions

View File

@ -15,6 +15,7 @@
import contextlib
import json
import os
import telnetlib
import time
@ -149,7 +150,7 @@ class ITestCase(unittest2.TestCase):
data = data[crud_object]
object_id = data.get('id')
if crud_object == 'cluster':
self.await_cluster_active(get_url, object_id)
self.await_cluster_active(object_id)
except Exception as e:
self.fail('failure: ' + str(e))
finally:
@ -353,8 +354,8 @@ class ITestCase(unittest2.TestCase):
#------------------------------helper_methods----------------------------------
def await_cluster_active(self, get_url, object_id):
get_data = self.get_object(get_url, object_id, 200)
def await_cluster_active(self, object_id):
get_data = self.get_object(self.url_cluster_with_slash, object_id, 200)
get_data = get_data['cluster']
i = 1
while get_data['status'] != 'Active':
@ -364,7 +365,8 @@ class ITestCase(unittest2.TestCase):
self.fail(
'cluster is not getting status \'Active\', '
'passed %d minutes' % param.TIMEOUT)
get_data = self.get_object(get_url, object_id, 200)
get_data = self.get_object(
self.url_cluster_with_slash, object_id, 200)
get_data = get_data['cluster']
time.sleep(10)
i += 1
@ -389,10 +391,12 @@ class ITestCase(unittest2.TestCase):
with contextlib.closing(self.ssh_connection(host)) as ssh:
return remote.read_file_from(ssh.open_sftp(), remote_file)
def transfer_script_to_node(self, host, directory, folder, script):
self.write_file_to(
str(host), 'script.sh',
open('%s/integration/%s/%s' % (directory, folder, script)).read())
def transfer_script_to_node(self, host,
script='hadoop_test/hadoop_test_script.sh'):
self.write_file_to(str(host),
'script.sh',
open('%s/integration/%s' % (os.getcwd(),
script)).read())
self.execute_command(str(host), 'chmod 777 script.sh')
def try_telnet(self, host, port):
@ -407,10 +411,25 @@ class ITestCase(unittest2.TestCase):
data = self.post_object(self.url_cluster, cluster_body, 202)
cluster_id = data['cluster']['id']
self.await_cluster_active(self.url_cluster_with_slash, cluster_id)
self.await_cluster_active(cluster_id)
return cluster_id
def create_cluster_using_ngt_and_get_id(self, node_list, name):
cl_tmpl_id = None
try:
cl_tmpl_body = self.make_cluster_template('cl-tmpl', node_list)
cl_tmpl_id = self.get_object_id(
'cluster_template', self.post_object(self.url_cl_tmpl,
cl_tmpl_body, 202))
clstr_body = self.make_cl_body_cluster_template(cl_tmpl_id)
clstr_body['name'] = name
return self.create_cluster_and_get_id(clstr_body)
except Exception as e:
print(str(e))
finally:
self.del_object(self.url_cl_tmpl_with_slash, cl_tmpl_id, 204)
def get_instances_ip_and_node_processes_list(self, cluster_id):
get_data = self.get_object(
self.url_cluster_with_slash, cluster_id, 200, True)

View File

@ -50,6 +50,14 @@ PATH_TO_SSH = _get_conf('PATH_TO_SSH', '/home/user/.ssh/id_rsa')
PLUGIN_NAME = _get_conf('PLUGIN_NAME', 'vanilla')
NAMENODE_CONFIG = _get_conf('NAMENODE_CONFIG', {})
JOBTRACKER_CONFIG = _get_conf('JOBTRACKER_CONFIG', {})
DATANODE_CONFIG = _get_conf('DATANODE_CONFIG', {})
TASKTRACKER_CONFIG = _get_conf('TASKTRACKER_CONFIG', {})
GENERAL_CONFIG = _get_conf('GENERAL_CONFIG', {})
CLUSTER_HDFS_CONFIG = _get_conf('CLUSTER_HDFS_CONFIG', {})
CLUSTER_MAPREDUCE_CONFIG = _get_conf('CLUSTER_MAPREDUCE_CONFIG', {})
JT_PORT = _get_conf('JT_PORT', 50030)
NN_PORT = _get_conf('NN_PORT', 50070)
TT_PORT = _get_conf('TT_PORT', 50060)

View File

@ -0,0 +1,53 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import telnetlib
from savanna.tests.integration import base
class ClusterFromClusterTemplateCrudTest(base.ITestCase):
def setUp(self):
super(ClusterFromClusterTemplateCrudTest, self).setUp()
telnetlib.Telnet(self.host, self.port)
self.create_node_group_templates()
def crud_clstr_cltr_tmpl(self, node_list):
cl_tmpl_id = ''
try:
cl_tmpl_body = self.make_cluster_template('cl-tmpl', node_list)
cl_tmpl_id = self.get_object_id(
'cluster_template', self.post_object(self.url_cl_tmpl,
cl_tmpl_body, 202))
clstr_body = self.make_cl_body_cluster_template(cl_tmpl_id)
self.crud_object(clstr_body, self.url_cluster)
except Exception as e:
self.fail('fail: ' + str(e))
finally:
self.del_object(self.url_cl_tmpl_with_slash, cl_tmpl_id, 204)
def test_cluster_nnttdn_jt(self):
node_list = {self.id_nn_tt_dn: 1, self.id_jt: 1}
self.crud_clstr_cltr_tmpl(node_list)
def test_cluster_jtttdn_nn(self):
node_list = {self.id_jt_tt_dn: 1, self.id_nn: 1}
self.crud_clstr_cltr_tmpl(node_list)
def tearDown(self):
self.delete_node_group_templates()

View File

@ -14,12 +14,6 @@ case $1 in
gn)
FUNC="get_job_name"
;;
lt)
FUNC="get_list_active_trackers"
;;
ld)
FUNC="get_list_active_datanodes"
;;
ed)
FUNC="check_exist_directory"
;;
@ -130,18 +124,6 @@ get_job_name() {
sudo su -c "cd $HADOOP_DIRECTORY && hadoop job -list all | tail -n1" hadoop | awk '{print $1}' 2>>$log
}
get_list_active_trackers() {
f_create_log_dir
f_var_check v_hadoop_directory
sudo su -c "cd $HADOOP_DIRECTORY && hadoop job -list-active-trackers" hadoop | wc -l 2>>$log
}
get_list_active_datanodes() {
f_create_log_dir
f_var_check v_hadoop_directory
sudo su -c "hadoop dfsadmin -report" hadoop | grep "Datanodes available:.*" | awk '{print $3}' 2>>$log
}
check_exist_directory() {
f_var_check v_job_name
f_var_check v_hadoop_log_directory

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import telnetlib
import time
@ -29,70 +28,26 @@ class TestHadoop(base.ITestCase):
self.create_node_group_templates()
def _hadoop_testing(self, node_list):
cl_tmpl_id = None
cluster_id = None
try:
cl_tmpl_body = self.make_cluster_template('cl-tmpl', node_list)
cl_tmpl_id = self.get_object_id(
'cluster_template', self.post_object(self.url_cl_tmpl,
cl_tmpl_body, 202))
clstr_body = self.make_cl_body_cluster_template(cl_tmpl_id)
clstr_body['name'] = param.CLUSTER_NAME_HADOOP
data = self.post_object(self.url_cluster, clstr_body, 202)
data = data['cluster']
cluster_id = data.pop('id')
self.await_cluster_active(self.url_cluster_with_slash, cluster_id)
time.sleep(30)
get_data = self.get_object(
self.url_cluster_with_slash, cluster_id, 200, True)
get_data = get_data['cluster']
node_groups = get_data['node_groups']
ip_instances = {}
for node_group in node_groups:
instances = node_group['instances']
for instans in instances:
management_ip = instans['management_ip']
ip_instances[management_ip] = node_group[
'node_processes']
cluster_id = self.create_cluster_using_ngt_and_get_id(
node_list, param.CLUSTER_NAME_HADOOP)
ip_instances = self.get_instances_ip_and_node_processes_list(
cluster_id)
namenode_ip = None
tasktracker_count = 0
datanode_count = 0
node_count = 0
try:
for key, value in ip_instances.items():
telnetlib.Telnet(key, '22')
if 'namenode' in value:
namenode_ip = key
telnetlib.Telnet(key, '50070')
if 'tasktracker' in value:
tasktracker_count += 1
telnetlib.Telnet(key, '50060')
if 'datanode' in value:
datanode_count += 1
telnetlib.Telnet(key, '50075')
if 'jobtracker' in value:
telnetlib.Telnet(key, '50030')
node_count += 1
clstr_info = self.get_namenode_ip_and_tt_dn_count(ip_instances)
namenode_ip = clstr_info['namenode_ip']
node_count = clstr_info['node_count']
self.await_active_workers_for_namenode(clstr_info)
except Exception as e:
self.fail('telnet instances has failure: ' + str(e))
this_dir = os.getcwd()
self.fail(str(e))
try:
for key in ip_instances:
self.transfer_script_to_node(key, this_dir, 'hadoop_test',
'hadoop_test_script.sh')
self.transfer_script_to_node(key)
except Exception as e:
self.fail('failure in transfer script: ' + str(e))
self.assertEqual(int(self.execute_command(
namenode_ip, './script.sh lt -hd %s'
% param.HADOOP_DIRECTORY)[1]), tasktracker_count,
msg='compare number active trackers is failure: ')
self.assertEqual(int(self.execute_command(
namenode_ip, './script.sh ld -hd %s' %
param.HADOOP_DIRECTORY)[1]), datanode_count,
msg='compare number active datanodes is failure:')
try:
self.execute_command(
namenode_ip, './script.sh pi -nc %s -hv %s -hd %s'
@ -101,8 +56,9 @@ class TestHadoop(base.ITestCase):
except Exception as e:
print(self.read_file_from(namenode_ip,
'/tmp/outputTestMapReduce/log.txt'))
self.fail('run pi script is failure: ' + str(e))
self.fail(
'run pi script has failed: '
+ str(e))
try:
job_name = self.execute_command(
namenode_ip, './script.sh gn -hd %s'
@ -137,7 +93,6 @@ class TestHadoop(base.ITestCase):
finally:
self.del_object(self.url_cluster_with_slash, cluster_id, 204)
time.sleep(5)
self.del_object(self.url_cl_tmpl_with_slash, cl_tmpl_id, 204)
def test_hadoop_single_master(self):
"""This test checks hadoop work

View File

@ -0,0 +1,131 @@
#!/bin/bash
dir=/tmp
log=$dir/log_config.txt
case $1 in
NameNodeHeapSize)
FUNC="nn_size"
;;
JobTrackerHeapSize)
FUNC="jt_size"
;;
DataNodeHeapSize)
FUNC="dn_size"
;;
TaskTrackerHeapSize)
FUNC="tt_size"
;;
EnableSwift)
FUNC="check_swift"
;;
dfs.replication)
FUNC="dfs_replication"
;;
mapred.map.tasks.speculative.execution)
FUNK="mapred_map_tasks_speculative_execution"
;;
mapred.child.java.opts)
FUNK="mapred_child_java_opts"
;;
esac
shift
until [ -z $1 ]
do
if [ "$1" = "-val" ]
then
VALUE="$2"
fi
if [ "$1" = "-url" ]
then
URL="$2"
fi
shift
done
f_var_check() {
case "$1" in
config_value)
if [ -z "$VALUE" ]
then
echo "config_value_is_not_specified"
exit 0
fi
;;
v_url)
if [ -z "$URL" ]
then
echo "url_is_not_specified"
exit 0
fi
;;
esac
}
compare_and_exit() {
f_var_check config_value
if [ "$VALUE" = "$1" ]; then exit 0; else echo "$VALUE != $1" && exit 1; fi
}
check_size() {
s=`ps aux | grep java | grep $1 | grep -o 'Xmx[0-9]\{1,10\}m' | tail -n 1 | grep -o '[0-9]\{1,100\}'`
compare_and_exit "$s"
}
nn_size() {
check_size "namenode"
}
jt_size() {
check_size "jobtracker"
}
dn_size() {
check_size "datanode"
}
tt_size() {
check_size "tasktracker"
}
check_swift() {
f_var_check config_value
f_var_check v_url
sudo apt-get -y --force-yes install python-pip
sleep 1
sudo pip install python-swiftclient==1.2.0
sleep 1
sudo pip install python-keystoneclient
sleep 1
echo "$URL"
export ST_AUTH="$URL"
export ST_USER="ci:admin"
export ST_KEY="swordfish"
sleep 1
swift -V2.0 delete configTesting
swift -V2.0 post configTesting
echo "Test hadoop config- Enable Swift" > /tmp/swiftlog.txt
sudo su -c "hadoop dfs -copyFromLocal /tmp/swiftlog.txt /" hadoop
sudo su -c "hadoop distcp -D fs.swift.service.savanna.username=admin -D fs.swift.service.savanna.tenant=ci -D fs.swift.service.savanna.password=swordfish /swiftlog.txt swift://configTesting.savanna/" hadoop
if [ -z `swift -V2.0 list configTesting | grep -o "swiftlog.txt"` ]; then val="False"; else val="True"; fi
compare_and_exit "$val" "$VALUE"
}
dfs_replication() {
s=`cat /etc/hadoop/hdfs-site.xml | grep -A 1 '.*dfs.replication.*' | tail -n 1 | grep -o "[0-9]\{1,10\}"`
compare_and_exit "$s"
}
mapred_map_tasks_speculative_execution() {
s=`cat /etc/hadoop/mapred-site.xml | grep -A 1 '.*mapred.map.tasks.speculative.execution.*' | tail -n 1 | grep -o "[a-z]\{4,5\}" | grep -v "value"`
compare_and_exit "$s"
}
mapred_child_java_opts() {
s=`cat /etc/hadoop/mapred-site.xml | grep -A 1 '.*mapred.child.java.opts.*' | tail -n 1 | grep -o "\-Xmx[0-9]\{1,10\}m"`
compare_and_exit "$s"
}
$FUNC

View File

@ -0,0 +1,150 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import telnetlib
from savanna.tests.integration import base
import savanna.tests.integration.configs.parameters as param
def _add_config(body, config):
if config in [param.NAMENODE_CONFIG, param.DATANODE_CONFIG]:
body['node_configs']['HDFS'] = config
elif config == param.GENERAL_CONFIG:
body['cluster_configs']['general'] = config
elif config == param.CLUSTER_HDFS_CONFIG:
body['cluster_configs']['HDFS'] = config
elif config == param.CLUSTER_MAPREDUCE_CONFIG:
body['cluster_configs']['MapReduce'] = config
else:
body['node_configs']['MapReduce'] = config
class ClusterConfigTest(base.ITestCase):
def setUp(self):
super(ClusterConfigTest, self).setUp()
telnetlib.Telnet(self.host, self.port)
def assertConfigs(self, get_config, param_config):
self.assertEqual(get_config, param_config,
msg='configs are not equal: %s != %s'
% (str(get_config), str(param_config)))
def assertConfigOnNode(self, host, config, value):
conf = config.replace(' ', '')
com = self.execute_command(host, './script.sh %s -val %s -url %s' %
(conf, value, param.OS_AUTH_URL))
self.assertEqual(com[0], 0,
msg='host: %s, config %s is not equal: %s'
% (host, config, value))
def _cluster_config_testing(self, cluster_body):
cluster_id = None
try:
_add_config(cluster_body, param.GENERAL_CONFIG)
_add_config(cluster_body, param.CLUSTER_HDFS_CONFIG)
_add_config(cluster_body, param.CLUSTER_MAPREDUCE_CONFIG)
cluster_id = self.create_cluster_and_get_id(cluster_body)
get_data = self.get_object(self.url_cluster_with_slash,
cluster_id, 200, True)
get_data = get_data['cluster']
self.assertConfigs(get_data['cluster_configs']['general'],
param.GENERAL_CONFIG)
self.assertConfigs(get_data['cluster_configs']['HDFS'],
param.CLUSTER_HDFS_CONFIG)
self.assertConfigs(get_data['cluster_configs']['MapReduce'],
param.CLUSTER_MAPREDUCE_CONFIG)
node_groups = get_data['node_groups']
ip_instances = {}
process_map = {
'namenode': {
'service': 'HDFS', 'param': param.NAMENODE_CONFIG},
'jobtracker': {
'service': 'MapReduce', 'param': param.JOBTRACKER_CONFIG},
'datanode': {
'service': 'HDFS', 'param': param.DATANODE_CONFIG},
'tasktracker': {
'service': 'MapReduce', 'param': param.TASKTRACKER_CONFIG}
}
def get_node_configs(node_group, process):
return \
node_group['node_configs'][process_map[process]['service']]
def get_param(process):
return process_map[process]['param']
for node_group in node_groups:
for process in node_group['node_processes']:
self.assertConfigs(
get_node_configs(node_group,
process), get_param(process))
instances = node_group['instances']
for instans in instances:
management_ip = instans['management_ip']
self.transfer_script_to_node(
management_ip, 'test_config/config_test_script.sh')
ip_instances[management_ip] = node_group[
'node_processes']
try:
for key, processes in ip_instances.items():
telnetlib.Telnet(key, '22')
for conf, value in param.CLUSTER_MAPREDUCE_CONFIG.items():
self.assertConfigOnNode(key, conf, value)
for conf, value in param.CLUSTER_HDFS_CONFIG.items():
self.assertConfigOnNode(key, conf, value)
for process in processes:
for sec_key, sec_value in get_param(process).items():
self.assertConfigOnNode(key, sec_key, sec_value)
if 'namenode' in processes:
for sec_key, sec_value in param.GENERAL_CONFIG.items():
self.assertConfigOnNode(
key, sec_key, sec_value)
except Exception as e:
self.fail(e.message)
except Exception as e:
self.fail(e.message)
finally:
self.del_object(self.url_cluster_with_slash, cluster_id, 204)
def test_cluster_config_nnjt_ttdn(self):
id_master_ngt = None
id_worker_ngt = None
try:
master_ngt_body = self.make_node_group_template(
'master-ngt', 'qa probe', 'JT+NN')
_add_config(master_ngt_body, param.NAMENODE_CONFIG)
_add_config(master_ngt_body, param.JOBTRACKER_CONFIG)
id_master_ngt = self.get_object_id(
'node_group_template', self.post_object(self.url_ngt,
master_ngt_body, 202))
worker_ngt_body = self.make_node_group_template(
'worker-ngt', 'qa probe', 'TT+DN')
_add_config(worker_ngt_body, param.DATANODE_CONFIG)
_add_config(worker_ngt_body, param.TASKTRACKER_CONFIG)
id_worker_ngt = self.get_object_id(
'node_group_template', self.post_object(self.url_ngt,
worker_ngt_body, 202))
ngt_id_list = {id_master_ngt: 1, id_worker_ngt: 2}
cl_body = self.make_cl_body_node_group_templates(ngt_id_list)
self._cluster_config_testing(cl_body)
except Exception as e:
self.fail(str(e))
finally:
self.del_object(self.url_ngt_with_slash, id_master_ngt, 204)
self.del_object(self.url_ngt_with_slash, id_worker_ngt, 204)