Add a maximum concurrency option to rps runner

The rps benchmark runner has no throttle for how many total concurrent
test threads it will run. For some tests, like nova boot tests, cloud
capacity for instances could be reached before a suitable amount of
tests. For that reason this introduces a throttle of maximum concurrent
scenario threads that are allowed to be live.

Much like how tasks are divvied up across workers, concurrency slots are
divvied up across workers as well. Each worker gets a max and will wait
for a thread to finish before adding a new one once the max has been
reached.

If no max concurrency is provided, the concurrency is pinned to the
times to execute.

Co-Authored-By: Jesse Keating <jlk@bluebox.net>
Co-Authored-By: Marian Krcmarik <mkrcmari@redhat.com>

Implements: blueprint rps-benchmark-with-max-concurrency

Change-Id: I488322eb9b50afe58801d28be8ad37f0cc5aeb07
This commit is contained in:
Jesse Keating 2014-12-08 13:40:20 -08:00 committed by Marian Krcmarik
parent edc7e7d6ec
commit 51a605fde5
3 changed files with 55 additions and 11 deletions

View File

@ -252,6 +252,18 @@
failure_rate:
max: 0
-
args:
sleep: 0.001
runner:
type: "rps"
times: 200
rps: 20
max_concurrency: 10
sla:
failure_rate:
max: 0
-
args:
sleep: 0.1

View File

@ -24,22 +24,24 @@ from rally.common import log as logging
from rally.common import utils
from rally import consts
LOG = logging.getLogger(__name__)
def _worker_process(queue, iteration_gen, timeout, rps, times,
context, cls, method_name, args, aborted):
max_concurrent, context, cls, method_name,
args, aborted):
"""Start scenario within threads.
Spawn N threads per second. Each thread runs the scenario once, and appends
the result to the queue.
result to queue. A maximum of max_concurrent threads will be ran
concurrently.
:param queue: queue object to append results
:param iteration_gen: next iteration number generator
:param timeout: operation's timeout
:param rps: number of scenario iterations to be run per one second
:param times: total number of scenario iterations to be run
:param max_concurrent: maximum worker concurrency
:param context: scenario context object
:param cls: scenario class
:param method_name: scenario method name
@ -80,8 +82,8 @@ def _worker_process(queue, iteration_gen, timeout, rps, times,
(i, real_rps, rps))
# try to join latest thread(s) until it finished, or until time to
# start new thread
while i / (time.time() - start) > rps:
# start new thread (if we have concurrent slots available)
while i / (time.time() - start) > rps or len(pool) >= max_concurrent:
if pool:
pool[0].join(sleep)
if not pool[0].isAlive():
@ -126,6 +128,10 @@ class RPSScenarioRunner(base.ScenarioRunner):
"timeout": {
"type": "number",
},
"max_concurrency": {
"type": "integer",
"minimum": 1
},
},
"additionalProperties": False
}
@ -151,27 +157,50 @@ class RPSScenarioRunner(base.ScenarioRunner):
timeout = self.config.get("timeout", 0) # 0 means no timeout
iteration_gen = utils.RAMInt()
cpu_count = multiprocessing.cpu_count()
processes_to_start = min(cpu_count, times)
processes_to_start = min(cpu_count, times,
self.config.get("max_concurrency", times))
rps_per_worker = float(self.config["rps"]) / processes_to_start
times_per_worker, times_overhead = divmod(times, processes_to_start)
# Determine concurrency per worker
concurrency_per_worker, concurrency_overhead = divmod(
self.config.get("max_concurrency", times),
processes_to_start)
self._log_debug_info(times=times, timeout=timeout, cpu_count=cpu_count,
processes_to_start=processes_to_start,
rps_per_worker=rps_per_worker,
times_per_worker=times_per_worker,
times_overhead=times_overhead)
times_overhead=times_overhead,
concurrency_per_worker=concurrency_per_worker,
concurrency_overhead=concurrency_overhead)
result_queue = multiprocessing.Queue()
def worker_args_gen(times_overhead):
def worker_args_gen(times_overhead, concurrency_overhead):
"""Generate arguments for process worker.
Remainder of threads per process division is distributed to
process workers equally - one thread per each process worker
until the remainder equals zero. The same logic is applied
to concurrency overhead.
:param times_overhead: remaining number of threads to be
distributed to workers
:param concurrency_overhead: remaining number of maximum
concurrent threads to be distributed
to workers
"""
while True:
yield (result_queue, iteration_gen, timeout, rps_per_worker,
times_per_worker + (times_overhead and 1),
concurrency_per_worker + (concurrency_overhead and 1),
context, cls, method_name, args, self.aborted)
if times_overhead:
times_overhead -= 1
if concurrency_overhead:
concurrency_overhead -= 1
process_pool = self._create_process_pool(
processes_to_start, _worker_process,
worker_args_gen(times_overhead))
worker_args_gen(times_overhead, concurrency_overhead))
self._join_processes(process_pool, result_queue)

View File

@ -37,6 +37,7 @@ class RPSScenarioRunnerTestCase(test.TestCase):
"type": consts.RunnerType.RPS,
"times": 1,
"rps": 100,
"max_concurrency": 50,
"timeout": 1
}
rps.RPSScenarioRunner.validate(config)
@ -83,6 +84,7 @@ class RPSScenarioRunnerTestCase(test.TestCase):
is_set=mock.MagicMock(return_value=False))
times = 4
max_concurrent = 3
fake_ram_int = iter(range(10))
@ -90,7 +92,8 @@ class RPSScenarioRunnerTestCase(test.TestCase):
"id": "uuid1"}]}
rps._worker_process(mock_queue, fake_ram_int, 1, 10, times,
context, "Dummy", "dummy", (), mock_event)
max_concurrent, context, "Dummy", "dummy",
(), mock_event)
self.assertEqual(times, mock_log.debug.call_count)
self.assertEqual(times, mock_thread.call_count)
@ -127,7 +130,7 @@ class RPSScenarioRunnerTestCase(test.TestCase):
context = fakes.FakeUserContext({}).context
context["task"] = {"uuid": "fake_uuid"}
config = {"times": 20, "rps": 20, "timeout": 5}
config = {"times": 20, "rps": 20, "timeout": 5, "max_concurrency": 15}
runner = rps.RPSScenarioRunner(self.task, config)
runner._run_scenario(fakes.FakeScenario, "do_it", context, {})