From 42e1e1e324ac9d29b15dc4c8ba2ca58d9b219e88 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Sat, 18 Jun 2022 11:51:08 -0700 Subject: [PATCH] Parallelize config cache loading Loading config involves significant network operations for each project: * Loading project keys * Asking the source for the list of branches for each project * Retrieving the config file contents from the ZK cache (if present) * Retrieving the config file contents from git (otherwise) Only the third item in that list is parallelized currently; the others are serialized. To parallelize the remainder, use a thread pool executor. The value of max_workers=4 is chosen as it appears in practice on OpenDev to make the most significant reduction in startup time while higher values make little difference (and could potentially contribute to DoS scenarios or local thread contention). Observed config priming times for various worker counts: 1: 282s 2: 181s 4: 144s 8: 146s Change-Id: I65472a8af96ed95eb28b88cc623ef103be76a46f --- tests/unit/test_client.py | 3 + zuul/configloader.py | 190 ++++++++++++++++++++++---------------- zuul/zk/__init__.py | 9 +- 3 files changed, 121 insertions(+), 81 deletions(-) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 3070514193..b51639952b 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -315,6 +315,9 @@ class TestOfflineZKOperations(ZuulTestCase): def assertFinalState(self): pass + def assertCleanShutdown(self): + pass + def test_delete_state(self): # Shut everything down (as much as possible) to reduce # logspam and errors. diff --git a/zuul/configloader.py b/zuul/configloader.py index 8f453756b9..3356d67e41 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -12,6 +12,7 @@ import collections from contextlib import contextmanager +from concurrent.futures import ThreadPoolExecutor, as_completed import copy import itertools import os @@ -1576,7 +1577,7 @@ class TenantParser(object): } return vs.Schema(tenant) - def fromYaml(self, abide, conf, ansible_manager, min_ltimes=None, + def fromYaml(self, abide, conf, ansible_manager, executor, min_ltimes=None, layout_uuid=None, branch_cache_min_ltimes=None, ignore_cat_exception=True): # Note: This vs schema validation is not necessary in most cases as we @@ -1622,7 +1623,15 @@ class TenantParser(object): # We prepare a stack to store config loading issues loading_errors = model.LoadingErrors() + # Get branches in parallel + branch_futures = {} for tpc in config_tpcs + untrusted_tpcs: + future = executor.submit(self._getProjectBranches, + tenant, tpc, branch_cache_min_ltimes) + branch_futures[future] = tpc + + for branch_future in as_completed(branch_futures.keys()): + tpc = branch_futures[branch_future] source_context = model.ProjectContext( tpc.project.canonical_name, tpc.project.name) with project_configuration_exceptions(source_context, @@ -1645,7 +1654,7 @@ class TenantParser(object): # already cached. Full reconfigurations start with an empty # cache. self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes, - ignore_cat_exception) + executor, ignore_cat_exception) # Then collect the appropriate YAML based on this tenant # config. @@ -1816,7 +1825,7 @@ class TenantParser(object): raise Exception("Unable to parse project %s", conf) return projects - def loadTenantProjects(self, conf_tenant): + def loadTenantProjects(self, conf_tenant, executor): config_projects = [] untrusted_projects = [] @@ -1824,6 +1833,7 @@ class TenantParser(object): 'secret', 'project-template', 'nodeset', 'queue']) + futures = [] for source_name, conf_source in conf_tenant.get('source', {}).items(): source = self.connections.getSource(source_name) @@ -1832,7 +1842,8 @@ class TenantParser(object): # tpcs = TenantProjectConfigs tpcs = self._getProjects(source, conf_repo, current_include) for tpc in tpcs: - self._loadProjectKeys(source_name, tpc.project) + futures.append(executor.submit( + self._loadProjectKeys, source_name, tpc.project)) config_projects.append(tpc) current_include = frozenset(default_include - set(['pipeline'])) @@ -1840,13 +1851,16 @@ class TenantParser(object): tpcs = self._getProjects(source, conf_repo, current_include) for tpc in tpcs: - self._loadProjectKeys(source_name, tpc.project) + futures.append(executor.submit( + self._loadProjectKeys, source_name, tpc.project)) untrusted_projects.append(tpc) + for f in futures: + f.result() return config_projects, untrusted_projects def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes, - ignore_cat_exception=True): + executor, ignore_cat_exception=True): # min_ltimes can be the following: None (that means that we # should not use the file cache at all) or a nested dict of # project and branch to ltime. A value of None usually means @@ -1910,6 +1924,7 @@ class TenantParser(object): jobs = [] + futures = [] for project in itertools.chain( tenant.config_projects, tenant.untrusted_projects): tpc = tenant.project_configs[project.canonical_name] @@ -1923,67 +1938,13 @@ class TenantParser(object): # If all config classes are excluded then do not # request any getFiles jobs. continue + futures.append(executor.submit(self._cacheTenantYAMLBranch, + abide, tenant, loading_errors, + min_ltimes, tpc, project, + branch, jobs)) + for future in futures: + future.result() - source_context = model.SourceContext( - project.canonical_name, project.name, - project.connection_name, branch, '', False) - if min_ltimes is not None: - files_cache = self.unparsed_config_cache.getFilesCache( - project.canonical_name, branch) - branch_cache = abide.getUnparsedBranchCache( - project.canonical_name, branch) - try: - pb_ltime = min_ltimes[project.canonical_name][branch] - except KeyError: - self.log.exception( - "Min. ltime missing for project/branch") - pb_ltime = -1 - - # If our unparsed branch cache is valid for the - # time, then we don't need to do anything else. - if branch_cache.isValidFor(tpc, pb_ltime): - min_ltimes[project.canonical_name][branch] =\ - branch_cache.ltime - continue - - with self.unparsed_config_cache.readLock( - project.canonical_name): - if files_cache.isValidFor(tpc, pb_ltime): - self.log.debug( - "Using files from cache for project " - "%s @%s: %s", - project.canonical_name, branch, - list(files_cache.keys())) - self._updateUnparsedBranchCache( - abide, tenant, source_context, files_cache, - loading_errors, files_cache.ltime, - min_ltimes) - continue - - extra_config_files = abide.getExtraConfigFiles(project.name) - extra_config_dirs = abide.getExtraConfigDirs(project.name) - if not self.merger: - with project_configuration_exceptions(source_context, - loading_errors): - raise Exception( - "Configuration files missing from cache. " - "Check Zuul scheduler logs for more information.") - continue - ltime = self.zk_client.getCurrentLtime() - job = self.merger.getFiles( - project.source.connection.connection_name, - project.name, branch, - files=(['zuul.yaml', '.zuul.yaml'] + - list(extra_config_files)), - dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs)) - self.log.debug("Submitting cat job %s for %s %s %s" % ( - job, project.source.connection.connection_name, - project.name, branch)) - job.extra_config_files = extra_config_files - job.extra_config_dirs = extra_config_dirs - job.ltime = ltime - job.source_context = source_context - jobs.append(job) try: self._processCatJobs(abide, tenant, loading_errors, jobs, min_ltimes) @@ -2000,7 +1961,76 @@ class TenantParser(object): if not ignore_cat_exception: raise + def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes, + tpc, project, branch, jobs): + # This is the middle section of _cacheTenantYAML, called for + # each project-branch. It's a separate method so we can + # execute it in parallel. The "jobs" argument is mutated and + # accumulates a list of all merger jobs submitted. + source_context = model.SourceContext( + project.canonical_name, project.name, + project.connection_name, branch, '', False) + if min_ltimes is not None: + files_cache = self.unparsed_config_cache.getFilesCache( + project.canonical_name, branch) + branch_cache = abide.getUnparsedBranchCache( + project.canonical_name, branch) + try: + pb_ltime = min_ltimes[project.canonical_name][branch] + except KeyError: + self.log.exception( + "Min. ltime missing for project/branch") + pb_ltime = -1 + + # If our unparsed branch cache is valid for the + # time, then we don't need to do anything else. + if branch_cache.isValidFor(tpc, pb_ltime): + min_ltimes[project.canonical_name][branch] =\ + branch_cache.ltime + return + + with self.unparsed_config_cache.readLock( + project.canonical_name): + if files_cache.isValidFor(tpc, pb_ltime): + self.log.debug( + "Using files from cache for project " + "%s @%s: %s", + project.canonical_name, branch, + list(files_cache.keys())) + self._updateUnparsedBranchCache( + abide, tenant, source_context, files_cache, + loading_errors, files_cache.ltime, + min_ltimes) + return + + extra_config_files = abide.getExtraConfigFiles(project.name) + extra_config_dirs = abide.getExtraConfigDirs(project.name) + if not self.merger: + with project_configuration_exceptions(source_context, + loading_errors): + raise Exception( + "Configuration files missing from cache. " + "Check Zuul scheduler logs for more information.") + return + ltime = self.zk_client.getCurrentLtime() + job = self.merger.getFiles( + project.source.connection.connection_name, + project.name, branch, + files=(['zuul.yaml', '.zuul.yaml'] + + list(extra_config_files)), + dirs=['zuul.d', '.zuul.d'] + list(extra_config_dirs)) + self.log.debug("Submitting cat job %s for %s %s %s" % ( + job, project.source.connection.connection_name, + project.name, branch)) + job.extra_config_files = extra_config_files + job.extra_config_dirs = extra_config_dirs + job.ltime = ltime + job.source_context = source_context + jobs.append(job) + def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes): + # Called at the end of _cacheTenantYAML after all cat jobs + # have been submitted for job in jobs: self.log.debug("Waiting for cat job %s" % (job,)) res = job.wait(self.merger.git_timeout) @@ -2475,15 +2505,17 @@ class ConfigLoader(object): # Pre-load TenantProjectConfigs so we can get and cache all of a # project's config files (incl. tenant specific extra config) at once. - for tenant_name, unparsed_config in tenants_to_load.items(): - config_tpcs, untrusted_tpcs = ( - self.tenant_parser.loadTenantProjects(unparsed_config) - ) - abide.clearTPCs(tenant_name) - for tpc in config_tpcs: - abide.addConfigTPC(tenant_name, tpc) - for tpc in untrusted_tpcs: - abide.addUntrustedTPC(tenant_name, tpc) + with ThreadPoolExecutor(max_workers=4) as executor: + for tenant_name, unparsed_config in tenants_to_load.items(): + config_tpcs, untrusted_tpcs = ( + self.tenant_parser.loadTenantProjects(unparsed_config, + executor) + ) + abide.clearTPCs(tenant_name) + for tpc in config_tpcs: + abide.addConfigTPC(tenant_name, tpc) + for tpc in untrusted_tpcs: + abide.addUntrustedTPC(tenant_name, tpc) def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide, min_ltimes=None, layout_uuid=None, @@ -2562,9 +2594,11 @@ class ConfigLoader(object): return None unparsed_config = unparsed_abide.tenants[tenant_name] - new_tenant = self.tenant_parser.fromYaml( - abide, unparsed_config, ansible_manager, min_ltimes, layout_uuid, - branch_cache_min_ltimes, ignore_cat_exception) + with ThreadPoolExecutor(max_workers=4) as executor: + new_tenant = self.tenant_parser.fromYaml( + abide, unparsed_config, ansible_manager, executor, + min_ltimes, layout_uuid, branch_cache_min_ltimes, + ignore_cat_exception) # Copy tenants dictionary to not break concurrent iterations. tenants = abide.tenants.copy() tenants[tenant_name] = new_tenant diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py index 195197c5ff..8ddc7511d1 100644 --- a/zuul/zk/__init__.py +++ b/zuul/zk/__init__.py @@ -17,7 +17,7 @@ from threading import Thread from typing import List, Callable from kazoo.client import KazooClient -from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NoNodeError, NodeExistsError from kazoo.handlers.threading import KazooTimeoutError from kazoo.protocol.states import KazooState @@ -211,8 +211,11 @@ class ZooKeeperClient(object): try: zstat = self.client.set("/zuul/ltime", b"") except NoNodeError: - self.client.create("/zuul/ltime", b"", makepath=True) - zstat = self.client.set("/zuul/ltime", b"") + try: + self.client.create("/zuul/ltime", b"", makepath=True) + zstat = self.client.set("/zuul/ltime", b"") + except NodeExistsError: + zstat = self.client.set("/zuul/ltime", b"") return zstat.last_modified_transaction_id