subunit2sql/subunit2sql/read_subunit.py

157 lines
5.3 KiB
Python

# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import re
import subunit
import testtools
DAY_SECONDS = 60 * 60 * 24
def get_duration(start, end):
if not start or not end:
duration = None
else:
delta = end - start
duration = '%d.%06d' % (
delta.days * DAY_SECONDS + delta.seconds, delta.microseconds)
return float(duration)
class ReadSubunit(object):
def __init__(self, stream_file, attachments=False, attr_regex=None,
targets=None, use_wall_time=False, non_subunit_name=None):
if targets is None:
targets = []
else:
targets = targets[:]
self.use_wall_time = use_wall_time
self.stream_file = stream_file
self.stream = subunit.ByteStreamToStreamResult(
self.stream_file,
non_subunit_name=non_subunit_name
)
starts = testtools.StreamResult()
summary = testtools.StreamSummary()
outcomes = testtools.StreamToDict(functools.partial(
self.parse_outcome))
targets.extend([starts, outcomes, summary])
self.result = testtools.CopyStreamResult(targets)
self.results = {}
self.attachments = attachments
if attr_regex:
self.attr_regex = re.compile(attr_regex)
# NOTE(mtreinish): Default to the previous implicit regex if None is
# specified for backwards compat
else:
self.attr_regex = re.compile('\[(.*)\]')
def get_results(self):
self.result.startTestRun()
try:
self.stream.run(self.result)
finally:
self.result.stopTestRun()
self.results['run_time'] = self.run_time()
return self.results
def get_attachments(self, test):
attach_dict = {}
for name, detail in test['details'].items():
name = name.split(':')[0]
attach_dict[name] = detail
return attach_dict
def parse_outcome(self, test):
metadata = {}
status = test['status']
if status == 'exists':
return
name = self.cleanup_test_name(test['id'])
attrs = self.get_attrs(test['id'])
if attrs:
metadata['attrs'] = attrs
if test['tags']:
metadata['tags'] = ",".join(test['tags'])
# Return code is a fail don't process it
if name == 'process-returncode':
return
timestamps = test['timestamps']
attachment_dict = {}
if self.attachments:
attachment_dict = self.get_attachments(test)
self.results[name] = {
'status': status,
'start_time': timestamps[0],
'end_time': timestamps[1],
'metadata': metadata,
'attachments': attachment_dict
}
def get_attrs(self, name):
matches = self.attr_regex.search(name)
if matches:
attrs = matches.group(1)
else:
attrs = None
return attrs
def cleanup_test_name(self, name, strip_tags=True, strip_scenarios=False):
"""Clean up the test name for display.
By default we strip out the tags in the test because they don't help us
in identifying the test that is run to it's result.
Make it possible to strip out the testscenarios information (not to
be confused with tempest scenarios) however that's often needed to
identify generated negative tests.
"""
if strip_tags:
matches = self.attr_regex.search(name)
if matches:
tags_start = matches.start(0)
tags_end = matches.end(0)
if tags_start > 0 and tags_end > tags_start:
newname = name[:tags_start]
newname += name[tags_end:]
name = newname
if strip_scenarios:
tags_start = name.find('(')
tags_end = name.find(')')
if tags_start > 0 and tags_end > tags_start:
newname = name[:tags_start]
newname += name[tags_end + 1:]
name = newname
return name
def run_time(self):
runtime = 0.0
if self.use_wall_time:
start_time = None
stop_time = None
for name, data in self.results.items():
if not start_time or data['start_time'] < start_time:
start_time = data['start_time']
if not stop_time or data['end_time'] > stop_time:
stop_time = data['end_time']
runtime = get_duration(start_time, stop_time)
else:
for name, data in self.results.items():
runtime += get_duration(data['start_time'], data['end_time'])
return runtime