33756ac899
Include: - util modules. such as table_parser, ssh/localhost clients, cli module, exception, logger, etc. Util modules are mostly used by keywords. - keywords modules. These are helper functions that are used directly by test functions. - platform (with platform or platform_sanity marker) and stx-openstack (with sanity, sx_sanity, cpe_sanity, or storage_sanity marker) sanity testcases - pytest config conftest, and test fixture modules - test config file template/example Required packages: - python3.4 or python3.5 - pytest >=3.10,<4.0 - pexpect - requests - pyyaml - selenium (firefox, ffmpeg, pyvirtualdisplay, Xvfb or Xephyr or Xvnc) Limitations: - Anything that requires copying from Test File Server will not work until a public share is configured to shared test files. Tests skipped for now. Co-Authored-By: Maria Yousaf <maria.yousaf@windriver.com> Co-Authored-By: Marvin Huang <marvin.huang@windriver.com> Co-Authored-By: Yosief Gebremariam <yosief.gebremariam@windriver.com> Co-Authored-By: Paul Warner <paul.warner@windriver.com> Co-Authored-By: Xueguang Ma <Xueguang.Ma@windriver.com> Co-Authored-By: Charles Chen <charles.chen@windriver.com> Co-Authored-By: Daniel Graziano <Daniel.Graziano@windriver.com> Co-Authored-By: Jordan Li <jordan.li@windriver.com> Co-Authored-By: Nimalini Rasa <nimalini.rasa@windriver.com> Co-Authored-By: Senthil Mukundakumar <senthil.mukundakumar@windriver.com> Co-Authored-By: Anuejyan Manokeran <anujeyan.manokeran@windriver.com> Co-Authored-By: Peng Peng <peng.peng@windriver.com> Co-Authored-By: Chris Winnicki <chris.winnicki@windriver.com> Co-Authored-By: Joe Vimar <Joe.Vimar@windriver.com> Co-Authored-By: Alex Kozyrev <alex.kozyrev@windriver.com> Co-Authored-By: Jack Ding <jack.ding@windriver.com> Co-Authored-By: Ming Lei <ming.lei@windriver.com> Co-Authored-By: Ankit Jain <ankit.jain@windriver.com> Co-Authored-By: Eric Barrett <eric.barrett@windriver.com> Co-Authored-By: William Jia <william.jia@windriver.com> Co-Authored-By: Joseph Richard <Joseph.Richard@windriver.com> Co-Authored-By: Aldo Mcfarlane <aldo.mcfarlane@windriver.com> Story: 2005892 Task: 33750 Signed-off-by: Yang Liu <yang.liu@windriver.com> Change-Id: I7a88a47e09733d39f024144530f5abb9aee8cad2
694 lines
24 KiB
Python
694 lines
24 KiB
Python
#
|
|
# Copyright (c) 2019 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
|
|
|
|
import logging
|
|
import os
|
|
from time import strftime, gmtime
|
|
# import threading # Used for formatting logger
|
|
|
|
|
|
import pytest # Don't remove. Used in eval
|
|
|
|
import setups
|
|
from consts.proj_vars import ProjVar
|
|
from utils.tis_log import LOG
|
|
from utils import parse_log
|
|
|
|
tc_start_time = None
|
|
tc_end_time = None
|
|
has_fail = False
|
|
repeat_count = -1
|
|
stress_count = -1
|
|
count = -1
|
|
no_teardown = False
|
|
tracebacks = []
|
|
region = None
|
|
test_count = 0
|
|
console_log = True
|
|
|
|
################################
|
|
# Process and log test results #
|
|
################################
|
|
|
|
|
|
class MakeReport:
|
|
nodeid = None
|
|
instances = {}
|
|
|
|
def __init__(self, item):
|
|
MakeReport.nodeid = item.nodeid
|
|
self.test_pass = None
|
|
self.test_results = {}
|
|
MakeReport.instances[item.nodeid] = self
|
|
|
|
def update_results(self, call, report):
|
|
if report.failed:
|
|
global has_fail
|
|
has_fail = True
|
|
msg = "***Failure at test {}: {}".format(call.when, call.excinfo)
|
|
print(msg)
|
|
LOG.debug(msg + "\n***Details: {}".format(report.longrepr))
|
|
global tracebacks
|
|
tracebacks.append(str(report.longrepr))
|
|
self.test_results[call.when] = ['Failed', call.excinfo]
|
|
elif report.skipped:
|
|
sep = 'Skipped: '
|
|
skipreason_list = str(call.excinfo).split(sep=sep)[1:]
|
|
skipreason_str = sep.join(skipreason_list)
|
|
self.test_results[call.when] = ['Skipped', skipreason_str]
|
|
elif report.passed:
|
|
self.test_results[call.when] = ['Passed', '']
|
|
|
|
def get_results(self):
|
|
return self.test_results
|
|
|
|
@classmethod
|
|
def get_report(cls, item):
|
|
if item.nodeid == cls.nodeid:
|
|
return cls.instances[cls.nodeid]
|
|
else:
|
|
return cls(item)
|
|
|
|
|
|
class TestRes:
|
|
PASSNUM = 0
|
|
FAILNUM = 0
|
|
SKIPNUM = 0
|
|
TOTALNUM = 0
|
|
|
|
|
|
def _write_results(res_in_tests, test_name):
|
|
global tc_start_time
|
|
with open(ProjVar.get_var("TCLIST_PATH"), mode='a') as f:
|
|
f.write('\n{}\t{}\t{}'.format(res_in_tests, tc_start_time, test_name))
|
|
global test_count
|
|
test_count += 1
|
|
# reset tc_start and end time for next test case
|
|
tc_start_time = None
|
|
|
|
|
|
def pytest_runtest_makereport(item, call, __multicall__):
|
|
report = __multicall__.execute()
|
|
my_rep = MakeReport.get_report(item)
|
|
my_rep.update_results(call, report)
|
|
|
|
test_name = item.nodeid.replace('::()::',
|
|
'::') # .replace('testcases/', '')
|
|
res_in_tests = ''
|
|
res = my_rep.get_results()
|
|
|
|
# Write final result to test_results.log
|
|
if report.when == 'teardown':
|
|
res_in_log = 'Test Passed'
|
|
fail_at = []
|
|
for key, val in res.items():
|
|
if val[0] == 'Failed':
|
|
fail_at.append('test ' + key)
|
|
elif val[0] == 'Skipped':
|
|
res_in_log = 'Test Skipped\nReason: {}'.format(val[1])
|
|
res_in_tests = 'SKIP'
|
|
break
|
|
if fail_at:
|
|
fail_at = ', '.join(fail_at)
|
|
res_in_log = 'Test Failed at {}'.format(fail_at)
|
|
|
|
# Log test result
|
|
testcase_log(msg=res_in_log, nodeid=test_name, log_type='tc_res')
|
|
|
|
if 'Test Passed' in res_in_log:
|
|
res_in_tests = 'PASS'
|
|
elif 'Test Failed' in res_in_log:
|
|
res_in_tests = 'FAIL'
|
|
if ProjVar.get_var('PING_FAILURE'):
|
|
setups.add_ping_failure(test_name=test_name)
|
|
|
|
if not res_in_tests:
|
|
res_in_tests = 'UNKNOWN'
|
|
|
|
# count testcases by status
|
|
TestRes.TOTALNUM += 1
|
|
if res_in_tests == 'PASS':
|
|
TestRes.PASSNUM += 1
|
|
elif res_in_tests == 'FAIL':
|
|
TestRes.FAILNUM += 1
|
|
elif res_in_tests == 'SKIP':
|
|
TestRes.SKIPNUM += 1
|
|
|
|
_write_results(res_in_tests=res_in_tests, test_name=test_name)
|
|
|
|
if repeat_count > 0:
|
|
for key, val in res.items():
|
|
if val[0] == 'Failed':
|
|
global tc_end_time
|
|
tc_end_time = strftime("%Y%m%d %H:%M:%S", gmtime())
|
|
_write_results(res_in_tests='FAIL', test_name=test_name)
|
|
TestRes.FAILNUM += 1
|
|
if ProjVar.get_var('PING_FAILURE'):
|
|
setups.add_ping_failure(test_name=test_name)
|
|
|
|
try:
|
|
parse_log.parse_test_steps(ProjVar.get_var('LOG_DIR'))
|
|
except Exception as e:
|
|
LOG.warning(
|
|
"Unable to parse test steps. \nDetails: {}".format(
|
|
e.__str__()))
|
|
|
|
pytest.exit(
|
|
"Skip rest of the iterations upon stress test failure")
|
|
|
|
if no_teardown and report.when == 'call':
|
|
for key, val in res.items():
|
|
if val[0] == 'Skipped':
|
|
break
|
|
else:
|
|
pytest.exit("No teardown and skip rest of the tests if any")
|
|
|
|
return report
|
|
|
|
|
|
def pytest_runtest_setup(item):
|
|
global tc_start_time
|
|
# tc_start_time = setups.get_tis_timestamp(con_ssh)
|
|
tc_start_time = strftime("%Y%m%d %H:%M:%S", gmtime())
|
|
print('')
|
|
message = "Setup started:"
|
|
testcase_log(message, item.nodeid, log_type='tc_setup')
|
|
# set test name for ping vm failure
|
|
test_name = 'test_{}'.format(
|
|
item.nodeid.rsplit('::test_', 1)[-1].replace('/', '_'))
|
|
ProjVar.set_var(TEST_NAME=test_name)
|
|
ProjVar.set_var(PING_FAILURE=False)
|
|
|
|
|
|
def pytest_runtest_call(item):
|
|
separator = \
|
|
'++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++'
|
|
message = "Test steps started:"
|
|
testcase_log(message, item.nodeid, separator=separator, log_type='tc_start')
|
|
|
|
|
|
def pytest_runtest_teardown(item):
|
|
print('')
|
|
message = 'Teardown started:'
|
|
testcase_log(message, item.nodeid, log_type='tc_teardown')
|
|
|
|
|
|
def testcase_log(msg, nodeid, separator=None, log_type=None):
|
|
if separator is None:
|
|
separator = '-----------'
|
|
|
|
print_msg = separator + '\n' + msg
|
|
logging_msg = '\n{}{} {}'.format(separator, msg, nodeid)
|
|
if console_log:
|
|
print(print_msg)
|
|
if log_type == 'tc_res':
|
|
global tc_end_time
|
|
tc_end_time = strftime("%Y%m%d %H:%M:%S", gmtime())
|
|
LOG.tc_result(msg=msg, tc_name=nodeid)
|
|
elif log_type == 'tc_start':
|
|
LOG.tc_func_start(nodeid)
|
|
elif log_type == 'tc_setup':
|
|
LOG.tc_setup_start(nodeid)
|
|
elif log_type == 'tc_teardown':
|
|
LOG.tc_teardown_start(nodeid)
|
|
else:
|
|
LOG.debug(logging_msg)
|
|
|
|
|
|
########################
|
|
# Command line options #
|
|
########################
|
|
@pytest.mark.tryfirst
|
|
def pytest_configure(config):
|
|
config.addinivalue_line("markers",
|
|
"features(feature_name1, feature_name2, "
|
|
"...): mark impacted feature(s) for a test case.")
|
|
config.addinivalue_line("markers",
|
|
"priorities(, cpe_sanity, p2, ...): mark "
|
|
"priorities for a test case.")
|
|
config.addinivalue_line("markers",
|
|
"known_issue(LP-xxxx): mark known issue with "
|
|
"LP ID or description if no LP needed.")
|
|
|
|
if config.getoption('help'):
|
|
return
|
|
|
|
# Common reporting params
|
|
collect_all = config.getoption('collectall')
|
|
always_collect = config.getoption('alwayscollect')
|
|
session_log_dir = config.getoption('sessiondir')
|
|
resultlog = config.getoption('resultlog')
|
|
|
|
# Test case params on installed system
|
|
testcase_config = config.getoption('testcase_config')
|
|
lab_arg = config.getoption('lab')
|
|
natbox_arg = config.getoption('natbox')
|
|
tenant_arg = config.getoption('tenant')
|
|
horizon_visible = config.getoption('horizon_visible')
|
|
is_vbox = config.getoption('is_vbox')
|
|
|
|
global repeat_count
|
|
repeat_count = config.getoption('repeat')
|
|
global stress_count
|
|
stress_count = config.getoption('stress')
|
|
global count
|
|
if repeat_count > 0:
|
|
count = repeat_count
|
|
elif stress_count > 0:
|
|
count = stress_count
|
|
|
|
global no_teardown
|
|
no_teardown = config.getoption('noteardown')
|
|
if repeat_count > 0 or no_teardown:
|
|
ProjVar.set_var(NO_TEARDOWN=True)
|
|
|
|
collect_netinfo = config.getoption('netinfo')
|
|
|
|
# Determine lab value.
|
|
lab = natbox = None
|
|
if lab_arg:
|
|
lab = setups.get_lab_dict(lab_arg)
|
|
if natbox_arg:
|
|
natbox = setups.get_natbox_dict(natbox_arg)
|
|
|
|
lab, natbox = setups.setup_testcase_config(testcase_config, lab=lab,
|
|
natbox=natbox)
|
|
tenant = tenant_arg.upper() if tenant_arg else 'TENANT1'
|
|
|
|
# Log collection params
|
|
collect_all = True if collect_all else False
|
|
always_collect = True if always_collect else False
|
|
|
|
# If floating ip cannot be reached, whether to try to ping/ssh
|
|
# controller-0 unit IP, etc.
|
|
if collect_netinfo:
|
|
ProjVar.set_var(COLLECT_SYS_NET_INFO=True)
|
|
|
|
horizon_visible = True if horizon_visible else False
|
|
|
|
if session_log_dir:
|
|
log_dir = session_log_dir
|
|
else:
|
|
# compute directory for all logs based on resultlog arg, lab,
|
|
# and timestamp on local machine
|
|
resultlog = resultlog if resultlog else os.path.expanduser("~")
|
|
if '/AUTOMATION_LOGS' in resultlog:
|
|
resultlog = resultlog.split(sep='/AUTOMATION_LOGS')[0]
|
|
resultlog = os.path.join(resultlog, 'AUTOMATION_LOGS')
|
|
lab_name = lab['short_name']
|
|
time_stamp = strftime('%Y%m%d%H%M')
|
|
log_dir = '{}/{}/{}'.format(resultlog, lab_name, time_stamp)
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# set global constants, which will be used for the entire test session, etc
|
|
ProjVar.init_vars(lab=lab, natbox=natbox, logdir=log_dir, tenant=tenant,
|
|
collect_all=collect_all,
|
|
always_collect=always_collect,
|
|
horizon_visible=horizon_visible)
|
|
|
|
if lab.get('central_region'):
|
|
ProjVar.set_var(IS_DC=True,
|
|
PRIMARY_SUBCLOUD=config.getoption('subcloud'))
|
|
|
|
if is_vbox:
|
|
ProjVar.set_var(IS_VBOX=True)
|
|
|
|
config_logger(log_dir, console=console_log)
|
|
|
|
# set resultlog save location
|
|
config.option.resultlog = ProjVar.get_var("PYTESTLOG_PATH")
|
|
|
|
# Repeat test params
|
|
file_or_dir = config.getoption('file_or_dir')
|
|
origin_file_dir = list(file_or_dir)
|
|
if count > 1:
|
|
print("Repeat following tests {} times: {}".format(count, file_or_dir))
|
|
del file_or_dir[:]
|
|
for f_or_d in origin_file_dir:
|
|
for i in range(count):
|
|
file_or_dir.append(f_or_d)
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
testconf_help = "Absolute path for testcase config file. Template can be " \
|
|
"found at automated-pytest-suite/stx-test_template.conf"
|
|
lab_help = "STX system to connect to. Valid value: 1) short_name or name " \
|
|
"of an existing dict entry in consts.Labs; Or 2) OAM floating " \
|
|
"ip of the STX system under test"
|
|
tenant_help = "Default tenant to use when unspecified. Valid values: " \
|
|
"tenant1, tenant2, or admin"
|
|
natbox_help = "NatBox IP or name. If automated tests are executed from " \
|
|
"NatBox, --natbox=localhost can be used. " \
|
|
"If username/password are required to SSH to NatBox, " \
|
|
"please specify them in test config file."
|
|
vbox_help = "Specify if StarlingX system is installed in virtual " \
|
|
"environment."
|
|
collect_all_help = "Run collect all on STX system at the end of test " \
|
|
"session if any test fails."
|
|
logdir_help = "Directory to store test session logs. If this is " \
|
|
"specified, then --resultlog will be ignored."
|
|
stress_help = "Number of iterations to run specified testcase(s). Abort " \
|
|
"rest of the test session on first failure"
|
|
count_help = "Repeat tests x times - NO stop on failure"
|
|
horizon_visible_help = "Display horizon on screen"
|
|
no_console_log = 'Print minimal console logs'
|
|
|
|
# Test session options on installed and configured STX system:
|
|
parser.addoption('--testcase-config', action='store',
|
|
metavar='testcase_config', default=None,
|
|
help=testconf_help)
|
|
parser.addoption('--lab', action='store', metavar='lab', default=None,
|
|
help=lab_help)
|
|
parser.addoption('--tenant', action='store', metavar='tenantname',
|
|
default=None, help=tenant_help)
|
|
parser.addoption('--natbox', action='store', metavar='natbox', default=None,
|
|
help=natbox_help)
|
|
parser.addoption('--vm', '--vbox', action='store_true', dest='is_vbox',
|
|
help=vbox_help)
|
|
|
|
# Debugging/Log collection options:
|
|
parser.addoption('--sessiondir', '--session_dir', '--session-dir',
|
|
action='store', dest='sessiondir',
|
|
metavar='sessiondir', default=None, help=logdir_help)
|
|
parser.addoption('--collectall', '--collect_all', '--collect-all',
|
|
dest='collectall', action='store_true',
|
|
help=collect_all_help)
|
|
parser.addoption('--alwayscollect', '--always-collect', '--always_collect',
|
|
dest='alwayscollect',
|
|
action='store_true', help=collect_all_help)
|
|
parser.addoption('--repeat', action='store', metavar='repeat', type=int,
|
|
default=-1, help=stress_help)
|
|
parser.addoption('--stress', metavar='stress', action='store', type=int,
|
|
default=-1, help=count_help)
|
|
parser.addoption('--no-teardown', '--no_teardown', '--noteardown',
|
|
dest='noteardown', action='store_true')
|
|
parser.addoption('--netinfo', '--net-info', dest='netinfo',
|
|
action='store_true',
|
|
help="Collect system networking info if scp keyfile fails")
|
|
parser.addoption('--horizon-visible', '--horizon_visible',
|
|
action='store_true', dest='horizon_visible',
|
|
help=horizon_visible_help)
|
|
parser.addoption('--noconsolelog', '--noconsole', '--no-console-log',
|
|
'--no_console_log', '--no-console',
|
|
'--no_console', action='store_true', dest='noconsolelog',
|
|
help=no_console_log)
|
|
|
|
|
|
def config_logger(log_dir, console=True):
|
|
# logger for log saved in file
|
|
file_name = log_dir + '/TIS_AUTOMATION.log'
|
|
logging.Formatter.converter = gmtime
|
|
log_format = '[%(asctime)s] %(lineno)-5d%(levelname)-5s %(threadName)-8s ' \
|
|
'%(module)s.%(funcName)-8s:: %(message)s'
|
|
tis_formatter = logging.Formatter(log_format)
|
|
LOG.setLevel(logging.NOTSET)
|
|
|
|
tmp_path = os.path.join(os.path.expanduser('~'), '.tmp_log')
|
|
# clear the tmp log with best effort so it wont keep growing
|
|
try:
|
|
os.remove(tmp_path)
|
|
except:
|
|
pass
|
|
logging.basicConfig(level=logging.NOTSET, format=log_format,
|
|
filename=tmp_path, filemode='w')
|
|
|
|
# file handler:
|
|
file_handler = logging.FileHandler(file_name)
|
|
file_handler.setFormatter(tis_formatter)
|
|
file_handler.setLevel(logging.DEBUG)
|
|
LOG.addHandler(file_handler)
|
|
|
|
# logger for stream output
|
|
console_level = logging.INFO if console else logging.CRITICAL
|
|
stream_hdler = logging.StreamHandler()
|
|
stream_hdler.setFormatter(tis_formatter)
|
|
stream_hdler.setLevel(console_level)
|
|
LOG.addHandler(stream_hdler)
|
|
|
|
print("LOG DIR: {}".format(log_dir))
|
|
|
|
|
|
def pytest_unconfigure(config):
|
|
# collect all if needed
|
|
if config.getoption('help'):
|
|
return
|
|
|
|
try:
|
|
natbox_ssh = ProjVar.get_var('NATBOX_SSH')
|
|
natbox_ssh.close()
|
|
except:
|
|
pass
|
|
|
|
version_and_patch = ''
|
|
try:
|
|
version_and_patch = setups.get_version_and_patch_info()
|
|
except Exception as e:
|
|
LOG.debug(e)
|
|
pass
|
|
log_dir = ProjVar.get_var('LOG_DIR')
|
|
if not log_dir:
|
|
try:
|
|
from utils.clients.ssh import ControllerClient
|
|
ssh_list = ControllerClient.get_active_controllers(fail_ok=True)
|
|
for con_ssh_ in ssh_list:
|
|
con_ssh_.close()
|
|
except:
|
|
pass
|
|
return
|
|
|
|
log_dir = ProjVar.get_var('LOG_DIR')
|
|
if not log_dir:
|
|
try:
|
|
from utils.clients.ssh import ControllerClient
|
|
ssh_list = ControllerClient.get_active_controllers(fail_ok=True)
|
|
for con_ssh_ in ssh_list:
|
|
con_ssh_.close()
|
|
except:
|
|
pass
|
|
return
|
|
|
|
try:
|
|
tc_res_path = log_dir + '/test_results.log'
|
|
build_info = ProjVar.get_var('BUILD_INFO')
|
|
build_id = build_info.get('BUILD_ID', '')
|
|
build_job = build_info.get('JOB', '')
|
|
build_server = build_info.get('BUILD_HOST', '')
|
|
system_config = ProjVar.get_var('SYS_TYPE')
|
|
session_str = ''
|
|
total_exec = TestRes.PASSNUM + TestRes.FAILNUM
|
|
# pass_rate = fail_rate = '0'
|
|
if total_exec > 0:
|
|
pass_rate = "{}%".format(
|
|
round(TestRes.PASSNUM * 100 / total_exec, 2))
|
|
fail_rate = "{}%".format(
|
|
round(TestRes.FAILNUM * 100 / total_exec, 2))
|
|
with open(tc_res_path, mode='a') as f:
|
|
# Append general info to result log
|
|
f.write('\n\nLab: {}\n'
|
|
'Build ID: {}\n'
|
|
'Job: {}\n'
|
|
'Build Server: {}\n'
|
|
'System Type: {}\n'
|
|
'Automation LOGs DIR: {}\n'
|
|
'Ends at: {}\n'
|
|
'{}' # test session id and tag
|
|
'{}'.format(ProjVar.get_var('LAB_NAME'), build_id,
|
|
build_job, build_server, system_config,
|
|
ProjVar.get_var('LOG_DIR'), tc_end_time,
|
|
session_str, version_and_patch))
|
|
# Add result summary to beginning of the file
|
|
f.write(
|
|
'\nSummary:\nPassed: {} ({})\nFailed: {} ({})\nTotal '
|
|
'Executed: {}\n'.
|
|
format(TestRes.PASSNUM, pass_rate, TestRes.FAILNUM,
|
|
fail_rate, total_exec))
|
|
if TestRes.SKIPNUM > 0:
|
|
f.write('------------\nSkipped: {}'.format(TestRes.SKIPNUM))
|
|
|
|
LOG.info("Test Results saved to: {}".format(tc_res_path))
|
|
with open(tc_res_path, 'r') as fin:
|
|
print(fin.read())
|
|
except Exception as e:
|
|
LOG.exception(
|
|
"Failed to add session summary to test_results.py. "
|
|
"\nDetails: {}".format(e.__str__()))
|
|
# Below needs con_ssh to be initialized
|
|
try:
|
|
from utils.clients.ssh import ControllerClient
|
|
con_ssh = ControllerClient.get_active_controller()
|
|
except:
|
|
LOG.warning("No con_ssh found")
|
|
return
|
|
|
|
try:
|
|
parse_log.parse_test_steps(ProjVar.get_var('LOG_DIR'))
|
|
except Exception as e:
|
|
LOG.warning(
|
|
"Unable to parse test steps. \nDetails: {}".format(e.__str__()))
|
|
|
|
if test_count > 0 and (ProjVar.get_var('ALWAYS_COLLECT') or (
|
|
has_fail and ProjVar.get_var('COLLECT_ALL'))):
|
|
# Collect tis logs if collect all required upon test(s) failure
|
|
# Failure on collect all would not change the result of the last test
|
|
# case.
|
|
try:
|
|
setups.collect_tis_logs(con_ssh)
|
|
except Exception as e:
|
|
LOG.warning("'collect all' failed. {}".format(e.__str__()))
|
|
|
|
ssh_list = ControllerClient.get_active_controllers(fail_ok=True,
|
|
current_thread_only=True)
|
|
for con_ssh_ in ssh_list:
|
|
try:
|
|
con_ssh_.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
def pytest_collection_modifyitems(items):
|
|
# print("Collection modify")
|
|
move_to_last = []
|
|
absolute_last = []
|
|
|
|
for item in items:
|
|
# re-order tests:
|
|
trylast_marker = item.get_closest_marker('trylast')
|
|
abslast_marker = item.get_closest_marker('abslast')
|
|
|
|
if abslast_marker:
|
|
absolute_last.append(item)
|
|
elif trylast_marker:
|
|
move_to_last.append(item)
|
|
|
|
priority_marker = item.get_closest_marker('priorities')
|
|
if priority_marker is not None:
|
|
priorities = priority_marker.args
|
|
for priority in priorities:
|
|
item.add_marker(eval("pytest.mark.{}".format(priority)))
|
|
|
|
feature_marker = item.get_closest_marker('features')
|
|
if feature_marker is not None:
|
|
features = feature_marker.args
|
|
for feature in features:
|
|
item.add_marker(eval("pytest.mark.{}".format(feature)))
|
|
|
|
# known issue marker
|
|
known_issue_mark = item.get_closest_marker('known_issue')
|
|
if known_issue_mark is not None:
|
|
issue = known_issue_mark.args[0]
|
|
msg = "{} has a workaround due to {}".format(item.nodeid, issue)
|
|
print(msg)
|
|
LOG.debug(msg=msg)
|
|
item.add_marker(eval("pytest.mark.known_issue"))
|
|
|
|
# add dc maker to all tests start with test_dc_xxx
|
|
dc_maker = item.get_marker('dc')
|
|
if not dc_maker and 'test_dc_' in item.nodeid:
|
|
item.add_marker(pytest.mark.dc)
|
|
|
|
# add trylast tests to the end
|
|
for item in move_to_last:
|
|
items.remove(item)
|
|
items.append(item)
|
|
|
|
for i in absolute_last:
|
|
items.remove(i)
|
|
items.append(i)
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
# Prefix 'remote_cli' to test names so they are reported as a different
|
|
# testcase
|
|
if ProjVar.get_var('REMOTE_CLI'):
|
|
metafunc.parametrize('prefix_remote_cli', ['remote_cli'])
|
|
|
|
|
|
##############################################################
|
|
# Manipulating fixture orders based on following pytest rules
|
|
# session > module > class > function
|
|
# autouse > non-autouse
|
|
# alphabetic after full-filling above criteria
|
|
#
|
|
# Orders we want on fixtures of same scope:
|
|
# check_alarms > delete_resources > config_host
|
|
#############################################################
|
|
|
|
@pytest.fixture(scope='session')
|
|
def check_alarms():
|
|
LOG.debug("Empty check alarms")
|
|
return
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def config_host_class():
|
|
LOG.debug("Empty config host class")
|
|
return
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def config_host_module():
|
|
LOG.debug("Empty config host module")
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def a1_fixture(check_alarms):
|
|
return
|
|
|
|
|
|
@pytest.fixture(scope='module', autouse=True)
|
|
def c1_fixture(config_host_module):
|
|
return
|
|
|
|
|
|
@pytest.fixture(scope='class', autouse=True)
|
|
def c2_fixture(config_host_class):
|
|
return
|
|
|
|
|
|
@pytest.fixture(scope='session', autouse=True)
|
|
def prefix_remote_cli():
|
|
return
|
|
|
|
|
|
def __params_gen(index):
|
|
return 'iter{}'.format(index)
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def global_setup():
|
|
os.makedirs(ProjVar.get_var('TEMP_DIR'), exist_ok=True)
|
|
os.makedirs(ProjVar.get_var('PING_FAILURE_DIR'), exist_ok=True)
|
|
os.makedirs(ProjVar.get_var('GUEST_LOGS_DIR'), exist_ok=True)
|
|
|
|
if region:
|
|
setups.set_region(region=region)
|
|
|
|
|
|
#####################################
|
|
# End of fixture order manipulation #
|
|
#####################################
|
|
|
|
|
|
def pytest_sessionfinish():
|
|
if ProjVar.get_var('TELNET_THREADS'):
|
|
threads, end_event = ProjVar.get_var('TELNET_THREADS')
|
|
end_event.set()
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
if repeat_count > 0 and has_fail:
|
|
# _thread.interrupt_main()
|
|
print('Printing traceback: \n' + '\n'.join(tracebacks))
|
|
pytest.exit("\n========== Test failed - "
|
|
"Test session aborted without teardown to leave the "
|
|
"system in state ==========")
|
|
|
|
if no_teardown:
|
|
pytest.exit(
|
|
"\n========== Test session stopped without teardown after first "
|
|
"test executed ==========")
|