subunit2sql/subunit2sql/shell.py

286 lines
11 KiB
Python

# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import sys
from dateutil import parser as date_parser
from oslo_config import cfg
from oslo_db import options
from pbr import version
from stevedore import enabled
from subunit2sql.db import api
from subunit2sql import exceptions
from subunit2sql import read_subunit as subunit
CONF = cfg.CONF
CONF.import_opt('verbose', 'subunit2sql.db.api')
SHELL_OPTS = [
cfg.MultiStrOpt('subunit_files', positional=True, required=False,
default=None,
help='list of subunit files to put into the database'),
cfg.DictOpt('run_meta', short='r', default=None,
help='Dict of metadata about the run(s)'),
cfg.StrOpt('artifacts', short='a', default=None,
help='Location of run artifacts'),
cfg.BoolOpt('store_attachments', short='s', default=False,
help='Store attachments from subunit streams in the DB'),
cfg.StrOpt('run_id', short='i', default=None,
help='Run id to use for the specified subunit stream, can only'
' be used if a single stream is provided'),
cfg.StrOpt('attr_regex', default='\[(.*)\]',
help='The regex to use to extract the comma separated list of '
'test attributes from the test_id'),
cfg.StrOpt('test_attr_prefix', short='p', default=None,
help='An optional prefix to identify global test attrs '
'and treat it as test metadata instead of test_run '
'metadata'),
cfg.BoolOpt('remove_test_attr_prefix', short='x', default=False,
help='When True, the prefix configured in "test_attr_prefix", '
'if any, is removed from the metadata before it\'s '
'added to the test metadata'),
cfg.StrOpt('run_at', default=None,
help="The optional datetime string for the run was started, "
"If one isn't provided the date and time of when "
"subunit2sql is called will be used"),
cfg.BoolOpt('use_run_wall_time', default=False, short='w',
help="When True the wall time of a run will be used for the "
"run_time column in the runs table. By default the sum of"
" the test executions are used instead."),
cfg.StrOpt('non_subunit_name', default=None,
help='Allows non-subunit content and stores it under this'
' name'),
]
_version_ = version.VersionInfo('subunit2sql').version_string_with_vcs()
def cli_opts():
for opt in SHELL_OPTS:
CONF.register_cli_opt(opt)
def list_opts():
"""Return a list of oslo.config options available.
The purpose of this is to allow tools like the Oslo sample config file
generator to discover the options exposed to users.
"""
return [('DEFAULT', copy.deepcopy(SHELL_OPTS))]
def parse_args(argv, default_config_files=None):
cfg.CONF.register_cli_opts(options.database_opts, group='database')
cfg.CONF(argv[1:], project='subunit2sql', version=_version_,
default_config_files=default_config_files)
def running_avg(test, values, result):
count = test.success
avg_prev = test.run_time
curr_runtime = subunit.get_duration(result['start_time'],
result['end_time'])
if isinstance(avg_prev, float):
# Using a smoothed moving avg to limit the affect of a single outlier
new_avg = ((count * avg_prev) + curr_runtime) / (count + 1)
values['run_time'] = new_avg
else:
values['run_time'] = curr_runtime
return values
def increment_counts(test, results):
test_values = {'run_count': test.run_count + 1}
status = results.get('status')
if status in ['success', 'xfail']:
test_values['success'] = test.success + 1
test_values = running_avg(test, test_values, results)
elif status in ['fail', 'uxsuccess']:
test_values['failure'] = test.failure + 1
elif status == 'skip':
test_values = {}
else:
msg = "Unknown test status %s" % status
raise exceptions.UnknownStatus(msg)
return test_values
def get_run_totals(results):
success = len(
[x for x in results if results[x]['status'] in ['success', 'xfail']])
fails = len(
[x for x in results if results[x]['status'] in ['fail', 'uxsuccess']])
skips = len([x for x in results if results[x]['status'] == 'skip'])
totals = {
'success': success,
'fails': fails,
'skips': skips,
}
return totals
def _get_test_attrs_list(attrs):
if attrs:
attr_list = attrs.split(',')
test_attrs_list = [attr for attr in attr_list if attr.startswith(
CONF.test_attr_prefix)]
return test_attrs_list
else:
return None
def _override_conf(value, value_name):
if (not value) and hasattr(CONF, value_name):
return CONF[value_name]
else:
return value
def process_results(results, run_at=None, artifacts=None, run_id=None,
run_meta=None, test_attr_prefix=None):
"""Insert converted subunit data into the database.
Allows for run-specific information to be passed in via kwargs,
checks CONF if no run-specific information is supplied.
:param results: subunit stream to be inserted
:param run_at: Optional time at which the run was started.
:param artifacts: Link to any artifacts from the test run.
:param run_id: The run id for the new run. Must be unique.
:param run_meta: Metadata corresponding to the new run.
:param test_attr_prefix: Optional test attribute prefix.
"""
run_at = _override_conf(run_at, 'run_at')
artifacts = _override_conf(artifacts, 'artifacts')
run_id = _override_conf(run_id, 'run_id')
run_meta = _override_conf(run_meta, 'run_meta')
test_attr_prefix = _override_conf(test_attr_prefix, 'test_attr_prefix')
if run_at:
if not isinstance(run_at, datetime.datetime):
run_at = date_parser.parse(run_at)
else:
run_at = None
session = api.get_session()
run_time = results.pop('run_time')
totals = get_run_totals(results)
db_run = api.create_run(totals['skips'], totals['fails'],
totals['success'], run_time, artifacts,
id=run_id, run_at=run_at, session=session)
if run_meta:
api.add_run_metadata(run_meta, db_run.id, session)
for test in results:
db_test = api.get_test_by_test_id(test, session)
if not db_test:
if results[test]['status'] in ['success', 'xfail']:
success = 1
fails = 0
elif results[test]['status'] in ['fail', 'uxsuccess']:
fails = 1
success = 0
else:
fails = 0
success = 0
run_time = subunit.get_duration(results[test]['start_time'],
results[test]['end_time'])
db_test = api.create_test(test, (success + fails), success,
fails, run_time,
session)
else:
test_values = increment_counts(db_test, results[test])
# If skipped nothing to update
if test_values:
api.update_test(test_values, db_test.id, session)
test_run = api.create_test_run(db_test.id, db_run.id,
results[test]['status'],
results[test]['start_time'],
results[test]['end_time'],
session)
if results[test]['metadata']:
if test_attr_prefix:
attrs = results[test]['metadata'].get('attrs')
test_attr_list = _get_test_attrs_list(attrs)
test_metadata = api.get_test_metadata(db_test.id, session)
test_metadata = [(meta.key, meta.value) for meta in
test_metadata]
if test_attr_list:
for attr in test_attr_list:
if CONF.remove_test_attr_prefix:
normalized_attr = attr[len(
CONF.test_attr_prefix):]
else:
normalized_attr = attr
if ('attr', normalized_attr) not in test_metadata:
test_meta_dict = {'attr': normalized_attr}
api.add_test_metadata(test_meta_dict, db_test.id,
session=session)
api.add_test_run_metadata(results[test]['metadata'], test_run.id,
session)
if results[test]['attachments']:
api.add_test_run_attachments(results[test]['attachments'],
test_run.id, session)
session.close()
return db_run
def get_extensions():
def check_enabled(ext):
return ext.plugin.enabled()
return enabled.EnabledExtensionManager('subunit2sql.target',
check_func=check_enabled)
def get_targets(extensions):
try:
targets = list(extensions.map(lambda ext: ext.plugin()))
except RuntimeError:
targets = []
return targets
def main():
cli_opts()
extensions = get_extensions()
parse_args(sys.argv)
targets = get_targets(extensions)
if CONF.subunit_files:
if len(CONF.subunit_files) > 1 and CONF.run_id:
print("You can not specify a run id for adding more than 1 stream")
return 3
streams = [subunit.ReadSubunit(open(s, 'r'),
attachments=CONF.store_attachments,
attr_regex=CONF.attr_regex,
targets=targets,
use_wall_time=CONF.use_run_wall_time,
non_subunit_name=CONF.non_subunit_name)
for s in CONF.subunit_files]
else:
streams = [subunit.ReadSubunit(sys.stdin,
attachments=CONF.store_attachments,
attr_regex=CONF.attr_regex,
targets=targets,
use_wall_time=CONF.use_run_wall_time,
non_subunit_name=CONF.non_subunit_name)]
for stream in streams:
process_results(stream.get_results())
if __name__ == "__main__":
sys.exit(main())