Merge "Migrate integration tests to oslotest"
This commit is contained in:
commit
cdeafed735
@ -14,18 +14,19 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
import telnetlib
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
from keystoneclient.v2_0 import client as keystone_client
|
||||
from neutronclient.v2_0 import client as neutron_client
|
||||
from novaclient.v1_1 import client as nova_client
|
||||
from oslotest import base
|
||||
from saharaclient.api import base as client_base
|
||||
import saharaclient.client as sahara_client
|
||||
import six
|
||||
from swiftclient import client as swift_client
|
||||
import testtools
|
||||
from testtools import testcase
|
||||
|
||||
from sahara.openstack.common import excutils
|
||||
@ -69,7 +70,7 @@ def skip_test(config_name, message=''):
|
||||
return handle
|
||||
|
||||
|
||||
class ITestCase(testcase.WithAttributes, testtools.TestCase):
|
||||
class ITestCase(testcase.WithAttributes, base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(ITestCase, self).setUp()
|
||||
self.common_config = cfg.ITConfig().common_config
|
||||
@ -222,19 +223,25 @@ class ITestCase(testcase.WithAttributes, testtools.TestCase):
|
||||
def poll_cluster_state(self, cluster_id):
|
||||
data = self.sahara.clusters.get(cluster_id)
|
||||
timeout = self.common_config.CLUSTER_CREATION_TIMEOUT * 60
|
||||
while str(data.status) != 'Active':
|
||||
if str(data.status) == 'Error':
|
||||
self.fail('Cluster state == \'Error\'.')
|
||||
if timeout <= 0:
|
||||
self.fail(
|
||||
'Cluster did not return to \'Active\' state '
|
||||
'within %d minutes.'
|
||||
% self.common_config.CLUSTER_CREATION_TIMEOUT
|
||||
)
|
||||
data = self.sahara.clusters.get(cluster_id)
|
||||
time.sleep(10)
|
||||
timeout -= 10
|
||||
return str(data.status)
|
||||
|
||||
try:
|
||||
with fixtures.Timeout(timeout, gentle=True):
|
||||
while True:
|
||||
status = str(data.status)
|
||||
if status == 'Active':
|
||||
break
|
||||
if status == 'Error':
|
||||
self.fail('Cluster state == \'Error\'.')
|
||||
|
||||
time.sleep(10)
|
||||
data = self.sahara.clusters.get(cluster_id)
|
||||
|
||||
except fixtures.TimeoutException:
|
||||
self.fail("Cluster did not return to 'Active' state "
|
||||
"within %d minutes." %
|
||||
self.common_config.CLUSTER_CREATION_TIMEOUT)
|
||||
|
||||
return status
|
||||
|
||||
def get_cluster_node_ip_list_with_node_processes(self, cluster_id):
|
||||
data = self.sahara.clusters.get(cluster_id)
|
||||
@ -272,80 +279,83 @@ class ITestCase(testcase.WithAttributes, testtools.TestCase):
|
||||
def get_node_info(self, node_ip_list_with_node_processes, plugin_config):
|
||||
tasktracker_count = 0
|
||||
datanode_count = 0
|
||||
node_count = 0
|
||||
for node_ip, processes in node_ip_list_with_node_processes.items():
|
||||
self.try_telnet(node_ip, '22')
|
||||
node_count += 1
|
||||
for process in processes:
|
||||
if process in plugin_config.HADOOP_PROCESSES_WITH_PORTS:
|
||||
for i in range(self.common_config.TELNET_TIMEOUT * 60):
|
||||
try:
|
||||
time.sleep(1)
|
||||
telnetlib.Telnet(
|
||||
node_ip,
|
||||
plugin_config.HADOOP_PROCESSES_WITH_PORTS[
|
||||
process]
|
||||
)
|
||||
break
|
||||
timeout = self.common_config.TELNET_TIMEOUT * 60
|
||||
with fixtures.Timeout(timeout, gentle=True):
|
||||
accessible = False
|
||||
proc_with_ports = plugin_config.HADOOP_PROCESSES_WITH_PORTS
|
||||
while not accessible:
|
||||
accessible = True
|
||||
for node_ip, processes in six.iteritems(
|
||||
node_ip_list_with_node_processes):
|
||||
try:
|
||||
self.try_telnet(node_ip, '22')
|
||||
except Exception:
|
||||
accessible = False
|
||||
|
||||
except socket.error:
|
||||
print(
|
||||
'Connection attempt. NODE PROCESS: %s, '
|
||||
'PORT: %s.'
|
||||
% (process,
|
||||
plugin_config.HADOOP_PROCESSES_WITH_PORTS[
|
||||
process])
|
||||
)
|
||||
for process in processes:
|
||||
if process in proc_with_ports:
|
||||
try:
|
||||
self.try_telnet(node_ip,
|
||||
proc_with_ports[process])
|
||||
except Exception:
|
||||
print('Connection attempt. NODE PROCESS: %s, '
|
||||
'PORT: %s.' % (
|
||||
process, proc_with_ports[process]))
|
||||
accessible = False
|
||||
|
||||
else:
|
||||
self.try_telnet(
|
||||
node_ip,
|
||||
plugin_config.HADOOP_PROCESSES_WITH_PORTS[process]
|
||||
)
|
||||
if not accessible:
|
||||
time.sleep(1)
|
||||
|
||||
for node_ip, processes in six.iteritems(
|
||||
node_ip_list_with_node_processes):
|
||||
if plugin_config.PROCESS_NAMES['tt'] in processes:
|
||||
tasktracker_count += 1
|
||||
if plugin_config.PROCESS_NAMES['dn'] in processes:
|
||||
datanode_count += 1
|
||||
if plugin_config.PROCESS_NAMES['nn'] in processes:
|
||||
namenode_ip = node_ip
|
||||
|
||||
return {
|
||||
'namenode_ip': namenode_ip,
|
||||
'tasktracker_count': tasktracker_count,
|
||||
'datanode_count': datanode_count,
|
||||
'node_count': node_count
|
||||
'node_count': len(node_ip_list_with_node_processes)
|
||||
}
|
||||
|
||||
def await_active_workers_for_namenode(self, node_info, plugin_config):
|
||||
self.open_ssh_connection(
|
||||
node_info['namenode_ip'], plugin_config.SSH_USERNAME
|
||||
)
|
||||
for i in range(self.common_config.HDFS_INITIALIZATION_TIMEOUT * 6):
|
||||
time.sleep(10)
|
||||
active_tasktracker_count = self.execute_command(
|
||||
'sudo -u %s bash -lc "hadoop job -list-active-trackers" '
|
||||
'| grep "^tracker_" | wc -l'
|
||||
% plugin_config.HADOOP_USER)[1]
|
||||
active_tasktracker_count = int(active_tasktracker_count)
|
||||
active_datanode_count = int(
|
||||
self.execute_command(
|
||||
'sudo -u %s bash -lc "hadoop dfsadmin -report" \
|
||||
| grep "Datanodes available:.*" | awk \'{print $3}\''
|
||||
% plugin_config.HADOOP_USER)[1])
|
||||
node_info['namenode_ip'], plugin_config.SSH_USERNAME)
|
||||
timeout = self.common_config.HDFS_INITIALIZATION_TIMEOUT * 60
|
||||
try:
|
||||
with fixtures.Timeout(timeout, gentle=True):
|
||||
while True:
|
||||
active_tasktracker_count = self.execute_command(
|
||||
'sudo -u %s bash -lc "hadoop job -list-active-trackers'
|
||||
'" | grep "^tracker_" | wc -l'
|
||||
% plugin_config.HADOOP_USER)[1]
|
||||
active_tasktracker_count = int(active_tasktracker_count)
|
||||
active_datanode_count = self.execute_command(
|
||||
'sudo -u %s bash -lc "hadoop dfsadmin -report" | '
|
||||
'grep "Datanodes available:.*" | awk \'{print $3}\''
|
||||
% plugin_config.HADOOP_USER)[1]
|
||||
active_datanode_count = int(active_datanode_count)
|
||||
|
||||
if (
|
||||
active_tasktracker_count == node_info['tasktracker_count']
|
||||
) and (
|
||||
active_datanode_count == node_info['datanode_count']
|
||||
):
|
||||
break
|
||||
if (active_tasktracker_count ==
|
||||
node_info['tasktracker_count'] and
|
||||
active_datanode_count ==
|
||||
node_info['datanode_count']):
|
||||
break
|
||||
|
||||
else:
|
||||
time.sleep(10)
|
||||
|
||||
except fixtures.TimeoutException:
|
||||
self.fail(
|
||||
'Tasktracker or datanode cannot be started within '
|
||||
'%s minute(s) for namenode.'
|
||||
% self.common_config.HDFS_INITIALIZATION_TIMEOUT
|
||||
)
|
||||
self.close_ssh_connection()
|
||||
finally:
|
||||
self.close_ssh_connection()
|
||||
|
||||
# --------------------------------Remote---------------------------------------
|
||||
|
||||
@ -540,19 +550,19 @@ class ITestCase(testcase.WithAttributes, testtools.TestCase):
|
||||
if cluster_id:
|
||||
self.sahara.clusters.delete(cluster_id)
|
||||
|
||||
# waiting roughly for 300 seconds for cluster to terminate
|
||||
attempts = 60
|
||||
while attempts > 0:
|
||||
try:
|
||||
self.sahara.clusters.get(cluster_id)
|
||||
except client_base.APIException:
|
||||
# Cluster is finally deleted
|
||||
break
|
||||
try:
|
||||
# waiting roughly for 300 seconds for cluster to terminate
|
||||
with fixtures.Timeout(300, gentle=True):
|
||||
while True:
|
||||
try:
|
||||
self.sahara.clusters.get(cluster_id)
|
||||
except client_base.APIException:
|
||||
# Cluster is finally deleted
|
||||
break
|
||||
|
||||
attempts -= 1
|
||||
time.sleep(5)
|
||||
time.sleep(5)
|
||||
|
||||
if attempts == 0:
|
||||
except fixtures.TimeoutException:
|
||||
self.fail('Cluster failed to terminate in 300 seconds: '
|
||||
'%s' % cluster_id)
|
||||
|
||||
|
@ -18,6 +18,8 @@ import string
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
|
||||
from sahara.openstack.common import excutils
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.tests.integration.tests import base
|
||||
@ -45,17 +47,21 @@ class EDPTest(base.ITestCase):
|
||||
def _await_job_execution(self, job):
|
||||
timeout = self.common_config.JOB_LAUNCH_TIMEOUT * 60
|
||||
status = self.sahara.job_executions.get(job.id).info['status']
|
||||
while status != 'SUCCEEDED':
|
||||
if status == 'KILLED':
|
||||
self.fail('Job status == \'KILLED\'.')
|
||||
if timeout <= 0:
|
||||
self.fail(
|
||||
'Job did not return to \'SUCCEEDED\' status within '
|
||||
'%d minute(s).' % self.common_config.JOB_LAUNCH_TIMEOUT
|
||||
)
|
||||
status = self.sahara.job_executions.get(job.id).info['status']
|
||||
time.sleep(10)
|
||||
timeout -= 10
|
||||
try:
|
||||
with fixtures.Timeout(timeout, gentle=True):
|
||||
while status != 'SUCCEEDED':
|
||||
if status == 'KILLED':
|
||||
self.fail('Job status == \'KILLED\'.')
|
||||
|
||||
time.sleep(10)
|
||||
status = self.sahara.job_executions.get(
|
||||
job.id).info['status']
|
||||
|
||||
except fixtures.TimeoutException:
|
||||
self.fail(
|
||||
'Job did not return to \'SUCCEEDED\' status within '
|
||||
'%d minute(s).' % self.common_config.JOB_LAUNCH_TIMEOUT
|
||||
)
|
||||
|
||||
def _create_job_binaries(self, job_data_list, job_binary_internal_list,
|
||||
job_binary_list, swift_connection=None,
|
||||
|
Loading…
Reference in New Issue
Block a user