diff --git a/rally-jobs/rally.yaml b/rally-jobs/rally.yaml index 84a74171f3..a832f73e02 100755 --- a/rally-jobs/rally.yaml +++ b/rally-jobs/rally.yaml @@ -230,6 +230,17 @@ failure_rate: max: 0 + - + args: + sleep: 0.1 + runner: + type: "constant_for_duration" + duration: 5 + concurrency: 5 + sla: + failure_rate: + max: 0 + - args: sleep: 0.001 @@ -241,6 +252,16 @@ failure_rate: max: 0 + - + args: + sleep: 0.1 + runner: + type: "serial" + times: 20 + sla: + failure_rate: + max: 0 + - args: sleep: 0.01 diff --git a/rally/benchmark/runners/base.py b/rally/benchmark/runners/base.py index 4029e2e969..a838f1ed98 100644 --- a/rally/benchmark/runners/base.py +++ b/rally/benchmark/runners/base.py @@ -15,6 +15,7 @@ import abc import collections +import multiprocessing import random import jsonschema @@ -169,6 +170,7 @@ class ScenarioRunner(object): self.task = task self.config = config self.result_queue = collections.deque() + self.aborted = multiprocessing.Event() @staticmethod def _get_cls(runner_type): @@ -212,6 +214,8 @@ class ScenarioRunner(object): cls_name, method_name = name.split(".", 1) cls = scenario_base.Scenario.get_by_name(cls_name) + self.aborted.clear() + # NOTE(boris-42): processing @types decorators args = types.preprocess(cls, method_name, context, args) @@ -219,6 +223,10 @@ class ScenarioRunner(object): self._run_scenario(cls, method_name, context, args) return timer.duration() + def abort(self): + """Abort the execution of further benchmark scenario iterations.""" + self.aborted.set() + def _send_result(self, result): """Send partial result to consumer. diff --git a/rally/benchmark/runners/constant.py b/rally/benchmark/runners/constant.py index 90114ed747..5d4847d551 100644 --- a/rally/benchmark/runners/constant.py +++ b/rally/benchmark/runners/constant.py @@ -68,12 +68,13 @@ class ConstantScenarioRunner(base.ScenarioRunner): } @staticmethod - def _iter_scenario_args(cls, method, ctx, args, times): + def _iter_scenario_args(cls, method, ctx, args, times, aborted): for i in moves.range(times): + if aborted.is_set(): + break yield (i, cls, method, base._get_scenario_context(ctx), args) def _run_scenario(self, cls, method, context, args): - timeout = self.config.get("timeout", 600) concurrency = self.config.get("concurrency", 1) # NOTE(msdubov): If not specified, perform single scenario run. @@ -82,12 +83,15 @@ class ConstantScenarioRunner(base.ScenarioRunner): pool = multiprocessing.Pool(concurrency) iter_result = pool.imap(base._run_scenario_once, self._iter_scenario_args(cls, method, context, - args, times)) - for i in range(times): + args, times, + self.aborted)) + while True: try: result = iter_result.next(timeout) except multiprocessing.TimeoutError as e: result = base.format_result_on_timeout(e, timeout) + except StopIteration: + break self._send_result(result) @@ -135,13 +139,14 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner): } @staticmethod - def _iter_scenario_args(cls, method, ctx, args): + def _iter_scenario_args(cls, method, ctx, args, aborted): def _scenario_args(i): + if aborted.is_set(): + raise StopIteration() return (i, cls, method, base._get_scenario_context(ctx), args) return _scenario_args def _run_scenario(self, cls, method, context, args): - timeout = self.config.get("timeout", 600) concurrency = self.config.get("concurrency", 1) duration = self.config.get("duration") @@ -149,7 +154,8 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner): pool = multiprocessing.Pool(concurrency) run_args = utils.infinite_run_args_generator( - self._iter_scenario_args(cls, method, context, args)) + self._iter_scenario_args(cls, method, context, args, + self.aborted)) iter_result = pool.imap(base._run_scenario_once, run_args) start = time.time() @@ -158,6 +164,8 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner): result = iter_result.next(timeout) except multiprocessing.TimeoutError as e: result = base.format_result_on_timeout(e, timeout) + except StopIteration: + break self._send_result(result) diff --git a/rally/benchmark/runners/rps.py b/rally/benchmark/runners/rps.py index 724855a18f..578f02456e 100644 --- a/rally/benchmark/runners/rps.py +++ b/rally/benchmark/runners/rps.py @@ -28,11 +28,11 @@ SEND_RESULT_DELAY = 1 def _worker_thread(queue, args): - queue.put(base._run_scenario_once(args)) + queue.put(base._run_scenario_once(args)) def _worker_process(rps, times, queue, context, timeout, - worker_id, workers, cls, method_name, args): + worker_id, workers, cls, method_name, args, aborted): """Start scenario within threads. Spawn N threads per second. Each thread runs scenario once, and appends @@ -48,6 +48,7 @@ def _worker_process(rps, times, queue, context, timeout, :param cls: scenario class :param method_name: scenario method name :param args: scenario args + :param aborted: multiprocessing.Event that is an abort flag """ pool = [] @@ -61,7 +62,7 @@ def _worker_process(rps, times, queue, context, timeout, randsleep_delay = random.randint(int(sleep / 2 * 100), int(sleep * 100)) time.sleep(randsleep_delay / 100.0) - while times > i: + while times > i and not aborted.is_set(): scenario_context = base._get_scenario_context(context) i += 1 scenario_args = (queue, (worker_id + workers * (i - 1), cls, @@ -145,7 +146,7 @@ class RPSScenarioRunner(base.ScenarioRunner): rest -= 1 worker_args = (rps_per_worker, times, queue, context, timeout, i, processes_to_start, cls, - method_name, args) + method_name, args, self.aborted) process = multiprocessing.Process(target=_worker_process, args=worker_args) process.start() diff --git a/rally/benchmark/runners/serial.py b/rally/benchmark/runners/serial.py index 9b17a99738..4b9a93246e 100644 --- a/rally/benchmark/runners/serial.py +++ b/rally/benchmark/runners/serial.py @@ -50,6 +50,8 @@ class SerialScenarioRunner(base.ScenarioRunner): times = self.config.get("times", 1) for i in range(times): + if self.aborted.is_set(): + break run_args = (i, cls, method_name, base._get_scenario_context(context), args) result = base._run_scenario_once(run_args) diff --git a/tests/unit/benchmark/runners/test_base.py b/tests/unit/benchmark/runners/test_base.py index 40516ce5eb..e0fcfeae58 100644 --- a/tests/unit/benchmark/runners/test_base.py +++ b/tests/unit/benchmark/runners/test_base.py @@ -267,3 +267,11 @@ class ScenarioRunnerTestCase(test.TestCase): self.assertRaises( jsonschema.ValidationError, lambda: runner._send_result(mock.MagicMock())) + + def test_abort(self): + runner = serial.SerialScenarioRunner( + mock.MagicMock(), + mock.MagicMock()) + self.assertFalse(runner.aborted.is_set()) + runner.abort() + self.assertTrue(runner.aborted.is_set()) diff --git a/tests/unit/benchmark/runners/test_constant.py b/tests/unit/benchmark/runners/test_constant.py index 7a6ffee209..a43f169466 100644 --- a/tests/unit/benchmark/runners/test_constant.py +++ b/tests/unit/benchmark/runners/test_constant.py @@ -45,9 +45,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase): constant.ConstantScenarioRunner.validate, self.config) - def test_run_scenario_constantly_for_times(self): - runner = constant.ConstantScenarioRunner( - None, self.config) + def test__run_scenario_constantly_for_times(self): + runner = constant.ConstantScenarioRunner(None, self.config) runner._run_scenario(fakes.FakeScenario, "do_it", self.context, self.args) @@ -55,9 +54,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase): for result in runner.result_queue: self.assertIsNotNone(base.ScenarioRunnerResult(result)) - def test_run_scenario_constantly_for_times_exception(self): - runner = constant.ConstantScenarioRunner( - None, self.config) + def test__run_scenario_constantly_for_times_exception(self): + runner = constant.ConstantScenarioRunner(None, self.config) runner._run_scenario(fakes.FakeScenario, "something_went_wrong", self.context, self.args) @@ -66,9 +64,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase): self.assertIsNotNone(base.ScenarioRunnerResult(result)) self.assertIn("error", runner.result_queue[0]) - def test_run_scenario_constantly_for_times_timeout(self): - runner = constant.ConstantScenarioRunner( - None, self.config) + def test__run_scenario_constantly_for_times_timeout(self): + runner = constant.ConstantScenarioRunner(None, self.config) runner._run_scenario(fakes.FakeScenario, "raise_timeout", self.context, self.args) @@ -77,11 +74,25 @@ class ConstantScenarioRunnerTestCase(test.TestCase): self.assertIsNotNone(base.ScenarioRunnerResult(result)) self.assertIn("error", runner.result_queue[0]) + def test__run_scenario_constantly_aborted(self): + runner = constant.ConstantScenarioRunner(None, self.config) -class ConstantForDurationScenarioRunnerTeestCase(test.TestCase): + runner.abort() + runner._run_scenario(fakes.FakeScenario, + "do_it", self.context, self.args) + self.assertEqual(len(runner.result_queue), 0) + + def test_abort(self): + runner = constant.ConstantScenarioRunner(None, self.config) + self.assertFalse(runner.aborted.is_set()) + runner.abort() + self.assertTrue(runner.aborted.is_set()) + + +class ConstantForDurationScenarioRunnerTestCase(test.TestCase): def setUp(self): - super(ConstantForDurationScenarioRunnerTeestCase, self).setUp() + super(ConstantForDurationScenarioRunnerTestCase, self).setUp() duration = 0 concurrency = 2 timeout = 2 @@ -138,3 +149,17 @@ class ConstantForDurationScenarioRunnerTeestCase(test.TestCase): for result in runner.result_queue: self.assertIsNotNone(base.ScenarioRunnerResult(result)) self.assertIn("error", runner.result_queue[0]) + + def test__run_scenario_constantly_aborted(self): + runner = constant.ConstantForDurationScenarioRunner(None, self.config) + + runner.abort() + runner._run_scenario(fakes.FakeScenario, + "do_it", self.context, self.args) + self.assertEqual(len(runner.result_queue), 0) + + def test_abort(self): + runner = constant.ConstantForDurationScenarioRunner(None, self.config) + self.assertFalse(runner.aborted.is_set()) + runner.abort() + self.assertTrue(runner.aborted.is_set()) diff --git a/tests/unit/benchmark/runners/test_rps.py b/tests/unit/benchmark/runners/test_rps.py index 16ba8f5290..15c9c08e2d 100644 --- a/tests/unit/benchmark/runners/test_rps.py +++ b/tests/unit/benchmark/runners/test_rps.py @@ -75,13 +75,16 @@ class RPSScenarioRunnerTestCase(test.TestCase): isAlive=mock.MagicMock(return_value=False)) mock_thread.return_value = mock_thread_instance + mock_event = mock.MagicMock( + is_set=mock.MagicMock(return_value=False)) + times = 4 context = {"users": [{"tenant_id": "t1", "endpoint": "e1", "id": "uuid1"}]} rps._worker_process(10, times, mock_queue, context, 600, 1, 1, - "Dummy", "dummy", ()) + "Dummy", "dummy", (), mock_event) self.assertEqual(times, mock_log.debug.call_count) self.assertEqual(times, mock_thread.call_count) @@ -120,9 +123,9 @@ class RPSScenarioRunnerTestCase(test.TestCase): def test__run_scenario(self, mock_sleep): context = fakes.FakeUserContext({}).context context["task"] = {"uuid": "fake_uuid"} + config = {"times": 20, "rps": 20, "timeout": 5} - runner = rps.RPSScenarioRunner( - None, config) + runner = rps.RPSScenarioRunner(None, config) runner._run_scenario(fakes.FakeScenario, "do_it", context, {}) @@ -137,11 +140,37 @@ class RPSScenarioRunnerTestCase(test.TestCase): context["task"] = {"uuid": "fake_uuid"} config = {"times": 4, "rps": 10} - runner = rps.RPSScenarioRunner( - None, config) + runner = rps.RPSScenarioRunner(None, config) runner._run_scenario(fakes.FakeScenario, "something_went_wrong", context, {}) self.assertEqual(len(runner.result_queue), config["times"]) for result in runner.result_queue: self.assertIsNotNone(base.ScenarioRunnerResult(result)) + + @mock.patch("rally.benchmark.runners.rps.time.sleep") + def test__run_scenario_aborted(self, mock_sleep): + context = fakes.FakeUserContext({}).context + context["task"] = {"uuid": "fake_uuid"} + + config = {"times": 20, "rps": 20, "timeout": 5} + runner = rps.RPSScenarioRunner(None, config) + + runner.abort() + runner._run_scenario(fakes.FakeScenario, "do_it", context, {}) + + self.assertEqual(len(runner.result_queue), 0) + + for result in runner.result_queue: + self.assertIsNotNone(base.ScenarioRunnerResult(result)) + + def test_abort(self): + context = fakes.FakeUserContext({}).context + context["task"] = {"uuid": "fake_uuid"} + + config = {"times": 4, "rps": 10} + runner = rps.RPSScenarioRunner(None, config) + + self.assertFalse(runner.aborted.is_set()) + runner.abort() + self.assertTrue(runner.aborted.is_set()) diff --git a/tests/unit/benchmark/runners/test_serial.py b/tests/unit/benchmark/runners/test_serial.py index 3c4db6cf62..930de59a3a 100644 --- a/tests/unit/benchmark/runners/test_serial.py +++ b/tests/unit/benchmark/runners/test_serial.py @@ -26,7 +26,7 @@ class SerialScenarioRunnerTestCase(test.TestCase): super(SerialScenarioRunnerTestCase, self).setUp() @mock.patch("rally.benchmark.runners.base._run_scenario_once") - def test_run_scenario(self, mock_run_once): + def test__run_scenario(self, mock_run_once): times = 5 result = {"duration": 10, "idle_duration": 0, "error": [], "scenario_output": {}, "atomic_actions": {}} @@ -40,3 +40,18 @@ class SerialScenarioRunnerTestCase(test.TestCase): self.assertEqual(len(runner.result_queue), times) results = list(runner.result_queue) self.assertEqual(results, expected_results) + + def test__run_scenario_aborted(self): + runner = serial.SerialScenarioRunner(mock.MagicMock(), + {"times": 5}) + runner.abort() + runner._run_scenario(fakes.FakeScenario, "do_it", + fakes.FakeUserContext({}).context, {}) + self.assertEqual(len(runner.result_queue), 0) + + def test_abort(self): + runner = serial.SerialScenarioRunner(mock.MagicMock(), + {"times": 5}) + self.assertFalse(runner.aborted.is_set()) + runner.abort() + self.assertTrue(runner.aborted.is_set())