
This commit adds a new CLI/config option for the subunit2sql cli command to use the run wall time when populating the run_time column of the runs table. This is often desirable because it gives you a better feeling for the duration of the run. However, since using the sum of individual tests executed during a run has been the default behavior so long we can't just switch it for backwards compat reasons. This commit thus adds an option to use the behavior if that's what you need to use. Change-Id: I08fe16604454f5888d56573e5e919e40ac82efd1
154 lines
5.2 KiB
Python
154 lines
5.2 KiB
Python
# Copyright 2014 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import functools
|
|
import re
|
|
|
|
import subunit
|
|
import testtools
|
|
|
|
DAY_SECONDS = 60 * 60 * 24
|
|
|
|
|
|
def get_duration(start, end):
|
|
if not start or not end:
|
|
duration = None
|
|
else:
|
|
delta = end - start
|
|
duration = '%d.%06d' % (
|
|
delta.days * DAY_SECONDS + delta.seconds, delta.microseconds)
|
|
return float(duration)
|
|
|
|
|
|
class ReadSubunit(object):
|
|
|
|
def __init__(self, stream_file, attachments=False, attr_regex=None,
|
|
targets=None, use_wall_time=False):
|
|
if targets is None:
|
|
targets = []
|
|
else:
|
|
targets = targets[:]
|
|
self.use_wall_time = use_wall_time
|
|
self.stream_file = stream_file
|
|
self.stream = subunit.ByteStreamToStreamResult(self.stream_file)
|
|
starts = testtools.StreamResult()
|
|
summary = testtools.StreamSummary()
|
|
outcomes = testtools.StreamToDict(functools.partial(
|
|
self.parse_outcome))
|
|
targets.extend([starts, outcomes, summary])
|
|
self.result = testtools.CopyStreamResult(targets)
|
|
self.results = {}
|
|
self.attachments = attachments
|
|
if attr_regex:
|
|
self.attr_regex = re.compile(attr_regex)
|
|
# NOTE(mtreinish): Default to the previous implicit regex if None is
|
|
# specified for backwards compat
|
|
else:
|
|
self.attr_regex = re.compile('\[(.*)\]')
|
|
|
|
def get_results(self):
|
|
self.result.startTestRun()
|
|
try:
|
|
self.stream.run(self.result)
|
|
finally:
|
|
self.result.stopTestRun()
|
|
self.results['run_time'] = self.run_time()
|
|
return self.results
|
|
|
|
def get_attachments(self, test):
|
|
attach_dict = {}
|
|
for name, detail in test['details'].items():
|
|
name = name.split(':')[0]
|
|
attach_dict[name] = detail
|
|
return attach_dict
|
|
|
|
def parse_outcome(self, test):
|
|
metadata = {}
|
|
status = test['status']
|
|
if status == 'exists':
|
|
return
|
|
name = self.cleanup_test_name(test['id'])
|
|
attrs = self.get_attrs(test['id'])
|
|
if attrs:
|
|
metadata['attrs'] = attrs
|
|
if test['tags']:
|
|
metadata['tags'] = ",".join(test['tags'])
|
|
# Return code is a fail don't process it
|
|
if name == 'process-returncode':
|
|
return
|
|
timestamps = test['timestamps']
|
|
attachment_dict = {}
|
|
if self.attachments:
|
|
attachment_dict = self.get_attachments(test)
|
|
self.results[name] = {
|
|
'status': status,
|
|
'start_time': timestamps[0],
|
|
'end_time': timestamps[1],
|
|
'metadata': metadata,
|
|
'attachments': attachment_dict
|
|
}
|
|
|
|
def get_attrs(self, name):
|
|
matches = self.attr_regex.search(name)
|
|
if matches:
|
|
attrs = matches.group(1)
|
|
else:
|
|
attrs = None
|
|
return attrs
|
|
|
|
def cleanup_test_name(self, name, strip_tags=True, strip_scenarios=False):
|
|
"""Clean up the test name for display.
|
|
|
|
By default we strip out the tags in the test because they don't help us
|
|
in identifying the test that is run to it's result.
|
|
|
|
Make it possible to strip out the testscenarios information (not to
|
|
be confused with tempest scenarios) however that's often needed to
|
|
identify generated negative tests.
|
|
"""
|
|
if strip_tags:
|
|
matches = self.attr_regex.search(name)
|
|
if matches:
|
|
tags_start = matches.start(0)
|
|
tags_end = matches.end(0)
|
|
if tags_start > 0 and tags_end > tags_start:
|
|
newname = name[:tags_start]
|
|
newname += name[tags_end:]
|
|
name = newname
|
|
|
|
if strip_scenarios:
|
|
tags_start = name.find('(')
|
|
tags_end = name.find(')')
|
|
if tags_start > 0 and tags_end > tags_start:
|
|
newname = name[:tags_start]
|
|
newname += name[tags_end + 1:]
|
|
name = newname
|
|
return name
|
|
|
|
def run_time(self):
|
|
runtime = 0.0
|
|
if self.use_wall_time:
|
|
start_time = None
|
|
stop_time = None
|
|
for name, data in self.results.items():
|
|
if not start_time or data['start_time'] < start_time:
|
|
start_time = data['start_time']
|
|
if not stop_time or data['end_time'] > stop_time:
|
|
stop_time = data['end_time']
|
|
runtime = get_duration(start_time, stop_time)
|
|
else:
|
|
for name, data in self.results.items():
|
|
runtime += get_duration(data['start_time'], data['end_time'])
|
|
return runtime
|