Upload tests cases from system tests to TestRail

Add script for uploading tests cases from system tests
to the given section (tests suite) in TestRail.

Change-Id: Ifdc4e453b3e77501b64cfc06c3b4e0ef9582e296
Partial-bug: #1415467
This commit is contained in:
Artem Panchenko 2015-02-09 09:38:52 +02:00
parent 28dc449f0f
commit 672cf552ed
4 changed files with 462 additions and 3 deletions

View File

@ -1,6 +1,4 @@
def run_tests():
from proboscis import TestProgram # noqa
def import_tests():
from tests import test_admin_node # noqa
from tests import test_ceph # noqa
from tests import test_environment_action # noqa
@ -25,6 +23,11 @@ def run_tests():
from tests.plugins.plugin_lbaas import test_plugin_lbaas # noqa
from tests import test_multiple_networks # noqa
def run_tests():
from proboscis import TestProgram # noqa
import_tests()
# Run Proboscis and exit.
TestProgram().run_and_exit()

View File

@ -0,0 +1,39 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
logger = logging.getLogger(__package__)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
JENKINS = {
'url': os.environ.get('JENKINS_URL', 'http://localhost/'),
}
class TestRailSettings():
url = os.environ.get('TESTRAIL_URL', 'https://mirantis.testrail.com')
user = os.environ.get('TESTRAIL_USER', 'user@example.com')
password = os.environ.get('TESTRAIL_PASSWORD', 'password')
project = os.environ.get('TESTRAIL_PROJECT', 'Mirantis OpenStack')
milestone = os.environ.get('TESTRAIL_MILESTONE', '6.1')
test_suite = os.environ.get('TESTRAIL_TEST_SUITE', 'Swarm')
test_section = os.environ.get('TESTRAIL_TEST_SECTION', 'all cases')
test_include = os.environ.get('TESTRAIL_TEST_INCLUDE', None)
test_exclude = os.environ.get('TESTRAIL_TEST_EXCLUDE', None)

View File

@ -0,0 +1,287 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testrail import APIClient
class TestRailProject():
def __init__(self, url, user, password, project):
self.client = APIClient(base_url=url)
self.client.user = user
self.client.password = password
self.project = self._get_project(project)
def _get_project(self, project_name):
projects_uri = 'get_projects'
projects = self.client.send_get(uri=projects_uri)
for project in projects:
if project['name'] == project_name:
return project
return None
def test_run_struct(self, name, suite, milestone, description, config_ids,
include_all=True, assignedto=None, case_ids=None):
struct = {
'name': name,
'suite_id': self.get_suite(suite)['id'],
'milestone_id': self.get_milestone(milestone)['id'],
'description': description,
'include_all': include_all,
'config_ids': config_ids
}
if case_ids:
struct['include_all'] = False
struct['case_ids'] = case_ids
if assignedto:
struct['assignedto_id'] = self.get_user(assignedto)['id']
return struct
def get_users(self):
users_uri = 'get_users'
return self.client.send_get(uri=users_uri)
def get_user(self, name):
for user in self.get_users():
if user['name'] == name:
return user
def get_configs(self):
configs_uri = 'get_configs/{project_id}'.format(
project_id=self.project['id'])
return self.client.send_get(configs_uri)
def get_config_by_name(self, name):
for config in self.get_configs():
if config['name'] == name:
return config
def get_milestones(self):
milestones_uri = 'get_milestones/{project_id}'.format(
project_id=self.project['id'])
return self.client.send_get(uri=milestones_uri)
def get_milestone(self, name):
for milestone in self.get_milestones():
if milestone['name'] == name:
return milestone
def get_suites(self):
suites_uri = 'get_suites/{project_id}'.format(
project_id=self.project['id'])
return self.client.send_get(uri=suites_uri)
def get_suite(self, name):
for suite in self.get_suites():
if suite['name'] == name:
return suite
def get_sections(self, suite):
sections_uri = 'get_sections/{project_id}&suite_id={suite_id}'.format(
project_id=self.project['id'], suite_id=self.get_suite(suite)['id']
)
return self.client.send_get(sections_uri)
def get_section(self, section_id):
section_uri = 'get_section/{section_id}'.format(section_id=section_id)
return self.client.send_get(section_uri)
def get_section_by_name(self, suite, section_name):
for section in self.get_sections(suite=suite):
if section['name'] == section_name:
return section
def get_cases(self, suite, section_id=None):
cases_uri = 'get_cases/{project_id}&suite_id={suite_id}'.format(
project_id=self.project['id'], suite_id=self.get_suite(suite)['id']
)
if section_id:
cases_uri = '{0}&section_id={section_id}'.format(
cases_uri, section_id=section_id
)
return self.client.send_get(cases_uri)
def get_case_by_name(self, suite, name, cases=None):
for case in cases or self.get_cases(suite):
if case['title'] == name:
return case
def get_case_by_group(self, suite, group, cases=None):
for case in cases or self.get_cases(suite):
if case['custom_test_group'] == group:
return case
def add_case(self, section_id, case):
add_case_uri = 'add_case/{section_id}'.format(section_id=section_id)
return self.client.send_post(add_case_uri, case)
def get_plans(self):
plans_uri = 'get_plans/{project_id}'.format(
project_id=self.project['id'])
return self.client.send_get(plans_uri)
def get_plan(self, plan_id):
plan_uri = 'get_plan/{plan_id}'.format(plan_id=plan_id)
return self.client.send_get(plan_uri)
def get_plan_by_name(self, name):
for plan in self.get_plans():
if plan['name'] == name:
return self.get_plan(plan['id'])
def add_plan(self, name, description, milestone, entires):
add_plan_uri = 'add_plan/{project_id}'.format(
project_id=self.project['id'])
new_plan = {
'name': name,
'description': description,
'milestone_id': self.get_milestone(milestone)['id'],
'entries': entires
}
return self.client.send_post(add_plan_uri, new_plan)
def add_plan_entry(self, plan_id, suite_id, runs):
add_plan_entry_uri = 'add_plan_entry/{plan_id}'.format(plan_id=plan_id)
new_entry = {
'suite_id': suite_id,
'runs': runs
}
self.client.send_post(add_plan_entry_uri, new_entry)
def delete_plan(self, plan_id):
delete_plan_uri = 'delete_plan/{plan_id}'.format(plan_id=plan_id)
self.client.send_post(delete_plan_uri, {})
def get_runs(self):
runs_uri = 'get_runs/{project_id}'.format(
project_id=self.project['id'])
return self.client.send_get(uri=runs_uri)
def get_run(self, name):
for run in self.get_runs():
if run['name'] == name:
return run
def add_run(self, new_run):
add_run_uri = 'add_run/{project_id}'.format(
project_id=self.project['id'])
return self.client.send_post(add_run_uri, new_run)
def update_run(self, name, milestone=None, description=None,
config_ids=None, include_all=None, case_ids=None):
tests_run = self.get_run(name)
update_run_uri = 'update_run/{run_id}'.format(run_id=tests_run['id'])
update_run = {}
if milestone:
update_run['milestone_id'] = self.get_milestone(milestone)['id']
if description:
update_run['description'] = description
if include_all is not None:
update_run['include_all'] = include_all is True
if case_ids:
update_run['case_ids'] = case_ids
if config_ids:
update_run['config_ids'] = config_ids
return self.client.send_post(update_run_uri, update_run)
def create_or_update_run(self, name, suite, milestone, description,
config_ids, include_all=True, assignedto=None,
case_ids=None):
if self.get_run(name):
self.update_run(name=name,
milestone=milestone,
description=description,
config_ids=config_ids,
include_all=include_all,
case_ids=case_ids)
else:
self.add_run(self.test_run_struct(name, suite, milestone,
description, config_ids,
include_all=include_all,
assignedto=assignedto,
case_ids=case_ids))
def get_statuses(self):
statuses_uri = 'get_statuses'
return self.client.send_get(statuses_uri)
def get_status(self, name):
for status in self.get_statuses():
if status['name'] == name:
return status
def get_tests(self, run_id, status_id=None):
tests_uri = 'get_tests/{run_id}'.format(run_id=run_id)
if status_id:
tests_uri = '{0}&status_id={1}'.format(tests_uri,
','.join(status_id))
return self.client.send_get(tests_uri)
def get_test(self, test_id):
test_uri = 'get_test/{test_id}'.format(test_id=test_id)
return self.client.send_get(test_uri)
def get_test_by_name(self, run_id, name):
for test in self.get_tests(run_id):
if test['title'] == name:
return test
def get_test_by_group(self, run_id, group, tests=None):
for test in tests or self.get_tests(run_id):
if test['custom_test_group'] == group:
return test
def get_results_for_test(self, test_id, run_results=None):
if run_results:
for results in run_results:
if results['test_id'] == test_id:
return results
results_uri = 'get_results/{test_id}'.format(test_id=test_id)
return self.client.send_get(results_uri)
def get_results_for_run(self, run_id):
results_run_uri = 'get_results_for_run/{run_id}'.format(run_id=run_id)
return self.client.send_get(results_run_uri)
def get_results_for_case(self, run_id, case_id):
results_case_uri = 'get_results_for_case/{run_id}/{case_id}'.format(
run_id=run_id, case_id=case_id)
return self.client.send_get(results_case_uri)
def add_results_for_test(self, test_id, test_results):
add_results_test_uri = 'add_result/{test_id}'.format(test_id=test_id)
new_results = {
'status_id': self.get_status(test_results.status)['id'],
'comment': test_results.url,
'elapsed': test_results.duration,
'version': test_results.version
}
return self.client.send_post(add_results_test_uri, new_results)
def add_results_for_cases(self, run_id, tests_suite, tests_results):
add_results_test_uri = 'add_results_for_cases/{run_id}'.format(
run_id=run_id)
new_results = {'results': []}
tests_cases = self.get_cases(tests_suite)
for results in tests_results:
new_result = {
'case_id': self.get_case_by_group(suite=tests_suite,
group=results.group,
cases=tests_cases)['id'],
'status_id': self.get_status(results.status)['id'],
'comment': results.url,
'elapsed': results.duration,
'version': results.version
}
new_results['results'].append(new_result)
return self.client.send_post(add_results_test_uri, new_results)

View File

@ -0,0 +1,130 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from logging import DEBUG
from optparse import OptionParser
from proboscis import TestProgram
from fuelweb_test.run_tests import import_tests
from settings import logger
from settings import TestRailSettings
from testrail_client import TestRailProject
def get_tests_descriptions(milestone_id, tests_include, tests_exclude):
import_tests()
tests = []
for case in TestProgram().cases:
if tests_include:
if tests_include not in case.entry.home.func_name:
logger.debug("Skipping '{0}' test because it doesn't contain '"
"{1}' in method name".format(
case.entry.home.func_name,
tests_include))
continue
if tests_exclude:
if tests_exclude in case.entry.home.func_name:
logger.debug("Skipping '{0}' test because it contains '{1}' in"
"method name".format(case.entry.home.func_name,
tests_exclude))
continue
docstring = case.entry.home.func_doc or ''
docstring = '\n'.join([s.strip() for s in docstring.split('\n')])
steps = [{"content": s, "expected": "pass"} for s in
docstring.split('\n') if s and s[0].isdigit()]
test_duration = re.search(r'Duration\s+(\d+[s,m])\b', docstring)
test_case = {
"title": docstring.split('\n')[0] or case.entry.home.func_name,
"type_id": 1,
"milestone_id": milestone_id,
"priority_id": 5,
"estimate": test_duration.group(1) if test_duration else "3m",
"refs": "",
"custom_test_group": case.entry.home.func_name,
"custom_test_case_description": docstring or " ",
"custom_test_case_steps": steps
}
tests.append(test_case)
return tests
def upload_tests_descriptions(testrail_project, section_id, tests):
existing_cases = [case['custom_test_group'] for case in
testrail_project.get_cases(TestRailSettings.test_suite,
section_id=section_id)]
for test_case in tests:
if test_case['custom_test_group'] in existing_cases:
logger.debug('Skipping uploading "{0}" test case because it '
'already exists in "{1}" tests section.'.format(
test_case['custom_test_group'],
TestRailSettings.test_suite))
continue
logger.debug('Uploading test "{0}" to TestRail project "{1}", '
'suite "{2}", section "{3}"'.format(
test_case["custom_test_group"],
TestRailSettings.project,
TestRailSettings.test_suite,
TestRailSettings.test_section))
testrail_project.add_case(section_id=section_id, case=test_case)
def main():
testrail_project = TestRailProject(
url=TestRailSettings.url,
user=TestRailSettings.user,
password=TestRailSettings.password,
project=TestRailSettings.project
)
testrail_section = testrail_project.get_section_by_name(
suite=TestRailSettings.test_suite,
section_name=TestRailSettings.test_section
)
testrail_milestone = testrail_project.get_milestone(
name=TestRailSettings.milestone)['id']
tests_descriptions = get_tests_descriptions(
milestone_id=testrail_milestone,
tests_include=TestRailSettings.test_include,
tests_exclude=TestRailSettings.test_exclude
)
upload_tests_descriptions(testrail_project=testrail_project,
section_id=testrail_section['id'],
tests=tests_descriptions)
if __name__ == '__main__':
parser = OptionParser(
description="Upload tests cases to TestRail. "
"See settings.py for configuration."
)
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="Enable debug output")
(options, args) = parser.parse_args()
if options.verbose:
logger.setLevel(DEBUG)
main()