Parallelize config cache loading

Loading config involves significant network operations for each project:

* Loading project keys
* Asking the source for the list of branches for each project
* Retrieving the config file contents from the ZK cache (if present)
* Retrieving the config file contents from git (otherwise)

Only the third item in that list is parallelized currently; the others
are serialized.  To parallelize the remainder, use a thread pool executor.

The value of max_workers=4 is chosen as it appears in practice on OpenDev
to make the most significant reduction in startup time while higher values
make little difference (and could potentially contribute to DoS scenarios
or local thread contention).  Observed config priming times for various
worker counts:

1: 282s
2: 181s
4: 144s
8: 146s

Change-Id: I65472a8af96ed95eb28b88cc623ef103be76a46f
This commit is contained in:
James E. Blair 2022-06-18 11:51:08 -07:00
parent c4476d1b6a
commit 42e1e1e324
3 changed files with 121 additions and 81 deletions

View File

@ -315,6 +315,9 @@ class TestOfflineZKOperations(ZuulTestCase):
def assertFinalState(self):
pass
def assertCleanShutdown(self):
pass
def test_delete_state(self):
# Shut everything down (as much as possible) to reduce
# logspam and errors.

View File

@ -12,6 +12,7 @@
import collections
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
import copy
import itertools
import os
@ -1576,7 +1577,7 @@ class TenantParser(object):
}
return vs.Schema(tenant)
def fromYaml(self, abide, conf, ansible_manager, min_ltimes=None,
def fromYaml(self, abide, conf, ansible_manager, executor, min_ltimes=None,
layout_uuid=None, branch_cache_min_ltimes=None,
ignore_cat_exception=True):
# Note: This vs schema validation is not necessary in most cases as we
@ -1622,7 +1623,15 @@ class TenantParser(object):
# We prepare a stack to store config loading issues
loading_errors = model.LoadingErrors()
# Get branches in parallel
branch_futures = {}
for tpc in config_tpcs + untrusted_tpcs:
future = executor.submit(self._getProjectBranches,
tenant, tpc, branch_cache_min_ltimes)
branch_futures[future] = tpc
for branch_future in as_completed(branch_futures.keys()):
tpc = branch_futures[branch_future]
source_context = model.ProjectContext(
tpc.project.canonical_name, tpc.project.name)
with project_configuration_exceptions(source_context,
@ -1645,7 +1654,7 @@ class TenantParser(object):
# already cached. Full reconfigurations start with an empty
# cache.
self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes,
ignore_cat_exception)
executor, ignore_cat_exception)
# Then collect the appropriate YAML based on this tenant
# config.
@ -1816,7 +1825,7 @@ class TenantParser(object):
raise Exception("Unable to parse project %s", conf)
return projects
def loadTenantProjects(self, conf_tenant):
def loadTenantProjects(self, conf_tenant, executor):
config_projects = []
untrusted_projects = []
@ -1824,6 +1833,7 @@ class TenantParser(object):
'secret', 'project-template', 'nodeset',
'queue'])
futures = []
for source_name, conf_source in conf_tenant.get('source', {}).items():
source = self.connections.getSource(source_name)
@ -1832,7 +1842,8 @@ class TenantParser(object):
# tpcs = TenantProjectConfigs
tpcs = self._getProjects(source, conf_repo, current_include)
for tpc in tpcs:
self._loadProjectKeys(source_name, tpc.project)
futures.append(executor.submit(
self._loadProjectKeys, source_name, tpc.project))
config_projects.append(tpc)
current_include = frozenset(default_include - set(['pipeline']))
@ -1840,13 +1851,16 @@ class TenantParser(object):
tpcs = self._getProjects(source, conf_repo,
current_include)
for tpc in tpcs:
self._loadProjectKeys(source_name, tpc.project)
futures.append(executor.submit(
self._loadProjectKeys, source_name, tpc.project))
untrusted_projects.append(tpc)
for f in futures:
f.result()
return config_projects, untrusted_projects
def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes,
ignore_cat_exception=True):
executor, ignore_cat_exception=True):
# min_ltimes can be the following: None (that means that we
# should not use the file cache at all) or a nested dict of
# project and branch to ltime. A value of None usually means
@ -1910,6 +1924,7 @@ class TenantParser(object):
jobs = []
futures = []
for project in itertools.chain(
tenant.config_projects, tenant.untrusted_projects):
tpc = tenant.project_configs[project.canonical_name]
@ -1923,67 +1938,13 @@ class TenantParser(object):
# If all config classes are excluded then do not
# request any getFiles jobs.
continue
futures.append(executor.submit(self._cacheTenantYAMLBranch,
abide, tenant, loading_errors,
min_ltimes, tpc, project,
branch, jobs))
for future in futures:
future.result()
source_context = model.SourceContext(
project.canonical_name, project.name,
project.connection_name, branch, '', False)
if min_ltimes is not None:
files_cache = self.unparsed_config_cache.getFilesCache(
project.canonical_name, branch)
branch_cache = abide.getUnparsedBranchCache(
project.canonical_name, branch)
try:
pb_ltime = min_ltimes[project.canonical_name][branch]
except KeyError:
self.log.exception(
"Min. ltime missing for project/branch")
pb_ltime = -1
# If our unparsed branch cache is valid for the
# time, then we don't need to do anything else.
if branch_cache.isValidFor(tpc, pb_ltime):
min_ltimes[project.canonical_name][branch] =\
branch_cache.ltime
continue
with self.unparsed_config_cache.readLock(
project.canonical_name):
if files_cache.isValidFor(tpc, pb_ltime):
self.log.debug(
"Using files from cache for project "
"%s @%s: %s",
project.canonical_name, branch,
list(files_cache.keys()))
self._updateUnparsedBranchCache(
abide, tenant, source_context, files_cache,
loading_errors, files_cache.ltime,
min_ltimes)
continue
extra_config_files = abide.getExtraConfigFiles(project.name)
extra_config_dirs = abide.getExtraConfigDirs(project.name)
if not self.merger:
with project_configuration_exceptions(source_context,
loading_errors):
raise Exception(
"Configuration files missing from cache. "
"Check Zuul scheduler logs for more information.")
continue
ltime = self.zk_client.getCurrentLtime()
job = self.merger.getFiles(
project.source.connection.connection_name,
project.name, branch,
files=(['zuul.yaml', '.zuul.yaml'] +
list(extra_config_files)),
dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
self.log.debug("Submitting cat job %s for %s %s %s" % (
job, project.source.connection.connection_name,
project.name, branch))
job.extra_config_files = extra_config_files
job.extra_config_dirs = extra_config_dirs
job.ltime = ltime
job.source_context = source_context
jobs.append(job)
try:
self._processCatJobs(abide, tenant, loading_errors, jobs,
min_ltimes)
@ -2000,7 +1961,76 @@ class TenantParser(object):
if not ignore_cat_exception:
raise
def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes,
tpc, project, branch, jobs):
# This is the middle section of _cacheTenantYAML, called for
# each project-branch. It's a separate method so we can
# execute it in parallel. The "jobs" argument is mutated and
# accumulates a list of all merger jobs submitted.
source_context = model.SourceContext(
project.canonical_name, project.name,
project.connection_name, branch, '', False)
if min_ltimes is not None:
files_cache = self.unparsed_config_cache.getFilesCache(
project.canonical_name, branch)
branch_cache = abide.getUnparsedBranchCache(
project.canonical_name, branch)
try:
pb_ltime = min_ltimes[project.canonical_name][branch]
except KeyError:
self.log.exception(
"Min. ltime missing for project/branch")
pb_ltime = -1
# If our unparsed branch cache is valid for the
# time, then we don't need to do anything else.
if branch_cache.isValidFor(tpc, pb_ltime):
min_ltimes[project.canonical_name][branch] =\
branch_cache.ltime
return
with self.unparsed_config_cache.readLock(
project.canonical_name):
if files_cache.isValidFor(tpc, pb_ltime):
self.log.debug(
"Using files from cache for project "
"%s @%s: %s",
project.canonical_name, branch,
list(files_cache.keys()))
self._updateUnparsedBranchCache(
abide, tenant, source_context, files_cache,
loading_errors, files_cache.ltime,
min_ltimes)
return
extra_config_files = abide.getExtraConfigFiles(project.name)
extra_config_dirs = abide.getExtraConfigDirs(project.name)
if not self.merger:
with project_configuration_exceptions(source_context,
loading_errors):
raise Exception(
"Configuration files missing from cache. "
"Check Zuul scheduler logs for more information.")
return
ltime = self.zk_client.getCurrentLtime()
job = self.merger.getFiles(
project.source.connection.connection_name,
project.name, branch,
files=(['zuul.yaml', '.zuul.yaml'] +
list(extra_config_files)),
dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
self.log.debug("Submitting cat job %s for %s %s %s" % (
job, project.source.connection.connection_name,
project.name, branch))
job.extra_config_files = extra_config_files
job.extra_config_dirs = extra_config_dirs
job.ltime = ltime
job.source_context = source_context
jobs.append(job)
def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes):
# Called at the end of _cacheTenantYAML after all cat jobs
# have been submitted
for job in jobs:
self.log.debug("Waiting for cat job %s" % (job,))
res = job.wait(self.merger.git_timeout)
@ -2475,15 +2505,17 @@ class ConfigLoader(object):
# Pre-load TenantProjectConfigs so we can get and cache all of a
# project's config files (incl. tenant specific extra config) at once.
for tenant_name, unparsed_config in tenants_to_load.items():
config_tpcs, untrusted_tpcs = (
self.tenant_parser.loadTenantProjects(unparsed_config)
)
abide.clearTPCs(tenant_name)
for tpc in config_tpcs:
abide.addConfigTPC(tenant_name, tpc)
for tpc in untrusted_tpcs:
abide.addUntrustedTPC(tenant_name, tpc)
with ThreadPoolExecutor(max_workers=4) as executor:
for tenant_name, unparsed_config in tenants_to_load.items():
config_tpcs, untrusted_tpcs = (
self.tenant_parser.loadTenantProjects(unparsed_config,
executor)
)
abide.clearTPCs(tenant_name)
for tpc in config_tpcs:
abide.addConfigTPC(tenant_name, tpc)
for tpc in untrusted_tpcs:
abide.addUntrustedTPC(tenant_name, tpc)
def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide,
min_ltimes=None, layout_uuid=None,
@ -2562,9 +2594,11 @@ class ConfigLoader(object):
return None
unparsed_config = unparsed_abide.tenants[tenant_name]
new_tenant = self.tenant_parser.fromYaml(
abide, unparsed_config, ansible_manager, min_ltimes, layout_uuid,
branch_cache_min_ltimes, ignore_cat_exception)
with ThreadPoolExecutor(max_workers=4) as executor:
new_tenant = self.tenant_parser.fromYaml(
abide, unparsed_config, ansible_manager, executor,
min_ltimes, layout_uuid, branch_cache_min_ltimes,
ignore_cat_exception)
# Copy tenants dictionary to not break concurrent iterations.
tenants = abide.tenants.copy()
tenants[tenant_name] = new_tenant

View File

@ -17,7 +17,7 @@ from threading import Thread
from typing import List, Callable
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.exceptions import NoNodeError, NodeExistsError
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.protocol.states import KazooState
@ -211,8 +211,11 @@ class ZooKeeperClient(object):
try:
zstat = self.client.set("/zuul/ltime", b"")
except NoNodeError:
self.client.create("/zuul/ltime", b"", makepath=True)
zstat = self.client.set("/zuul/ltime", b"")
try:
self.client.create("/zuul/ltime", b"", makepath=True)
zstat = self.client.set("/zuul/ltime", b"")
except NodeExistsError:
zstat = self.client.set("/zuul/ltime", b"")
return zstat.last_modified_transaction_id