Unify RPS and constant runners and tune constant runner

The reason why we need to refactor constant runner are as follows:

1) using processes to genereate load is to expensive

The optimal way to do thing is to create N processes
(where N is amount of cpu) and use threads.

2) using multiprocessing.Pool() doesn't allow us to
make proper timeouts

3) using multprocessing.Pool() doesn't allow us to
stop imidiatelly load (when we run runner.abort())

What has also been done in this patch:

1) Unification of common part between rps and new constant
   runner

2) Using RAMInt for iterations counting in the RPS runner as well

3) Added a "sleep" parameter in Dummy.dummy_exception. This will allow us
   a better functional testing for the "--abort-on-sla-failure" feature.

4) Minor fix in SLA

ToDo in next patches:

* Implement timeouts
* Work on the constant for duration runner

Co-Authored-By: Boris Pavlovic <boris@pavlovic.me>
Co-Authored-By: Mikhail Dubov <mdubov@mirantis.com>

Change-Id: I49bcc8694e10b3bbb8dcf5c9fe52e29e48a985f3
This commit is contained in:
Boris Pavlovic 2015-02-09 15:54:19 +03:00 committed by Mikhail Dubov
parent fce275de19
commit 59766a57ec
11 changed files with 460 additions and 155 deletions

View File

@ -17,6 +17,7 @@ import abc
import collections
import multiprocessing
import random
import time
import jsonschema
import six
@ -94,6 +95,21 @@ def _run_scenario_once(args):
"atomic_actions": scenario.atomic_actions()}
def _worker_thread(queue, args):
queue.put(_run_scenario_once(args))
def _log_worker_info(**info):
"""Log worker parameters for debugging.
:param info: key-value pairs to be logged
"""
info_message = "\n\t".join(["%s: %s" % (k, v)
for k, v in info.items()])
LOG.debug("Starting a worker."
"\n\t%(info)s" % {"info": info_message})
class ScenarioRunnerResult(dict):
"""Class for all scenario runners' result."""
@ -227,6 +243,43 @@ class ScenarioRunner(object):
"""Abort the execution of further benchmark scenario iterations."""
self.aborted.set()
def _create_process_pool(self, processes_to_start, worker_process,
worker_args_gen):
"""Create a pool of processes with some defined target function.
:param processes_to_start: number of processes to create in the pool
:param worker_process: target function for all processes in the pool
:param worker_args_gen: generator of arguments for the target funciton
:returns: the process pool as a deque
"""
process_pool = collections.deque()
for i in range(processes_to_start):
process = multiprocessing.Process(target=worker_process,
args=next(worker_args_gen))
process.start()
process_pool.append(process)
return process_pool
def _join_processes(self, process_pool, result_queue):
"""Join the processes in the pool and send their results to the queue.
:param process_pool: pool of processes to join
:result_queue: multiprocessing.Queue that receives the results
"""
while process_pool:
while process_pool and not process_pool[0].is_alive():
process_pool.popleft().join()
if result_queue.empty():
# sleep a bit to avoid 100% usage of CPU by this method
time.sleep(0.001)
while not result_queue.empty():
self._send_result(result_queue.get())
result_queue.close()
def _send_result(self, result):
"""Send partial result to consumer.
@ -235,3 +288,19 @@ class ScenarioRunner(object):
ValidationError is raised.
"""
self.result_queue.append(ScenarioRunnerResult(result))
def _log_debug_info(self, **info):
"""Log runner parameters for debugging.
The method logs the runner name, the task id as well as the values
passed as arguments.
:param info: key-value pairs to be logged
"""
info_message = "\n\t".join(["%s: %s" % (k, v)
for k, v in info.items()])
LOG.debug("Starting the %(runner_type)s runner (task UUID: %(task)s)."
"\n\t%(info)s" %
{"runner_type": self.__execution_type__,
"task": self.task["uuid"],
"info": info_message})

View File

@ -13,21 +13,92 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import multiprocessing
import threading
import time
from six import moves
from rally.benchmark.runners import base
from rally.benchmark import utils
from rally.benchmark import utils as butils
from rally.common import log as logging
from rally.common import utils
from rally import consts
LOG = logging.getLogger(__name__)
def _worker_process(queue, iteration_gen, timeout, concurrency, times, context,
cls, method_name, args, aborted):
"""Start the scenario within threads.
Spawn threads to support scenario execution for a fixed number of times.
This generates a constant load on the cloud under test by executing each
scenario iteration without pausing between iterations. Each thread runs
the scenario method once with passed scenario arguments and context.
After execution the result is appended to the queue.
:param queue: queue object to append results
:param iteration_gen: next iteration number generator
:param timeout: operation's timeout
:param concurrency: number of concurrently running scenario iterations
:param times: total number of scenario iterations to be run
:param context: scenario context object
:param cls: scenario class
:param method_name: scenario method name
:param args: scenario args
:param aborted: multiprocessing.Event that aborts load generation if
the flag is set
"""
pool = collections.deque()
alive_threads_in_pool = 0
finished_threads_in_pool = 0
base._log_worker_info(times=times, concurrency=concurrency,
timeout=timeout, cls=cls, method_name=method_name,
args=args)
iteration = next(iteration_gen)
while iteration < times and not aborted.is_set():
scenario_context = base._get_scenario_context(context)
scenario_args = (iteration, cls, method_name, scenario_context, args)
worker_args = (queue, scenario_args)
thread = threading.Thread(target=base._worker_thread,
args=worker_args)
thread.start()
pool.append((thread, time.time()))
alive_threads_in_pool += 1
while alive_threads_in_pool == concurrency:
prev_finished_threads_in_pool = finished_threads_in_pool
finished_threads_in_pool = 0
for t in pool:
if not t[0].isAlive():
finished_threads_in_pool += 1
alive_threads_in_pool -= finished_threads_in_pool
alive_threads_in_pool += prev_finished_threads_in_pool
if alive_threads_in_pool < concurrency:
# NOTE(boris-42): cleanup pool array. This is required because
# in other case array length will be equal to times which
# is unlimited big
while pool and not pool[0][0].isAlive():
pool.popleft()[0].join()
finished_threads_in_pool -= 1
break
# we should wait to not create big noise with these checks
time.sleep(0.001)
iteration = next(iteration_gen)
# Wait until all threads are done
while pool:
pool.popleft()[0].join()
class ConstantScenarioRunner(base.ScenarioRunner):
"""Creates constant load executing a scenario a specified number of times.
@ -67,36 +138,52 @@ class ConstantScenarioRunner(base.ScenarioRunner):
"additionalProperties": False
}
@staticmethod
def _iter_scenario_args(cls, method, ctx, args, times, aborted):
for i in moves.range(times):
if aborted.is_set():
break
yield (i, cls, method, base._get_scenario_context(ctx), args)
def _run_scenario(self, cls, method_name, context, args):
"""Runs the specified benchmark scenario with given arguments.
def _run_scenario(self, cls, method, context, args):
timeout = self.config.get("timeout", 600)
concurrency = self.config.get("concurrency", 1)
# NOTE(msdubov): If not specified, perform single scenario run.
This method generates a constant load on the cloud under test by
executing each scenario iteration using a pool of processes without
pausing between iterations up to the number of times specified
in the scenario config.
:param cls: The Scenario class where the scenario is implemented
:param method_name: Name of the method that implements the scenario
:param context: Benchmark context that contains users, admin & other
information, that was created before benchmark started.
:param args: Arguments to call the scenario method with
:returns: List of results fore each single scenario iteration,
where each result is a dictionary
"""
timeout = self.config.get("timeout", 0) # 0 means no timeout
times = self.config.get("times", 1)
concurrency = self.config.get("concurrency", 1)
iteration_gen = utils.RAMInt()
cpu_count = multiprocessing.cpu_count()
processes_to_start = min(cpu_count, times, concurrency)
concurrency_per_worker, concurrency_overhead = divmod(
concurrency, processes_to_start)
pool = multiprocessing.Pool(concurrency)
iter_result = pool.imap(base._run_scenario_once,
self._iter_scenario_args(cls, method, context,
args, times,
self.aborted))
while True:
try:
result = iter_result.next(timeout)
except multiprocessing.TimeoutError as e:
result = base.format_result_on_timeout(e, timeout)
except StopIteration:
break
self._log_debug_info(times=times, concurrency=concurrency,
timeout=timeout, cpu_count=cpu_count,
processes_to_start=processes_to_start,
concurrency_per_worker=concurrency_per_worker,
concurrency_overhead=concurrency_overhead)
self._send_result(result)
result_queue = multiprocessing.Queue()
pool.close()
pool.join()
def worker_args_gen(concurrency_overhead):
while True:
yield (result_queue, iteration_gen, timeout,
concurrency_per_worker + (concurrency_overhead and 1),
times, context, cls, method_name, args, self.aborted)
if concurrency_overhead:
concurrency_overhead -= 1
process_pool = self._create_process_pool(
processes_to_start, _worker_process,
worker_args_gen(concurrency_overhead))
self._join_processes(process_pool, result_queue)
class ConstantForDurationScenarioRunner(base.ScenarioRunner):
@ -147,13 +234,24 @@ class ConstantForDurationScenarioRunner(base.ScenarioRunner):
return _scenario_args
def _run_scenario(self, cls, method, context, args):
"""Runs the specified benchmark scenario with given arguments.
:param cls: The Scenario class where the scenario is implemented
:param method_name: Name of the method that implements the scenario
:param context: Benchmark context that contains users, admin & other
information, that was created before benchmark started.
:param args: Arguments to call the scenario method with
:returns: List of results fore each single scenario iteration,
where each result is a dictionary
"""
timeout = self.config.get("timeout", 600)
concurrency = self.config.get("concurrency", 1)
duration = self.config.get("duration")
pool = multiprocessing.Pool(concurrency)
run_args = utils.infinite_run_args_generator(
run_args = butils.infinite_run_args_generator(
self._iter_scenario_args(cls, method, context, args,
self.aborted))
iter_result = pool.imap(base._run_scenario_once, run_args)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import multiprocessing
import random
import threading
@ -20,63 +21,63 @@ import time
from rally.benchmark.runners import base
from rally.common import log as logging
from rally.common import utils
from rally import consts
LOG = logging.getLogger(__name__)
SEND_RESULT_DELAY = 1
def _worker_thread(queue, args):
queue.put(base._run_scenario_once(args))
def _worker_process(rps, times, queue, context, timeout,
worker_id, workers, cls, method_name, args, aborted):
def _worker_process(queue, iteration_gen, timeout, rps, times,
context, cls, method_name, args, aborted):
"""Start scenario within threads.
Spawn N threads per second. Each thread runs scenario once, and appends
result to queue.
Spawn N threads per second. Each thread runs the scenario once, and appends
the result to the queue.
:param rps: runs per second
:param times: number of threads to be run
:param queue: queue object to append results
:param iteration_gen: next iteration number generator
:param timeout: operation's timeout
:param rps: number of scenario iterations to be run per one second
:param times: total number of scenario iterations to be run
:param context: scenario context object
:param timeout: timeout operation
:param worker_id: id of worker process
:param workers: number of total workers
:param cls: scenario class
:param method_name: scenario method name
:param args: scenario args
:param aborted: multiprocessing.Event that is an abort flag
:param aborted: multiprocessing.Event that aborts load generation if
the flag is set
"""
pool = []
i = 0
pool = collections.deque()
start = time.time()
sleep = 1.0 / rps
base._log_worker_info(times=times, rps=rps, timeout=timeout,
cls=cls, method_name=method_name, args=args)
# Injecting timeout to exclude situations, where start time and
# actual time are neglible close
randsleep_delay = random.randint(int(sleep / 2 * 100), int(sleep * 100))
time.sleep(randsleep_delay / 100.0)
while times > i and not aborted.is_set():
i = 0
while i < times and not aborted.is_set():
scenario_context = base._get_scenario_context(context)
scenario_args = (next(iteration_gen), cls, method_name,
scenario_context, args)
worker_args = (queue, scenario_args)
thread = threading.Thread(target=base._worker_thread,
args=worker_args)
i += 1
scenario_args = (queue, (worker_id + workers * (i - 1), cls,
method_name, scenario_context, args),)
thread = threading.Thread(target=_worker_thread,
args=scenario_args)
thread.start()
pool.append(thread)
time_gap = time.time() - start
real_rps = i / time_gap if time_gap else "Infinity"
LOG.debug("Worker: %s rps: %s (requested rps: %s)" % (
worker_id, real_rps, rps))
LOG.debug("Worker: %s rps: %s (requested rps: %s)" %
(i, real_rps, rps))
# try to join latest thread(s) until it finished, or until time to
# start new thread
@ -84,20 +85,20 @@ def _worker_process(rps, times, queue, context, timeout,
if pool:
pool[0].join(sleep)
if not pool[0].isAlive():
pool.pop(0)
pool.popleft()
else:
time.sleep(sleep)
while pool:
thr = pool.pop(0)
thr = pool.popleft()
thr.join()
class RPSScenarioRunner(base.ScenarioRunner):
"""Scenario runner that does the job with with specified frequency.
"""Scenario runner that does the job with specified frequency.
Each execution is a single benchmark scenario iteration (i.e. no parallel
execution of multiple iterations is performed). The scenario will be
Every single benchmark scenario iteration is executed with specified
frequency (runs per second) in a pool of processes. The scenario will be
launched for a fixed number of times in total (specified in the config).
An example of a rps scenario is booting 1 VM onse per second. This
@ -120,6 +121,7 @@ class RPSScenarioRunner(base.ScenarioRunner):
},
"rps": {
"type": "number",
"minimum": 1
},
"timeout": {
"type": "number",
@ -129,37 +131,47 @@ class RPSScenarioRunner(base.ScenarioRunner):
}
def _run_scenario(self, cls, method_name, context, args):
"""Runs the specified benchmark scenario with given arguments.
Every single benchmark scenario iteration is executed with specified
frequency (runs per second) in a pool of processes. The scenario will
be launched for a fixed number of times in total (specified in the
config).
:param cls: The Scenario class where the scenario is implemented
:param method_name: Name of the method that implements the scenario
:param context: Benchmark context that contains users, admin & other
information, that was created before benchmark started.
:param args: Arguments to call the scenario method with
:returns: List of results fore each single scenario iteration,
where each result is a dictionary
"""
times = self.config["times"]
timeout = self.config.get("timeout", 600)
timeout = self.config.get("timeout", 0) # 0 means no timeout
iteration_gen = utils.RAMInt()
cpu_count = multiprocessing.cpu_count()
processes_to_start = min(cpu_count, times)
rps_per_worker = float(self.config["rps"]) / processes_to_start
times_per_worker, times_overhead = divmod(times, processes_to_start)
queue = multiprocessing.Queue()
self._log_debug_info(times=times, timeout=timeout, cpu_count=cpu_count,
processes_to_start=processes_to_start,
rps_per_worker=rps_per_worker,
times_per_worker=times_per_worker,
times_overhead=times_overhead)
process_pool = []
result_queue = multiprocessing.Queue()
times_per_worker, rest = divmod(times, processes_to_start)
def worker_args_gen(times_overhead):
while True:
yield (result_queue, iteration_gen, timeout, rps_per_worker,
times_per_worker + (times_overhead and 1),
context, cls, method_name, args, self.aborted)
if times_overhead:
times_overhead -= 1
for i in range(processes_to_start):
times = times_per_worker + int(rest > 0)
rest -= 1
worker_args = (rps_per_worker, times, queue, context,
timeout, i, processes_to_start, cls,
method_name, args, self.aborted)
process = multiprocessing.Process(target=_worker_process,
args=worker_args)
process.start()
process_pool.append(process)
while process_pool:
for process in process_pool:
process.join(SEND_RESULT_DELAY)
if not process.is_alive():
process.join()
process_pool.remove(process)
while not queue.empty():
self._send_result(queue.get())
queue.close()
process_pool = self._create_process_pool(
processes_to_start, _worker_process,
worker_args_gen(times_overhead))
self._join_processes(process_pool, result_queue)

View File

@ -18,11 +18,11 @@ from rally import consts
class SerialScenarioRunner(base.ScenarioRunner):
"""Scenario runner that executes benchmark scenarios in serial.
"""Scenario runner that executes benchmark scenarios serially.
Unlike scenario runners that execute in parallel, the SerialScenarioRunner
executes scenarios one-by-one in the same python interpreter process as
Rally. This allows you to benchmark your scenario without introducing
Unlike scenario runners that execute in parallel, the serial scenario
runner executes scenarios one-by-one in the same python interpreter process
as Rally. This allows you to benchmark your scenario without introducing
any concurrent operations as well as interactively debug the scenario
from the same command that you use to start Rally.
"""
@ -47,6 +47,23 @@ class SerialScenarioRunner(base.ScenarioRunner):
}
def _run_scenario(self, cls, method_name, context, args):
"""Runs the specified benchmark scenario with given arguments.
The scenario iterations are executed one-by-one in the same python
interpreter process as Rally. This allows you to benchmark your
scenario without introducing any concurrent operations as well as
interactively debug the scenario from the same command that you use
to start Rally.
:param cls: The Scenario class where the scenario is implemented
:param method_name: Name of the method that implements the scenario
:param context: Benchmark context that contains users, admin & other
information, that was created before benchmark started.
:param args: Arguments to call the scenario method with
:returns: List of results fore each single scenario iteration,
where each result is a dictionary
"""
times = self.config.get("times", 1)
for i in range(times):

View File

@ -41,7 +41,7 @@ class Dummy(base.Scenario):
@validation.number("size_of_message",
minval=1, integer_only=True, nullable=True)
@base.scenario()
def dummy_exception(self, size_of_message=1):
def dummy_exception(self, size_of_message=1, sleep=0):
"""Throw an exception.
Dummy.dummy_exception can be used for test if exceptions are processed
@ -49,8 +49,11 @@ class Dummy(base.Scenario):
results storing process.
:param size_of_message: int size of the exception message
:param sleep: idle time of method (in seconds).
:raises: DummyScenarioException
"""
if sleep:
time.sleep(sleep)
raise DummyScenarioException("M" * size_of_message)

View File

@ -183,7 +183,7 @@ class IterationTime(SLA):
return self.success
def details(self):
return (_("Maximum seconds per iteration %.2fs<= %.2fs - %s") %
return (_("Maximum seconds per iteration %.2fs <= %.2fs - %s") %
(self.max_iteration_time, self.criterion_value, self.status()))

View File

@ -351,8 +351,7 @@ class TaskTestCase(unittest.TestCase):
"deployment_id": deployment_id})
results = json.loads(rally("task results"))
iterations_completed = len(results[0]["result"])
# NOTE(msdubov): Change '<=' to '<' as soon as we fix the runners.
self.assertTrue(iterations_completed <= times)
self.assertTrue(iterations_completed < times)
def test_start_abort_on_sla_failure_max_seconds_constant(self):
times = 100
@ -421,6 +420,9 @@ class TaskTestCase(unittest.TestCase):
cfg = {
"Dummy.dummy_exception": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "constant",
"times": times,
@ -439,6 +441,9 @@ class TaskTestCase(unittest.TestCase):
cfg = {
"Dummy.dummy_exception": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "serial",
"times": times
@ -456,6 +461,9 @@ class TaskTestCase(unittest.TestCase):
cfg = {
"Dummy.dummy_exception": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "rps",
"times": times,

View File

@ -13,6 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import multiprocessing
import jsonschema
import mock
@ -24,9 +27,12 @@ from tests.unit import fakes
from tests.unit import test
BASE = "rally.benchmark.runners.base."
class ScenarioHelpersTestCase(test.TestCase):
@mock.patch("rally.benchmark.runners.base.utils.format_exc")
@mock.patch(BASE + "utils.format_exc")
def test_format_result_on_timeout(self, mock_format_exc):
mock_exc = mock.MagicMock()
@ -42,8 +48,7 @@ class ScenarioHelpersTestCase(test.TestCase):
expected)
mock_format_exc.assert_called_once_with(mock_exc)
@mock.patch("rally.benchmark.runners.base.random.choice",
side_effect=lambda x: x[1])
@mock.patch(BASE + "random.choice", side_effect=lambda x: x[1])
def test_get_scenario_context(self, mock_random):
users = list()
@ -74,7 +79,7 @@ class ScenarioHelpersTestCase(test.TestCase):
self.assertEqual(expected_context, base._get_scenario_context(context))
@mock.patch("rally.benchmark.runners.base.osclients")
@mock.patch(BASE + "osclients")
def test_run_scenario_once_internal_logic(self, mock_clients):
mock_clients.Clients.return_value = "cl"
@ -92,9 +97,8 @@ class ScenarioHelpersTestCase(test.TestCase):
]
scenario_cls.assert_has_calls(expected_calls, any_order=True)
@mock.patch("rally.benchmark.runners.base.rutils.Timer",
side_effect=fakes.FakeTimer)
@mock.patch("rally.benchmark.runners.base.osclients")
@mock.patch(BASE + "rutils.Timer", side_effect=fakes.FakeTimer)
@mock.patch(BASE + "osclients")
def test_run_scenario_once_without_scenario_output(self, mock_clients,
mock_rtimer):
context = base._get_scenario_context(fakes.FakeUserContext({}).context)
@ -111,9 +115,8 @@ class ScenarioHelpersTestCase(test.TestCase):
}
self.assertEqual(expected_result, result)
@mock.patch("rally.benchmark.runners.base.rutils.Timer",
side_effect=fakes.FakeTimer)
@mock.patch("rally.benchmark.runners.base.osclients")
@mock.patch(BASE + "rutils.Timer", side_effect=fakes.FakeTimer)
@mock.patch(BASE + "osclients")
def test_run_scenario_once_with_scenario_output(self, mock_clients,
mock_rtimer):
context = base._get_scenario_context(fakes.FakeUserContext({}).context)
@ -130,9 +133,8 @@ class ScenarioHelpersTestCase(test.TestCase):
}
self.assertEqual(expected_result, result)
@mock.patch("rally.benchmark.runners.base.rutils.Timer",
side_effect=fakes.FakeTimer)
@mock.patch("rally.benchmark.runners.base.osclients")
@mock.patch(BASE + "rutils.Timer", side_effect=fakes.FakeTimer)
@mock.patch(BASE + "osclients")
def test_run_scenario_once_exception(self, mock_clients, mock_rtimer):
context = base._get_scenario_context(fakes.FakeUserContext({}).context)
args = (1, fakes.FakeScenario, "something_went_wrong", context, {})
@ -190,8 +192,8 @@ class ScenarioRunnerTestCase(test.TestCase):
def setUp(self):
super(ScenarioRunnerTestCase, self).setUp()
@mock.patch("rally.benchmark.runners.base.jsonschema.validate")
@mock.patch("rally.benchmark.runners.base.ScenarioRunner._get_cls")
@mock.patch(BASE + "jsonschema.validate")
@mock.patch(BASE + "ScenarioRunner._get_cls")
def test_validate(self, mock_get_cls, mock_validate):
mock_get_cls.return_value = fakes.FakeRunner
@ -219,7 +221,7 @@ class ScenarioRunnerTestCase(test.TestCase):
base.ScenarioRunner.get_runner,
None, {"type": "NoSuchRunner"})
@mock.patch("rally.benchmark.runners.base.jsonschema.validate")
@mock.patch(BASE + "jsonschema.validate")
def test_validate_default_runner(self, mock_validate):
config = {"a": 10}
base.ScenarioRunner.validate(config)
@ -227,8 +229,7 @@ class ScenarioRunnerTestCase(test.TestCase):
config,
serial.SerialScenarioRunner.CONFIG_SCHEMA)
@mock.patch("rally.benchmark.runners.base.rutils.Timer.duration",
return_value=10)
@mock.patch(BASE + "rutils.Timer.duration", return_value=10)
def test_run(self, mock_duration):
runner = serial.SerialScenarioRunner(
mock.MagicMock(),
@ -275,3 +276,39 @@ class ScenarioRunnerTestCase(test.TestCase):
self.assertFalse(runner.aborted.is_set())
runner.abort()
self.assertTrue(runner.aborted.is_set())
def test__create_process_pool(self):
runner = serial.SerialScenarioRunner(
mock.MagicMock(),
mock.MagicMock())
processes_to_start = 10
def worker_process(i):
pass
counter = ((i,) for i in range(100))
process_pool = runner._create_process_pool(processes_to_start,
worker_process,
counter)
self.assertEqual(processes_to_start, len(process_pool))
for process in process_pool:
self.assertIsInstance(process, multiprocessing.Process)
@mock.patch(BASE + "ScenarioRunner._send_result")
def test__join_processes(self, mock_send_result):
process = mock.MagicMock(is_alive=mock.MagicMock(return_value=False))
processes = 10
process_pool = collections.deque([process] * processes)
mock_result_queue = mock.MagicMock(
empty=mock.MagicMock(return_value=True))
runner = serial.SerialScenarioRunner(
mock.MagicMock(),
mock.MagicMock())
runner._join_processes(process_pool, mock_result_queue)
self.assertEqual(processes, process.join.call_count)
mock_result_queue.close.assert_called_once_with()

View File

@ -14,6 +14,7 @@
# under the License.
import jsonschema
import mock
from rally.benchmark.runners import base
from rally.benchmark.runners import constant
@ -22,6 +23,9 @@ from tests.unit import fakes
from tests.unit import test
RUNNERS = "rally.benchmark.runners."
class ConstantScenarioRunnerTestCase(test.TestCase):
def setUp(self):
@ -35,6 +39,7 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
self.context = fakes.FakeUserContext({"task":
{"uuid": "uuid"}}).context
self.args = {"a": 1}
self.task = mock.MagicMock()
def test_validate(self):
constant.ConstantScenarioRunner.validate(self.config)
@ -45,8 +50,69 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
constant.ConstantScenarioRunner.validate,
self.config)
def test__run_scenario_constantly_for_times(self):
runner = constant.ConstantScenarioRunner(None, self.config)
@mock.patch(RUNNERS + "base.scenario_base")
@mock.patch(RUNNERS + "base.osclients")
def test_get_constant_runner(self, mock_osclients, mock_base):
mock_osclients.Clients.return_value = fakes.FakeClients()
runner = base.ScenarioRunner.get_runner(mock.MagicMock(),
{"type":
consts.RunnerType.CONSTANT})
self.assertIsNotNone(runner)
@mock.patch(RUNNERS + "constant.time")
@mock.patch(RUNNERS + "constant.threading.Thread")
@mock.patch(RUNNERS + "constant.multiprocessing.Queue")
@mock.patch(RUNNERS + "constant.base")
def test__worker_process(self, mock_base, mock_queue, mock_thread,
mock_time):
mock_thread_instance = mock.MagicMock(
isAlive=mock.MagicMock(return_value=False))
mock_thread.return_value = mock_thread_instance
mock_event = mock.MagicMock(
is_set=mock.MagicMock(return_value=False))
times = 4
fake_ram_int = iter(range(10))
context = {"users": [{"tenant_id": "t1", "endpoint": "e1",
"id": "uuid1"}]}
constant._worker_process(mock_queue, fake_ram_int, 1, 2, times,
context, "Dummy", "dummy", (), mock_event)
self.assertEqual(times, mock_thread.call_count)
self.assertEqual(times, mock_thread_instance.start.call_count)
self.assertEqual(times, mock_thread_instance.join.call_count)
self.assertEqual(times, mock_base._get_scenario_context.call_count)
for i in range(times):
scenario_context = mock_base._get_scenario_context(context)
call = mock.call(args=(mock_queue,
(i, "Dummy", "dummy",
scenario_context, ())),
target=mock_base._worker_thread)
self.assertIn(call, mock_thread.mock_calls)
@mock.patch(RUNNERS + "constant.base._run_scenario_once")
def test__worker_thread(self, mock_run_scenario_once):
mock_queue = mock.MagicMock()
args = ("some_args",)
base._worker_thread(mock_queue, args)
self.assertEqual(1, mock_queue.put.call_count)
expected_calls = [mock.call(("some_args",))]
self.assertEqual(expected_calls, mock_run_scenario_once.mock_calls)
def test__run_scenario(self):
runner = constant.ConstantScenarioRunner(self.task, self.config)
runner._run_scenario(fakes.FakeScenario,
"do_it", self.context, self.args)
@ -54,8 +120,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
def test__run_scenario_constantly_for_times_exception(self):
runner = constant.ConstantScenarioRunner(None, self.config)
def test__run_scenario_exception(self):
runner = constant.ConstantScenarioRunner(self.task, self.config)
runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", self.context, self.args)
@ -64,18 +130,8 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn("error", runner.result_queue[0])
def test__run_scenario_constantly_for_times_timeout(self):
runner = constant.ConstantScenarioRunner(None, self.config)
runner._run_scenario(fakes.FakeScenario,
"raise_timeout", self.context, self.args)
self.assertEqual(len(runner.result_queue), self.config["times"])
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
self.assertIn("error", runner.result_queue[0])
def test__run_scenario_constantly_aborted(self):
runner = constant.ConstantScenarioRunner(None, self.config)
def test__run_scenario_aborted(self):
runner = constant.ConstantScenarioRunner(self.task, self.config)
runner.abort()
runner._run_scenario(fakes.FakeScenario,
@ -83,7 +139,7 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
self.assertEqual(len(runner.result_queue), 0)
def test_abort(self):
runner = constant.ConstantScenarioRunner(None, self.config)
runner = constant.ConstantScenarioRunner(self.task, self.config)
self.assertFalse(runner.aborted.is_set())
runner.abort()
self.assertTrue(runner.aborted.is_set())

View File

@ -23,10 +23,14 @@ from tests.unit import fakes
from tests.unit import test
RUNNERS = "rally.benchmark.runners."
class RPSScenarioRunnerTestCase(test.TestCase):
def setUp(self):
super(RPSScenarioRunnerTestCase, self).setUp()
self.task = mock.MagicMock()
def test_validate(self):
config = {
@ -43,8 +47,8 @@ class RPSScenarioRunnerTestCase(test.TestCase):
self.assertRaises(jsonschema.ValidationError,
rps.RPSScenarioRunner.validate, config)
@mock.patch("rally.benchmark.runners.base.scenario_base")
@mock.patch("rally.benchmark.runners.base.osclients")
@mock.patch(RUNNERS + "base.scenario_base")
@mock.patch(RUNNERS + "base.osclients")
def test_get_rps_runner(self, mock_osclients, mock_base):
mock_osclients.Clients.return_value = fakes.FakeClients()
@ -54,11 +58,11 @@ class RPSScenarioRunnerTestCase(test.TestCase):
consts.RunnerType.RPS})
self.assertIsNotNone(runner)
@mock.patch("rally.benchmark.runners.rps.LOG")
@mock.patch("rally.benchmark.runners.rps.time")
@mock.patch("rally.benchmark.runners.rps.threading.Thread")
@mock.patch("rally.benchmark.runners.rps.multiprocessing.Queue")
@mock.patch("rally.benchmark.runners.rps.base")
@mock.patch(RUNNERS + "rps.LOG")
@mock.patch(RUNNERS + "rps.time")
@mock.patch(RUNNERS + "rps.threading.Thread")
@mock.patch(RUNNERS + "rps.multiprocessing.Queue")
@mock.patch(RUNNERS + "rps.base")
def test__worker_process(self, mock_base, mock_queue, mock_thread,
mock_time, mock_log):
@ -80,15 +84,16 @@ class RPSScenarioRunnerTestCase(test.TestCase):
times = 4
fake_ram_int = iter(range(10))
context = {"users": [{"tenant_id": "t1", "endpoint": "e1",
"id": "uuid1"}]}
rps._worker_process(10, times, mock_queue, context, 600, 1, 1,
"Dummy", "dummy", (), mock_event)
rps._worker_process(mock_queue, fake_ram_int, 1, 10, times,
context, "Dummy", "dummy", (), mock_event)
self.assertEqual(times, mock_log.debug.call_count)
self.assertEqual(times, mock_thread.call_count)
self.assertEqual(times, mock_thread_instance.start.call_count)
self.assertEqual(times, mock_thread_instance.join.call_count)
self.assertEqual(times - 1, mock_time.sleep.call_count)
@ -96,36 +101,34 @@ class RPSScenarioRunnerTestCase(test.TestCase):
self.assertEqual(times * 4 - 1, mock_time.time.count)
self.assertEqual(times, mock_base._get_scenario_context.call_count)
for i in range(1, times + 1):
for i in range(times):
scenario_context = mock_base._get_scenario_context(context)
call = mock.call(args=(mock_queue,
(i, "Dummy", "dummy",
scenario_context, ())),
target=rps._worker_thread)
target=mock_base._worker_thread)
self.assertIn(call, mock_thread.mock_calls)
@mock.patch("rally.benchmark.runners.rps.base",
_run_scenario_once=mock.MagicMock())
def test__worker_thread(self, mock_base):
@mock.patch(RUNNERS + "rps.base._run_scenario_once")
def test__worker_thread(self, mock_run_scenario_once):
mock_queue = mock.MagicMock()
args = ("some_args",)
rps._worker_thread(mock_queue, args)
base._worker_thread(mock_queue, args)
self.assertEqual(1, mock_queue.put.call_count)
expected_calls = [mock.call(("some_args",))]
self.assertEqual(expected_calls,
mock_base._run_scenario_once.mock_calls)
self.assertEqual(expected_calls, mock_run_scenario_once.mock_calls)
@mock.patch("rally.benchmark.runners.rps.time.sleep")
@mock.patch(RUNNERS + "rps.time.sleep")
def test__run_scenario(self, mock_sleep):
context = fakes.FakeUserContext({}).context
context["task"] = {"uuid": "fake_uuid"}
config = {"times": 20, "rps": 20, "timeout": 5}
runner = rps.RPSScenarioRunner(None, config)
runner = rps.RPSScenarioRunner(self.task, config)
runner._run_scenario(fakes.FakeScenario, "do_it", context, {})
@ -134,13 +137,13 @@ class RPSScenarioRunnerTestCase(test.TestCase):
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
@mock.patch("rally.benchmark.runners.rps.time.sleep")
@mock.patch(RUNNERS + "rps.time.sleep")
def test__run_scenario_exception(self, mock_sleep):
context = fakes.FakeUserContext({}).context
context["task"] = {"uuid": "fake_uuid"}
config = {"times": 4, "rps": 10}
runner = rps.RPSScenarioRunner(None, config)
runner = rps.RPSScenarioRunner(self.task, config)
runner._run_scenario(fakes.FakeScenario,
"something_went_wrong", context, {})
@ -148,13 +151,13 @@ class RPSScenarioRunnerTestCase(test.TestCase):
for result in runner.result_queue:
self.assertIsNotNone(base.ScenarioRunnerResult(result))
@mock.patch("rally.benchmark.runners.rps.time.sleep")
@mock.patch(RUNNERS + "rps.time.sleep")
def test__run_scenario_aborted(self, mock_sleep):
context = fakes.FakeUserContext({}).context
context["task"] = {"uuid": "fake_uuid"}
config = {"times": 20, "rps": 20, "timeout": 5}
runner = rps.RPSScenarioRunner(None, config)
runner = rps.RPSScenarioRunner(self.task, config)
runner.abort()
runner._run_scenario(fakes.FakeScenario, "do_it", context, {})
@ -169,7 +172,7 @@ class RPSScenarioRunnerTestCase(test.TestCase):
context["task"] = {"uuid": "fake_uuid"}
config = {"times": 4, "rps": 10}
runner = rps.RPSScenarioRunner(None, config)
runner = rps.RPSScenarioRunner(self.task, config)
self.assertFalse(runner.aborted.is_set())
runner.abort()

View File

@ -29,12 +29,14 @@ class DummyTestCase(test.TestCase):
scenario.dummy(sleep=10)
mock_sleep.sleep.assert_called_once_with(10)
def test_dummy_exception(self):
@mock.patch("rally.benchmark.scenarios.dummy.dummy.time")
def test_dummy_exception(self, mock_sleep):
scenario = dummy.Dummy()
size_of_message = 5
self.assertRaises(dummy.DummyScenarioException,
scenario.dummy_exception, size_of_message)
scenario.dummy_exception, size_of_message, sleep=10)
mock_sleep.sleep.assert_called_once_with(10)
def test_dummy_exception_probability(self):
scenario = dummy.Dummy()