Update container build to better reign in threads

This restructures the buildah build process to ensure we're
managing our thread pool correctly and not spawing more
threads than needed or expected. By updating the thread
process we'll ensure we're building each container only
once and that we're maintaining build order.

Tests have been added to ensure the new methods are called,
and that the data processing is consistent.

Closes-Bug: #1894856
Change-Id: I647212442c378aabbd1887a0b499c19bbc74b5f7
Signed-off-by: Kevin Carter <kecarter@redhat.com>
This commit is contained in:
Kevin Carter 2020-09-08 08:57:05 -05:00 committed by Emilien Macchi
parent 9848dadc0f
commit 16ff908a2d
2 changed files with 220 additions and 60 deletions

View File

@ -185,76 +185,157 @@ class BuildahBuilder(base.BaseBuilder):
process.execute(*args, run_as_root=False, use_standard_locale=True) process.execute(*args, run_as_root=False, use_standard_locale=True)
def build_all(self, deps=None): def build_all(self, deps=None):
"""Function that browse containers dependencies and build them. """Build all containers.
This function will thread the build process allowing it to complete
in the shortest possible time.
:params deps: Dictionary defining the container images :params deps: Dictionary defining the container images
dependencies. dependencies.
""" """
if deps is None: if deps is None:
deps = self.deps deps = self.deps
if isinstance(deps, (list,)): container_deps = self._generate_deps(deps=deps, containers=list())
# Only a list of images can be multi-processed because they LOG.debug("All container deps: {}".format(container_deps))
# are the last layer to build. Otherwise we could have issues for containers in container_deps:
# to build multiple times the same layer. LOG.info("Processing containers: {}".format(containers))
# Number of workers will be based on CPU count with a min 2, if isinstance(deps, (list,)):
# max 8. Concurrency in Buildah isn't that great so it's not self._multi_build(containers=containers)
# useful to go above 8.
workers = min(8, max(2, processutils.get_worker_count()))
with futures.ThreadPoolExecutor(max_workers=workers) as executor:
future_to_build = {executor.submit(self.build_all,
container): container for container in
deps}
done, not_done = futures.wait(
future_to_build,
timeout=self.build_timeout,
return_when=futures.FIRST_EXCEPTION
)
# NOTE(cloudnull): Once the job has been completed all completed
# jobs are checked for exceptions. If any jobs
# failed a SystemError will be raised using the
# exception information. If any job was loaded
# but not executed a SystemError will be raised.
exceptions = list()
for job in done:
if job._exception:
exceptions.append(
"\nException information: {exception}".format(
exception=job._exception
)
)
else: else:
if exceptions: self._multi_build(containers=[containers])
raise RuntimeError(
'\nThe following errors were detected during '
'container build(s):\n{exceptions}'.format(
exceptions='\n'.join(exceptions)
)
)
if not_done: def _generate_deps(self, deps, containers, prio_list=None):
error_msg = ('The following jobs were ' """Browse containers dependencies and return an an array.
'incomplete: {}'.format(
[future_to_build[job] for job
in not_done]))
jobs_with_exceptions = [{ When the dependencies are generated they're captured in an array,
'container': future_to_build[job], which contains additional arrays. This data structure is later
'exception': job._exception} used in a futures queue.
for job in not_done if job._exception]
if jobs_with_exceptions:
for job_with_exception in jobs_with_exceptions:
error_msg = error_msg + os.linesep + (
"%(container)s raised the following "
"exception: %(exception)s" %
job_with_exception)
raise SystemError(error_msg) :params deps: Dictionary defining the container images
dependencies.
:params containers: List used to keep track of dependent containers.
:params prio_list: List used to keep track of nested dependencies.
:returns: list
"""
LOG.debug("Process deps: {}".format(deps))
if isinstance(deps, (six.string_types,)):
if prio_list:
prio_list.append(deps)
else:
containers.append([deps])
elif isinstance(deps, (dict,)): elif isinstance(deps, (dict,)):
for container in deps: parents = list(deps.keys())
self._generate_container(container) if prio_list:
self.build_all(deps.get(container)) prio_list.extend(parents)
elif isinstance(deps, six.string_types): else:
self._generate_container(deps) containers.append(parents)
for value in deps.values():
LOG.debug("Recursing with: {}".format(value))
self._generate_deps(
deps=value,
containers=containers
)
elif isinstance(deps, (list,)):
dep_list = list()
dep_rehash_list = list()
for item in deps:
if isinstance(item, (six.string_types,)):
dep_list.append(item)
else:
dep_rehash_list.append(item)
if dep_list:
containers.append(dep_list)
for item in dep_rehash_list:
LOG.debug("Recursing with: {}".format(item))
self._generate_deps(
deps=item,
containers=containers,
prio_list=dep_list
)
LOG.debug("Constructed containers: {}".format(containers))
return containers
def _multi_build(self, containers):
"""Build mutliple containers.
Multi-thread the build process for all containers defined within
the containers list.
:params containers: List defining the container images.
"""
# Workers will use the processor core count with a max of 8. If
# the containers array has a length less-than the expected processor
# count, the workers will be adjusted to meet the expectations of the
# work being processed.
workers = min(
min(
8,
max(
2,
processutils.get_worker_count()
)
),
len(containers)
)
with futures.ThreadPoolExecutor(max_workers=workers) as executor:
future_to_build = {
executor.submit(
self._generate_container, container_name
): container_name for container_name in containers
}
done, not_done = futures.wait(
future_to_build,
timeout=self.build_timeout,
return_when=futures.FIRST_EXCEPTION
)
# NOTE(cloudnull): Once the job has been completed all completed
# jobs are checked for exceptions. If any jobs
# failed a SystemError will be raised using the
# exception information. If any job was loaded
# but not executed a SystemError will be raised.
exceptions = list()
for job in done:
if job._exception:
exceptions.append(
"\nException information: {exception}".format(
exception=job._exception
)
)
else:
if exceptions:
raise RuntimeError(
'\nThe following errors were detected during '
'container build(s):\n{exceptions}'.format(
exceptions='\n'.join(exceptions)
)
)
if not_done:
error_msg = (
'The following jobs were incomplete: {}'.format(
[future_to_build[job] for job in not_done]
)
)
jobs_with_exceptions = [{
'container': future_to_build[job],
'exception': job._exception}
for job in not_done if job._exception]
if jobs_with_exceptions:
for job_with_exception in jobs_with_exceptions:
error_msg = error_msg + os.linesep + (
"%(container)s raised the following "
"exception: %(exception)s" %
job_with_exception)
raise SystemError(error_msg)

View File

@ -37,6 +37,39 @@ BUILD_ALL_DICT_CONTAINERS = {
} }
BUILD_ALL_STR_CONTAINER = 'container1' BUILD_ALL_STR_CONTAINER = 'container1'
PREPROCESSED_CONTAINER_DEPS = [
{
"image0": [
"image1",
{
"image2": [
{
"image3": [
"image4",
"image5"
]
},
"image8",
{
"image6": [
"image7"
]
},
"image9"
]
},
{
"image10": [
"image11",
"image12"
]
},
"image13",
"image14"
]
}
]
class ThreadPoolExecutorReturn(object): class ThreadPoolExecutorReturn(object):
_exception = None _exception = None
@ -223,3 +256,49 @@ class TestBuildahBuilder(base.TestCase):
def test_build_all_str_ok(self, mock_touch, def test_build_all_str_ok(self, mock_touch,
mock_build, mock_wait, mock_submit): mock_build, mock_wait, mock_submit):
bb(WORK_DIR, DEPS).build_all(deps=BUILD_ALL_STR_CONTAINER) bb(WORK_DIR, DEPS).build_all(deps=BUILD_ALL_STR_CONTAINER)
def test_dep_processing(self):
containers = list()
self.assertEqual(
bb(WORK_DIR, DEPS)._generate_deps(
deps=PREPROCESSED_CONTAINER_DEPS,
containers=containers
),
[
[
'image0'
],
[
'image1',
'image13',
'image14',
'image2',
'image10'
],
[
'image8',
'image9',
'image3',
'image6'
],
[
'image4',
'image5'
],
[
'image7'
],
[
'image11',
'image12'
]
]
)
@mock.patch(
'tripleo_common.image.builder.buildah.BuildahBuilder._multi_build',
autospec=True
)
def test_build_all_multi_build(self, mock_multi_build):
bb(WORK_DIR, DEPS).build_all(deps=BUILD_ALL_LIST_CONTAINERS)
self.assertTrue(mock_multi_build.called)