subunit2sql/subunit2sql/tests/test_read_subunit.py

247 lines
10 KiB
Python

# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import io
import mock
import subunit as subunit_lib
from subunit2sql import read_subunit as subunit
from subunit2sql.tests import base
class TestReadSubunit(base.TestCase):
def test_get_duration(self):
dur = subunit.get_duration(datetime.datetime(1914, 6, 28, 10, 45, 0),
datetime.datetime(1914, 6, 28, 10, 45, 50))
self.assertEqual(dur, 50.000000)
def test_get_duration_no_start(self):
dur = subunit.get_duration(None,
datetime.datetime(1914, 6, 28, 10, 45, 50))
self.assertIsNone(dur)
def test_get_duration_no_end(self):
dur = subunit.get_duration(datetime.datetime(1914, 6, 28, 10, 45, 50),
None)
self.assertIsNone(dur)
def test_get_attrs(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock())
fake_name = 'fake_dir.fake_module.fakeClass.test_fake[attr1,attr2]'
attrs = fake_subunit.get_attrs(fake_name)
self.assertEqual(attrs, 'attr1,attr2')
def test_cleanup_test_name_with_attrs(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock())
fake_name = 'fake_dir.fake_module.fakeClass.test_fake[attr1,attr2]'
name = fake_subunit.cleanup_test_name(fake_name)
self.assertEqual(name, 'fake_dir.fake_module.fakeClass.test_fake')
def test_cleanup_test_name_with_attrs_leave_scenarios(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock())
fake_name = ('fake_dir.fake_module.fakeClass.test_fake[attr1,attr2]'
'(scenario)')
name = fake_subunit.cleanup_test_name(fake_name)
self.assertEqual(name, 'fake_dir.fake_module.fakeClass.test_fake'
'(scenario)')
def test_cleanup_test_name_with_strip_scenarios_and_attrs(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock())
fake_name = ('fake_dir.fake_module.fakeClass.test_fake[attr1,attr2]'
'(scenario)')
name = fake_subunit.cleanup_test_name(fake_name, strip_scenarios=True)
self.assertEqual(name, 'fake_dir.fake_module.fakeClass.test_fake')
def test_cleanup_test_name_strip_nothing(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock())
fake_name = ('fake_dir.fake_module.fakeClass.test_fake[attr1,attr2]'
'(scenario)')
name = fake_subunit.cleanup_test_name(fake_name, strip_tags=False)
self.assertEqual(name, fake_name)
def test_run_time(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock())
fake_results = {}
fifty_sec_run_result = {
'start_time': datetime.datetime(1914, 6, 28, 10, 45, 0),
'end_time': datetime.datetime(1914, 6, 28, 10, 45, 50),
}
for num in range(100):
test_name = 'test_fake_' + str(num)
fake_results[test_name] = fifty_sec_run_result
fake_subunit.results = fake_results
runtime = fake_subunit.run_time()
self.assertEqual(runtime, 5000.0)
def test_wall_run_time(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock(),
use_wall_time=True)
fake_results = {}
start_time = datetime.datetime(1914, 6, 28, 10, 45, 0)
stop_time = datetime.datetime(1914, 6, 28, 10, 45, 50)
fifty_sec_run_result = {
'start_time': start_time,
'end_time': stop_time,
}
fake_results['first'] = fifty_sec_run_result
for num in range(100):
test_name = 'test_fake_' + str(num)
start_time = start_time + datetime.timedelta(minutes=1)
stop_time = stop_time + datetime.timedelta(minutes=1)
fake_result = {
'start_time': start_time,
'end_time': stop_time,
}
fake_results[test_name] = fake_result
fake_subunit.results = fake_results
runtime = fake_subunit.run_time()
# Wall time should be (60 * 100) + 50
self.assertEqual(runtime, 6050.0)
def test_parse_outcome(self):
fake_subunit = subunit.ReadSubunit(mock.MagicMock())
fake_id = 'fake_dir.fake_module.fakeClass.test_fake[attr1,attr2]'
fake_timestamps = [datetime.datetime(1914, 8, 26, 20, 00, 00),
datetime.datetime(2014, 8, 26, 20, 00, 00)]
fake_status = 'skip'
tags = set(['worker-0'])
fake_results = {
'status': fake_status,
'details': {
'reason': 'fake reason'
},
'id': fake_id,
'timestamps': fake_timestamps,
'tags': tags
}
fake_subunit.parse_outcome(fake_results)
parsed_results = fake_subunit.results
# assert that the dict root key is the test name - the fake_id stripped
# of the tags
fake_test_name = fake_id[:fake_id.find('[')]
self.assertEqual(list(parsed_results.keys()), [fake_test_name])
self.assertEqual(parsed_results[fake_test_name]['status'],
fake_status)
self.assertEqual(parsed_results[fake_test_name]['start_time'],
fake_timestamps[0])
self.assertEqual(parsed_results[fake_test_name]['end_time'],
fake_timestamps[1])
fake_attrs = fake_id[fake_id.find('[') + 1:fake_id.find(']')]
self.assertEqual(parsed_results[fake_test_name]['metadata']['attrs'],
fake_attrs)
self.assertEqual(parsed_results[fake_test_name]['metadata']['tags'],
','.join(tags))
def test_attrs_default_regex(self):
fake_id = 'test_fake.TestThing.testA[good_test,fun_test,legit]'
read = subunit.ReadSubunit(io.BytesIO())
attrs = read.get_attrs(fake_id)
attrs_list = attrs.split(',')
self.assertIn('legit', attrs_list)
self.assertIn('fun_test', attrs_list)
self.assertIn('good_test', attrs_list)
def test_attrs_non_default_regex(self):
fake_id = 'test_fake.TestThing.testB`good_test,fun_test,legit`'
regex = '\`(.*)\`'
read = subunit.ReadSubunit(io.BytesIO(), attr_regex=regex)
attrs = read.get_attrs(fake_id)
attrs_list = attrs.split(',')
self.assertIn('legit', attrs_list)
self.assertIn('fun_test', attrs_list)
self.assertIn('good_test', attrs_list)
def test_attrs_no_matches(self):
fake_id = 'test_fake.TestThing.testB`good_test,fun_test,legit`'
read = subunit.ReadSubunit(io.BytesIO())
attrs = read.get_attrs(fake_id)
self.assertIsNone(attrs)
def test_cleanup_test_name_default_attr_regex(self):
fake_id = 'test_fake.TestThing.testA[good_test,fun_test,legit]'
read = subunit.ReadSubunit(io.BytesIO())
test_name = read.cleanup_test_name(fake_id, strip_tags=True,
strip_scenarios=False)
self.assertEqual('test_fake.TestThing.testA', test_name)
def test_cleanup_test_name_non_default_attr_regex(self):
fake_id = 'test_fake.TestThing.testB`good_test,fun_test,legit`'
regex = '\`(.*)\`'
read = subunit.ReadSubunit(io.BytesIO(), attr_regex=regex)
test_name = read.cleanup_test_name(fake_id, strip_tags=True,
strip_scenarios=False)
self.assertEqual('test_fake.TestThing.testB', test_name)
def test_cleanup_test_name_no_attr_matches(self):
fake_id = 'test_fake.TestThing.testB`good_test,fun_test,legit`'
read = subunit.ReadSubunit(io.BytesIO())
test_name = read.cleanup_test_name(fake_id, strip_tags=True,
strip_scenarios=False)
self.assertEqual(fake_id, test_name)
@mock.patch('testtools.CopyStreamResult')
def test_targets_added_to_result(self, ttc_mock):
subunit.ReadSubunit(mock.MagicMock(), targets=['foo'])
self.assertIn('foo', ttc_mock.call_args[0][0])
@mock.patch('testtools.CopyStreamResult')
def test_targets_not_modified(self, ttc_mock):
# A regression test that verifies that the subunit reader does
# not modify the passed in value of targets.
targets = ['foo']
subunit.ReadSubunit(mock.MagicMock(), targets=targets)
ntargets1 = len(ttc_mock.call_args[0][0])
subunit.ReadSubunit(mock.MagicMock(), targets=targets)
ntargets2 = len(ttc_mock.call_args[0][0])
self.assertEqual(ntargets1, ntargets2)
self.assertEqual(targets, ['foo'])
def test_non_subunit_name(self):
non_subunit_name = 'fake_non_subunit'
fake_subunit = subunit.ReadSubunit(mock.MagicMock(),
non_subunit_name=non_subunit_name)
self.assertEqual(fake_subunit.stream.non_subunit_name,
non_subunit_name)
def test_not_subunit_no_subunit_name_set(self):
stream_buf = io.BytesIO()
stream = subunit_lib.StreamResultToBytes(stream_buf)
stream.status(test_id='test_a', test_status='inprogress')
stream.status(test_id='test_a', test_status='success')
stream_buf.write(b'I AM NOT SUBUNIT')
stream_buf.seek(0)
result = subunit.ReadSubunit(stream_buf)
exc_found = False
try:
result.get_results()
# NOTE(mtreinish): Subunit raises the generic Exception class
# so manually inspect the Exception object to check the error
# message
except Exception as e:
self.assertIsInstance(e, Exception)
self.assertEqual(e.args, ('Non subunit content', b'I'))
exc_found = True
self.assertTrue(exc_found,
'subunit exception not raised on invalid content')