Extract test run aggregation into separated class

Previously the test run aggregation was happening on a long method that was
hard to follow. This commit extracts this logic into its own class as
well includes unit tests around it.

Change-Id: I792dd0f0e57bc382c872210b84d31b70a95e4b44
This commit is contained in:
Glauco Oliveira 2015-11-06 15:02:28 -02:00
parent 0d38b13e11
commit 11dcb88deb
8 changed files with 495 additions and 138 deletions

View File

@ -27,9 +27,9 @@ from operator import itemgetter
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from subunit2sql.db import api
from subunit2sql import read_subunit
from run_aggregator import RunAggregator
from test_run_aggregator import TestRunAggregator
app = flask.Flask(__name__)
app.config['PROPAGATE_EXCEPTIONS'] = True
@ -111,18 +111,18 @@ def get_runs_grouped_by_metadata_per_datetime(key):
session = Session()
start_date = _parse_datetimes(flask.request.args.get('start_date', None))
stop_date = _parse_datetimes(flask.request.args.get('stop_date', None))
date_range = flask.request.args.get('datetime_resolution', None)
datetime_resolution = flask.request.args.get('datetime_resolution', None)
sec_runs = api.get_all_runs_time_series_by_key(key, start_date,
stop_date, session)
if not date_range:
if not datetime_resolution:
runs = sec_runs
else:
runs = {}
if date_range not in ['sec', 'min', 'hour', 'day']:
if datetime_resolution not in ['sec', 'min', 'hour', 'day']:
return ('Datetime resolution: %s, is not a valid'
' choice' % date_range), 400
' choice' % datetime_resolution), 400
else:
runs = RunAggregator(sec_runs, date_range).aggregate()
runs = RunAggregator(sec_runs).aggregate(datetime_resolution)
out_runs = {}
for run in runs:
out_runs[run.isoformat()] = runs[run]
@ -147,10 +147,10 @@ def _group_runs_by_key(runs_by_time, groupby_key):
def _get_runs_for_key_value_grouped_by(key, value, groupby_key,
start_date=None, stop_date=None,
date_range=None):
if date_range not in ['sec', 'min', 'hour', 'day']:
datetime_resolution=None):
if datetime_resolution not in ['sec', 'min', 'hour', 'day']:
return ('Datetime resolution: %s, is not a valid'
' choice' % date_range), 400
' choice' % datetime_resolution), 400
global Session
session = Session()
@ -165,8 +165,8 @@ def _get_runs_for_key_value_grouped_by(key, value, groupby_key,
# Group runs by the chosen data_range.
# That does not apply when you choose 'sec' since runs are already grouped
# by it.
runs_by_groupby_key = \
RunAggregator(runs_by_groupby_key, date_range).aggregate()
runs_by_groupby_key = (RunAggregator(runs_by_groupby_key)
.aggregate(datetime_resolution=datetime_resolution))
out_runs = {}
for run in runs_by_groupby_key:
out_runs[run.isoformat()] = runs_by_groupby_key[run]
@ -174,97 +174,6 @@ def _get_runs_for_key_value_grouped_by(key, value, groupby_key,
return out_runs, 200
def _moving_avg(curr_avg, count, value):
return ((count * curr_avg) + value) / (count + 1)
def _update_counters(status, pass_count, fail_count, skip_count):
if status == 'success' or status == 'xfail':
pass_count = pass_count + 1
elif status == 'fail' or status == 'unxsuccess':
fail_count = fail_count + 1
else:
skip_count = skip_count + 1
return pass_count, fail_count, skip_count
def _group_test_runs_by_date_res(res, tests):
test_runs = {}
for test_run in tests:
# Correct resolution
if res == 'sec':
corr_res = test_run['start_time'].replace(microsecond=0)
elif res == 'min':
corr_res = test_run['start_time'].replace(second=0, microsecond=0)
elif res == 'hour':
corr_res = test_run['start_time'].replace(minute=0, second=0,
microsecond=0)
elif res == 'day':
corr_res = test_run['start_time'].date()
corr_res = corr_res.isoformat()
# Bin test runs based on corrected timestamp
if corr_res in test_runs:
test_id = test_run['test_id']
if test_id in test_runs[corr_res]:
# Update moving average if the test was a success
if (test_run['status'] == 'success' or
test_run['status'] == 'xfail'):
durr = read_subunit.get_duration(test_run['start_time'],
test_run['stop_time'])
run_time = _moving_avg(
test_runs[corr_res][test_id]['run_time'],
test_runs[corr_res][test_id]['pass'],
durr)
else:
run_time = None
# Update Counters
pass_count, fail_count, skip_count = _update_counters(
test_run['status'],
test_runs[corr_res][test_id]['pass'],
test_runs[corr_res][test_id]['fail'],
test_runs[corr_res][test_run['test_id']]['skip'])
test_runs[corr_res][test_id]['pass'] = pass_count
test_runs[corr_res][test_id]['fail'] = fail_count
test_runs[corr_res][test_id]['skip'] = skip_count
if run_time:
test_runs[corr_res][test_id]['run_time'] = run_time
else:
pass_count, fail_count, skip_count = _update_counters(
test_run['status'], 0, 0, 0)
if (test_run['status'] == 'success' or
test_run['status'] == 'xfail'):
run_time = read_subunit.get_duration(
test_run['start_time'],
test_run['stop_time'])
else:
run_time = 0
test_runs[corr_res][test_id] = {
'pass': pass_count,
'fail': fail_count,
'skip': skip_count,
'run_time': run_time
}
else:
pass_count, fail_count, skip_count = _update_counters(
test_run['status'], 0, 0, 0)
if (test_run['status'] == 'success' or
test_run['status'] == 'xfail'):
run_time = read_subunit.get_duration(test_run['start_time'],
test_run['stop_time'])
else:
run_time = 0
test_runs[corr_res] = {
test_run['test_id']: {
'pass': pass_count,
'fail': fail_count,
'skip': skip_count,
'run_time': run_time
}
}
return test_runs
@app.route('/build_name/<string:build_name>/test_runs', methods=['GET'])
def get_test_runs_by_build_name(build_name):
global Session
@ -275,13 +184,14 @@ def get_test_runs_by_build_name(build_name):
return 'A key and value must be specified', 400
start_date = _parse_datetimes(flask.request.args.get('start_date', None))
stop_date = _parse_datetimes(flask.request.args.get('stop_date', None))
date_range = flask.request.args.get('datetime_resolution', 'sec')
if date_range not in ['sec', 'min', 'hour', 'day']:
datetime_resolution = flask.request.args.get('datetime_resolution', 'sec')
if datetime_resolution not in ['sec', 'min', 'hour', 'day']:
return ('Datetime resolution: %s, is not a valid'
' choice' % date_range), 400
' choice' % datetime_resolution), 400
tests = api.get_test_run_dict_by_run_meta_key_value(key, value, start_date,
stop_date, session)
tests = _group_test_runs_by_date_res(date_range, tests)
tests = (TestRunAggregator(tests)
.aggregate(datetime_resolution=datetime_resolution))
return jsonify({'tests': tests})
@ -338,7 +248,7 @@ def _aggregate_runs(runs_by_time_delta):
def get_runs_by_project(project):
start_date = _parse_datetimes(flask.request.args.get('start_date', None))
stop_date = _parse_datetimes(flask.request.args.get('stop_date', None))
date_range = flask.request.args.get('datetime_resolution', 'day')
datetime_resolution = flask.request.args.get('datetime_resolution', 'day')
filter_by_project = "project"
group_by_build_name = "build_name"
@ -347,7 +257,7 @@ def get_runs_by_project(project):
group_by_build_name,
start_date,
stop_date,
date_range)
datetime_resolution)
if err != 200:
return abort(make_response(runs_by_time, err))

View File

@ -0,0 +1,30 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class BaseAggregator(object):
def _update_datetime_to_fit_resolution(self,
execution_datetime,
datetime_resolution):
if datetime_resolution == 'sec':
return execution_datetime
elif datetime_resolution == 'min':
return execution_datetime.replace(second=0,
microsecond=0)
elif datetime_resolution == 'hour':
return execution_datetime.replace(minute=0,
second=0,
microsecond=0)
elif datetime_resolution == 'day':
return execution_datetime.date()

View File

@ -12,24 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from base_aggregator import BaseAggregator
class RunAggregator(object):
def __init__(self, runs, datetime_resolution):
class RunAggregator(BaseAggregator):
def __init__(self, runs):
self.runs = runs
self.datetime_resolution = datetime_resolution
def _update_datetime_to_fit_resolution(self, execution_datetime):
if self.datetime_resolution == 'sec':
return execution_datetime
elif self.datetime_resolution == 'min':
return execution_datetime.replace(second=0,
microsecond=0)
elif self.datetime_resolution == 'hour':
return execution_datetime.replace(minute=0,
second=0,
microsecond=0)
elif self.datetime_resolution == 'day':
return execution_datetime.date()
def _build_aggregated_runs(self, execution_datetime, updated_datetime,
aggregated_runs):
@ -42,17 +30,18 @@ class RunAggregator(object):
runs_at_given_datetime = self.runs[execution_datetime]
runs_by_given_metadata_key = runs_at_given_datetime[metadata_key]
if aggregated_runs[updated_datetime].get(metadata_key, None):
aggregated_runs[updated_datetime][metadata_key]\
.extend(runs_by_given_metadata_key)
(aggregated_runs[updated_datetime][metadata_key]
.extend(runs_by_given_metadata_key))
else:
aggregated_runs[updated_datetime][metadata_key] = \
runs_by_given_metadata_key
def aggregate(self):
def aggregate(self, datetime_resolution='sec'):
aggregated_runs = {}
for execution_datetime in self.runs:
updated_datetime = \
self._update_datetime_to_fit_resolution(execution_datetime)
self._update_datetime_to_fit_resolution(execution_datetime,
datetime_resolution)
self._build_aggregated_runs(execution_datetime,
updated_datetime,
aggregated_runs)

View File

@ -0,0 +1,144 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from subunit2sql import read_subunit
from base_aggregator import BaseAggregator
class Status(object):
def __init__(self, status):
self.status = status
@property
def is_success(self):
return self.status in ['success', 'xfail']
@property
def is_failure(self):
return self.status in ['fail', 'unxsuccess']
@property
def is_skip(self):
return (not self.is_success and
not self.is_failure)
class Counter(object):
def __init__(self, passes, failures, skips):
self.passes = passes
self.failures = failures
self.skips = skips
def _update_pass_counter(self, status):
if status.is_success:
self.passes = self.passes + 1
def _update_fail_counter(self, status):
if status.is_failure:
self.failures = self.failures + 1
def _update_skip_counter(self, status):
if status.is_skip:
self.skips = self.skips + 1
def update(self, _status):
status = Status(_status)
self._update_pass_counter(status),
self._update_fail_counter(status),
self._update_skip_counter(status)
return (self.passes, self.failures, self.skips)
class TestRunAggregator(BaseAggregator):
def __init__(self, test_runs):
self.test_runs = test_runs
def _get_run_time(self, test_run):
status = Status(test_run['status'])
if status.is_success:
return read_subunit.get_duration(test_run['start_time'],
test_run['stop_time'])
return 0
def _moving_avg(self, curr_avg, count, value):
return ((count * curr_avg) + value) / (count + 1)
def _get_average_run_time(self, test_run, aggregated_test_run):
run_time = self._get_run_time(test_run)
status = Status(test_run['status'])
if status.is_success:
return self._moving_avg(
aggregated_test_run['run_time'],
aggregated_test_run['pass'],
run_time)
return 0
def _build_aggregated_test_runs(self,
updated_datetime,
test_run,
aggregated_test_runs):
status = test_run['status']
test_id = test_run['test_id']
if updated_datetime not in aggregated_test_runs:
passes, failures, skips = \
Counter(passes=0, failures=0, skips=0).update(status)
run_time = self._get_run_time(test_run)
aggregated_test_runs[updated_datetime] = {
test_id: {
'pass': passes, 'fail': failures,
'skip': skips, 'run_time': run_time
}
}
return
if test_id not in aggregated_test_runs[updated_datetime]:
passes, failures, skips = \
Counter(passes=0, failures=0, skips=0).update(status)
run_time = self._get_run_time(test_run)
aggregated_test_runs[updated_datetime][test_id] = {
'pass': passes, 'fail': failures,
'skip': skips, 'run_time': run_time
}
return
aggregated_test_run = aggregated_test_runs[updated_datetime][test_id]
passes, failures, skips = \
Counter(
passes=aggregated_test_run['pass'],
failures=aggregated_test_run['fail'],
skips=aggregated_test_run['skip']).update(status)
average_runtime = self._get_average_run_time(test_run,
aggregated_test_run)
aggregated_test_runs[updated_datetime][test_id] = {
'pass': passes, 'fail': failures,
'skip': skips, 'run_time': average_runtime
}
def aggregate(self, datetime_resolution='sec'):
aggregated_test_runs = {}
for test_run in self.test_runs:
updated_datetime = \
self._update_datetime_to_fit_resolution(test_run['start_time'],
datetime_resolution)
updated_datetime = updated_datetime.isoformat()
self._build_aggregated_test_runs(updated_datetime,
test_run,
aggregated_test_runs)
return aggregated_test_runs

View File

@ -0,0 +1,39 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack_health.test_run_aggregator import Counter
from openstack_health.tests import base
class TestCounter(base.TestCase):
def test_that_pass_counter_will_be_updated_on_success(self):
counter = Counter(passes=0, failures=0, skips=0)
(passes, failures, skips) = counter.update('success')
self.assertEqual(1, passes)
self.assertEqual(0, failures)
self.assertEqual(0, skips)
def test_that_fail_counter_will_be_updated_on_failure(self):
counter = Counter(passes=0, failures=0, skips=0)
(passes, failures, skips) = counter.update('fail')
self.assertEqual(0, passes)
self.assertEqual(1, failures)
self.assertEqual(0, skips)
def test_that_skip_counter_will_be_updated_on_skip(self):
counter = Counter(passes=0, failures=0, skips=0)
(passes, failures, skips) = counter.update('skip')
self.assertEqual(0, passes)
self.assertEqual(0, failures)
self.assertEqual(1, skips)

View File

@ -14,9 +14,8 @@
import datetime
from openstack_health.tests import base
from openstack_health.run_aggregator import RunAggregator
from openstack_health.tests import base
class TestRunAggregator(base.TestCase):
@ -39,15 +38,15 @@ class TestRunAggregator(base.TestCase):
}
def test_that_runs_will_be_aggregated_by_seconds_and_project(self):
aggregator = RunAggregator(self.runs, 'sec')
aggregated_runs = aggregator.aggregate()
aggregator = RunAggregator(self.runs)
aggregated_runs = aggregator.aggregate(datetime_resolution='sec')
expected_response = self.runs
self.assertItemsEqual(expected_response, aggregated_runs)
def test_that_runs_will_be_aggregated_by_minute_and_project(self):
aggregator = RunAggregator(self.runs, 'min')
aggregated_runs = aggregator.aggregate()
aggregator = RunAggregator(self.runs)
aggregated_runs = aggregator.aggregate(datetime_resolution='min')
expected_response = {
datetime.datetime(2015, 1, 2, 12, 23): {
@ -63,8 +62,8 @@ class TestRunAggregator(base.TestCase):
self.assertItemsEqual(expected_response, aggregated_runs)
def test_that_runs_will_be_aggregated_by_hour_and_project(self):
aggregator = RunAggregator(self.runs, 'hour')
aggregated_runs = aggregator.aggregate()
aggregator = RunAggregator(self.runs)
aggregated_runs = aggregator.aggregate(datetime_resolution='hour')
expected_response = {
datetime.datetime(2015, 1, 2, 12): {
@ -80,8 +79,8 @@ class TestRunAggregator(base.TestCase):
self.assertItemsEqual(expected_response, aggregated_runs)
def test_that_runs_will_be_aggregated_by_day_and_project(self):
aggregator = RunAggregator(self.runs, 'day')
aggregated_runs = aggregator.aggregate()
aggregator = RunAggregator(self.runs)
aggregated_runs = aggregator.aggregate(datetime_resolution='day')
expected_response = {
datetime.date(2015, 1, 2): {

View File

@ -0,0 +1,60 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack_health.test_run_aggregator import Status
from openstack_health.tests import base
class TestStatus(base.TestCase):
def test_that_success_string_translates_to_success(self):
status = Status('success')
self.assertEqual(True, status.is_success)
self.assertEqual(False, status.is_failure)
self.assertEqual(False, status.is_skip)
def test_that_xfail_string_translates_to_success(self):
status = Status('xfail')
self.assertEqual(True, status.is_success)
self.assertEqual(False, status.is_failure)
self.assertEqual(False, status.is_skip)
def test_that_fail_string_translates_to_failure(self):
status = Status('fail')
self.assertEqual(False, status.is_success)
self.assertEqual(True, status.is_failure)
self.assertEqual(False, status.is_skip)
def test_that_unxsuccess_string_translates_to_failure(self):
status = Status('unxsuccess')
self.assertEqual(False, status.is_success)
self.assertEqual(True, status.is_failure)
self.assertEqual(False, status.is_skip)
def test_that_null_translates_to_skip(self):
status = Status(None)
self.assertEqual(False, status.is_success)
self.assertEqual(False, status.is_failure)
self.assertEqual(True, status.is_skip)
def test_that_an_empty_string_translates_to_skip(self):
status = Status('')
self.assertEqual(False, status.is_success)
self.assertEqual(False, status.is_failure)
self.assertEqual(True, status.is_skip)
def test_that_a_random_string_translates_to_skip(self):
status = Status('$random1234')
self.assertEqual(False, status.is_success)
self.assertEqual(False, status.is_failure)
self.assertEqual(True, status.is_skip)

View File

@ -0,0 +1,186 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from openstack_health.test_run_aggregator import TestRunAggregator
from openstack_health.tests import base
class TestTestRunAggregator(base.TestCase):
def setUp(self):
super(TestTestRunAggregator, self).setUp()
timestamp_a = datetime.datetime(2015, 1, 2, 12, 23, 45)
timestamp_b = datetime.datetime(2015, 1, 2, 12, 23, 56)
timestamp_c = datetime.datetime(2015, 1, 2, 12, 24, 23)
self.test_runs = [
{'test_id': 'nova_test', 'status': 'success',
'start_time': timestamp_a, 'stop_time': timestamp_b},
{'test_id': 'neutron_test', 'status': 'fail',
'start_time': timestamp_a, 'stop_time': timestamp_c},
{'test_id': 'sahara_test', 'status': 'fail',
'start_time': timestamp_b, 'stop_time': timestamp_c}
]
def test_that_test_runs_will_be_aggregated_by_seconds(self):
aggregator = TestRunAggregator(self.test_runs)
aggregated_test_runs = aggregator.aggregate(datetime_resolution='sec')
expected_response = {
'2015-01-02T12:23:56': {
'sahara_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
}
},
'2015-01-02T12:23:45': {
'nova_test': {
'run_time': 11.0, 'fail': 0, 'pass': 1, 'skip': 0
},
'neutron_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
}
}
}
self.assertItemsEqual(expected_response, aggregated_test_runs)
expected_sahara_test = (expected_response['2015-01-02T12:23:56']
['sahara_test'])
actual_sahara_test = (aggregated_test_runs['2015-01-02T12:23:56']
['sahara_test'])
self.assertEqual(expected_sahara_test, actual_sahara_test)
expected_nova_test = (expected_response['2015-01-02T12:23:45']
['nova_test'])
actual_nova_test = (aggregated_test_runs['2015-01-02T12:23:45']
['nova_test'])
self.assertEqual(expected_nova_test, actual_nova_test)
expected_neutron_test = (expected_response['2015-01-02T12:23:45']
['neutron_test'])
actual_neutron_test = (aggregated_test_runs['2015-01-02T12:23:45']
['neutron_test'])
self.assertEqual(expected_neutron_test, actual_neutron_test)
def test_that_test_runs_will_be_aggregated_by_minutes(self):
aggregator = TestRunAggregator(self.test_runs)
aggregated_test_runs = aggregator.aggregate(datetime_resolution='min')
expected_response = {
'2015-01-02T12:23:00': {
'sahara_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
},
'nova_test': {
'run_time': 11.0, 'fail': 0, 'pass': 1, 'skip': 0
},
'neutron_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
}
}
}
self.assertItemsEqual(expected_response, aggregated_test_runs)
expected_sahara_test = (expected_response['2015-01-02T12:23:00']
['sahara_test'])
actual_sahara_test = (aggregated_test_runs['2015-01-02T12:23:00']
['sahara_test'])
self.assertEqual(expected_sahara_test, actual_sahara_test)
expected_nova_test = (expected_response['2015-01-02T12:23:00']
['nova_test'])
actual_nova_test = (aggregated_test_runs['2015-01-02T12:23:00']
['nova_test'])
self.assertEqual(expected_nova_test, actual_nova_test)
expected_neutron_test = (expected_response['2015-01-02T12:23:00']
['neutron_test'])
actual_neutron_test = (aggregated_test_runs['2015-01-02T12:23:00']
['neutron_test'])
self.assertEqual(expected_neutron_test, actual_neutron_test)
def test_that_test_runs_will_be_aggregated_by_hour(self):
aggregator = TestRunAggregator(self.test_runs)
aggregated_test_runs = aggregator.aggregate(datetime_resolution='hour')
expected_response = {
'2015-01-02T12:00:00': {
'sahara_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
},
'nova_test': {
'run_time': 11.0, 'fail': 0, 'pass': 1, 'skip': 0
},
'neutron_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
}
}
}
self.assertItemsEqual(expected_response, aggregated_test_runs)
expected_sahara_test = (expected_response['2015-01-02T12:00:00']
['sahara_test'])
actual_sahara_test = (aggregated_test_runs['2015-01-02T12:00:00']
['sahara_test'])
self.assertEqual(expected_sahara_test, actual_sahara_test)
expected_nova_test = (expected_response['2015-01-02T12:00:00']
['nova_test'])
actual_nova_test = (aggregated_test_runs['2015-01-02T12:00:00']
['nova_test'])
self.assertEqual(expected_nova_test, actual_nova_test)
expected_neutron_test = (expected_response['2015-01-02T12:00:00']
['neutron_test'])
actual_neutron_test = (aggregated_test_runs['2015-01-02T12:00:00']
['neutron_test'])
self.assertEqual(expected_neutron_test, actual_neutron_test)
def test_that_test_runs_will_be_aggregated_by_day(self):
aggregator = TestRunAggregator(self.test_runs)
aggregated_test_runs = aggregator.aggregate(datetime_resolution='day')
expected_response = {
'2015-01-02': {
'sahara_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
},
'nova_test': {
'run_time': 11.0, 'fail': 0, 'pass': 1, 'skip': 0
},
'neutron_test': {
'run_time': 0, 'fail': 1, 'pass': 0, 'skip': 0
}
}
}
self.assertItemsEqual(expected_response, aggregated_test_runs)
expected_sahara_test = (expected_response['2015-01-02']
['sahara_test'])
actual_sahara_test = (aggregated_test_runs['2015-01-02']
['sahara_test'])
self.assertEqual(expected_sahara_test, actual_sahara_test)
expected_nova_test = (expected_response['2015-01-02']
['nova_test'])
actual_nova_test = (aggregated_test_runs['2015-01-02']
['nova_test'])
self.assertEqual(expected_nova_test, actual_nova_test)
expected_neutron_test = (expected_response['2015-01-02']
['neutron_test'])
actual_neutron_test = (aggregated_test_runs['2015-01-02']
['neutron_test'])
self.assertEqual(expected_neutron_test, actual_neutron_test)