Replace non-documented ThreadPool with multiprocessing.Pool

Replace ThreadPool in rally/benchmark/runners/periodic.py

Change-Id: I585f60ba8f8879a585cde5bc377aea489fe0b5ba
This commit is contained in:
liyingjun 2014-04-25 07:48:51 +08:00
parent 155c01cba9
commit 6ff6869201
2 changed files with 16 additions and 6 deletions

View File

@ -14,7 +14,6 @@
# under the License.
import multiprocessing
from multiprocessing import pool as multiprocessing_pool
import time
from rally.benchmark.runners import base
@ -71,14 +70,18 @@ class PeriodicScenarioRunner(base.ScenarioRunner):
async_results = []
pools = []
for i in range(times):
pool = multiprocessing_pool.ThreadPool(processes=1)
pool = multiprocessing.Pool(1)
scenario_args = ((i, cls, method_name,
base._get_scenario_context(context), args),)
async_result = pool.apply_async(base._run_scenario_once,
scenario_args)
async_results.append(async_result)
pool.close()
pools.append(pool)
if i < times - 1:
time.sleep(period)
@ -91,4 +94,7 @@ class PeriodicScenarioRunner(base.ScenarioRunner):
"error": utils.format_exc(e)}
results.append(result)
for pool in pools:
pool.join()
return base.ScenarioRunnerResult(results)

View File

@ -49,6 +49,7 @@ class PeriodicScenarioRunnerTestCase(test.TestCase):
def test_run_scenario(self):
context = fakes.FakeUserContext({}).context
context['task'] = {'uuid': 'fake_uuid'}
config = {"times": 3, "period": 0, "timeout": 5}
runner = periodic.PeriodicScenarioRunner(
None, [context["admin"]["endpoint"]], config)
@ -59,6 +60,7 @@ class PeriodicScenarioRunnerTestCase(test.TestCase):
def test_run_scenario_exception(self):
context = fakes.FakeUserContext({}).context
context['task'] = {'uuid': 'fake_uuid'}
config = {"times": 4, "period": 0}
runner = periodic.PeriodicScenarioRunner(
@ -70,9 +72,9 @@ class PeriodicScenarioRunnerTestCase(test.TestCase):
self.assertIsNotNone(base.ScenarioRunnerResult(result))
@mock.patch("rally.benchmark.runners.periodic.base.ScenarioRunnerResult")
@mock.patch("rally.benchmark.runners.periodic.multiprocessing_pool")
@mock.patch("rally.benchmark.runners.periodic.multiprocessing")
@mock.patch("rally.benchmark.runners.periodic.time.sleep")
def test_run_scenario_internal_logic(self, mock_time, mock_pool,
def test_run_scenario_internal_logic(self, mock_time, mock_mp,
mock_result):
context = fakes.FakeUserContext({}).context
config = {"times": 4, "period": 0, "timeout": 5}
@ -80,7 +82,7 @@ class PeriodicScenarioRunnerTestCase(test.TestCase):
None, [context["admin"]["endpoint"]], config)
mock_pool_inst = mock.MagicMock()
mock_pool.ThreadPool.return_value = mock_pool_inst
mock_mp.Pool.return_value = mock_pool_inst
runner._run_scenario(fakes.FakeScenario, "do_it", context, {})
@ -92,12 +94,14 @@ class PeriodicScenarioRunnerTestCase(test.TestCase):
base._get_scenario_context(context), {}),)
)
exptected_pool_inst_call.append(mock.call.apply_async(*args))
call = mock.call.close()
exptected_pool_inst_call.append(call)
for i in range(config["times"]):
call = mock.call.apply_async().get(timeout=5)
exptected_pool_inst_call.append(call)
mock_pool.assert_has_calls([mock.call.ThreadPool(processes=1)])
mock_mp.assert_has_calls([mock.call.Pool(1)])
mock_pool_inst.assert_has_calls(exptected_pool_inst_call)
mock_time.assert_has_calls([])