Rename "parallelize" decorator to "concurrent"
It came to my attention when I was deciding whether or not to use the jenkins_jobs.parallel.parallelize decorator to parallelize things in jenkins_jobs.parser that because it is using the Python threading library nothing is actually parallelized, only concurrentized (at least for CPython). I actually think concurrency is fine for the original use case since that (ie, updating Jenkins jobs) is primarily I/O bound on network connections to a Jenkins instance. However, the "parallel" name really is misleading and could actually be harmful for users of this API who may mistakenly have the impression that it can be used to speed up CPU-bound tasks. Also removes seemingly unnecessary usages of this decorator. ie, jenkins_jobs.builder.Jenkins.changed that is never actually calle with a list of arguments. Change-Id: I996f9dea440e2d6b67ea70870d22942d6eef3ec7
This commit is contained in:
parent
8c9c50b1f6
commit
dfc0efb79c
@ -31,7 +31,7 @@ import yaml
|
|||||||
import jenkins
|
import jenkins
|
||||||
|
|
||||||
from jenkins_jobs.constants import MAGIC_MANAGE_STRING
|
from jenkins_jobs.constants import MAGIC_MANAGE_STRING
|
||||||
from jenkins_jobs.parallel import parallelize
|
from jenkins_jobs.parallel import concurrent
|
||||||
from jenkins_jobs import utils
|
from jenkins_jobs import utils
|
||||||
|
|
||||||
|
|
||||||
@ -153,7 +153,6 @@ class Jenkins(object):
|
|||||||
self._job_list = set(job['name'] for job in self.jobs)
|
self._job_list = set(job['name'] for job in self.jobs)
|
||||||
return self._job_list
|
return self._job_list
|
||||||
|
|
||||||
@parallelize
|
|
||||||
def update_job(self, job_name, xml):
|
def update_job(self, job_name, xml):
|
||||||
if self.is_job(job_name):
|
if self.is_job(job_name):
|
||||||
logger.info("Reconfiguring jenkins job {0}".format(job_name))
|
logger.info("Reconfiguring jenkins job {0}".format(job_name))
|
||||||
@ -274,7 +273,6 @@ class Builder(object):
|
|||||||
# Need to clear the JJB cache after deletion
|
# Need to clear the JJB cache after deletion
|
||||||
self.cache.clear()
|
self.cache.clear()
|
||||||
|
|
||||||
@parallelize
|
|
||||||
def changed(self, job):
|
def changed(self, job):
|
||||||
md5 = job.md5()
|
md5 = job.md5()
|
||||||
|
|
||||||
@ -344,9 +342,9 @@ class Builder(object):
|
|||||||
p_params = [{'job': job} for job in jobs]
|
p_params = [{'job': job} for job in jobs]
|
||||||
results = self.parallel_update_job(
|
results = self.parallel_update_job(
|
||||||
n_workers=n_workers,
|
n_workers=n_workers,
|
||||||
parallelize=p_params)
|
concurrent=p_params)
|
||||||
logging.debug("Parsing results")
|
logging.debug("Parsing results")
|
||||||
# generalize the result parsing, as a parallelized job always returns a
|
# generalize the result parsing, as a concurrent job always returns a
|
||||||
# list
|
# list
|
||||||
if len(p_params) in (1, 0):
|
if len(p_params) in (1, 0):
|
||||||
results = [results]
|
results = [results]
|
||||||
@ -365,7 +363,7 @@ class Builder(object):
|
|||||||
logging.debug("Total run took %ss", (time.time() - orig))
|
logging.debug("Total run took %ss", (time.time() - orig))
|
||||||
return jobs, len(jobs)
|
return jobs, len(jobs)
|
||||||
|
|
||||||
@parallelize
|
@concurrent
|
||||||
def parallel_update_job(self, job):
|
def parallel_update_job(self, job):
|
||||||
self.jenkins.update_job(job.name, job.output().decode('utf-8'))
|
self.jenkins.update_job(job.name, job.output().decode('utf-8'))
|
||||||
return (job.name, job.md5())
|
return (job.name, job.md5())
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
# Parallel execution helper functions and classes
|
# Concurrent execution helper functions and classes
|
||||||
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
import logging
|
import logging
|
||||||
@ -66,30 +66,30 @@ class Worker(threading.Thread):
|
|||||||
self.out_queue.put((task['ord'], res))
|
self.out_queue.put((task['ord'], res))
|
||||||
|
|
||||||
|
|
||||||
def parallelize(func):
|
def concurrent(func):
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def parallelized(*args, **kwargs):
|
def concurrentized(*args, **kwargs):
|
||||||
"""
|
"""
|
||||||
This function will spawn workers and run the decorated function in
|
This function will spawn workers and run the decorated function
|
||||||
parallel on the workers. It will not ensure the thread safety of the
|
concurrently on the workers. It will not ensure the thread safety of
|
||||||
decorated function (the decorated function should be thread safe by
|
the decorated function (the decorated function should be thread safe by
|
||||||
itself). It accepts two special parameters:
|
itself). It accepts two special parameters:
|
||||||
|
|
||||||
:arg list parallelize: list of the arguments to pass to each of the
|
:arg list concurrentize: list of the arguments to pass to each of the
|
||||||
runs, the results of each run will be returned in the same order.
|
runs, the results of each run will be returned in the same order.
|
||||||
:arg int n_workers: number of workers to use, by default and if '0'
|
:arg int n_workers: number of workers to use, by default and if '0'
|
||||||
passed will autodetect the number of cores and use that, if '1'
|
passed will autodetect the number of cores and use that, if '1'
|
||||||
passed, it will not use any workers and just run as if were not
|
passed, it will not use any workers and just run as if were not
|
||||||
parallelized everything.
|
concurrentized everything.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
> @parallelize
|
> @concurrent
|
||||||
> def sample(param1, param2, param3):
|
> def sample(param1, param2, param3):
|
||||||
> return param1 + param2 + param3
|
> return param1 + param2 + param3
|
||||||
>
|
>
|
||||||
> sample('param1', param2='val2',
|
> sample('param1', param2='val2',
|
||||||
> parallelize=[
|
> concurrent=[
|
||||||
> {'param3': 'val3'},
|
> {'param3': 'val3'},
|
||||||
> {'param3': 'val4'},
|
> {'param3': 'val4'},
|
||||||
> {'param3': 'val5'},
|
> {'param3': 'val5'},
|
||||||
@ -97,14 +97,14 @@ def parallelize(func):
|
|||||||
>
|
>
|
||||||
['param1val2val3', 'param1val2val4', 'param1val2val5']
|
['param1val2val3', 'param1val2val4', 'param1val2val5']
|
||||||
|
|
||||||
This will run the function `parallelized_function` 3 times, in
|
This will run the function `concurrentized_function` 3 times, in
|
||||||
parallel (depending on the number of detected cores) and return an
|
concurrent (depending on the number of detected cores) and return an
|
||||||
array with the results of the executions in the same order the
|
array with the results of the executions in the same order the
|
||||||
parameters were passed.
|
parameters were passed.
|
||||||
"""
|
"""
|
||||||
n_workers = kwargs.pop('n_workers', 0)
|
n_workers = kwargs.pop('n_workers', 0)
|
||||||
p_kwargs = kwargs.pop('parallelize', [])
|
p_kwargs = kwargs.pop('concurrent', [])
|
||||||
# if only one parameter is passed inside the parallelize dict, run the
|
# if only one parameter is passed inside the concurrent dict, run the
|
||||||
# original function as is, no need for pools
|
# original function as is, no need for pools
|
||||||
if len(p_kwargs) == 1:
|
if len(p_kwargs) == 1:
|
||||||
kwargs.update(p_kwargs[0])
|
kwargs.update(p_kwargs[0])
|
||||||
@ -115,7 +115,7 @@ def parallelize(func):
|
|||||||
# If no number of workers passed or passed 0
|
# If no number of workers passed or passed 0
|
||||||
if not n_workers:
|
if not n_workers:
|
||||||
n_workers = cpu_count()
|
n_workers = cpu_count()
|
||||||
logging.debug("Running parallel %d workers", n_workers)
|
logging.debug("Running concurrent %d workers", n_workers)
|
||||||
worker_pool = []
|
worker_pool = []
|
||||||
in_queue = queue.Queue()
|
in_queue = queue.Queue()
|
||||||
out_queue = queue.Queue()
|
out_queue = queue.Queue()
|
||||||
@ -146,6 +146,6 @@ def parallelize(func):
|
|||||||
worker.join()
|
worker.join()
|
||||||
# Reorder the results
|
# Reorder the results
|
||||||
results = [r[1] for r in sorted(results)]
|
results = [r[1] for r in sorted(results)]
|
||||||
logging.debug("Parallel task finished")
|
logging.debug("Concurrent task finished")
|
||||||
return results
|
return results
|
||||||
return parallelized
|
return concurrentized
|
||||||
|
@ -18,7 +18,7 @@ from multiprocessing import cpu_count
|
|||||||
from testtools import matchers
|
from testtools import matchers
|
||||||
from testtools import TestCase
|
from testtools import TestCase
|
||||||
|
|
||||||
from jenkins_jobs.parallel import parallelize
|
from jenkins_jobs.parallel import concurrent
|
||||||
from tests.base import mock
|
from tests.base import mock
|
||||||
|
|
||||||
|
|
||||||
@ -26,45 +26,45 @@ class TestCaseParallel(TestCase):
|
|||||||
def test_parallel_correct_order(self):
|
def test_parallel_correct_order(self):
|
||||||
expected = list(range(10, 20))
|
expected = list(range(10, 20))
|
||||||
|
|
||||||
@parallelize
|
@concurrent
|
||||||
def parallel_test(num_base, num_extra):
|
def parallel_test(num_base, num_extra):
|
||||||
return num_base + num_extra
|
return num_base + num_extra
|
||||||
|
|
||||||
parallel_args = [{'num_extra': num} for num in range(10)]
|
parallel_args = [{'num_extra': num} for num in range(10)]
|
||||||
result = parallel_test(10, parallelize=parallel_args)
|
result = parallel_test(10, concurrent=parallel_args)
|
||||||
self.assertThat(result, matchers.Equals(expected))
|
self.assertThat(result, matchers.Equals(expected))
|
||||||
|
|
||||||
def test_parallel_time_less_than_serial(self):
|
def test_parallel_time_less_than_serial(self):
|
||||||
|
|
||||||
@parallelize
|
@concurrent
|
||||||
def wait(secs):
|
def wait(secs):
|
||||||
time.sleep(secs)
|
time.sleep(secs)
|
||||||
|
|
||||||
before = time.time()
|
before = time.time()
|
||||||
# ten threads to make it as fast as possible
|
# ten threads to make it as fast as possible
|
||||||
wait(parallelize=[{'secs': 1} for _ in range(10)], n_workers=10)
|
wait(concurrent=[{'secs': 1} for _ in range(10)], n_workers=10)
|
||||||
after = time.time()
|
after = time.time()
|
||||||
self.assertThat(after - before, matchers.LessThan(5))
|
self.assertThat(after - before, matchers.LessThan(5))
|
||||||
|
|
||||||
def test_parallel_single_thread(self):
|
def test_parallel_single_thread(self):
|
||||||
expected = list(range(10, 20))
|
expected = list(range(10, 20))
|
||||||
|
|
||||||
@parallelize
|
@concurrent
|
||||||
def parallel_test(num_base, num_extra):
|
def parallel_test(num_base, num_extra):
|
||||||
return num_base + num_extra
|
return num_base + num_extra
|
||||||
|
|
||||||
parallel_args = [{'num_extra': num} for num in range(10)]
|
parallel_args = [{'num_extra': num} for num in range(10)]
|
||||||
result = parallel_test(10, parallelize=parallel_args, n_workers=1)
|
result = parallel_test(10, concurrent=parallel_args, n_workers=1)
|
||||||
self.assertThat(result, matchers.Equals(expected))
|
self.assertThat(result, matchers.Equals(expected))
|
||||||
|
|
||||||
@mock.patch('jenkins_jobs.parallel.cpu_count', wraps=cpu_count)
|
@mock.patch('jenkins_jobs.parallel.cpu_count', wraps=cpu_count)
|
||||||
def test_use_auto_detect_cores(self, mockCpu_count):
|
def test_use_auto_detect_cores(self, mockCpu_count):
|
||||||
|
|
||||||
@parallelize
|
@concurrent
|
||||||
def parallel_test():
|
def parallel_test():
|
||||||
return True
|
return True
|
||||||
|
|
||||||
result = parallel_test(parallelize=[{} for _ in range(10)],
|
result = parallel_test(concurrent=[{} for _ in range(10)],
|
||||||
n_workers=0)
|
n_workers=0)
|
||||||
self.assertThat(result, matchers.Equals([True for _ in range(10)]))
|
self.assertThat(result, matchers.Equals([True for _ in range(10)]))
|
||||||
mockCpu_count.assert_called_once_with()
|
mockCpu_count.assert_called_once_with()
|
||||||
|
Loading…
Reference in New Issue
Block a user