From 2a0558bb14864050d0992085f444dc27c5a5dbc5 Mon Sep 17 00:00:00 2001 From: Mikhail Dubov Date: Sat, 19 Oct 2013 01:22:58 +0400 Subject: [PATCH] Data processing: aggregation plots This is the first contribution to the automated data processing and visualization in Rally. Here we introduce the method for plotting the relation between the values of some config parameter of a benchmark scenario and its runing time. Minimal, maximal and average runtimes are plotted, in assumption that the benchmark scenario has been run several times during the task execution with different values of the parameter the data is aggregated on (e.g. the "active_users" parameter). The patch also extends the CLI with a new command, namely "task plot aggregated " Blueprint data-processing Change-Id: Ieee6622f6cfdc737878a0217cf965307d739b2ce --- rally/cmd/main.py | 12 +++++ rally/exceptions.py | 4 ++ rally/processing.py | 93 +++++++++++++++++++++++++++++++++ tests/cmd/test_main.py | 8 +++ tests/test_processing.py | 108 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 225 insertions(+) create mode 100644 rally/processing.py create mode 100644 tests/test_processing.py diff --git a/rally/cmd/main.py b/rally/cmd/main.py index 46ab74f0f2..2ad2300ea2 100644 --- a/rally/cmd/main.py +++ b/rally/cmd/main.py @@ -27,6 +27,7 @@ from rally.cmd import cliutils from rally import db from rally.openstack.common.gettextutils import _ # noqa from rally.orchestrator import api +from rally import processing class TaskCommands(object): @@ -125,6 +126,17 @@ class TaskCommands(object): """Delete a specific task and related results.""" api.delete_task(task_id, force=force) + @cliutils.args('--plot-type', type=str, help='plot type; available types: ' + ', '.join(processing.PLOTS.keys())) + @cliutils.args('--field-name', type=str, help='field from the task config ' + 'to aggregate the data on: concurrent/times/...') + @cliutils.args('--task-id', type=str, help='uuid of task') + def plot(self, plot_type, aggregated_field, task_id): + if plot_type in processing.PLOTS: + processing.PLOTS[plot_type](task_id, aggregated_field) + else: + print("Plot type '%s' not supported." % plot_type) + def main(): categories = {'task': TaskCommands} diff --git a/rally/exceptions.py b/rally/exceptions.py index 49f09416d5..bf5a777459 100644 --- a/rally/exceptions.py +++ b/rally/exceptions.py @@ -118,6 +118,10 @@ class NoSuchScenario(NotFoundException): msg_fmt = _("There is no benchmark scenario with name `%(name)s`.") +class NoSuchConfigField(NotFoundException): + msg_fmt = _("There is no field in the task config with name `%(name)s`.") + + class TaskNotFound(NotFoundException): msg_fmt = _("Task with uuid=%(uuid)s not found.") diff --git a/rally/processing.py b/rally/processing.py new file mode 100644 index 0000000000..5e9101ad90 --- /dev/null +++ b/rally/processing.py @@ -0,0 +1,93 @@ +# Copyright 2013: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools + +from rally import db +from rally import exceptions +from rally.openstack.common import importutils + +plt = importutils.try_import("matplotlib.pyplot") +ticker = importutils.try_import("matplotlib.ticker") + + +def aggregated_plot(task_id, aggregated_field): + """Draws an aggregated figure of benchmark runtimes in a separate window. + + The resulting figure has the aggregated field values on the X axis and + the benchmark runtimes (in seconds) on the Y axis. For each benchmark run, + minimum, maximum and average runtime values will be drawn, thus resulting + in three plots on the figure. + + :param task_id: ID of the task to draw the plot for + :param aggregated_field: Field from the test config to aggregate the data + on. This can be e.g. "active_users", "times" etc. + """ + + task = db.task_get_detailed(task_id) + + task["results"].sort(key=lambda res: res["key"]["name"]) + results_by_benchmark = itertools.groupby(task["results"], + lambda res: res["key"]["name"]) + for benchmark_name, data in results_by_benchmark: + data_dict = {} + for result in data: + + if aggregated_field not in result["key"]["kw"]["config"]: + raise exceptions.NoSuchConfigField(name=aggregated_field) + + raw = result["data"]["raw"] + times = map(lambda x: x["time"], + filter(lambda r: not r["error"], raw)) + + aggr_field_val = result["key"]["kw"]["config"][aggregated_field] + + data_dict[aggr_field_val] = {"min": min(times), + "avg": sum(times) / len(times), + "max": max(times)} + + aggr_field_vals = sorted(data_dict.keys()) + mins = [data_dict[x]["min"] for x in aggr_field_vals] + avgs = [data_dict[x]["avg"] for x in aggr_field_vals] + maxes = [data_dict[x]["max"] for x in aggr_field_vals] + + axes = plt.subplot(111) + + plt.plot(aggr_field_vals, maxes, "r-", label="max", linewidth=2) + plt.plot(aggr_field_vals, avgs, "b-", label="avg", linewidth=2) + plt.plot(aggr_field_vals, mins, "g-", label="min", linewidth=2) + + title = "Benchmark results: %s" % benchmark_name + plt.title(title) + fig = plt.gcf() + fig.canvas.set_window_title(title) + + plt.xlabel(aggregated_field) + axes.set_xlim(0, max(aggr_field_vals) + 1) + x_axis = axes.get_xaxis() + x_axis.set_major_locator(ticker.MaxNLocator(integer=True)) + + plt.ylabel("Time (sec)") + axes.set_ylim(min(mins) - 2, max(maxes) + 2) + + plt.legend(loc="upper right") + + plt.show() + + +# NOTE(msdubov): A mapping from plot names to plotting functions is used in CLI +PLOTS = { + "aggregated": aggregated_plot +} diff --git a/tests/cmd/test_main.py b/tests/cmd/test_main.py index 092bdb6c39..5d9b194986 100644 --- a/tests/cmd/test_main.py +++ b/tests/cmd/test_main.py @@ -66,3 +66,11 @@ class TaskCommandsTestCase(test.BaseTestCase): self.task.delete(task_uuid, force) mock_api.delete_task.assert_called_once_with(task_uuid, force=force) + + def test_plot(self): + test_uuid = str(uuid.uuid4()) + mock_plot = mock.Mock() + PLOTS = {"aggregated": mock_plot} + with mock.patch("rally.cmd.main.processing.PLOTS", new=PLOTS): + self.task.plot("aggregated", "concurrent", test_uuid) + mock_plot.assert_called_once_with(test_uuid, "concurrent") diff --git a/tests/test_processing.py b/tests/test_processing.py new file mode 100644 index 0000000000..55d9a46c6f --- /dev/null +++ b/tests/test_processing.py @@ -0,0 +1,108 @@ +# Copyright 2013: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from rally import exceptions +from rally import processing +from rally import test + + +class ProcessingTestCase(test.TestCase): + + def setUp(self): + super(ProcessingTestCase, self).setUp() + self.fake_task = { + "results": [ + { + "data": {"raw": [{"error": None, "time": 10.5}, + {"error": None, "time": 12.5}]}, + "key": {"name": "scenario_1", + "kw": {"config": {"active_users": 1}}} + }, + { + "data": {"raw": [{"error": None, "time": 4.3}]}, + "key": {"name": "scenario_2", + "kw": {"config": {"active_users": 1}}} + }, + { + "data": {"raw": [{"error": None, "time": 1.2}, + {"error": None, "time": 3.4}, + {"error": None, "time": 5.6}]}, + "key": {"name": "scenario_1", + "kw": {"config": {"active_users": 2}}} + } + ], + } + self.fake_task_aggregated_by_concurrency = { + "scenario_1": {1: [10.5, 12.5], 2: [1.2, 3.4, 5.6]}, + "scenario_2": {1: [4.3]} + } + self.fake_task_invalid_no_aggregated_field = { + "results": [ + { + "data": {"raw": [{"error": None, "time": 10.5}, + {"error": None, "time": 12.5}]}, + "key": {"name": "scenario_1", + "kw": {"config": {"active_users": 1}}} + }, + { + "data": {"raw": [{"error": None, "time": 4.3}]}, + "key": {"name": "scenario_2", + "kw": {"config": {"times": 1}}} + } + ], + } + + def test_aggregated_plot(self): + with mock.patch("rally.processing.db.task_get_detailed") as mock_task: + mock_task.return_value = self.fake_task_invalid_no_aggregated_field + with mock.patch("rally.processing.plt") as mock_plot: + with mock.patch("rally.processing.ticker"): + self.assertRaises(exceptions.NoSuchConfigField, + processing.aggregated_plot, + "task", "active_users") + mock_task.return_value = self.fake_task + with mock.patch("rally.processing.plt") as mock_plot: + with mock.patch("rally.processing.ticker"): + processing.aggregated_plot("task", "active_users") + + expected_plot_calls = [] + expected_show_calls = [] + + for scenario in self.fake_task_aggregated_by_concurrency: + + scenario_data = self.fake_task_aggregated_by_concurrency[scenario] + + active_users_vals = sorted(scenario_data.keys()) + + mins = [min(scenario_data[c]) for c in active_users_vals] + avgs = [sum(scenario_data[c]) / len(scenario_data[c]) + for c in active_users_vals] + maxes = [max(scenario_data[c]) for c in active_users_vals] + + expected_plot_calls.append(mock.call(active_users_vals, maxes, + "r-", label="max", + linewidth=2)) + expected_plot_calls.append(mock.call(active_users_vals, avgs, + "b-", label="avg", + linewidth=2)) + expected_plot_calls.append(mock.call(active_users_vals, mins, + "g-", label="min", + linewidth=2)) + expected_show_calls.append(mock.call.show()) + + self.assertEqual(mock_plot.plot.mock_calls, expected_plot_calls) + self.assertEqual(mock_plot.show.mock_calls, expected_show_calls)