Merge "Parallelize config cache loading"

This commit is contained in:
Zuul 2022-06-23 15:22:06 +00:00 committed by Gerrit Code Review
commit f2d4ff276b
3 changed files with 121 additions and 81 deletions

View File

@ -315,6 +315,9 @@ class TestOfflineZKOperations(ZuulTestCase):
def assertFinalState(self): def assertFinalState(self):
pass pass
def assertCleanShutdown(self):
pass
def test_delete_state(self): def test_delete_state(self):
# Shut everything down (as much as possible) to reduce # Shut everything down (as much as possible) to reduce
# logspam and errors. # logspam and errors.

View File

@ -12,6 +12,7 @@
import collections import collections
from contextlib import contextmanager from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
import copy import copy
import itertools import itertools
import os import os
@ -1608,7 +1609,7 @@ class TenantParser(object):
} }
return vs.Schema(tenant) return vs.Schema(tenant)
def fromYaml(self, abide, conf, ansible_manager, min_ltimes=None, def fromYaml(self, abide, conf, ansible_manager, executor, min_ltimes=None,
layout_uuid=None, branch_cache_min_ltimes=None, layout_uuid=None, branch_cache_min_ltimes=None,
ignore_cat_exception=True): ignore_cat_exception=True):
# Note: This vs schema validation is not necessary in most cases as we # Note: This vs schema validation is not necessary in most cases as we
@ -1659,7 +1660,15 @@ class TenantParser(object):
# We prepare a stack to store config loading issues # We prepare a stack to store config loading issues
loading_errors = model.LoadingErrors() loading_errors = model.LoadingErrors()
# Get branches in parallel
branch_futures = {}
for tpc in config_tpcs + untrusted_tpcs: for tpc in config_tpcs + untrusted_tpcs:
future = executor.submit(self._getProjectBranches,
tenant, tpc, branch_cache_min_ltimes)
branch_futures[future] = tpc
for branch_future in as_completed(branch_futures.keys()):
tpc = branch_futures[branch_future]
source_context = model.ProjectContext( source_context = model.ProjectContext(
tpc.project.canonical_name, tpc.project.name) tpc.project.canonical_name, tpc.project.name)
with project_configuration_exceptions(source_context, with project_configuration_exceptions(source_context,
@ -1682,7 +1691,7 @@ class TenantParser(object):
# already cached. Full reconfigurations start with an empty # already cached. Full reconfigurations start with an empty
# cache. # cache.
self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes, self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes,
ignore_cat_exception) executor, ignore_cat_exception)
# Then collect the appropriate YAML based on this tenant # Then collect the appropriate YAML based on this tenant
# config. # config.
@ -1879,7 +1888,7 @@ class TenantParser(object):
raise Exception("Unable to parse project %s", conf) raise Exception("Unable to parse project %s", conf)
return projects return projects
def loadTenantProjects(self, conf_tenant): def loadTenantProjects(self, conf_tenant, executor):
config_projects = [] config_projects = []
untrusted_projects = [] untrusted_projects = []
@ -1887,6 +1896,7 @@ class TenantParser(object):
'secret', 'project-template', 'nodeset', 'secret', 'project-template', 'nodeset',
'queue']) 'queue'])
futures = []
for source_name, conf_source in conf_tenant.get('source', {}).items(): for source_name, conf_source in conf_tenant.get('source', {}).items():
source = self.connections.getSource(source_name) source = self.connections.getSource(source_name)
@ -1895,7 +1905,8 @@ class TenantParser(object):
# tpcs = TenantProjectConfigs # tpcs = TenantProjectConfigs
tpcs = self._getProjects(source, conf_repo, current_include) tpcs = self._getProjects(source, conf_repo, current_include)
for tpc in tpcs: for tpc in tpcs:
self._loadProjectKeys(source_name, tpc.project) futures.append(executor.submit(
self._loadProjectKeys, source_name, tpc.project))
config_projects.append(tpc) config_projects.append(tpc)
current_include = frozenset(default_include - set(['pipeline'])) current_include = frozenset(default_include - set(['pipeline']))
@ -1903,13 +1914,16 @@ class TenantParser(object):
tpcs = self._getProjects(source, conf_repo, tpcs = self._getProjects(source, conf_repo,
current_include) current_include)
for tpc in tpcs: for tpc in tpcs:
self._loadProjectKeys(source_name, tpc.project) futures.append(executor.submit(
self._loadProjectKeys, source_name, tpc.project))
untrusted_projects.append(tpc) untrusted_projects.append(tpc)
for f in futures:
f.result()
return config_projects, untrusted_projects return config_projects, untrusted_projects
def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes, def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes,
ignore_cat_exception=True): executor, ignore_cat_exception=True):
# min_ltimes can be the following: None (that means that we # min_ltimes can be the following: None (that means that we
# should not use the file cache at all) or a nested dict of # should not use the file cache at all) or a nested dict of
# project and branch to ltime. A value of None usually means # project and branch to ltime. A value of None usually means
@ -1973,6 +1987,7 @@ class TenantParser(object):
jobs = [] jobs = []
futures = []
for project in itertools.chain( for project in itertools.chain(
tenant.config_projects, tenant.untrusted_projects): tenant.config_projects, tenant.untrusted_projects):
tpc = tenant.project_configs[project.canonical_name] tpc = tenant.project_configs[project.canonical_name]
@ -1986,67 +2001,13 @@ class TenantParser(object):
# If all config classes are excluded then do not # If all config classes are excluded then do not
# request any getFiles jobs. # request any getFiles jobs.
continue continue
futures.append(executor.submit(self._cacheTenantYAMLBranch,
abide, tenant, loading_errors,
min_ltimes, tpc, project,
branch, jobs))
for future in futures:
future.result()
source_context = model.SourceContext(
project.canonical_name, project.name,
project.connection_name, branch, '', False)
if min_ltimes is not None:
files_cache = self.unparsed_config_cache.getFilesCache(
project.canonical_name, branch)
branch_cache = abide.getUnparsedBranchCache(
project.canonical_name, branch)
try:
pb_ltime = min_ltimes[project.canonical_name][branch]
except KeyError:
self.log.exception(
"Min. ltime missing for project/branch")
pb_ltime = -1
# If our unparsed branch cache is valid for the
# time, then we don't need to do anything else.
if branch_cache.isValidFor(tpc, pb_ltime):
min_ltimes[project.canonical_name][branch] =\
branch_cache.ltime
continue
with self.unparsed_config_cache.readLock(
project.canonical_name):
if files_cache.isValidFor(tpc, pb_ltime):
self.log.debug(
"Using files from cache for project "
"%s @%s: %s",
project.canonical_name, branch,
list(files_cache.keys()))
self._updateUnparsedBranchCache(
abide, tenant, source_context, files_cache,
loading_errors, files_cache.ltime,
min_ltimes)
continue
extra_config_files = abide.getExtraConfigFiles(project.name)
extra_config_dirs = abide.getExtraConfigDirs(project.name)
if not self.merger:
with project_configuration_exceptions(source_context,
loading_errors):
raise Exception(
"Configuration files missing from cache. "
"Check Zuul scheduler logs for more information.")
continue
ltime = self.zk_client.getCurrentLtime()
job = self.merger.getFiles(
project.source.connection.connection_name,
project.name, branch,
files=(['zuul.yaml', '.zuul.yaml'] +
list(extra_config_files)),
dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
self.log.debug("Submitting cat job %s for %s %s %s" % (
job, project.source.connection.connection_name,
project.name, branch))
job.extra_config_files = extra_config_files
job.extra_config_dirs = extra_config_dirs
job.ltime = ltime
job.source_context = source_context
jobs.append(job)
try: try:
self._processCatJobs(abide, tenant, loading_errors, jobs, self._processCatJobs(abide, tenant, loading_errors, jobs,
min_ltimes) min_ltimes)
@ -2063,7 +2024,76 @@ class TenantParser(object):
if not ignore_cat_exception: if not ignore_cat_exception:
raise raise
def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes,
tpc, project, branch, jobs):
# This is the middle section of _cacheTenantYAML, called for
# each project-branch. It's a separate method so we can
# execute it in parallel. The "jobs" argument is mutated and
# accumulates a list of all merger jobs submitted.
source_context = model.SourceContext(
project.canonical_name, project.name,
project.connection_name, branch, '', False)
if min_ltimes is not None:
files_cache = self.unparsed_config_cache.getFilesCache(
project.canonical_name, branch)
branch_cache = abide.getUnparsedBranchCache(
project.canonical_name, branch)
try:
pb_ltime = min_ltimes[project.canonical_name][branch]
except KeyError:
self.log.exception(
"Min. ltime missing for project/branch")
pb_ltime = -1
# If our unparsed branch cache is valid for the
# time, then we don't need to do anything else.
if branch_cache.isValidFor(tpc, pb_ltime):
min_ltimes[project.canonical_name][branch] =\
branch_cache.ltime
return
with self.unparsed_config_cache.readLock(
project.canonical_name):
if files_cache.isValidFor(tpc, pb_ltime):
self.log.debug(
"Using files from cache for project "
"%s @%s: %s",
project.canonical_name, branch,
list(files_cache.keys()))
self._updateUnparsedBranchCache(
abide, tenant, source_context, files_cache,
loading_errors, files_cache.ltime,
min_ltimes)
return
extra_config_files = abide.getExtraConfigFiles(project.name)
extra_config_dirs = abide.getExtraConfigDirs(project.name)
if not self.merger:
with project_configuration_exceptions(source_context,
loading_errors):
raise Exception(
"Configuration files missing from cache. "
"Check Zuul scheduler logs for more information.")
return
ltime = self.zk_client.getCurrentLtime()
job = self.merger.getFiles(
project.source.connection.connection_name,
project.name, branch,
files=(['zuul.yaml', '.zuul.yaml'] +
list(extra_config_files)),
dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs))
self.log.debug("Submitting cat job %s for %s %s %s" % (
job, project.source.connection.connection_name,
project.name, branch))
job.extra_config_files = extra_config_files
job.extra_config_dirs = extra_config_dirs
job.ltime = ltime
job.source_context = source_context
jobs.append(job)
def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes): def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes):
# Called at the end of _cacheTenantYAML after all cat jobs
# have been submitted
for job in jobs: for job in jobs:
self.log.debug("Waiting for cat job %s" % (job,)) self.log.debug("Waiting for cat job %s" % (job,))
res = job.wait(self.merger.git_timeout) res = job.wait(self.merger.git_timeout)
@ -2545,15 +2575,17 @@ class ConfigLoader(object):
# Pre-load TenantProjectConfigs so we can get and cache all of a # Pre-load TenantProjectConfigs so we can get and cache all of a
# project's config files (incl. tenant specific extra config) at once. # project's config files (incl. tenant specific extra config) at once.
for tenant_name, unparsed_config in tenants_to_load.items(): with ThreadPoolExecutor(max_workers=4) as executor:
config_tpcs, untrusted_tpcs = ( for tenant_name, unparsed_config in tenants_to_load.items():
self.tenant_parser.loadTenantProjects(unparsed_config) config_tpcs, untrusted_tpcs = (
) self.tenant_parser.loadTenantProjects(unparsed_config,
abide.clearTPCs(tenant_name) executor)
for tpc in config_tpcs: )
abide.addConfigTPC(tenant_name, tpc) abide.clearTPCs(tenant_name)
for tpc in untrusted_tpcs: for tpc in config_tpcs:
abide.addUntrustedTPC(tenant_name, tpc) abide.addConfigTPC(tenant_name, tpc)
for tpc in untrusted_tpcs:
abide.addUntrustedTPC(tenant_name, tpc)
def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide, def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide,
min_ltimes=None, layout_uuid=None, min_ltimes=None, layout_uuid=None,
@ -2632,9 +2664,11 @@ class ConfigLoader(object):
return None return None
unparsed_config = unparsed_abide.tenants[tenant_name] unparsed_config = unparsed_abide.tenants[tenant_name]
new_tenant = self.tenant_parser.fromYaml( with ThreadPoolExecutor(max_workers=4) as executor:
abide, unparsed_config, ansible_manager, min_ltimes, layout_uuid, new_tenant = self.tenant_parser.fromYaml(
branch_cache_min_ltimes, ignore_cat_exception) abide, unparsed_config, ansible_manager, executor,
min_ltimes, layout_uuid, branch_cache_min_ltimes,
ignore_cat_exception)
# Copy tenants dictionary to not break concurrent iterations. # Copy tenants dictionary to not break concurrent iterations.
tenants = abide.tenants.copy() tenants = abide.tenants.copy()
tenants[tenant_name] = new_tenant tenants[tenant_name] = new_tenant

View File

@ -17,7 +17,7 @@ from threading import Thread
from typing import List, Callable from typing import List, Callable
from kazoo.client import KazooClient from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError from kazoo.exceptions import NoNodeError, NodeExistsError
from kazoo.handlers.threading import KazooTimeoutError from kazoo.handlers.threading import KazooTimeoutError
from kazoo.protocol.states import KazooState from kazoo.protocol.states import KazooState
@ -211,8 +211,11 @@ class ZooKeeperClient(object):
try: try:
zstat = self.client.set("/zuul/ltime", b"") zstat = self.client.set("/zuul/ltime", b"")
except NoNodeError: except NoNodeError:
self.client.create("/zuul/ltime", b"", makepath=True) try:
zstat = self.client.set("/zuul/ltime", b"") self.client.create("/zuul/ltime", b"", makepath=True)
zstat = self.client.set("/zuul/ltime", b"")
except NodeExistsError:
zstat = self.client.set("/zuul/ltime", b"")
return zstat.last_modified_transaction_id return zstat.last_modified_transaction_id