diff --git a/tests/base.py b/tests/base.py index ced25e6b17..1500bbc68a 100644 --- a/tests/base.py +++ b/tests/base.py @@ -4576,6 +4576,15 @@ class ZuulTestCase(BaseTestCase): with open(private_key_file, 'w') as o: o.write(i.read()) + def getCurrentLtime(self): + """Get the logical timestamp as seen by the Zookeeper cluster.""" + result = self.zk_client.client.command(b"srvr") + for line in result.splitlines(): + match = re.match(r"zxid:\s+0x(?P[a-f0-9])", line, re.I) + if match: + return int(match.group("zxid"), 16) + raise RuntimeError("Could not find zxid in Zookeeper srvr output") + def copyDirToRepo(self, project, source_path): self.init_repo(project) diff --git a/tests/unit/test_configloader.py b/tests/unit/test_configloader.py index 2756923815..fd79e29683 100644 --- a/tests/unit/test_configloader.py +++ b/tests/unit/test_configloader.py @@ -18,6 +18,7 @@ import logging import textwrap import testtools +from zuul import model from zuul.configloader import AuthorizationRuleParser, safe_load_yaml from tests.base import ZuulTestCase @@ -521,6 +522,62 @@ class TestConfigConflict(ZuulTestCase): jobs) +class TestUnparsedConfigCache(ZuulTestCase): + tenant_config_file = 'config/single-tenant/main.yaml' + + def test_config_caching(self): + cache = self.scheds.first.sched. unparsed_config_cache + tenant = self.scheds.first.sched.abide.tenants["tenant-one"] + + common_cache = cache.getFilesCache("review.example.com/common-config", + "master") + tpc = tenant.project_configs["review.example.com/common-config"] + self.assertTrue(common_cache.isValidFor(tpc, cache_ltime=-1)) + self.assertEqual(len(common_cache), 1) + self.assertIn("zuul.yaml", common_cache) + self.assertTrue(len(common_cache["zuul.yaml"]) > 0) + + project_cache = cache.getFilesCache("review.example.com/org/project", + "master") + # Cache of org/project should be valid but empty (no in-repo config) + tpc = tenant.project_configs["review.example.com/org/project"] + self.assertTrue(project_cache.isValidFor(tpc, cache_ltime=-1)) + self.assertEqual(len(project_cache), 0) + + def test_cache_use(self): + sched = self.scheds.first.sched + # Stop cleanup thread so it's not removing projects from the cache + # during the test. + sched.cleanup_stop.set() + sched.cleanup_thread.join() + tenant = sched.abide.tenants['tenant-one'] + _, project = tenant.getProject('org/project2') + + # Get the current ltime from Zookeeper and run a full reconfiguration, + # so that we know all items in the cache have a larger ltime. + ltime = self.getCurrentLtime() + self.scheds.first.fullReconfigure() + + # Clear the unparsed branch cache so all projects (except for + # org/project2) are retrieved from the cache in Zookeeper. + sched.abide.unparsed_project_branch_cache.clear() + self.gearman_server.jobs_history.clear() + + # Create a tenant reconfiguration event with a known ltime that is + # smaller than the ltime of the items in the cache. + event = model.TenantReconfigureEvent( + tenant.name, project.canonical_name, branch_name=None) + event.zuul_event_ltime = ltime + sched.management_events.put(event, needs_result=False) + self.waitUntilSettled() + + # As the cache should be valid, we only expect a cat job for + # org/project2 + cat_jobs = [job for job in self.gearman_server.jobs_history + if job.name == b"merger:cat"] + self.assertEqual(len(cat_jobs), 1) + + class TestAuthorizationRuleParser(ZuulTestCase): tenant_config_file = 'config/tenant-parser/authorizations.yaml' diff --git a/zuul/configloader.py b/zuul/configloader.py index 374ffe3a76..8f4cce83b6 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -1485,6 +1485,7 @@ class TenantParser(object): self.scheduler = scheduler self.merger = merger self.keystorage = keystorage + self.unparsed_config_cache = self.scheduler.unparsed_config_cache classes = vs.Any('pipeline', 'job', 'semaphore', 'project', 'project-template', 'nodeset', 'secret', 'queue') @@ -1546,7 +1547,7 @@ class TenantParser(object): } return vs.Schema(tenant) - def fromYaml(self, abide, conf, ansible_manager): + def fromYaml(self, abide, conf, ansible_manager, cache_ltime=None): self.getSchema()(conf) tenant = model.Tenant(conf['name']) pcontext = ParseContext(self.connections, self.scheduler, @@ -1604,7 +1605,7 @@ class TenantParser(object): # Start by fetching any YAML needed by this tenant which isn't # already cached. Full reconfigurations start with an empty # cache. - self._cacheTenantYAML(abide, tenant, loading_errors) + self._cacheTenantYAML(abide, tenant, loading_errors, cache_ltime) # Then collect the appropriate YAML based on this tenant # config. @@ -1770,7 +1771,7 @@ class TenantParser(object): return config_projects, untrusted_projects - def _cacheTenantYAML(self, abide, tenant, loading_errors): + def _cacheTenantYAML(self, abide, tenant, loading_errors, cache_ltime): jobs = [] for project in itertools.chain( tenant.config_projects, tenant.untrusted_projects): @@ -1790,6 +1791,24 @@ class TenantParser(object): # If all config classes are excluded then do not # request any getFiles jobs. continue + + source_context = model.SourceContext( + project, branch, '', False) + if cache_ltime is not None: + files_cache = self.unparsed_config_cache.getFilesCache( + project.canonical_name, branch) + with self.unparsed_config_cache.readLock( + project.canonical_name): + if files_cache.isValidFor(tpc, cache_ltime): + self.log.debug( + "Using files from cache for project %s @%s", + project.canonical_name, branch) + self._updateUnparsedBranchCache( + abide, tenant, source_context, files_cache, + loading_errors) + branch_cache.setValidFor(tpc) + continue + extra_config_files = abide.getExtraConfigFiles(project.name) extra_config_dirs = abide.getExtraConfigDirs(project.name) job = self.merger.getFiles( @@ -1801,8 +1820,9 @@ class TenantParser(object): self.log.debug("Submitting cat job %s for %s %s %s" % ( job, project.source.connection.connection_name, project.name, branch)) - job.source_context = model.SourceContext( - project, branch, '', False) + job.extra_config_files = extra_config_files + job.extra_config_dirs = extra_config_dirs + job.source_context = source_context jobs.append(job) branch_cache.setValidFor(tpc) @@ -1817,41 +1837,54 @@ class TenantParser(object): raise Exception("Cat job %s failed" % (job,)) self.log.debug("Cat job %s got files %s" % (job, job.files.keys())) - loaded = False - files = sorted(job.files.keys()) - unparsed_config = model.UnparsedConfig() - tpc = tenant.project_configs[ - job.source_context.project.canonical_name] - for conf_root in ( - ('zuul.yaml', 'zuul.d', '.zuul.yaml', '.zuul.d') + - tpc.extra_config_files + tpc.extra_config_dirs): - for fn in files: - fn_root = fn.split('/')[0] - if fn_root != conf_root or not job.files.get(fn): + + self._updateUnparsedBranchCache(abide, tenant, job.source_context, + job.files, loading_errors) + + # Save all config files in Zookeeper (not just for the current tpc) + files_cache = self.unparsed_config_cache.getFilesCache( + job.source_context.project.canonical_name, + job.source_context.branch) + with self.unparsed_config_cache.writeLock(project.canonical_name): + for fn, content in job.files.items(): + # Cache file in Zookeeper + if content is not None: + files_cache[fn] = content + files_cache.setValidFor(job.extra_config_files, + job.extra_config_dirs) + + def _updateUnparsedBranchCache(self, abide, tenant, source_context, files, + loading_errors): + loaded = False + tpc = tenant.project_configs[source_context.project.canonical_name] + for conf_root in ( + ('zuul.yaml', 'zuul.d', '.zuul.yaml', '.zuul.d') + + tpc.extra_config_files + tpc.extra_config_dirs): + for fn in sorted(files.keys()): + fn_root = fn.split('/')[0] + if fn_root != conf_root or not files.get(fn): + continue + # Don't load from more than one configuration in a + # project-branch (unless an "extra" file/dir). + if (conf_root not in tpc.extra_config_files and + conf_root not in tpc.extra_config_dirs): + if (loaded and loaded != conf_root): + self.log.warning("Multiple configuration files in %s", + source_context) continue - # Don't load from more than one configuration in a - # project-branch (unless an "extra" file/dir). - if (conf_root not in tpc.extra_config_files and - conf_root not in tpc.extra_config_dirs): - if (loaded and loaded != conf_root): - self.log.warning( - "Multiple configuration files in %s" % - (job.source_context,)) - continue - loaded = conf_root - # Create a new source_context so we have unique filenames. - source_context = job.source_context.copy() - source_context.path = fn - self.log.info( - "Loading configuration from %s" % - (source_context,)) - incdata = self.loadProjectYAML( - job.files[fn], source_context, loading_errors) - branch_cache = abide.getUnparsedBranchCache( - source_context.project.canonical_name, - source_context.branch) - branch_cache.put(source_context.path, incdata) - unparsed_config.extend(incdata) + loaded = conf_root + # Create a new source_context so we have unique filenames. + source_context = source_context.copy() + source_context.path = fn + self.log.info( + "Loading configuration from %s" % + (source_context,)) + incdata = self.loadProjectYAML( + files[fn], source_context, loading_errors) + branch_cache = abide.getUnparsedBranchCache( + source_context.project.canonical_name, + source_context.branch) + branch_cache.put(source_context.path, incdata) def _loadTenantYAML(self, abide, tenant, loading_errors): config_projects_config = model.UnparsedConfig() @@ -2262,7 +2295,7 @@ class ConfigLoader(object): return abide def reloadTenant(self, abide, tenant, ansible_manager, - unparsed_abide=None): + unparsed_abide=None, cache_ltime=None): new_abide = model.Abide() new_abide.tenants = abide.tenants.copy() new_abide.admin_rules = abide.admin_rules.copy() @@ -2296,7 +2329,7 @@ class ConfigLoader(object): # When reloading a tenant only, use cached data if available. new_tenant = self.tenant_parser.fromYaml( - new_abide, unparsed_config, ansible_manager) + new_abide, unparsed_config, ansible_manager, cache_ltime) new_abide.tenants[tenant.name] = new_tenant if len(new_tenant.layout.loading_errors): self.log.warning( diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 5a38759ad7..63d787b7cf 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -71,6 +71,7 @@ from zuul.zk import ZooKeeperClient from zuul.zk.components import ( BaseComponent, ComponentRegistry, SchedulerComponent ) +from zuul.zk.config_cache import UnparsedConfigCache from zuul.zk.event_queues import ( GlobalEventWatcher, GlobalManagementEventQueue, @@ -158,6 +159,7 @@ class Scheduler(threading.Thread): self.component_info = SchedulerComponent(self.zk_client, self.hostname) self.component_info.register() self.component_registry = ComponentRegistry(self.zk_client) + self.unparsed_config_cache = UnparsedConfigCache(self.zk_client) self.result_event_queue = NamedQueue("ResultEventQueue") self.global_watcher = GlobalEventWatcher( @@ -434,6 +436,14 @@ class Scheduler(threading.Thread): except Exception: self.log.exception("Error in semaphore cleanup:") + cached_projects = set( + self.unparsed_config_cache.listCachedProjects()) + active_projects = set( + self.abide.unparsed_project_branch_cache.keys()) + unused_projects = cached_projects - active_projects + for project_cname in unused_projects: + self.unparsed_config_cache.clearCache(project_cname) + def addTriggerEvent(self, driver_name, event): event.arrived_at_scheduler_timestamp = time.time() self.trigger_events.put(driver_name, event) @@ -865,11 +875,16 @@ class Scheduler(threading.Thread): project_name, branch_name) self.abide.clearUnparsedBranchCache(project_name, branch_name) + with self.unparsed_config_cache.writeLock(project_name): + self.unparsed_config_cache.clearCache(project_name, + branch_name) + old_tenant = self.abide.tenants[event.tenant_name] loader = configloader.ConfigLoader( self.connections, self, self.merger, self.keystore) - abide = loader.reloadTenant( - self.abide, old_tenant, self.ansible_manager) + abide = loader.reloadTenant(self.abide, old_tenant, + self.ansible_manager, + cache_ltime=event.zuul_event_ltime) tenant = abide.tenants[event.tenant_name] self._reconfigureTenant(tenant) self.abide = abide