Revert "Integration tests for hadoop were added."

This reverts commit ff07ad8d65
This commit is contained in:
Sergey Lukjanov 2013-05-15 13:36:42 +00:00 committed by Gerrit Code Review
parent ff07ad8d65
commit bf7a737c93
5 changed files with 221 additions and 330 deletions

View File

@ -53,48 +53,222 @@ class ValidationTestCase(unittest.TestCase):
self.flavor_id = 'm1.medium'
self.image_id = SAVANNA_IMAGE_ID
self.url_nt = '/v0.2/%s/node-templates' % self.tenant
self.url_nt_slash = '/v0.2/%s/node-templates/' % self.tenant
self.url_cluster = '/v0.2/%s/clusters' % self.tenant
self.url_cl_slash = '/v0.2/%s/clusters/' % self.tenant
self.url_nt_not_json = '/v0.2/%s/node-templates/' % self.tenant
#----------------------CRUD_comands--------------------------------------------
#----------------------add_value_for_node_templates----------------------------
self.jtnn = dict(
node_template=dict(
name='test-template-1',
node_type='JT+NN',
flavor_id=self.flavor_id,
job_tracker={
'heap_size': '1234'
},
name_node={
'heap_size': '2345'
}
))
self.ttdn = dict(
node_template=dict(
name='test-template-2',
node_type='TT+DN',
flavor_id=self.flavor_id,
task_tracker={
'heap_size': '1234'
},
data_node={
'heap_size': '2345'
}
))
self.jt = dict(
node_template=dict(
name='test-template-3',
node_type='JT',
flavor_id=self.flavor_id,
job_tracker={
'heap_size': '1234'
}
))
self.nn = dict(
node_template=dict(
name='test-template-4',
node_type='NN',
flavor_id=self.flavor_id,
name_node={
'heap_size': '2345'
}
))
self.tt = dict(
node_template=dict(
name='test-template-5',
node_type='TT',
flavor_id=self.flavor_id,
task_tracker={
'heap_size': '2345'
}
))
self.dn = dict(
node_template=dict(
name='test-template-6',
node_type='DN',
flavor_id=self.flavor_id,
data_node={
'heap_size': '2345'
}
))
self.get_ttdn = {
u'name': u'test-template-2',
u'data_node': {u'heap_size': u'2345'},
u'task_tracker': {u'heap_size': u'1234'},
u'node_type': {
u'processes': [u'task_tracker',
u'data_node'],
u'name': u'TT+DN'},
u'flavor_id': u'm1.medium'
}
self.get_jtnn = {
u'name': u'test-template-1',
u'name_node': {u'heap_size': u'2345'},
u'job_tracker': {u'heap_size': u'1234'},
u'node_type': {
u'processes': [u'job_tracker',
u'name_node'],
u'name': u'JT+NN'},
u'flavor_id': u'm1.medium'
}
self.get_nn = {
u'name': u'test-template-4',
u'name_node': {u'heap_size': u'2345'},
u'node_type': {
u'processes': [u'name_node'],
u'name': u'NN'},
u'flavor_id': u'm1.medium'
}
self.get_jt = {
u'name': u'test-template-3',
u'job_tracker': {u'heap_size': u'1234'},
u'node_type': {
u'processes': [u'job_tracker'],
u'name': u'JT'},
u'flavor_id': u'm1.medium'
}
#----------------------add_value_for_clusters----------------------------------
self.url_cluster = '/v0.2/%s/clusters' % self.tenant
self.url_cluster_without_json = '/v0.2/%s/clusters/' % self.tenant
self.cluster_data_jtnn_ttdn = dict(
cluster=dict(
name='QA-test-cluster',
base_image_id=self.image_id,
node_templates={
'jt_nn.medium': 1,
'tt_dn.medium': 2
}
))
self.cluster_data_jtnn_ttdn_small = dict(
cluster=dict(
name='QA-test-cluster',
base_image_id=self.image_id,
node_templates={
'jt_nn.small': 1,
'tt_dn.small': 1
}
))
self.cluster_data_jtnn = dict(
cluster=dict(
name='test-cluster',
base_image_id=self.image_id,
node_templates={
'jt_nn.medium': 1
}
))
self.get_cluster_data_jtnn_ttdn = {
u'status': u'Starting',
u'service_urls': {},
u'name': u'QA-test-cluster',
u'base_image_id': u'%s' % self.image_id,
u'node_templates':
{
u'jt_nn.medium': 1,
u'tt_dn.medium': 2
},
u'nodes': []
}
self.get_cluster_data_jtnn_ttdn_small = {
u'status': u'Starting',
u'service_urls': {},
u'name': u'QA-test-cluster',
u'base_image_id': u'%s' % self.image_id,
u'node_templates':
{
u'jt_nn.small': 1,
u'tt_dn.small': 1
},
u'nodes': []
}
self.get_cluster_data_jtnn = {
u'status': u'Starting',
u'service_urls': {},
u'name': u'test-cluster',
u'base_image_id': u'%s' % self.image_id,
u'node_templates':
{
u'jt_nn.medium': 1
},
u'nodes': []
}
#---------------------close_setUp----------------------------------------------
def post(self, url, body):
URL = self.baseurl + url
resp = requests.post(URL, data=body, headers={
'x-auth-token': self.token, 'Content-Type': 'application/json'})
data = json.loads(resp.content) if resp.status_code == 202 \
else resp.content
print('URL = %s\ndata = %s\nresponse = %s\ndata = %s\n'
"x-auth-token": self.token, "Content-Type": "application/json"})
if resp.status_code == 202:
data = json.loads(resp.content)
else:
data = resp.content
print("URL = %s\ndata = %s\nresponse = %s\ndata = %s\n"
% (URL, body, resp.status_code, data))
return resp
def put(self, url, body):
URL = self.baseurl + url
resp = requests.put(URL, data=body, headers={
'x-auth-token': self.token, 'Content-Type': 'application/json'})
"x-auth-token": self.token, "Content-Type": "application/json"})
data = json.loads(resp.content)
print('URL = %s\ndata = %s\nresponse = %s\ndata = %s\n'
print("URL = %s\ndata = %s\nresponse = %s\ndata = %s\n"
% (URL, body, resp.status_code, data))
return resp
def get(self, url, printing):
def get(self, url):
URL = self.baseurl + url
resp = requests.get(URL, headers={'x-auth-token': self.token})
if printing:
print('URL = %s\nresponse = %s\n' % (URL, resp.status_code))
resp = requests.get(URL, headers={"x-auth-token": self.token})
print("URL = %s\nresponse = %s\n" % (URL, resp.status_code))
if resp.status_code != 200:
data = json.loads(resp.content)
print('data= %s\n' % data)
print("data= %s\n") % data
return resp
def delete(self, url):
URL = self.baseurl + url
resp = requests.delete(URL, headers={'x-auth-token': self.token})
print('URL = %s\nresponse = %s\n' % (URL, resp.status_code))
resp = requests.delete(URL, headers={"x-auth-token": self.token})
print("URL = %s\nresponse = %s\n" % (URL, resp.status_code))
if resp.status_code != 204:
data = json.loads(resp.content)
print('data= %s\n' % data)
print("data= %s\n") % data
return resp
def _post_object(self, url, body, code):
@ -103,8 +277,8 @@ class ValidationTestCase(unittest.TestCase):
data = json.loads(post.content)
return data
def _get_object(self, url, obj_id, code, printing=False):
rv = self.get(url + obj_id, printing)
def _get_object(self, url, obj_id, code):
rv = self.get(url + obj_id)
self.assertEquals(rv.status_code, code)
data = json.loads(rv.content)
return data
@ -121,127 +295,39 @@ class ValidationTestCase(unittest.TestCase):
eventlet.sleep(1)
code = self.delete(url + obj_id).status_code
#----------------------other_commands------------------------------------------
def _get_body_nt(self, name, nt_type, hs1, hs2):
node = 'name' if nt_type in ['JT+NN', 'NN'] else 'data'
tracker = 'job' if nt_type in ['JT+NN', 'JT'] else 'task'
processes_name = nt_type
nt = {
u'name': u'%s' % name,
u'%s_node' % node: {u'heap_size': u'%d' % hs1},
u'%s_tracker' % tracker: {u'heap_size': u'%d' % hs2},
u'node_type': {
u'processes': [u'%s_tracker' % tracker,
u'%s_node' % node],
u'name': u'%s' % processes_name},
u'flavor_id': u'%s' % self.flavor_id
}
if nt_type == 'NN':
del nt[u'%s_tracker' % tracker]
nt[u'node_type'][u'processes'] = [u'%s_node' % node]
elif nt_type == 'JT':
del nt[u'%s_node' % node]
nt[u'node_type'][u'processes'] = [u'%s_tracker' % tracker]
print("GET_BODY!!!!!!!!!!!" + str(nt))
return nt
def _get_body_cluster(self, name, master_name, worker_name, node_number):
return {
u'status': u'Starting',
u'service_urls': {},
u'name': u'%s' % name,
u'base_image_id': u'%s' % self.image_id,
u'node_templates':
{
u'%s' % master_name: 1,
u'%s' % worker_name: node_number
},
u'nodes': []
}
def change_field_nt(self, data, old_field, new_field):
val = data['node_template'][old_field]
del data['node_template'][old_field]
data['node_template'][new_field] = val
return data
def make_nt(self, nt_name, node_type, node1_size, node2_size):
nt = dict(
node_template=dict(
name=nt_name,
node_type='JT+NN',
flavor_id=self.flavor_id,
job_tracker={
'heap_size': '%d' % node1_size
},
name_node={
'heap_size': '%d' % node2_size
}
))
if node_type == 'JT+NN':
return nt
elif node_type == 'TT+DN':
nt['node_template']['node_type'] = 'TT+DN'
nt = self.change_field_nt(nt, 'job_tracker', 'task_tracker')
nt = self.change_field_nt(nt, 'name_node', 'data_node')
elif node_type == 'NN':
nt['node_template']['node_type'] = 'NN'
del nt['node_template']['job_tracker']
elif node_type == 'JT':
nt['node_template']['node_type'] = 'JT'
del nt['node_template']['name_node']
return nt
def make_cluster_body(self, cluster_name, name_master_node,
name_worker_node, number_workers):
body = dict(
cluster=dict(
name=cluster_name,
base_image_id=self.image_id,
node_templates={
'%s' % name_master_node: 1,
'%s' % name_worker_node: number_workers
}
))
return body
def delete_node_template(self, data):
data = data['node_template']
object_id = data.pop(u'id')
self._del_object(self.url_nt_slash, object_id, 204)
def _crud_object(self, body, get_body, url):
data = self._post_object(url, body, 202)
get_url = None
object_id = None
try:
obj = 'node_template' if url == self.url_nt else 'cluster'
get_url = self.url_nt_slash if url == self.url_nt \
else self.url_cl_slash
data = data['%s' % obj]
obj = "cluster"
get_url = self.url_cluster_without_json
if url == self.url_nt:
obj = "node_template"
get_url = self.url_nt_not_json
data = data["%s" % obj]
object_id = data.pop(u'id')
self.assertEquals(data, get_body)
get_data = self._get_object(get_url, object_id, 200)
get_data = get_data['%s' % obj]
del get_data[u'id']
if obj == 'cluster':
self._response_cluster(get_body, get_data, get_url, object_id)
if obj == "cluster":
self._asrtCluster(get_body, get_data, get_url, object_id)
except Exception as e:
self.fail('failure:' + str(e))
print("failure:" + str(e))
finally:
self._del_object(get_url, object_id, 204)
return object_id
def _response_cluster(self, get_body, get_data, get_url, object_id):
def _asrtCluster(self, get_body, get_data, get_url, object_id):
get_body[u'status'] = u'Active'
del get_body[u'service_urls']
del get_body[u'nodes']
i = 1
while get_data[u'status'] != u'Active':
if i > 60:
self.fail(
"cluster not Starting -> Active, remaining 10 minutes")
print(self.fail(
"cluster not Starting -> Active, remaining 10 minutes"))
get_data = self._get_object(get_url, object_id, 200)
get_data = get_data['cluster']
del get_data[u'id']

View File

@ -1,25 +0,0 @@
#!/bin/bash
#cd / && touch script.sh && chmod +x script.sh && vim script.sh
dir=/outputTestMapReduce
directory=/usr/share/hadoop
rm -r $dir
mkdir $dir
log=$dir/log.txt
echo `dmesg > $dir/input` 2>>$log
touch $log
chmod -R 777 $dir
echo "[------ dpkg------]">>$log
echo `dpkg --get-selections | grep hadoop` >>$log
echo "[------jps------]">>$log
echo `jps | grep -v Jps` >>$log
echo "[------netstat------]">>$log
echo `sudo netstat -plten | grep java` &>>$log
echo "[------test for hdfs------]">>$log
echo `dmesg > $dir/input` 2>>$log
su -c "hadoop dfs -ls /" hadoop
su -c "hadoop dfs -mkdir /test" hadoop &&
su -c "hadoop dfs -copyFromLocal $dir/input /test/mydata" hadoop 2>>$log &&
echo "[------start job------]">>$log &&
su -c "cd $directory && hadoop jar hadoop-examples-1.1.1.jar wordcount /test/mydata /test/output" hadoop 2>>$log &&
su -c "hadoop dfs -copyToLocal /test/output/ $dir/out/" hadoop 2>>$log &&
su -c "hadoop dfs -rmr /test" hadoop 2>>$log

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from savanna.tests.integration.db import ValidationTestCase
from telnetlib import Telnet
@ -24,20 +25,10 @@ class TestValidationApiForClusters(ValidationTestCase):
Telnet(self.host, self.port)
def test_crud_operation_for_cluster(self):
nt_body = self.make_nt('master_node.medium', 'JT+NN', 1234, 2345)
data_nt_master = self._post_object(self.url_nt, nt_body, 202)
get_body = copy.deepcopy(self.get_cluster_data_jtnn_ttdn)
self._crud_object(
self.cluster_data_jtnn_ttdn, get_body, self.url_cluster)
nt_body = self.make_nt('worker_node.medium', 'TT+DN', 1234, 2345)
data_nt_worker = self._post_object(self.url_nt, nt_body, 202)
try:
cluster_body = self.make_cluster_body(
'QA-cluster', 'master_node.medium', 'worker_node.medium', 3)
get_cluster_body = self._get_body_cluster(
'QA-cluster', 'master_node.medium', 'worker_node.medium', 3)
self._crud_object(cluster_body, get_cluster_body, self.url_cluster)
finally:
self.delete_node_template(data_nt_master)
self.delete_node_template(data_nt_worker)
def test_crud_operation_for_cluster_with_one_node(self):
get_body = copy.deepcopy(self.get_cluster_data_jtnn)
self._crud_object(self.cluster_data_jtnn, get_body, self.url_cluster)

View File

@ -1,152 +0,0 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import getcwd
import paramiko
from re import search
from savanna.tests.integration.db import ValidationTestCase
from telnetlib import Telnet
def _setup_ssh_connection(host, ssh):
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
host,
username='root',
password='swordfish'
)
def _open_channel_and_execute(ssh, cmd):
chan = ssh.get_transport().open_session()
chan.exec_command(cmd)
return chan.recv_exit_status()
def _execute_command_on_node(host, cmd):
ssh = paramiko.SSHClient()
try:
_setup_ssh_connection(host, ssh)
return _open_channel_and_execute(ssh, cmd)
finally:
ssh.close()
def _execute_transfer_on_node(host, locfile, nodefile):
try:
transport = paramiko.Transport(host)
transport.connect(username='root', password='swordfish')
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(locfile, nodefile)
finally:
sftp.close()
transport.close()
def _execute_transfer_from_node(host, nodefile, localfile):
try:
transport = paramiko.Transport(host)
transport.connect(username='root', password='swordfish')
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.get(nodefile, localfile)
finally:
sftp.close()
transport.close()
class TestForHadoop(ValidationTestCase):
def setUp(self):
super(TestForHadoop, self).setUp()
Telnet(self.host, self.port)
def _hadoop_testing(self, cluster_name, nt_name_master,
nt_name_worker, number_workers):
object_id = None
cluster_body = self.make_cluster_body(
cluster_name, nt_name_master, nt_name_worker, number_workers)
data = self._post_object(self.url_cluster, cluster_body, 202)
try:
data = data['cluster']
object_id = data.pop(u'id')
get_body = self._get_body_cluster(
cluster_name, nt_name_master, nt_name_worker, number_workers)
get_data = self._get_object(self.url_cl_slash, object_id, 200)
get_data = get_data['cluster']
del get_data[u'id']
self._response_cluster(
get_body, get_data, self.url_cl_slash, object_id)
get_data = self._get_object(
self.url_cl_slash, object_id, 200, True)
get_data = get_data['cluster']
namenode = get_data[u'service_urls'][u'namenode']
jobtracker = get_data[u'service_urls'][u'jobtracker']
p = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
m = search(p, namenode)
t = search(p, jobtracker)
namenode_ip = m.group('host')
namenode_port = m.group('port')
jobtracker_ip = t.group('host')
jobtracker_port = t.group('port')
Telnet(str(namenode_ip), str(namenode_port))
Telnet(str(jobtracker_ip), str(jobtracker_port))
this_dir = getcwd()
_execute_transfer_on_node(
str(namenode_ip), '%s/integration/script.sh' % this_dir,
'/script.sh')
try:
self.assertEquals(
_execute_command_on_node(
namenode_ip,
"cd .. && chmod 777 script.sh && ./script.sh"), 0)
except Exception as e:
_execute_transfer_from_node(
namenode_ip, '/outputTestMapReduce/log.txt',
'%s' % this_dir)
self.fail("run script is failure" + e.message)
except Exception as e:
self.fail("failure:" + e.message)
finally:
self._del_object(self.url_cl_slash, object_id, 204)
def test_hadoop_single_master(self):
data_nt_master = self._post_object(
self.url_nt, self.make_nt('master_node.medium', 'JT+NN',
1234, 2345), 202)
data_nt_worker = self._post_object(
self.url_nt, self.make_nt('worker_node.medium', 'TT+DN',
1234, 2345), 202)
try:
self._hadoop_testing(
'QA-hadoop', 'master_node.medium', 'worker_node.medium', 2)
except Exception as e:
self.fail("failure:" + str(e))
finally:
self.delete_node_template(data_nt_master)
self.delete_node_template(data_nt_worker)

View File

@ -18,31 +18,22 @@ from telnetlib import Telnet
class TestValidationApiForNodetemplates(ValidationTestCase):
def setUp(self):
super(TestValidationApiForNodetemplates, self).setUp()
Telnet(self.host, self.port)
def test_crud_nt_jtnn(self):
nt_jtnn = self.make_nt('jtnn', 'JT+NN', 1024, 1024)
get_jtnn = self._get_body_nt('jtnn', 'JT+NN', 1024, 1024)
self._crud_object(nt_jtnn, get_jtnn, self.url_nt)
self._crud_object(self.jtnn, self.get_jtnn.copy(),
self.url_nt)
def test_crud_nt_ttdn(self):
nt_ttdn = self.make_nt('ttdn', 'TT+DN', 1024, 1024)
get_ttdn = self._get_body_nt('ttdn', 'TT+DN', 1024, 1024)
self._crud_object(nt_ttdn, get_ttdn, self.url_nt)
self._crud_object(self.ttdn, self.get_ttdn.copy(),
self.url_nt)
def test_crud_nt_nn(self):
nt_nn = self.make_nt('nn', 'NN', 1024, 1024)
get_nn = self._get_body_nt('nn', 'NN', 1024, 1024)
self._crud_object(nt_nn, get_nn, self.url_nt)
self._crud_object(self.nn, self.get_nn.copy(),
self.url_nt)
def test_crud_nt_jt(self):
nt_jt = self.make_nt('jt', 'JT', 1024, 1024)
get_jt = self._get_body_nt('jt', 'JT', 1024, 1024)
self._crud_object(nt_jt, get_jt, self.url_nt)
self._crud_object(self.jt, self.get_jt.copy(),
self.url_nt)