Merge "Fix rps runner to use random users from context"
This commit is contained in:
commit
2015bf92a2
@ -32,7 +32,7 @@ def _worker_thread(queue, args):
|
||||
queue.put(base._run_scenario_once(args))
|
||||
|
||||
|
||||
def _worker_process(rps, times, queue, scenario_context, timeout,
|
||||
def _worker_process(rps, times, queue, context, timeout,
|
||||
worker_id, workers, cls, method_name, args):
|
||||
"""Start scenario within threads.
|
||||
|
||||
@ -42,7 +42,7 @@ def _worker_process(rps, times, queue, scenario_context, timeout,
|
||||
:param rps: runs per second
|
||||
:param times: number of threads to be run
|
||||
:param queue: queue object to append results
|
||||
:param scenario_context: scenario context object
|
||||
:param context: scenario context object
|
||||
:param timeout: timeout operation
|
||||
:param worker_id: id of worker process
|
||||
:param workers: number of total workers
|
||||
@ -63,6 +63,7 @@ def _worker_process(rps, times, queue, scenario_context, timeout,
|
||||
time.sleep(randsleep_delay / 100.0)
|
||||
|
||||
while times > i:
|
||||
scenario_context = base._get_scenario_context(context)
|
||||
i += 1
|
||||
scenario_args = (queue, (worker_id + workers * (i - 1), cls,
|
||||
method_name, scenario_context, args),)
|
||||
@ -137,14 +138,13 @@ class RPSScenarioRunner(base.ScenarioRunner):
|
||||
queue = multiprocessing.Queue()
|
||||
|
||||
process_pool = []
|
||||
scenario_context = base._get_scenario_context(context)
|
||||
|
||||
times_per_worker, rest = divmod(times, processes_to_start)
|
||||
|
||||
for i in range(processes_to_start):
|
||||
times = times_per_worker + int(rest > 0)
|
||||
rest -= 1
|
||||
worker_args = (rps_per_worker, times, queue, scenario_context,
|
||||
worker_args = (rps_per_worker, times, queue, context,
|
||||
timeout, i, processes_to_start, cls,
|
||||
method_name, args)
|
||||
process = multiprocessing.Process(target=_worker_process,
|
||||
|
@ -58,8 +58,9 @@ class RPSScenarioRunnerTestCase(test.TestCase):
|
||||
@mock.patch("rally.benchmark.runners.rps.time")
|
||||
@mock.patch("rally.benchmark.runners.rps.threading.Thread")
|
||||
@mock.patch("rally.benchmark.runners.rps.multiprocessing.Queue")
|
||||
def test__worker_process(self, mock_queue, mock_thread, mock_time,
|
||||
mock_log):
|
||||
@mock.patch("rally.benchmark.runners.rps.base")
|
||||
def test__worker_process(self, mock_base, mock_queue, mock_thread,
|
||||
mock_time, mock_log):
|
||||
|
||||
def time_side():
|
||||
time_side.last += 0.03
|
||||
@ -76,7 +77,10 @@ class RPSScenarioRunnerTestCase(test.TestCase):
|
||||
|
||||
times = 4
|
||||
|
||||
rps._worker_process(10, times, mock_queue, None, 600, 1, 1,
|
||||
context = {'users': [{'tenant_id': 't1', 'endpoint': 'e1',
|
||||
'id': 'uuid1'}]}
|
||||
|
||||
rps._worker_process(10, times, mock_queue, context, 600, 1, 1,
|
||||
"Dummy", "dummy", ())
|
||||
|
||||
self.assertEqual(times, mock_log.debug.call_count)
|
||||
@ -84,14 +88,16 @@ class RPSScenarioRunnerTestCase(test.TestCase):
|
||||
|
||||
self.assertEqual(times, mock_thread_instance.start.call_count)
|
||||
self.assertEqual(times, mock_thread_instance.join.call_count)
|
||||
self.assertEqual(3, mock_time.sleep.call_count)
|
||||
self.assertEqual(times - 1, mock_time.sleep.call_count)
|
||||
self.assertEqual(times, mock_thread_instance.isAlive.call_count)
|
||||
self.assertEqual(15, mock_time.time.count)
|
||||
self.assertEqual(times * 4 - 1, mock_time.time.count)
|
||||
self.assertEqual(times, mock_base._get_scenario_context.call_count)
|
||||
|
||||
for i in range(1, times + 1):
|
||||
scenario_context = mock_base._get_scenario_context(context)
|
||||
call = mock.call(args=(mock_queue,
|
||||
(i, "Dummy", "dummy",
|
||||
None, ())),
|
||||
scenario_context, ())),
|
||||
target=rps._worker_thread)
|
||||
self.assertIn(call, mock_thread.mock_calls)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user