diff --git a/rally/plugins/common/runners/constant.py b/rally/plugins/common/runners/constant.py index 592ef90c59..7febf3427d 100644 --- a/rally/plugins/common/runners/constant.py +++ b/rally/plugins/common/runners/constant.py @@ -295,3 +295,4 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): pool.terminate() pool.join() + self._flush_results() diff --git a/rally/plugins/common/runners/serial.py b/rally/plugins/common/runners/serial.py index 3334beadc5..92cd7d9905 100644 --- a/rally/plugins/common/runners/serial.py +++ b/rally/plugins/common/runners/serial.py @@ -72,3 +72,5 @@ class SerialScenarioRunner(runner.ScenarioRunner): runner._get_scenario_context(context), args) result = runner._run_scenario_once(run_args) self._send_result(result) + + self._flush_results() diff --git a/rally/task/engine.py b/rally/task/engine.py index 2404caa602..0db38e878a 100644 --- a/rally/task/engine.py +++ b/rally/task/engine.py @@ -75,12 +75,13 @@ class ResultConsumer(object): def _consume_results(self): while True: if self.runner.result_queue: - result = self.runner.result_queue.popleft() - self.results.append(result) - success = self.sla_checker.add_iteration(result) - if self.abort_on_sla_failure and not success: - self.sla_checker.set_aborted_on_sla() - self.runner.abort() + results = self.runner.result_queue.popleft() + self.results.extend(results) + for r in results: + success = self.sla_checker.add_iteration(r) + if self.abort_on_sla_failure and not success: + self.sla_checker.set_aborted_on_sla() + self.runner.abort() elif self.is_done.isSet(): break else: diff --git a/rally/task/runner.py b/rally/task/runner.py index bf5a7e5fb2..ba44622001 100644 --- a/rally/task/runner.py +++ b/rally/task/runner.py @@ -163,7 +163,7 @@ class ScenarioRunner(plugin.Plugin): CONFIG_SCHEMA = {} - def __init__(self, task, config): + def __init__(self, task, config, batch_size=0): """Runner constructor. It sets task and config to local variables. Also initialize @@ -177,6 +177,8 @@ class ScenarioRunner(plugin.Plugin): self.result_queue = collections.deque() self.aborted = multiprocessing.Event() self.run_duration = 0 + self.batch_size = batch_size + self.result_batch = [] @staticmethod def validate(config): @@ -254,16 +256,32 @@ class ScenarioRunner(plugin.Plugin): while not result_queue.empty(): self._send_result(result_queue.get()) + + self._flush_results() result_queue.close() + def _flush_results(self): + if self.result_batch: + sorted_batch = sorted(self.result_batch) + self.result_queue.append(sorted_batch) + self.result_batch = [] + def _send_result(self, result): - """Send partial result to consumer. + """Store partial result to send it to consumer later. :param result: Result dict to be sent. It should match the ScenarioRunnerResult schema, otherwise ValidationError is raised. """ - self.result_queue.append(ScenarioRunnerResult(result)) + + r = ScenarioRunnerResult(result) + self.result_batch.append(r) + + if len(self.result_batch) >= self.batch_size: + sorted_batch = sorted(self.result_batch, + key=lambda r: r["timestamp"]) + self.result_queue.append(sorted_batch) + self.result_batch = [] def _log_debug_info(self, **info): """Log runner parameters for debugging. diff --git a/tests/unit/plugins/common/runners/test_constant.py b/tests/unit/plugins/common/runners/test_constant.py index fa56c3ad12..bc42f54062 100644 --- a/tests/unit/plugins/common/runners/test_constant.py +++ b/tests/unit/plugins/common/runners/test_constant.py @@ -107,8 +107,9 @@ class ConstantScenarioRunnerTestCase(test.TestCase): runner_obj._run_scenario( fakes.FakeScenario, "do_it", self.context, self.args) self.assertEqual(len(runner_obj.result_queue), self.config["times"]) - for result in runner_obj.result_queue: - self.assertIsNotNone(runner.ScenarioRunnerResult(result)) + for result_batch in runner_obj.result_queue: + for result in result_batch: + self.assertIsNotNone(runner.ScenarioRunnerResult(result)) def test__run_scenario_exception(self): runner_obj = constant.ConstantScenarioRunner(self.task, self.config) @@ -116,9 +117,10 @@ class ConstantScenarioRunnerTestCase(test.TestCase): runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong", self.context, self.args) self.assertEqual(len(runner_obj.result_queue), self.config["times"]) - for result in runner_obj.result_queue: - self.assertIsNotNone(runner.ScenarioRunnerResult(result)) - self.assertIn("error", runner_obj.result_queue[0]) + for result_batch in runner_obj.result_queue: + for result in result_batch: + self.assertIsNotNone(runner.ScenarioRunnerResult(result)) + self.assertIn("error", runner_obj.result_queue[0][0]) def test__run_scenario_aborted(self): runner_obj = constant.ConstantScenarioRunner(self.task, self.config) @@ -258,8 +260,9 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time expected_times = 1 self.assertEqual(len(runner_obj.result_queue), expected_times) - for result in runner_obj.result_queue: - self.assertIsNotNone(runner.ScenarioRunnerResult(result)) + for result_batch in runner_obj.result_queue: + for result in result_batch: + self.assertIsNotNone(runner.ScenarioRunnerResult(result)) def test_run_scenario_constantly_for_duration_exception(self): runner_obj = constant.ConstantForDurationScenarioRunner( @@ -270,9 +273,10 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time expected_times = 1 self.assertEqual(len(runner_obj.result_queue), expected_times) - for result in runner_obj.result_queue: - self.assertIsNotNone(runner.ScenarioRunnerResult(result)) - self.assertIn("error", runner_obj.result_queue[0]) + for result_batch in runner_obj.result_queue: + for result in result_batch: + self.assertIsNotNone(runner.ScenarioRunnerResult(result)) + self.assertIn("error", runner_obj.result_queue[0][0]) def test_run_scenario_constantly_for_duration_timeout(self): runner_obj = constant.ConstantForDurationScenarioRunner( @@ -283,9 +287,10 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase): # NOTE(mmorais): when duration is 0, scenario executes exactly 1 time expected_times = 1 self.assertEqual(len(runner_obj.result_queue), expected_times) - for result in runner_obj.result_queue: - self.assertIsNotNone(runner.ScenarioRunnerResult(result)) - self.assertIn("error", runner_obj.result_queue[0]) + for result_batch in runner_obj.result_queue: + for result in result_batch: + self.assertIsNotNone(runner.ScenarioRunnerResult(result)) + self.assertIn("error", runner_obj.result_queue[0][0]) def test__run_scenario_constantly_aborted(self): runner_obj = constant.ConstantForDurationScenarioRunner(None, diff --git a/tests/unit/plugins/common/runners/test_rps.py b/tests/unit/plugins/common/runners/test_rps.py index 917deacb1a..ffdb9f5a3b 100644 --- a/tests/unit/plugins/common/runners/test_rps.py +++ b/tests/unit/plugins/common/runners/test_rps.py @@ -145,8 +145,9 @@ class RPSScenarioRunnerTestCase(test.TestCase): self.assertEqual(len(runner_obj.result_queue), config["times"]) - for result in runner_obj.result_queue: - self.assertIsNotNone(runner.ScenarioRunnerResult(result)) + for result_batch in runner_obj.result_queue: + for result in result_batch: + self.assertIsNotNone(runner.ScenarioRunnerResult(result)) @mock.patch(RUNNERS + "rps.time.sleep") def test__run_scenario_exception(self, mock_sleep): @@ -156,8 +157,9 @@ class RPSScenarioRunnerTestCase(test.TestCase): runner_obj._run_scenario(fakes.FakeScenario, "something_went_wrong", fakes.FakeContext({}).context, {}) self.assertEqual(len(runner_obj.result_queue), config["times"]) - for result in runner_obj.result_queue: - self.assertIsNotNone(runner.ScenarioRunnerResult(result)) + for result_batch in runner_obj.result_queue: + for result in result_batch: + self.assertIsNotNone(runner.ScenarioRunnerResult(result)) @mock.patch(RUNNERS + "rps.time.sleep") def test__run_scenario_aborted(self, mock_sleep): diff --git a/tests/unit/plugins/common/runners/test_serial.py b/tests/unit/plugins/common/runners/test_serial.py index 262937f9a4..f7d1031469 100644 --- a/tests/unit/plugins/common/runners/test_serial.py +++ b/tests/unit/plugins/common/runners/test_serial.py @@ -26,9 +26,10 @@ class SerialScenarioRunnerTestCase(test.TestCase): def test__run_scenario(self, mock__run_scenario_once): times = 5 result = {"duration": 10, "idle_duration": 0, "error": [], - "scenario_output": {}, "atomic_actions": {}} + "scenario_output": {}, "atomic_actions": {}, + "timestamp": 1} mock__run_scenario_once.return_value = result - expected_results = [result for i in range(times)] + expected_results = [[result] for i in range(times)] runner = serial.SerialScenarioRunner(mock.MagicMock(), {"times": times}) diff --git a/tests/unit/task/test_engine.py b/tests/unit/task/test_engine.py index be1bd30af3..90e4e69241 100644 --- a/tests/unit/task/test_engine.py +++ b/tests/unit/task/test_engine.py @@ -488,8 +488,8 @@ class ResultConsumerTestCase(test.TestCase): runner = mock.MagicMock() results = [ - {"duration": 1, "timestamp": 3}, - {"duration": 2, "timestamp": 2} + [{"duration": 1, "timestamp": 3}], + [{"duration": 2, "timestamp": 2}] ] runner.result_queue = collections.deque(results) @@ -497,9 +497,12 @@ class ResultConsumerTestCase(test.TestCase): key, task, runner, False) as consumer_obj: pass - self.assertEqual(list(map(mock.call, results)), - mock_sla_instance.add_iteration.mock_calls) - self.assertEqual(sorted(results, key=lambda x: x["timestamp"]), + mock_sla_instance.add_iteration.assert_has_calls([ + mock.call({"duration": 1, "timestamp": 3}), + mock.call({"duration": 2, "timestamp": 2})]) + + self.assertEqual([{"duration": 2, "timestamp": 2}, + {"duration": 1, "timestamp": 3}], consumer_obj.results) @mock.patch("rally.common.objects.Task.get_status") @@ -517,7 +520,8 @@ class ResultConsumerTestCase(test.TestCase): runner = mock.MagicMock() runner.result_queue = collections.deque( - [{"duration": 1, "timestamp": 1}] * 4) + [[{"duration": 1, "timestamp": 1}, + {"duration": 2, "timestamp": 2}]] * 4) with engine.ResultConsumer(key, task, runner, True): pass @@ -565,7 +569,7 @@ class ResultConsumerTestCase(test.TestCase): task = mock.MagicMock() runner = mock.MagicMock() runner.result_queue = collections.deque( - [{"duration": 1, "timestamp": 4}] * 4) + [[{"duration": 1, "timestamp": 4}]] * 4) with engine.ResultConsumer(key, task, runner, False): pass