diff --git a/rally/benchmark/engine.py b/rally/benchmark/engine.py index 6f4b81abc2..991e8f9978 100644 --- a/rally/benchmark/engine.py +++ b/rally/benchmark/engine.py @@ -14,6 +14,8 @@ # under the License. import json +import threading +import time import traceback import jsonschema @@ -185,18 +187,20 @@ class BenchmarkEngine(object): corresponding benchmark test launches """ self.task.update_status(consts.TaskStatus.RUNNING) - results = {} for name in self.config: for n, kw in enumerate(self.config[name]): key = {'name': name, 'pos': n, 'kw': kw} LOG.info("Running benchmark with key: %s" % key) runner = self._get_runner(kw) - result = runner.run(name, kw.get("context", {}), - kw.get("args", {})) - self.task.append_results(key, {"raw": result}) - results[json.dumps(key)] = result + is_done = threading.Event() + consumer = threading.Thread( + target=self.consume_results, + args=(key, self.task, runner.result_queue, is_done)) + consumer.start() + runner.run(name, kw.get("context", {}), kw.get("args", {})) + is_done.set() + consumer.join() self.task.update_status(consts.TaskStatus.FINISHED) - return results @rutils.log_task_wrapper(LOG.info, _("Check cloud.")) def bind(self, endpoints): @@ -211,3 +215,27 @@ class BenchmarkEngine(object): clients = osclients.Clients(self.admin_endpoint) clients.verified_keystone() return self + + @staticmethod + def consume_results(key, task, result_queue, is_done): + """Consume scenario runner results from queue and send them to db. + + Has to be run from different thread simultaneously with the runner.run + method. + + :param key: Scenario identifier + :param task: Running task + :param result_queue: Deque with runner results + :param is_done: Event which is set from the runner thread after the + runner finishes it's work. + """ + results = [] + while True: + if result_queue: + result = result_queue.popleft() + results.append(result) + elif is_done.isSet(): + break + else: + time.sleep(0.1) + task.append_results(key, {"raw": results}) diff --git a/rally/benchmark/runners/base.py b/rally/benchmark/runners/base.py index 4828010c17..bbee52b3d3 100644 --- a/rally/benchmark/runners/base.py +++ b/rally/benchmark/runners/base.py @@ -14,6 +14,7 @@ # under the License. import abc +import collections import copy import random @@ -77,56 +78,53 @@ def _run_scenario_once(args): "atomic_actions": scenario.atomic_actions()} -class ScenarioRunnerResult(list): +class ScenarioRunnerResult(dict): """Class for all scenario runners' result.""" RESULT_SCHEMA = { - "type": "array", + "type": "object", "$schema": rutils.JSON_SCHEMA, - "items": { - "type": "object", - "properties": { - "duration": { - "type": "number" + "properties": { + "duration": { + "type": "number" + }, + "idle_duration": { + "type": "number" + }, + "scenario_output": { + "type": "object", + "properties": { + "data": { + "type": "object", + "patternProperties": { + ".*": {"type": "number"} + } + }, + "errors": { + "type": "string" + }, }, - "idle_duration": { - "type": "number" - }, - "scenario_output": { + "additionalProperties": False + }, + "atomic_actions": { + "type": "array", + "items": { "type": "object", "properties": { - "data": { - "type": "object", - "patternProperties": { - ".*": {"type": "number"} - } - }, - "errors": { - "type": "string" - }, + "action": {"type": "string"}, + "duration": {"type": "number"} }, "additionalProperties": False - }, - "atomic_actions": { - "type": "array", - "items": { - "type": "object", - "properties": { - "action": {"type": "string"}, - "duration": {"type": "number"} - }, - "additionalProperties": False - } - }, - "error": { - "type": "array", - "items": { - "type": "string" - } } }, - "additionalProperties": False - } + "error": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "additionalProperties": False } def __init__(self, result_list): @@ -154,6 +152,7 @@ class ScenarioRunner(object): # a single admin endpoint here. self.admin_user = endpoints[0] self.config = config + self.result_queue = collections.deque() @staticmethod def _get_cls(runner_type): @@ -208,13 +207,14 @@ class ScenarioRunner(object): } args = cls.preprocess(method_name, context_obj, args) - results = base_ctx.ContextManager.run(context_obj, self._run_scenario, - cls, method_name, args) + base_ctx.ContextManager.run(context_obj, self._run_scenario, + cls, method_name, args) - if not isinstance(results, ScenarioRunnerResult): - name = self.__execution_type__ - results_type = type(results) - raise exceptions.InvalidRunnerResult(name=name, - results_type=results_type) + def _send_result(self, result): + """Send partial result to consumer. - return results + :param result: Result dict to be sent. It should match the + ScenarioRunnerResult schema, otherwise + ValidationError is raised. + """ + self.result_queue.append(ScenarioRunnerResult(result)) diff --git a/rally/benchmark/runners/constant.py b/rally/benchmark/runners/constant.py index 7c596762f6..de9d1220a7 100644 --- a/rally/benchmark/runners/constant.py +++ b/rally/benchmark/runners/constant.py @@ -82,9 +82,6 @@ class ConstantScenarioRunner(base.ScenarioRunner): iter_result = pool.imap(base._run_scenario_once, self._iter_scenario_args(cls, method, context, args, times)) - - results = [] - for i in range(times): try: result = iter_result.next(timeout) @@ -93,13 +90,11 @@ class ConstantScenarioRunner(base.ScenarioRunner): "scenario_output": {}, "atomic_actions": [], "error": utils.format_exc(e)} - results.append(result) + self._send_result(result) pool.close() pool.join() - return base.ScenarioRunnerResult(results) - class ConstantForDurationScenarioRunner(base.ScenarioRunner): """Creates constant load executing a scenario for an interval of time. @@ -158,7 +153,6 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner): self._iter_scenario_args(cls, method, context, args)) iter_result = pool.imap(base._run_scenario_once, run_args) - results = [] start = time.time() while True: try: @@ -168,12 +162,10 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner): "scenario_output": {}, "atomic_actions": [], "error": utils.format_exc(e)} - results.append(result) + self._send_result(result) if time.time() - start > duration: break pool.terminate() pool.join() - - return base.ScenarioRunnerResult(results) diff --git a/rally/benchmark/runners/periodic.py b/rally/benchmark/runners/periodic.py index b0064d2907..a486ed5a84 100644 --- a/rally/benchmark/runners/periodic.py +++ b/rally/benchmark/runners/periodic.py @@ -85,7 +85,6 @@ class PeriodicScenarioRunner(base.ScenarioRunner): if i < times - 1: time.sleep(period) - results = [] for async_result in async_results: try: result = async_result.get(timeout=timeout) @@ -94,9 +93,7 @@ class PeriodicScenarioRunner(base.ScenarioRunner): "scenario_output": {}, "atomic_actions": [], "error": utils.format_exc(e)} - results.append(result) + self._send_result(result) for pool in pools: pool.join() - - return base.ScenarioRunnerResult(results) diff --git a/rally/benchmark/runners/serial.py b/rally/benchmark/runners/serial.py index c425123d97..5b7ce4c48b 100644 --- a/rally/benchmark/runners/serial.py +++ b/rally/benchmark/runners/serial.py @@ -50,12 +50,8 @@ class SerialScenarioRunner(base.ScenarioRunner): def _run_scenario(self, cls, method_name, context, args): times = self.config.get('times', 1) - results = [] - for i in range(times): run_args = (i, cls, method_name, base._get_scenario_context(context), args) result = base._run_scenario_once(run_args) - results.append(result) - - return base.ScenarioRunnerResult(results) + self._send_result(result) diff --git a/tests/benchmark/runners/test_base.py b/tests/benchmark/runners/test_base.py index 619350ed37..172f0de81f 100644 --- a/tests/benchmark/runners/test_base.py +++ b/tests/benchmark/runners/test_base.py @@ -147,10 +147,11 @@ class ScenarioRunnerResultTestCase(test.TestCase): } ] - self.assertEqual(config, base.ScenarioRunnerResult(config)) + self.assertEqual(config[0], base.ScenarioRunnerResult(config[0])) + self.assertEqual(config[1], base.ScenarioRunnerResult(config[1])) def test_validate_failed(self): - config = [{"a": 10}] + config = {"a": 10} self.assertRaises(jsonschema.ValidationError, base.ScenarioRunnerResult, config) @@ -207,15 +208,15 @@ class ScenarioRunnerTestCase(test.TestCase): @mock.patch("rally.benchmark.runners.base.osclients") @mock.patch("rally.benchmark.runners.base.base_ctx.ContextManager") def test_run(self, mock_ctx_manager, mock_osclients): - runner = constant.ConstantScenarioRunner(mock.MagicMock(), - self.fake_endpoints, - mock.MagicMock()) - mock_ctx_manager.run.return_value = base.ScenarioRunnerResult([]) + runner = constant.ConstantScenarioRunner( + mock.MagicMock(), + self.fake_endpoints, + mock.MagicMock()) scenario_name = "NovaServers.boot_server_from_volume_and_delete" config_kwargs = {"image": {"id": 1}, "flavor": {"id": 1}} - result = runner.run(scenario_name, {"some_ctx": 2}, config_kwargs) + runner.run(scenario_name, {"some_ctx": 2}, config_kwargs) - self.assertEqual(result, mock_ctx_manager.run.return_value) + self.assertEqual(list(runner.result_queue), []) cls_name, method_name = scenario_name.split(".", 1) cls = base_scenario.Scenario.get_by_name(cls_name) @@ -233,12 +234,11 @@ class ScenarioRunnerTestCase(test.TestCase): method_name, config_kwargs] mock_ctx_manager.run.assert_called_once_with(*expected) - @mock.patch("rally.benchmark.runners.base.base_ctx.ContextManager") - def test_run_scenario_runner_results_exception(self, mock_ctx_manager): - srunner_cls = constant.ConstantForDurationScenarioRunner - srunner = srunner_cls(mock.MagicMock(), self.fake_endpoints, - mock.MagicMock()) - self.assertRaises(exceptions.InvalidRunnerResult, - srunner.run, - "NovaServers.boot_server_from_volume_and_delete", - mock.MagicMock(), {}) + def test_runner_send_result_exception(self): + runner = constant.ConstantScenarioRunner( + mock.MagicMock(), + self.fake_endpoints, + mock.MagicMock()) + self.assertRaises( + jsonschema.ValidationError, + lambda: runner._send_result(mock.MagicMock())) diff --git a/tests/benchmark/runners/test_constant.py b/tests/benchmark/runners/test_constant.py index ec394a538f..21dee0f109 100644 --- a/tests/benchmark/runners/test_constant.py +++ b/tests/benchmark/runners/test_constant.py @@ -49,31 +49,33 @@ class ConstantScenarioRunnerTestCase(test.TestCase): runner = constant.ConstantScenarioRunner( None, [self.context["admin"]["endpoint"]], self.config) - result = runner._run_scenario(fakes.FakeScenario, "do_it", - self.context, self.args) - self.assertEqual(len(result), self.config["times"]) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) + runner._run_scenario(fakes.FakeScenario, + "do_it", self.context, self.args) + self.assertEqual(len(runner.result_queue), self.config["times"]) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) def test_run_scenario_constantly_for_times_exception(self): runner = constant.ConstantScenarioRunner( None, [self.context["admin"]["endpoint"]], self.config) - result = runner._run_scenario(fakes.FakeScenario, - "something_went_wrong", - self.context, self.args) - self.assertEqual(len(result), self.config["times"]) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) - self.assertIn('error', result[0]) + runner._run_scenario(fakes.FakeScenario, + "something_went_wrong", self.context, self.args) + self.assertEqual(len(runner.result_queue), self.config["times"]) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) + self.assertIn('error', runner.result_queue[0]) def test_run_scenario_constantly_for_times_timeout(self): runner = constant.ConstantScenarioRunner( None, [self.context["admin"]["endpoint"]], self.config) - result = runner._run_scenario(fakes.FakeScenario, - "raise_timeout", self.context, self.args) - self.assertEqual(len(result), self.config["times"]) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) - self.assertIn('error', result[0]) + runner._run_scenario(fakes.FakeScenario, + "raise_timeout", self.context, self.args) + self.assertEqual(len(runner.result_queue), self.config["times"]) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) + self.assertIn('error', runner.result_queue[0]) class ConstantForDurationScenarioRunnerTeestCase(test.TestCase): @@ -103,34 +105,36 @@ class ConstantForDurationScenarioRunnerTeestCase(test.TestCase): runner = constant.ConstantForDurationScenarioRunner( None, [self.context["admin"]["endpoint"]], self.config) - result = runner._run_scenario(fakes.FakeScenario, "do_it", - self.context, self.args) + runner._run_scenario(fakes.FakeScenario, "do_it", + self.context, self.args) # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time expected_times = 1 - self.assertEqual(len(result), expected_times) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) + self.assertEqual(len(runner.result_queue), expected_times) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) def test_run_scenario_constantly_for_duration_exception(self): runner = constant.ConstantForDurationScenarioRunner( None, [self.context["admin"]["endpoint"]], self.config) - result = runner._run_scenario(fakes.FakeScenario, - "something_went_wrong", - self.context, self.args) + runner._run_scenario(fakes.FakeScenario, + "something_went_wrong", self.context, self.args) # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time expected_times = 1 - self.assertEqual(len(result), expected_times) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) - self.assertIn('error', result[0]) + self.assertEqual(len(runner.result_queue), expected_times) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) + self.assertIn('error', runner.result_queue[0]) def test_run_scenario_constantly_for_duration_timeout(self): runner = constant.ConstantForDurationScenarioRunner( - None, [self.context["admin"]["endpoint"]], self.config) + None, [self.context["admin"]["endpoint"]], self.config) - result = runner._run_scenario(fakes.FakeScenario, - "raise_timeout", self.context, self.args) + runner._run_scenario(fakes.FakeScenario, + "raise_timeout", self.context, self.args) # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time expected_times = 1 - self.assertEqual(len(result), expected_times) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) - self.assertIn('error', result[0]) + self.assertEqual(len(runner.result_queue), expected_times) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) + self.assertIn('error', runner.result_queue[0]) diff --git a/tests/benchmark/runners/test_periodic.py b/tests/benchmark/runners/test_periodic.py index b6a4816876..1c2f0fe261 100644 --- a/tests/benchmark/runners/test_periodic.py +++ b/tests/benchmark/runners/test_periodic.py @@ -54,9 +54,10 @@ class PeriodicScenarioRunnerTestCase(test.TestCase): runner = periodic.PeriodicScenarioRunner( None, [context["admin"]["endpoint"]], config) - result = runner._run_scenario(fakes.FakeScenario, "do_it", context, {}) - self.assertEqual(len(result), config["times"]) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) + runner._run_scenario(fakes.FakeScenario, "do_it", context, {}) + self.assertEqual(len(runner.result_queue), config["times"]) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) def test_run_scenario_exception(self): context = fakes.FakeUserContext({}).context @@ -66,10 +67,11 @@ class PeriodicScenarioRunnerTestCase(test.TestCase): runner = periodic.PeriodicScenarioRunner( None, [context["admin"]["endpoint"]], config) - result = runner._run_scenario(fakes.FakeScenario, - "something_went_wrong", context, {}) - self.assertEqual(len(result), config["times"]) - self.assertIsNotNone(base.ScenarioRunnerResult(result)) + runner._run_scenario(fakes.FakeScenario, + "something_went_wrong", context, {}) + self.assertEqual(len(runner.result_queue), config["times"]) + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) @mock.patch("rally.benchmark.runners.periodic.base.ScenarioRunnerResult") @mock.patch("rally.benchmark.runners.periodic.multiprocessing") diff --git a/tests/benchmark/runners/test_serial.py b/tests/benchmark/runners/test_serial.py index b802530fad..93cc733088 100644 --- a/tests/benchmark/runners/test_serial.py +++ b/tests/benchmark/runners/test_serial.py @@ -41,7 +41,8 @@ class SerialScenarioRunnerTestCase(test.TestCase): runner = serial.SerialScenarioRunner(mock.MagicMock(), self.fake_endpoints, {"times": times}) - results = runner._run_scenario(fakes.FakeScenario, "do_it", - fakes.FakeUserContext({}).context, {}) - self.assertEqual(mock_run_once.call_count, times) + runner._run_scenario(fakes.FakeScenario, "do_it", + fakes.FakeUserContext({}).context, {}) + self.assertEqual(len(runner.result_queue), times) + results = list(runner.result_queue) self.assertEqual(results, expected_results) diff --git a/tests/benchmark/test_engine.py b/tests/benchmark/test_engine.py index f75e67ef97..a275a1ae02 100644 --- a/tests/benchmark/test_engine.py +++ b/tests/benchmark/test_engine.py @@ -220,21 +220,22 @@ class BenchmarkEngineTestCase(test.TestCase): ] mock_helper.assert_has_calls(expected_calls) - def test_run__update_status(self): + @mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results") + def test_run__update_status(self, mock_consume): task = mock.MagicMock() eng = engine.BenchmarkEngine([], task) - results = eng.run() - self.assertEqual(results, {}) + eng.run() task.update_status.assert_has_calls([ mock.call(consts.TaskStatus.RUNNING), mock.call(consts.TaskStatus.FINISHED) ]) - @mock.patch("rally.benchmark.engine.endpoint.Endpoint") - @mock.patch("rally.benchmark.engine.osclients") + @mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results") @mock.patch("rally.benchmark.engine.base_runner.ScenarioRunner") + @mock.patch("rally.benchmark.engine.osclients") + @mock.patch("rally.benchmark.engine.endpoint.Endpoint") def test_run__config_has_args(self, mock_endpoint, mock_osclients, - mock_runner): + mock_runner, mock_consume): config = { "a.args": [{"args": {"a": "a", "b": 1}}], "b.args": [{"args": {"a": 1}}] @@ -243,11 +244,12 @@ class BenchmarkEngineTestCase(test.TestCase): eng = engine.BenchmarkEngine(config, task).bind([{}]) eng.run() - @mock.patch("rally.benchmark.engine.endpoint.Endpoint") - @mock.patch("rally.benchmark.engine.osclients") + @mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results") @mock.patch("rally.benchmark.engine.base_runner.ScenarioRunner") + @mock.patch("rally.benchmark.engine.osclients") + @mock.patch("rally.benchmark.engine.endpoint.Endpoint") def test_run__config_has_runner(self, mock_endpoint, mock_osclients, - mock_runner): + mock_runner, mock_consume): config = { "a.args": [{"runner": {"type": "a", "b": 1}}], "b.args": [{"runner": {"a": 1}}] @@ -256,11 +258,12 @@ class BenchmarkEngineTestCase(test.TestCase): eng = engine.BenchmarkEngine(config, task).bind([{}]) eng.run() - @mock.patch("rally.benchmark.engine.endpoint.Endpoint") - @mock.patch("rally.benchmark.engine.osclients") + @mock.patch("rally.benchmark.engine.BenchmarkEngine.consume_results") @mock.patch("rally.benchmark.engine.base_runner.ScenarioRunner") + @mock.patch("rally.benchmark.engine.osclients") + @mock.patch("rally.benchmark.engine.endpoint.Endpoint") def test_run__config_has_context(self, mock_endpoint, mock_osclients, - mock_runner): + mock_runner, mock_consume): config = { "a.args": [{"context": {"context_a": {"a": 1}}}], "b.args": [{"context": {"context_b": {"b": 2}}}] diff --git a/tests/orchestrator/test_api.py b/tests/orchestrator/test_api.py index 32f0db0d10..a52eda1141 100644 --- a/tests/orchestrator/test_api.py +++ b/tests/orchestrator/test_api.py @@ -15,6 +15,7 @@ """ Test for orchestrator. """ +import collections import uuid import mock @@ -123,7 +124,7 @@ class APITestCase(test.TestCase): mock_deploy_get.return_value = self.deployment mock_utils_runner.return_value = mock_runner = mock.Mock() - mock_runner.run.return_value = ['fake_result'] + mock_runner.result_queue = collections.deque(['fake_result']) mock_osclients.Clients.return_value = fakes.FakeClients()