Data processing: aggregation plots

This is the first contribution to the automated data processing and
visualization in Rally. Here we introduce the method for plotting
the relation between the values of some config parameter of a benchmark
scenario and its runing time. Minimal, maximal and average runtimes
are plotted, in assumption that the benchmark scenario has been run
several times during the task execution with different values of the
parameter the data is aggregated on (e.g. the "active_users" parameter).

The patch also extends the CLI with a new command, namely
"task plot aggregated <parameter_name> <task_uuid>"

Blueprint data-processing

Change-Id: Ieee6622f6cfdc737878a0217cf965307d739b2ce
This commit is contained in:
Mikhail Dubov 2013-10-19 01:22:58 +04:00
parent 3921b4faa5
commit 2a0558bb14
5 changed files with 225 additions and 0 deletions

View File

@ -27,6 +27,7 @@ from rally.cmd import cliutils
from rally import db from rally import db
from rally.openstack.common.gettextutils import _ # noqa from rally.openstack.common.gettextutils import _ # noqa
from rally.orchestrator import api from rally.orchestrator import api
from rally import processing
class TaskCommands(object): class TaskCommands(object):
@ -125,6 +126,17 @@ class TaskCommands(object):
"""Delete a specific task and related results.""" """Delete a specific task and related results."""
api.delete_task(task_id, force=force) api.delete_task(task_id, force=force)
@cliutils.args('--plot-type', type=str, help='plot type; available types: '
', '.join(processing.PLOTS.keys()))
@cliutils.args('--field-name', type=str, help='field from the task config '
'to aggregate the data on: concurrent/times/...')
@cliutils.args('--task-id', type=str, help='uuid of task')
def plot(self, plot_type, aggregated_field, task_id):
if plot_type in processing.PLOTS:
processing.PLOTS[plot_type](task_id, aggregated_field)
else:
print("Plot type '%s' not supported." % plot_type)
def main(): def main():
categories = {'task': TaskCommands} categories = {'task': TaskCommands}

View File

@ -118,6 +118,10 @@ class NoSuchScenario(NotFoundException):
msg_fmt = _("There is no benchmark scenario with name `%(name)s`.") msg_fmt = _("There is no benchmark scenario with name `%(name)s`.")
class NoSuchConfigField(NotFoundException):
msg_fmt = _("There is no field in the task config with name `%(name)s`.")
class TaskNotFound(NotFoundException): class TaskNotFound(NotFoundException):
msg_fmt = _("Task with uuid=%(uuid)s not found.") msg_fmt = _("Task with uuid=%(uuid)s not found.")

93
rally/processing.py Normal file
View File

@ -0,0 +1,93 @@
# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from rally import db
from rally import exceptions
from rally.openstack.common import importutils
plt = importutils.try_import("matplotlib.pyplot")
ticker = importutils.try_import("matplotlib.ticker")
def aggregated_plot(task_id, aggregated_field):
"""Draws an aggregated figure of benchmark runtimes in a separate window.
The resulting figure has the aggregated field values on the X axis and
the benchmark runtimes (in seconds) on the Y axis. For each benchmark run,
minimum, maximum and average runtime values will be drawn, thus resulting
in three plots on the figure.
:param task_id: ID of the task to draw the plot for
:param aggregated_field: Field from the test config to aggregate the data
on. This can be e.g. "active_users", "times" etc.
"""
task = db.task_get_detailed(task_id)
task["results"].sort(key=lambda res: res["key"]["name"])
results_by_benchmark = itertools.groupby(task["results"],
lambda res: res["key"]["name"])
for benchmark_name, data in results_by_benchmark:
data_dict = {}
for result in data:
if aggregated_field not in result["key"]["kw"]["config"]:
raise exceptions.NoSuchConfigField(name=aggregated_field)
raw = result["data"]["raw"]
times = map(lambda x: x["time"],
filter(lambda r: not r["error"], raw))
aggr_field_val = result["key"]["kw"]["config"][aggregated_field]
data_dict[aggr_field_val] = {"min": min(times),
"avg": sum(times) / len(times),
"max": max(times)}
aggr_field_vals = sorted(data_dict.keys())
mins = [data_dict[x]["min"] for x in aggr_field_vals]
avgs = [data_dict[x]["avg"] for x in aggr_field_vals]
maxes = [data_dict[x]["max"] for x in aggr_field_vals]
axes = plt.subplot(111)
plt.plot(aggr_field_vals, maxes, "r-", label="max", linewidth=2)
plt.plot(aggr_field_vals, avgs, "b-", label="avg", linewidth=2)
plt.plot(aggr_field_vals, mins, "g-", label="min", linewidth=2)
title = "Benchmark results: %s" % benchmark_name
plt.title(title)
fig = plt.gcf()
fig.canvas.set_window_title(title)
plt.xlabel(aggregated_field)
axes.set_xlim(0, max(aggr_field_vals) + 1)
x_axis = axes.get_xaxis()
x_axis.set_major_locator(ticker.MaxNLocator(integer=True))
plt.ylabel("Time (sec)")
axes.set_ylim(min(mins) - 2, max(maxes) + 2)
plt.legend(loc="upper right")
plt.show()
# NOTE(msdubov): A mapping from plot names to plotting functions is used in CLI
PLOTS = {
"aggregated": aggregated_plot
}

View File

@ -66,3 +66,11 @@ class TaskCommandsTestCase(test.BaseTestCase):
self.task.delete(task_uuid, force) self.task.delete(task_uuid, force)
mock_api.delete_task.assert_called_once_with(task_uuid, mock_api.delete_task.assert_called_once_with(task_uuid,
force=force) force=force)
def test_plot(self):
test_uuid = str(uuid.uuid4())
mock_plot = mock.Mock()
PLOTS = {"aggregated": mock_plot}
with mock.patch("rally.cmd.main.processing.PLOTS", new=PLOTS):
self.task.plot("aggregated", "concurrent", test_uuid)
mock_plot.assert_called_once_with(test_uuid, "concurrent")

108
tests/test_processing.py Normal file
View File

@ -0,0 +1,108 @@
# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from rally import exceptions
from rally import processing
from rally import test
class ProcessingTestCase(test.TestCase):
def setUp(self):
super(ProcessingTestCase, self).setUp()
self.fake_task = {
"results": [
{
"data": {"raw": [{"error": None, "time": 10.5},
{"error": None, "time": 12.5}]},
"key": {"name": "scenario_1",
"kw": {"config": {"active_users": 1}}}
},
{
"data": {"raw": [{"error": None, "time": 4.3}]},
"key": {"name": "scenario_2",
"kw": {"config": {"active_users": 1}}}
},
{
"data": {"raw": [{"error": None, "time": 1.2},
{"error": None, "time": 3.4},
{"error": None, "time": 5.6}]},
"key": {"name": "scenario_1",
"kw": {"config": {"active_users": 2}}}
}
],
}
self.fake_task_aggregated_by_concurrency = {
"scenario_1": {1: [10.5, 12.5], 2: [1.2, 3.4, 5.6]},
"scenario_2": {1: [4.3]}
}
self.fake_task_invalid_no_aggregated_field = {
"results": [
{
"data": {"raw": [{"error": None, "time": 10.5},
{"error": None, "time": 12.5}]},
"key": {"name": "scenario_1",
"kw": {"config": {"active_users": 1}}}
},
{
"data": {"raw": [{"error": None, "time": 4.3}]},
"key": {"name": "scenario_2",
"kw": {"config": {"times": 1}}}
}
],
}
def test_aggregated_plot(self):
with mock.patch("rally.processing.db.task_get_detailed") as mock_task:
mock_task.return_value = self.fake_task_invalid_no_aggregated_field
with mock.patch("rally.processing.plt") as mock_plot:
with mock.patch("rally.processing.ticker"):
self.assertRaises(exceptions.NoSuchConfigField,
processing.aggregated_plot,
"task", "active_users")
mock_task.return_value = self.fake_task
with mock.patch("rally.processing.plt") as mock_plot:
with mock.patch("rally.processing.ticker"):
processing.aggregated_plot("task", "active_users")
expected_plot_calls = []
expected_show_calls = []
for scenario in self.fake_task_aggregated_by_concurrency:
scenario_data = self.fake_task_aggregated_by_concurrency[scenario]
active_users_vals = sorted(scenario_data.keys())
mins = [min(scenario_data[c]) for c in active_users_vals]
avgs = [sum(scenario_data[c]) / len(scenario_data[c])
for c in active_users_vals]
maxes = [max(scenario_data[c]) for c in active_users_vals]
expected_plot_calls.append(mock.call(active_users_vals, maxes,
"r-", label="max",
linewidth=2))
expected_plot_calls.append(mock.call(active_users_vals, avgs,
"b-", label="avg",
linewidth=2))
expected_plot_calls.append(mock.call(active_users_vals, mins,
"g-", label="min",
linewidth=2))
expected_show_calls.append(mock.call.show())
self.assertEqual(mock_plot.plot.mock_calls, expected_plot_calls)
self.assertEqual(mock_plot.show.mock_calls, expected_show_calls)