Adding new test execution layer

Use mulitprocessing.Pool instead of self implementation.

Add timeout to each test. If timeout for single test is not defined
tester will define gloabal timeout = 3600 sec / Number of time test
have to be run.

Implements blueprint test-engine-utils
Change-Id: I03bf8ad087bbf1f34c46289d21c309437c72844f
This commit is contained in:
ekonstantinov 2013-09-18 15:04:59 +03:00 committed by Boris Pavlovic
parent 8f75894bad
commit b4d0d04856
3 changed files with 38 additions and 36 deletions

View File

@ -19,7 +19,6 @@ import functools
import multiprocessing
import os
import pytest
import time
import fuel_health.cleanup as fuel_cleanup
@ -51,6 +50,19 @@ def parameterize_from_test_config(benchmark_name):
return decorator
def _run_test(args):
test_args = args[0]
proc_n = args[2]
os.environ['OSTF_CONFIG'] = args[1]
with utils.StdOutCapture() as out:
status = pytest.main(args=test_args)
return {'msg': [line for line in out.getvalue().split('\n')
if '===' not in line or line],
'status': status, 'proc_name': proc_n}
class Tester(object):
def __init__(self, cloud_config_path, test_config_path=None):
@ -106,46 +118,23 @@ class Tester(object):
The keys in the top-level dictionary are the corresponding
process names
"""
res = {}
processes = {}
proc_id = 0
if '--timeout' not in test_args:
timeout = str(60 * 60 * 60 / times)
test_args.extend(['--timeout', timeout])
for i in xrange(min(concurrent, times)):
proc_id = proc_id + 1
processes.update(self._start_test_process(proc_id, test_args))
iterable_test_args = ((test_args, self._cloud_config_path, n)
for n in xrange(times))
pool = multiprocessing.Pool(concurrent)
result_generator = pool.imap(_run_test, iterable_test_args)
while 1:
for process in processes.keys():
if not processes[process].is_alive():
del processes[process]
item = self._q.get()
res[item['proc_name']] = item
if proc_id < times:
proc_id = proc_id + 1
processes.update(self._start_test_process(proc_id,
test_args))
if not processes and proc_id >= times:
results = {}
for result in result_generator:
results.update({result['proc_name']: result})
if 'Timeout' in result['msg'][-2]:
break
time.sleep(0.5)
self._cleanup(self._cloud_config_path)
return res
def _start_test_process(self, id, test_args):
proc_name = 'test_%d' % id
args = (test_args, proc_name)
test = multiprocessing.Process(name=proc_name, args=args,
target=self._run_test)
test.start()
return {proc_name: test}
def _run_test(self, test_args, proc_name):
os.environ['OSTF_CONFIG'] = self._cloud_config_path
with utils.StdOutCapture() as out:
status = pytest.main(args=test_args)
msg = filter(lambda line: line and '===' not in line,
out.getvalue().split('\n'))
self._q.put({'msg': msg, 'status': status, 'proc_name': proc_name})
return results
def _cleanup(self, cloud_config_path):
os.environ['OSTF_CONFIG'] = cloud_config_path

View File

@ -7,6 +7,7 @@ paramiko>=1.8.0
pbr>=0.5.21,<1.0
psutil
pytest
pytest-timeout
SQLAlchemy>=0.7.8,<0.7.99
sh
six

View File

@ -18,6 +18,7 @@
"""Tests for utils."""
import mock
import os
import time
from rally.benchmark import config
from rally.benchmark import engine
@ -34,6 +35,10 @@ def test_dummy_2():
pass
def test_dummy_timeout():
time.sleep(5)
class UtilsTestCase(test.NoDBTestCase):
def setUp(self):
super(UtilsTestCase, self).setUp()
@ -89,3 +94,10 @@ class UtilsTestCase(test.NoDBTestCase):
res = test_engine.benchmark()
self.assertEqual(res[0].values()[0]['status'], 0)
tests.benchmark_tests = old_benchmark_tests
def test_tester_timeout(self):
tester = utils.Tester(self.cloud_config_path)
test = ['./tests/benchmark/test_utils.py', '-k',
'test_dummy_timeout', '--timeout', '2']
results = tester.run(test, times=10, concurrent=2)
self.assertFalse('Timeout' in results.values()[0]['msg'][-2])