Merge "Added hadoop testing"

This commit is contained in:
Jenkins 2013-06-26 21:17:28 +00:00 committed by Gerrit Code Review
commit 14af5e04a7
5 changed files with 378 additions and 30 deletions

View File

@ -15,9 +15,9 @@
import json
import eventlet
import keystoneclient.v2_0
import requests
import time
import unittest2
import savanna.tests.integration.parameters as param
@ -130,6 +130,7 @@ class ITestCase(unittest2.TestCase):
data = self.post_object(url, body, 202)
get_url = None
object_id = None
crud_object = None
try:
if url == self.url_cluster:
crud_object = 'cluster'
@ -145,9 +146,11 @@ class ITestCase(unittest2.TestCase):
if crud_object == 'cluster':
self.await_cluster_active(get_url, object_id)
except Exception as e:
self.fail('failure: ' + e.message)
self.fail('failure: ' + str(e))
finally:
self.del_object(get_url, object_id, 204)
if crud_object == 'cluster':
time.sleep(30)
return object_id
def await_cluster_active(self, get_url, object_id):
@ -157,12 +160,13 @@ class ITestCase(unittest2.TestCase):
while get_data['status'] != 'Active':
print 'GET_STATUS: ', get_data['status']
if i > int(param.TIMEOUT) * 6:
print("json for cluster: \n" + get_data + "\n")
self.fail(
'cluster not Starting -> Active, passed %d minutes'
% param.TIMEOUT)
get_data = self.get_object(get_url, object_id, 200)
get_data = get_data['cluster']
eventlet.sleep(10)
time.sleep(10)
i += 1
def get_object_id(self, obj, body):
@ -316,41 +320,41 @@ class ITestCase(unittest2.TestCase):
self.id_tt = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'worker_tt', 'qa probe', 'TT'), 202))
'worker-tt', 'qa probe', 'TT'), 202))
self.id_jt = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'master_jt', 'qa probe', 'JT'), 202))
'master-jt', 'qa probe', 'JT'), 202))
self.id_nn = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'master_nn', 'qa probe', 'NN'), 202))
'master-nn', 'qa probe', 'NN'), 202))
self.id_dn = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'worker_dn', 'qa probe', 'DN'), 202))
'worker-dn', 'qa probe', 'DN'), 202))
self.id_tt_dn = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'worker_tt_dn', 'qa probe', 'TT+DN'), 202))
'worker-tt-dn', 'qa probe', 'TT+DN'), 202))
self.id_jt_nn = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'master_jt_nn', 'qa probe', 'JT+NN'), 202))
'master-jt-nn', 'qa probe', 'JT+NN'), 202))
self.id_nn_tt_dn = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'nn_tt_dn', 'qa probe', 'NN+TT+DN'), 202))
'nn-tt-dn', 'qa probe', 'NN+TT+DN'), 202))
self.id_jt_tt_dn = self.get_object_id(
'node_group_template', self.post_object(
self.url_ngt, self.make_node_group_template(
'jt_tt_dn', 'qa probe', 'JT+TT+DN'), 202))
'jt-tt-dn', 'qa probe', 'JT+TT+DN'), 202))
#---------------------delete_node_group_template-------------------------------

View File

@ -1,20 +1,26 @@
OS_USERNAME = 'admin' # username for nova
OS_PASSWORD = 'password' # password for nova
OS_USERNAME = 'admin' # username for nova
OS_PASSWORD = 'password' # password for nova
OS_TENANT_NAME = 'admin'
OS_AUTH_URL = 'http://localhost:5000/v2.0/' # URL for keystone
OS_AUTH_URL = 'http://192.168.1.1:35357/v2.0/' # URL for keystone
SAVANNA_HOST = 'localhost' # IP for Savanna API
SAVANNA_PORT = '8080' # port for Savanna API
SAVANNA_HOST = '192.168.1.1' # IP for Savanna API
SAVANNA_PORT = '8080' # port for Savanna API
IMAGE_ID = '2' # ID for instance image
FLAVOR_ID = 'abc9de12-fdc42ea25'
IMAGE_ID = '42' # ID for instance image
FLAVOR_ID = '42'
CLUSTER_NAME_CRUD = 'cluster-crud' # cluster name for crud operations
NODE_USERNAME = 'username' # username for master node
TIMEOUT = 15 # time of waiting provisioning cluster
CLUSTER_NAME_CRUD = 'cluster-name-crud' # cluster name for crud operations
CLUSTER_NAME_HADOOP = 'cluster-name-hadoop' # cluster name for hadoop testing
PLUGIN_NAME = 'vanilla' # name for plugin
TIMEOUT = 15 # cluster creation timeout (in minutes)
HADOOP_VERSION = '1.1.2' # version of Hadoop
HADOOP_VERSION = '1.1.2'
HADOOP_DIRECTORY = '/usr/share/hadoop'
HADOOP_LOG_DIRECTORY = '/mnt/log/hadoop/hadoop/userlogs'
SSH_KEY = 'vpupkin' # ssh public key which Savanna transfers to nodes of cluster for access of users to virtual machines via key
SSH_KEY = 'ssh_key' # ssh public key which Savanna transfers to nodes of cluster for access of users to virtual machines via key
PATH_TO_SSH = '/home/user/.ssh/id_rsa'
PLUGIN_NAME = 'vanilla'

View File

@ -0,0 +1,153 @@
#!/bin/bash
#touch script.sh && chmod +x script.sh && vim script.sh
dir=/tmp/outputTestMapReduce
log=$dir/log.txt
case $1 in
mr)
FUNC="map_reduce"
;;
pi)
FUNC="run_pi_job"
;;
gn)
FUNC="get_job_name"
;;
lt)
FUNC="get_list_active_trackers"
;;
ld)
FUNC="get_list_active_datanodes"
;;
ed)
FUNC="check_exist_directory"
;;
esac
shift
until [ -z $1 ]
do
if [ "$1" = "-nc" ]
then
NODE_COUNT="$2"
elif [ "$1" = "-jn" ]
then
JOB_NAME="$2"
elif [ "$1" = "-hv" ]
then
HADOOP_VERSION="$2"
elif [ "$1" = "-hd" ]
then
HADOOP_DIRECTORY="$2"
elif [ "$1" = "-hld" ]
then
HADOOP_LOG_DIRECTORY="$2"
fi
shift
done
f_var_check() {
case "$1" in
v_node_count)
if [ -z "$NODE_COUNT" ]
then
echo "count_of_node_not_specified"
exit 0
fi
;;
v_job_name)
if [ -z "$JOB_NAME" ]
then
echo "job_name_not_specified"
exit 0
fi
;;
v_hadoop_version)
if [ -z "$HADOOP_VERSION" ]
then
echo "hadoop_version_not_specified"
exit 0
fi
;;
v_hadoop_directory)
if [ -z "$HADOOP_DIRECTORY" ]
then
echo "hadoop_directory_not_specified"
exit 0
fi
;;
v_hadoop_log_directory)
if [ -z "$HADOOP_LOG_DIRECTORY" ]
then
echo "hadoop_log_directory_not_specified"
exit 0
fi
;;
esac
}
f_create_log_dir() {
rm -r $dir 2>/dev/null
mkdir $dir
chmod -R 777 $dir
touch $log
}
map_reduce() {
f_create_log_dir
f_var_check v_hadoop_version
f_var_check v_hadoop_directory
echo "
[------ dpkg------]
`dpkg --get-selections | grep hadoop`
[------jps------]
`jps | grep -v Jps`
[------netstat------]
`sudo netstat -plten | grep java`
[------test for hdfs------]">>$log
echo `dmesg > $dir/input` 2>>$log
sudo su -c "hadoop dfs -ls /" hadoop &&
sudo su -c "hadoop dfs -mkdir /test" hadoop &&
sudo su -c "hadoop dfs -copyFromLocal $dir/input /test/mydata" hadoop 2>>$log
echo "[------start job------]">>$log &&
sudo su -c "cd $HADOOP_DIRECTORY && hadoop jar hadoop-examples-$HADOOP_VERSION.jar wordcount /test/mydata /test/output" hadoop 2>>$log &&
sudo su -c "hadoop dfs -copyToLocal /test/output/ $dir/out/" hadoop 2>>$log &&
sudo su -c "hadoop dfs -rmr /test" hadoop 2>>$log
}
run_pi_job() {
f_var_check v_node_count
f_var_check v_hadoop_version
f_var_check v_hadoop_directory
f_create_log_dir
sudo su -c "cd $HADOOP_DIRECTORY && hadoop jar hadoop-examples-$HADOOP_VERSION.jar pi $[$NODE_COUNT*10] 1000" hadoop 2>>$log
}
get_job_name() {
f_var_check v_hadoop_directory
sudo su -c "cd $HADOOP_DIRECTORY && hadoop job -list all | tail -n1" hadoop | awk '{print $1}' 2>>$log
}
get_list_active_trackers() {
f_create_log_dir
f_var_check v_hadoop_directory
sudo su -c "cd $HADOOP_DIRECTORY && hadoop job -list-active-trackers" hadoop | wc -l 2>>$log
}
get_list_active_datanodes() {
f_create_log_dir
f_var_check v_hadoop_directory
sudo su -c "hadoop dfsadmin -report" hadoop | grep "Datanodes available:.*" | awk '{print $3}' 2>>$log
}
check_exist_directory() {
f_var_check v_job_name
f_var_check v_hadoop_log_directory
if ! [ -d $HADOOP_LOG_DIRECTORY/$JOB_NAME ];
then echo "directory_not_found" && exit 1
fi
}
$FUNC

View File

@ -9,20 +9,27 @@ def _get_conf(key, default):
OS_USERNAME = _get_conf('OS_USERNAME', 'admin')
OS_PASSWORD = _get_conf('OS_PASSWORD', 'password')
OS_TENANT_NAME = _get_conf('OS_TENANT_NAME', 'admin')
OS_AUTH_URL = _get_conf('OS_AUTH_URL', 'http://localhost:5000/v2.0/')
OS_AUTH_URL = _get_conf('OS_AUTH_URL', 'http://192.168.1.1:35357/v2.0/')
SAVANNA_HOST = _get_conf('SAVANNA_HOST', 'localhost')
SAVANNA_HOST = _get_conf('SAVANNA_HOST', '192.168.1.1')
SAVANNA_PORT = _get_conf('SAVANNA_PORT', '8080')
IMAGE_ID = _get_conf('IMAGE_ID', 'ab12cde4-fgh17')
FLAVOR_ID = _get_conf('FLAVOR_ID', '2')
IMAGE_ID = _get_conf('IMAGE_ID', '42')
FLAVOR_ID = _get_conf('FLAVOR_ID', '42')
NODE_USERNAME = _get_conf('NODE_USERNAME', 'username')
CLUSTER_NAME_CRUD = _get_conf('CLUSTER_NAME_CRUD', 'cluster-crud')
CLUSTER_NAME_HADOOP = _get_conf('CLUSTER_NAME_HADOOP', 'cluster-hadoop')
TIMEOUT = _get_conf('TIMEOUT', 15)
PLUGIN_NAME = _get_conf('PLUGIN_NAME', 'vanilla')
HADOOP_VERSION = _get_conf('HADOOP_VERSION', '1.1.2')
HADOOP_DIRECTORY = _get_conf('HADOOP_DIRECTORY', '/usr/share/hadoop')
HADOOP_LOG_DIRECTORY = _get_conf('HADOOP_LOG_DIRECTORY',
'/var/log/hadoop/hadoop/userlogs')
SSH_KEY = _get_conf('SSH_KEY', 'vpupkin')
SSH_KEY = _get_conf('SSH_KEY', 'jenkins')
PATH_TO_SSH = _get_conf('PATH_TO_SSH', '/home/user/.ssh/id_rsa')
PLUGIN_NAME = _get_conf('PLUGIN_NAME', 'vanilla')

View File

@ -0,0 +1,178 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import os
import telnetlib
import time
from savanna.tests.integration import base
import savanna.tests.integration.parameters as param
from savanna.utils import remote
def ssh_connection(host):
return remote.setup_ssh_connection(host, param.NODE_USERNAME,
open(param.PATH_TO_SSH).read())
def execute_command(host, cmd):
with contextlib.closing(ssh_connection(host)) as ssh:
return remote.execute_command(ssh, cmd)
def write_file_to(host, remote_file, data):
with contextlib.closing(ssh_connection(host)) as ssh:
return remote.write_file_to(ssh.open_sftp(), remote_file, data)
def read_file_from(host, remote_file):
with contextlib.closing(ssh_connection(host)) as ssh:
return remote.read_file_from(ssh.open_sftp(), remote_file)
def _transfer_script_to_node(host, directory):
write_file_to(str(host),
'script.sh',
open('%s/integration/hadoop_test_script.sh'
% directory).read())
execute_command(str(host), 'chmod 777 script.sh')
class TestHadoop(base.ITestCase):
def setUp(self):
super(TestHadoop, self).setUp()
telnetlib.Telnet(self.host, self.port)
self.create_node_group_templates()
def _hadoop_testing(self, node_list):
cl_tmpl_id = None
cluster_id = None
try:
cl_tmpl_body = self.make_cluster_template('cl-tmpl', node_list)
cl_tmpl_id = self.get_object_id(
'cluster_template', self.post_object(self.url_cl_tmpl,
cl_tmpl_body, 202))
clstr_body = self.make_cl_body_cluster_template(
param.PLUGIN_NAME, param.HADOOP_VERSION, cl_tmpl_id)
data = self.post_object(self.url_cluster, clstr_body, 202)
data = data['cluster']
cluster_id = data.pop('id')
self.await_cluster_active(self.url_cluster_with_slash, cluster_id)
time.sleep(30)
get_data = self.get_object(
self.url_cluster_with_slash, cluster_id, 200, True)
get_data = get_data['cluster']
node_groups = get_data['node_groups']
ip_instances = {}
for node_group in node_groups:
instances = node_group['instances']
for instans in instances:
management_ip = instans['management_ip']
ip_instances['%s' % management_ip] = node_group[
'node_processes']
namenode_ip = None
tasktracker_count = 0
datanode_count = 0
node_count = 0
try:
for key, value in ip_instances.items():
telnetlib.Telnet(key, '22')
if 'namenode' in value:
namenode_ip = key
telnetlib.Telnet(key, '50070')
if 'tasktracker' in value:
tasktracker_count += 1
telnetlib.Telnet(key, '50060')
if 'datanode' in value:
datanode_count += 1
telnetlib.Telnet(key, '50075')
if 'jobtracker' in value:
telnetlib.Telnet(key, '50030')
node_count += 1
except Exception as e:
self.fail('telnet instances has failure: ' + str(e))
this_dir = os.getcwd()
try:
for key in ip_instances:
_transfer_script_to_node(key, this_dir)
except Exception as e:
self.fail('failure in transfer script: ' + str(e))
self.assertEqual(int(execute_command(
namenode_ip, './script.sh lt -hd %s'
% param.HADOOP_DIRECTORY)[1]), tasktracker_count,
msg='compare number active trackers is failure: ')
self.assertEqual(int(execute_command(
namenode_ip, './script.sh ld -hd %s' %
param.HADOOP_DIRECTORY)[1]), datanode_count,
msg='compare number active datanodes is failure:')
try:
execute_command(
namenode_ip, './script.sh pi -nc %s -hv %s -hd %s'
% (node_count, param.HADOOP_VERSION,
param.HADOOP_DIRECTORY))
except Exception as e:
print(read_file_from(namenode_ip,
'/tmp/outputTestMapReduce/log.txt'))
self.fail(
'run pi script is failure: '
+ str(e))
try:
job_name = execute_command(
namenode_ip, './script.sh gn -hd %s'
% param.HADOOP_DIRECTORY)[1]
if job_name == "JobId":
self.fail()
except Exception as e:
self.fail('fail in get job name: ' + str(e))
for key, value in ip_instances.items():
if 'datanode' in value or 'tasktracker' in value:
self.assertEquals(
execute_command(
key, './script.sh ed -jn %s -hld %s'
% (job_name[:-1],
param.HADOOP_LOG_DIRECTORY))[0], 0,
msg='fail in check run job in worker nodes: ')
try:
self.assertEquals(
execute_command(
namenode_ip, './script.sh mr -hv %s -hd %s'
% (param.HADOOP_VERSION,
param.HADOOP_DIRECTORY))[0], 0)
except Exception as e:
print(read_file_from(namenode_ip,
'/tmp/outputTestMapReduce/log.txt'))
self.fail('run hdfs script is failure: ' + str(e))
except Exception as e:
self.fail(str(e))
finally:
self.del_object(self.url_cluster_with_slash, cluster_id, 204)
time.sleep(20)
self.del_object(self.url_cl_tmpl_with_slash, cl_tmpl_id, 204)
def test_hadoop_single_master(self):
node_list = {self.id_jt_nn: 1, self.id_tt_dn: 1}
self._hadoop_testing(node_list)
def tearDown(self):
self.delete_node_group_templates()