From 51a605fde5693b6e5a653fd55170f7beabd49a95 Mon Sep 17 00:00:00 2001 From: Jesse Keating Date: Mon, 8 Dec 2014 13:40:20 -0800 Subject: [PATCH] Add a maximum concurrency option to rps runner The rps benchmark runner has no throttle for how many total concurrent test threads it will run. For some tests, like nova boot tests, cloud capacity for instances could be reached before a suitable amount of tests. For that reason this introduces a throttle of maximum concurrent scenario threads that are allowed to be live. Much like how tasks are divvied up across workers, concurrency slots are divvied up across workers as well. Each worker gets a max and will wait for a thread to finish before adding a new one once the max has been reached. If no max concurrency is provided, the concurrency is pinned to the times to execute. Co-Authored-By: Jesse Keating Co-Authored-By: Marian Krcmarik Implements: blueprint rps-benchmark-with-max-concurrency Change-Id: I488322eb9b50afe58801d28be8ad37f0cc5aeb07 --- rally-jobs/rally.yaml | 12 ++++++ rally/benchmark/runners/rps.py | 47 +++++++++++++++++++----- tests/unit/benchmark/runners/test_rps.py | 7 +++- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/rally-jobs/rally.yaml b/rally-jobs/rally.yaml index 7d6a697840..fdd50e03b8 100755 --- a/rally-jobs/rally.yaml +++ b/rally-jobs/rally.yaml @@ -252,6 +252,18 @@ failure_rate: max: 0 + - + args: + sleep: 0.001 + runner: + type: "rps" + times: 200 + rps: 20 + max_concurrency: 10 + sla: + failure_rate: + max: 0 + - args: sleep: 0.1 diff --git a/rally/benchmark/runners/rps.py b/rally/benchmark/runners/rps.py index af740e0229..dda769fe19 100644 --- a/rally/benchmark/runners/rps.py +++ b/rally/benchmark/runners/rps.py @@ -24,22 +24,24 @@ from rally.common import log as logging from rally.common import utils from rally import consts - LOG = logging.getLogger(__name__) def _worker_process(queue, iteration_gen, timeout, rps, times, - context, cls, method_name, args, aborted): + max_concurrent, context, cls, method_name, + args, aborted): """Start scenario within threads. Spawn N threads per second. Each thread runs the scenario once, and appends - the result to the queue. + result to queue. A maximum of max_concurrent threads will be ran + concurrently. :param queue: queue object to append results :param iteration_gen: next iteration number generator :param timeout: operation's timeout :param rps: number of scenario iterations to be run per one second :param times: total number of scenario iterations to be run + :param max_concurrent: maximum worker concurrency :param context: scenario context object :param cls: scenario class :param method_name: scenario method name @@ -80,8 +82,8 @@ def _worker_process(queue, iteration_gen, timeout, rps, times, (i, real_rps, rps)) # try to join latest thread(s) until it finished, or until time to - # start new thread - while i / (time.time() - start) > rps: + # start new thread (if we have concurrent slots available) + while i / (time.time() - start) > rps or len(pool) >= max_concurrent: if pool: pool[0].join(sleep) if not pool[0].isAlive(): @@ -126,6 +128,10 @@ class RPSScenarioRunner(base.ScenarioRunner): "timeout": { "type": "number", }, + "max_concurrency": { + "type": "integer", + "minimum": 1 + }, }, "additionalProperties": False } @@ -151,27 +157,50 @@ class RPSScenarioRunner(base.ScenarioRunner): timeout = self.config.get("timeout", 0) # 0 means no timeout iteration_gen = utils.RAMInt() cpu_count = multiprocessing.cpu_count() - processes_to_start = min(cpu_count, times) + processes_to_start = min(cpu_count, times, + self.config.get("max_concurrency", times)) rps_per_worker = float(self.config["rps"]) / processes_to_start times_per_worker, times_overhead = divmod(times, processes_to_start) + # Determine concurrency per worker + concurrency_per_worker, concurrency_overhead = divmod( + self.config.get("max_concurrency", times), + processes_to_start) + self._log_debug_info(times=times, timeout=timeout, cpu_count=cpu_count, processes_to_start=processes_to_start, rps_per_worker=rps_per_worker, times_per_worker=times_per_worker, - times_overhead=times_overhead) + times_overhead=times_overhead, + concurrency_per_worker=concurrency_per_worker, + concurrency_overhead=concurrency_overhead) result_queue = multiprocessing.Queue() - def worker_args_gen(times_overhead): + def worker_args_gen(times_overhead, concurrency_overhead): + """Generate arguments for process worker. + + Remainder of threads per process division is distributed to + process workers equally - one thread per each process worker + until the remainder equals zero. The same logic is applied + to concurrency overhead. + :param times_overhead: remaining number of threads to be + distributed to workers + :param concurrency_overhead: remaining number of maximum + concurrent threads to be distributed + to workers + """ while True: yield (result_queue, iteration_gen, timeout, rps_per_worker, times_per_worker + (times_overhead and 1), + concurrency_per_worker + (concurrency_overhead and 1), context, cls, method_name, args, self.aborted) if times_overhead: times_overhead -= 1 + if concurrency_overhead: + concurrency_overhead -= 1 process_pool = self._create_process_pool( processes_to_start, _worker_process, - worker_args_gen(times_overhead)) + worker_args_gen(times_overhead, concurrency_overhead)) self._join_processes(process_pool, result_queue) diff --git a/tests/unit/benchmark/runners/test_rps.py b/tests/unit/benchmark/runners/test_rps.py index 4747eb5a3f..d115433866 100644 --- a/tests/unit/benchmark/runners/test_rps.py +++ b/tests/unit/benchmark/runners/test_rps.py @@ -37,6 +37,7 @@ class RPSScenarioRunnerTestCase(test.TestCase): "type": consts.RunnerType.RPS, "times": 1, "rps": 100, + "max_concurrency": 50, "timeout": 1 } rps.RPSScenarioRunner.validate(config) @@ -83,6 +84,7 @@ class RPSScenarioRunnerTestCase(test.TestCase): is_set=mock.MagicMock(return_value=False)) times = 4 + max_concurrent = 3 fake_ram_int = iter(range(10)) @@ -90,7 +92,8 @@ class RPSScenarioRunnerTestCase(test.TestCase): "id": "uuid1"}]} rps._worker_process(mock_queue, fake_ram_int, 1, 10, times, - context, "Dummy", "dummy", (), mock_event) + max_concurrent, context, "Dummy", "dummy", + (), mock_event) self.assertEqual(times, mock_log.debug.call_count) self.assertEqual(times, mock_thread.call_count) @@ -127,7 +130,7 @@ class RPSScenarioRunnerTestCase(test.TestCase): context = fakes.FakeUserContext({}).context context["task"] = {"uuid": "fake_uuid"} - config = {"times": 20, "rps": 20, "timeout": 5} + config = {"times": 20, "rps": 20, "timeout": 5, "max_concurrency": 15} runner = rps.RPSScenarioRunner(self.task, config) runner._run_scenario(fakes.FakeScenario, "do_it", context, {})