Replace non-documented ThreadPool with multiprocessing.Pool

Change-Id: I18b155565d12108a8ed8c8b8249f531c294f0e17
This commit is contained in:
liyingjun 2014-04-24 16:24:24 +08:00
parent 8c7e0bd346
commit 08c2cfe8d4
4 changed files with 25 additions and 8 deletions

View File

@ -164,7 +164,8 @@ class UserGenerator(base.Context):
for tenant, users in utils.run_concurrent(
self.config["concurrent"],
self._create_tenant_users,
UserGenerator,
"_create_tenant_users",
args):
self.context["tenants"].append(tenant)
self.context["users"] += users
@ -179,12 +180,14 @@ class UserGenerator(base.Context):
users_chunks = utils.chunks(self.context["users"], concurrent)
utils.run_concurrent(
concurrent,
self._delete_users,
UserGenerator,
"_delete_users",
[(self.endpoint, users) for users in users_chunks])
# Delete tenants
tenants_chunks = utils.chunks(self.context["tenants"], concurrent)
utils.run_concurrent(
concurrent,
self._delete_tenants,
UserGenerator,
"_delete_tenants",
[(self.endpoint, tenants) for tenants in tenants_chunks])

View File

@ -146,17 +146,24 @@ def infinite_run_args_generator(args_func):
yield args_func(i)
def run_concurrent(concurrent, fn, fn_args):
def run_concurrent_helper(args):
cls, method, fn_args = args
return getattr(cls, method)(fn_args)
def run_concurrent(concurrent, cls, fn, fn_args):
"""Run given function using pool of threads.
:param concurrent: number of threads in the pool
:param fn: function to be called in the pool
:param cls: class to be called in the pool
:param fn: class method to be called in the pool
:param fn_args: list of arguments for function fn() in the pool
:returns: iterator from Pool.imap()
"""
pool = multiprocessing.pool.ThreadPool(concurrent)
iterator = pool.imap(fn, fn_args)
pool = multiprocessing.Pool(concurrent)
iterator = pool.imap(run_concurrent_helper,
[(cls, fn, args) for args in fn_args])
pool.close()
pool.join()

View File

@ -23,7 +23,8 @@ from tests import fakes
from tests import test
run_concurrent = lambda dummy, f, args: list(itertools.imap(f, args))
run_concurrent = lambda dummy, cls, f, args: \
list(itertools.imap(getattr(cls, f), args))
@mock.patch.object(utils, "run_concurrent", run_concurrent)

View File

@ -106,6 +106,12 @@ class BenchmarkUtilsTestCase(test.TestCase):
self.assertRaises(exceptions.GetResourceFailure,
get_from_manager, resource)
def test_run_concurrent_helper(self):
cls = mock.MagicMock()
args = (cls, "test", {})
result = utils.run_concurrent_helper(args)
self.assertEqual(cls.test(), result)
class WaitForTestCase(test.TestCase):