Store iteration number in context obj

Number of iteration can be useful while preparing arguments for run.
For example, we can choose users and tenants with round-robin approach
instead of random choice.

Before this patch, runners extended context object with iteration number
just before running scenario when all preparation stuff is done.

Change-Id: I0862e08ef07248b24681e249fd6310287ea59e47
This commit is contained in:
Andrey Kurilin 2016-04-22 18:53:43 +03:00 committed by Valeriy Ponomaryov
parent e811456c7b
commit da7d455188
8 changed files with 90 additions and 54 deletions

View File

@ -71,9 +71,8 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
iteration = next(iteration_gen)
while iteration < times and not aborted.is_set():
scenario_context = runner._get_scenario_context(context)
scenario_args = (iteration, cls, method_name, scenario_context, args)
worker_args = (queue, scenario_args)
scenario_context = runner._get_scenario_context(iteration, context)
worker_args = (queue, cls, method_name, scenario_context, args)
thread = threading.Thread(target=runner._worker_thread,
args=worker_args)
@ -210,6 +209,18 @@ class ConstantScenarioRunner(runner.ScenarioRunner):
self._join_processes(process_pool, result_queue)
def _run_scenario_once_with_unpack_args(args):
# NOTE(andreykurilin): `pool.imap` is used in
# ConstantForDurationScenarioRunner. It does not want to work with
# instance-methods, class-methods and static-methods. Also, it can't
# transmit positional or keyword arguments to destination function.
# While original `rally.task.runner._run_scenario_once` accepts
# multiple arguments instead of one big tuple with all arguments, we
# need to hardcode unpacking here(all other runners are able to
# transmit arguments in proper way).
return runner._run_scenario_once(*args)
@runner.configure(name="constant_for_duration")
class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
"""Creates constant load executing a scenario for an interval of time.
@ -253,7 +264,7 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
def _scenario_args(i):
if aborted.is_set():
raise StopIteration()
return (i, cls, method, runner._get_scenario_context(ctx), args)
return (cls, method, runner._get_scenario_context(i, ctx), args)
return _scenario_args
def _run_scenario(self, cls, method, context, args):
@ -272,12 +283,14 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
concurrency = self.config.get("concurrency", 1)
duration = self.config.get("duration")
# FIXME(andreykurilin): unify `_worker_process`, use it here and remove
# usage of `multiprocessing.Pool`(usage of separate process for
# each concurrent iteration is redundant).
pool = multiprocessing.Pool(concurrency)
run_args = butils.infinite_run_args_generator(
self._iter_scenario_args(cls, method, context, args,
self.aborted))
iter_result = pool.imap(runner._run_scenario_once, run_args)
self._iter_scenario_args(cls, method, context, args, self.aborted))
iter_result = pool.imap(_run_scenario_once_with_unpack_args, run_args)
start = time.time()
while True:

View File

@ -73,10 +73,9 @@ def _worker_process(queue, iteration_gen, timeout, rps, times,
i = 0
while i < times and not aborted.is_set():
scenario_context = runner._get_scenario_context(context)
scenario_args = (next(iteration_gen), cls, method_name,
scenario_context, args)
worker_args = (queue, scenario_args)
scenario_context = runner._get_scenario_context(next(iteration_gen),
context)
worker_args = (queue, cls, method_name, scenario_context, args)
thread = threading.Thread(target=runner._worker_thread,
args=worker_args)

View File

@ -68,9 +68,9 @@ class SerialScenarioRunner(runner.ScenarioRunner):
for i in range(times):
if self.aborted.is_set():
break
run_args = (i, cls, method_name,
runner._get_scenario_context(context), args)
result = runner._run_scenario_once(run_args)
result = runner._run_scenario_once(
cls, method_name, runner._get_scenario_context(i, context),
args)
self._send_result(result)
self._flush_results()

View File

@ -44,24 +44,26 @@ def format_result_on_timeout(exc, timeout):
}
def _get_scenario_context(context_obj):
def _get_scenario_context(iteration, context_obj):
context_obj = copy.deepcopy(context_obj)
context_obj["iteration"] = iteration
return context.ContextManager(context_obj).map_for_scenario()
def _run_scenario_once(args):
iteration, cls, method_name, context_obj, kwargs = args
def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs):
iteration = context_obj["iteration"]
kwargs = copy.deepcopy(kwargs)
# provide arguments isolation between iterations
scenario_kwargs = copy.deepcopy(scenario_kwargs)
LOG.info("Task %(task)s | ITER: %(iteration)s START" %
{"task": context_obj["task"]["uuid"], "iteration": iteration})
context_obj["iteration"] = iteration
scenario_inst = cls(context_obj)
error = []
try:
with rutils.Timer() as timer:
getattr(scenario_inst, method_name)(**kwargs)
getattr(scenario_inst, method_name)(**scenario_kwargs)
except Exception as e:
error = utils.format_exc(e)
if logging.is_debug():
@ -80,8 +82,9 @@ def _run_scenario_once(args):
"atomic_actions": scenario_inst.atomic_actions()}
def _worker_thread(queue, args):
queue.put(_run_scenario_once(args))
def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs):
queue.put(_run_scenario_once(cls, method_name, context_obj,
scenario_kwargs))
def _log_worker_info(**info):

View File

@ -46,6 +46,15 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
runner.ScenarioRunner.validate,
self.config)
@mock.patch(RUNNERS + "constant.runner")
def test__run_scenario_once_with_unpack_args(self, mock_runner):
result = constant._run_scenario_once_with_unpack_args(
("FOO", ("BAR", "QUUZ")))
self.assertEqual(mock_runner._run_scenario_once.return_value, result)
mock_runner._run_scenario_once.assert_called_once_with(
"FOO", ("BAR", "QUUZ"))
@mock.patch(RUNNERS + "constant.time")
@mock.patch(RUNNERS + "constant.threading.Thread")
@mock.patch(RUNNERS + "constant.multiprocessing.Queue")
@ -81,24 +90,24 @@ class ConstantScenarioRunnerTestCase(test.TestCase):
self.assertEqual(times, mock_runner._get_scenario_context.call_count)
for i in range(times):
scenario_context = mock_runner._get_scenario_context(context)
call = mock.call(args=(mock_queue,
(i, "Dummy", "dummy",
scenario_context, ())),
target=mock_runner._worker_thread)
scenario_context = mock_runner._get_scenario_context(i, context)
call = mock.call(
args=(mock_queue, "Dummy", "dummy", scenario_context, ()),
target=mock_runner._worker_thread,
)
self.assertIn(call, mock_thread.mock_calls)
@mock.patch(RUNNERS_BASE + "_run_scenario_once")
def test__worker_thread(self, mock__run_scenario_once):
mock_queue = mock.MagicMock()
args = ("some_args",)
args = ("fake_cls", "fake_method_name", "fake_context_obj", {})
runner._worker_thread(mock_queue, args)
runner._worker_thread(mock_queue, *args)
self.assertEqual(1, mock_queue.put.call_count)
expected_calls = [mock.call(("some_args",))]
expected_calls = [mock.call(*args)]
self.assertEqual(expected_calls, mock__run_scenario_once.mock_calls)
def test__run_scenario(self):
@ -240,6 +249,7 @@ class ConstantForDurationScenarioRunnerTestCase(test.TestCase):
self.config = {"duration": 0, "concurrency": 2,
"timeout": 2, "type": "constant_for_duration"}
self.context = fakes.FakeContext({"task": {"uuid": "uuid"}}).context
self.context["iteration"] = 14
self.args = {"a": 1}
def test_validate(self):

View File

@ -115,24 +115,23 @@ class RPSScenarioRunnerTestCase(test.TestCase):
self.assertEqual(times, mock_runner._get_scenario_context.call_count)
for i in range(times):
scenario_context = mock_runner._get_scenario_context(context)
call = mock.call(args=(mock_queue,
(i, "Dummy", "dummy",
scenario_context, ())),
target=mock_runner._worker_thread)
scenario_context = mock_runner._get_scenario_context(i, context)
call = mock.call(
args=(mock_queue, "Dummy", "dummy", scenario_context, ()),
target=mock_runner._worker_thread,
)
self.assertIn(call, mock_thread.mock_calls)
@mock.patch(RUNNERS + "rps.runner._run_scenario_once")
def test__worker_thread(self, mock__run_scenario_once):
mock_queue = mock.MagicMock()
args = ("fake_cls", "fake_method_name", "fake_context_obj", {})
args = ("some_args",)
runner._worker_thread(mock_queue, args)
runner._worker_thread(mock_queue, *args)
self.assertEqual(1, mock_queue.put.call_count)
expected_calls = [mock.call(("some_args",))]
expected_calls = [mock.call(*args)]
self.assertEqual(expected_calls, mock__run_scenario_once.mock_calls)
@mock.patch(RUNNERS + "rps.time.sleep")

View File

@ -31,14 +31,24 @@ class SerialScenarioRunnerTestCase(test.TestCase):
"timestamp": 1.}
mock__run_scenario_once.return_value = result
expected_results = [[result] for i in range(times)]
runner = serial.SerialScenarioRunner(mock.MagicMock(),
{"times": times})
runner._run_scenario(fakes.FakeScenario, "do_it",
fakes.FakeContext().context, {})
self.assertEqual(len(runner.result_queue), times)
results = list(runner.result_queue)
self.assertEqual(results, expected_results)
expected_calls = []
for i in range(times):
ctxt = fakes.FakeContext().context
ctxt["iteration"] = i
ctxt["task"] = mock.ANY
expected_calls.append(
mock.call(fakes.FakeScenario, "do_it", ctxt, {})
)
mock__run_scenario_once.assert_has_calls(expected_calls)
def test__run_scenario_aborted(self):
runner = serial.SerialScenarioRunner(mock.MagicMock(),

View File

@ -49,22 +49,25 @@ class ScenarioRunnerHelpersTestCase(test.TestCase):
@mock.patch(BASE + "context.ContextManager")
def test_get_scenario_context(self, mock_context_manager):
mock_context_obj = mock.MagicMock()
mock_context_obj = {}
mock_map_for_scenario = (
mock_context_manager.return_value.map_for_scenario)
self.assertEqual(mock_map_for_scenario.return_value,
runner._get_scenario_context(mock_context_obj))
result = runner._get_scenario_context(13, mock_context_obj)
self.assertEqual(
mock_map_for_scenario.return_value,
result
)
mock_context_manager.assert_called_once_with(mock_context_obj)
mock_context_manager.assert_called_once_with({"iteration": 13})
mock_map_for_scenario.assert_called_once_with()
def test_run_scenario_once_internal_logic(self):
context = runner._get_scenario_context(
fakes.FakeContext({}).context)
12, fakes.FakeContext({}).context)
scenario_cls = mock.MagicMock()
args = (2, scenario_cls, "test", context, {})
runner._run_scenario_once(args)
runner._run_scenario_once(scenario_cls, "test", context, {})
expected_calls = [
mock.call(context),
@ -77,8 +80,8 @@ class ScenarioRunnerHelpersTestCase(test.TestCase):
@mock.patch(BASE + "rutils.Timer", side_effect=fakes.FakeTimer)
def test_run_scenario_once_without_scenario_output(self, mock_timer):
args = (1, fakes.FakeScenario, "do_it", mock.MagicMock(), {})
result = runner._run_scenario_once(args)
result = runner._run_scenario_once(
fakes.FakeScenario, "do_it", mock.MagicMock(), {})
expected_result = {
"duration": fakes.FakeTimer().duration(),
@ -92,8 +95,8 @@ class ScenarioRunnerHelpersTestCase(test.TestCase):
@mock.patch(BASE + "rutils.Timer", side_effect=fakes.FakeTimer)
def test_run_scenario_once_with_added_scenario_output(self, mock_timer):
args = (1, fakes.FakeScenario, "with_add_output", mock.MagicMock(), {})
result = runner._run_scenario_once(args)
result = runner._run_scenario_once(
fakes.FakeScenario, "with_add_output", mock.MagicMock(), {})
expected_result = {
"duration": fakes.FakeTimer().duration(),
@ -114,9 +117,8 @@ class ScenarioRunnerHelpersTestCase(test.TestCase):
@mock.patch(BASE + "rutils.Timer", side_effect=fakes.FakeTimer)
def test_run_scenario_once_exception(self, mock_timer):
args = (1, fakes.FakeScenario, "something_went_wrong",
mock.MagicMock(), {})
result = runner._run_scenario_once(args)
result = runner._run_scenario_once(
fakes.FakeScenario, "something_went_wrong", mock.MagicMock(), {})
expected_error = result.pop("error")
expected_result = {
"duration": fakes.FakeTimer().duration(),