diff --git a/rally/benchmark/runners/rps.py b/rally/benchmark/runners/rps.py index 8130c877f4..9ad989dddc 100644 --- a/rally/benchmark/runners/rps.py +++ b/rally/benchmark/runners/rps.py @@ -32,7 +32,7 @@ def _worker_thread(queue, args): queue.put(base._run_scenario_once(args)) -def _worker_process(rps, times, queue, scenario_context, timeout, +def _worker_process(rps, times, queue, context, timeout, worker_id, workers, cls, method_name, args): """Start scenario within threads. @@ -42,7 +42,7 @@ def _worker_process(rps, times, queue, scenario_context, timeout, :param rps: runs per second :param times: number of threads to be run :param queue: queue object to append results - :param scenario_context: scenario context object + :param context: scenario context object :param timeout: timeout operation :param worker_id: id of worker process :param workers: number of total workers @@ -63,6 +63,7 @@ def _worker_process(rps, times, queue, scenario_context, timeout, time.sleep(randsleep_delay / 100.0) while times > i: + scenario_context = base._get_scenario_context(context) i += 1 scenario_args = (queue, (worker_id + workers * (i - 1), cls, method_name, scenario_context, args),) @@ -137,14 +138,13 @@ class RPSScenarioRunner(base.ScenarioRunner): queue = multiprocessing.Queue() process_pool = [] - scenario_context = base._get_scenario_context(context) times_per_worker, rest = divmod(times, processes_to_start) for i in range(processes_to_start): times = times_per_worker + int(rest > 0) rest -= 1 - worker_args = (rps_per_worker, times, queue, scenario_context, + worker_args = (rps_per_worker, times, queue, context, timeout, i, processes_to_start, cls, method_name, args) process = multiprocessing.Process(target=_worker_process, diff --git a/tests/unit/benchmark/runners/test_rps.py b/tests/unit/benchmark/runners/test_rps.py index 7eeb1929b3..c0e74a5c36 100644 --- a/tests/unit/benchmark/runners/test_rps.py +++ b/tests/unit/benchmark/runners/test_rps.py @@ -58,8 +58,9 @@ class RPSScenarioRunnerTestCase(test.TestCase): @mock.patch("rally.benchmark.runners.rps.time") @mock.patch("rally.benchmark.runners.rps.threading.Thread") @mock.patch("rally.benchmark.runners.rps.multiprocessing.Queue") - def test__worker_process(self, mock_queue, mock_thread, mock_time, - mock_log): + @mock.patch("rally.benchmark.runners.rps.base") + def test__worker_process(self, mock_base, mock_queue, mock_thread, + mock_time, mock_log): def time_side(): time_side.last += 0.03 @@ -76,7 +77,10 @@ class RPSScenarioRunnerTestCase(test.TestCase): times = 4 - rps._worker_process(10, times, mock_queue, None, 600, 1, 1, + context = {'users': [{'tenant_id': 't1', 'endpoint': 'e1', + 'id': 'uuid1'}]} + + rps._worker_process(10, times, mock_queue, context, 600, 1, 1, "Dummy", "dummy", ()) self.assertEqual(times, mock_log.debug.call_count) @@ -84,14 +88,16 @@ class RPSScenarioRunnerTestCase(test.TestCase): self.assertEqual(times, mock_thread_instance.start.call_count) self.assertEqual(times, mock_thread_instance.join.call_count) - self.assertEqual(3, mock_time.sleep.call_count) + self.assertEqual(times - 1, mock_time.sleep.call_count) self.assertEqual(times, mock_thread_instance.isAlive.call_count) - self.assertEqual(15, mock_time.time.count) + self.assertEqual(times * 4 - 1, mock_time.time.count) + self.assertEqual(times, mock_base._get_scenario_context.call_count) for i in range(1, times + 1): + scenario_context = mock_base._get_scenario_context(context) call = mock.call(args=(mock_queue, (i, "Dummy", "dummy", - None, ())), + scenario_context, ())), target=rps._worker_thread) self.assertIn(call, mock_thread.mock_calls)