Cache unparsed config files in Zookeeper
Cache raw config files in Zookeeper so we can load the config on multiple schedulers with only one round trip to the mergers. In case no cache ltime is give, the configloader will get the content of the config files via a cat job and store the returned config in Zookeeper. For tenant reconfigurations the 'zuul_event_ltime' of the corresponding event will be passed as the cache ltime to 'reloadTenant()'. This has the advantage that in case a project is part of multiple tenants, only the first tenant will request the config from the mergers. All following tenants can get the config from Zookeeper. In a later step we can save the logical timestamp of a tenant reconfiguration in the 'global layout state'. This timestamp can then be used as the cache ltime by other schedulers to update their local layout. Change-Id: Ia3d2a124eaf6ee9d82c94a0375ee7ecba61caadf
This commit is contained in:
parent
997f88185a
commit
62526da69f
|
@ -4576,6 +4576,15 @@ class ZuulTestCase(BaseTestCase):
|
|||
with open(private_key_file, 'w') as o:
|
||||
o.write(i.read())
|
||||
|
||||
def getCurrentLtime(self):
|
||||
"""Get the logical timestamp as seen by the Zookeeper cluster."""
|
||||
result = self.zk_client.client.command(b"srvr")
|
||||
for line in result.splitlines():
|
||||
match = re.match(r"zxid:\s+0x(?P<zxid>[a-f0-9])", line, re.I)
|
||||
if match:
|
||||
return int(match.group("zxid"), 16)
|
||||
raise RuntimeError("Could not find zxid in Zookeeper srvr output")
|
||||
|
||||
def copyDirToRepo(self, project, source_path):
|
||||
self.init_repo(project)
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ import logging
|
|||
import textwrap
|
||||
import testtools
|
||||
|
||||
from zuul import model
|
||||
from zuul.configloader import AuthorizationRuleParser, safe_load_yaml
|
||||
|
||||
from tests.base import ZuulTestCase
|
||||
|
@ -521,6 +522,62 @@ class TestConfigConflict(ZuulTestCase):
|
|||
jobs)
|
||||
|
||||
|
||||
class TestUnparsedConfigCache(ZuulTestCase):
|
||||
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||
|
||||
def test_config_caching(self):
|
||||
cache = self.scheds.first.sched. unparsed_config_cache
|
||||
tenant = self.scheds.first.sched.abide.tenants["tenant-one"]
|
||||
|
||||
common_cache = cache.getFilesCache("review.example.com/common-config",
|
||||
"master")
|
||||
tpc = tenant.project_configs["review.example.com/common-config"]
|
||||
self.assertTrue(common_cache.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertEqual(len(common_cache), 1)
|
||||
self.assertIn("zuul.yaml", common_cache)
|
||||
self.assertTrue(len(common_cache["zuul.yaml"]) > 0)
|
||||
|
||||
project_cache = cache.getFilesCache("review.example.com/org/project",
|
||||
"master")
|
||||
# Cache of org/project should be valid but empty (no in-repo config)
|
||||
tpc = tenant.project_configs["review.example.com/org/project"]
|
||||
self.assertTrue(project_cache.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertEqual(len(project_cache), 0)
|
||||
|
||||
def test_cache_use(self):
|
||||
sched = self.scheds.first.sched
|
||||
# Stop cleanup thread so it's not removing projects from the cache
|
||||
# during the test.
|
||||
sched.cleanup_stop.set()
|
||||
sched.cleanup_thread.join()
|
||||
tenant = sched.abide.tenants['tenant-one']
|
||||
_, project = tenant.getProject('org/project2')
|
||||
|
||||
# Get the current ltime from Zookeeper and run a full reconfiguration,
|
||||
# so that we know all items in the cache have a larger ltime.
|
||||
ltime = self.getCurrentLtime()
|
||||
self.scheds.first.fullReconfigure()
|
||||
|
||||
# Clear the unparsed branch cache so all projects (except for
|
||||
# org/project2) are retrieved from the cache in Zookeeper.
|
||||
sched.abide.unparsed_project_branch_cache.clear()
|
||||
self.gearman_server.jobs_history.clear()
|
||||
|
||||
# Create a tenant reconfiguration event with a known ltime that is
|
||||
# smaller than the ltime of the items in the cache.
|
||||
event = model.TenantReconfigureEvent(
|
||||
tenant.name, project.canonical_name, branch_name=None)
|
||||
event.zuul_event_ltime = ltime
|
||||
sched.management_events.put(event, needs_result=False)
|
||||
self.waitUntilSettled()
|
||||
|
||||
# As the cache should be valid, we only expect a cat job for
|
||||
# org/project2
|
||||
cat_jobs = [job for job in self.gearman_server.jobs_history
|
||||
if job.name == b"merger:cat"]
|
||||
self.assertEqual(len(cat_jobs), 1)
|
||||
|
||||
|
||||
class TestAuthorizationRuleParser(ZuulTestCase):
|
||||
tenant_config_file = 'config/tenant-parser/authorizations.yaml'
|
||||
|
||||
|
|
|
@ -1485,6 +1485,7 @@ class TenantParser(object):
|
|||
self.scheduler = scheduler
|
||||
self.merger = merger
|
||||
self.keystorage = keystorage
|
||||
self.unparsed_config_cache = self.scheduler.unparsed_config_cache
|
||||
|
||||
classes = vs.Any('pipeline', 'job', 'semaphore', 'project',
|
||||
'project-template', 'nodeset', 'secret', 'queue')
|
||||
|
@ -1546,7 +1547,7 @@ class TenantParser(object):
|
|||
}
|
||||
return vs.Schema(tenant)
|
||||
|
||||
def fromYaml(self, abide, conf, ansible_manager):
|
||||
def fromYaml(self, abide, conf, ansible_manager, cache_ltime=None):
|
||||
self.getSchema()(conf)
|
||||
tenant = model.Tenant(conf['name'])
|
||||
pcontext = ParseContext(self.connections, self.scheduler,
|
||||
|
@ -1604,7 +1605,7 @@ class TenantParser(object):
|
|||
# Start by fetching any YAML needed by this tenant which isn't
|
||||
# already cached. Full reconfigurations start with an empty
|
||||
# cache.
|
||||
self._cacheTenantYAML(abide, tenant, loading_errors)
|
||||
self._cacheTenantYAML(abide, tenant, loading_errors, cache_ltime)
|
||||
|
||||
# Then collect the appropriate YAML based on this tenant
|
||||
# config.
|
||||
|
@ -1770,7 +1771,7 @@ class TenantParser(object):
|
|||
|
||||
return config_projects, untrusted_projects
|
||||
|
||||
def _cacheTenantYAML(self, abide, tenant, loading_errors):
|
||||
def _cacheTenantYAML(self, abide, tenant, loading_errors, cache_ltime):
|
||||
jobs = []
|
||||
for project in itertools.chain(
|
||||
tenant.config_projects, tenant.untrusted_projects):
|
||||
|
@ -1790,6 +1791,24 @@ class TenantParser(object):
|
|||
# If all config classes are excluded then do not
|
||||
# request any getFiles jobs.
|
||||
continue
|
||||
|
||||
source_context = model.SourceContext(
|
||||
project, branch, '', False)
|
||||
if cache_ltime is not None:
|
||||
files_cache = self.unparsed_config_cache.getFilesCache(
|
||||
project.canonical_name, branch)
|
||||
with self.unparsed_config_cache.readLock(
|
||||
project.canonical_name):
|
||||
if files_cache.isValidFor(tpc, cache_ltime):
|
||||
self.log.debug(
|
||||
"Using files from cache for project %s @%s",
|
||||
project.canonical_name, branch)
|
||||
self._updateUnparsedBranchCache(
|
||||
abide, tenant, source_context, files_cache,
|
||||
loading_errors)
|
||||
branch_cache.setValidFor(tpc)
|
||||
continue
|
||||
|
||||
extra_config_files = abide.getExtraConfigFiles(project.name)
|
||||
extra_config_dirs = abide.getExtraConfigDirs(project.name)
|
||||
job = self.merger.getFiles(
|
||||
|
@ -1801,8 +1820,9 @@ class TenantParser(object):
|
|||
self.log.debug("Submitting cat job %s for %s %s %s" % (
|
||||
job, project.source.connection.connection_name,
|
||||
project.name, branch))
|
||||
job.source_context = model.SourceContext(
|
||||
project, branch, '', False)
|
||||
job.extra_config_files = extra_config_files
|
||||
job.extra_config_dirs = extra_config_dirs
|
||||
job.source_context = source_context
|
||||
jobs.append(job)
|
||||
branch_cache.setValidFor(tpc)
|
||||
|
||||
|
@ -1817,41 +1837,54 @@ class TenantParser(object):
|
|||
raise Exception("Cat job %s failed" % (job,))
|
||||
self.log.debug("Cat job %s got files %s" %
|
||||
(job, job.files.keys()))
|
||||
loaded = False
|
||||
files = sorted(job.files.keys())
|
||||
unparsed_config = model.UnparsedConfig()
|
||||
tpc = tenant.project_configs[
|
||||
job.source_context.project.canonical_name]
|
||||
for conf_root in (
|
||||
('zuul.yaml', 'zuul.d', '.zuul.yaml', '.zuul.d') +
|
||||
tpc.extra_config_files + tpc.extra_config_dirs):
|
||||
for fn in files:
|
||||
fn_root = fn.split('/')[0]
|
||||
if fn_root != conf_root or not job.files.get(fn):
|
||||
|
||||
self._updateUnparsedBranchCache(abide, tenant, job.source_context,
|
||||
job.files, loading_errors)
|
||||
|
||||
# Save all config files in Zookeeper (not just for the current tpc)
|
||||
files_cache = self.unparsed_config_cache.getFilesCache(
|
||||
job.source_context.project.canonical_name,
|
||||
job.source_context.branch)
|
||||
with self.unparsed_config_cache.writeLock(project.canonical_name):
|
||||
for fn, content in job.files.items():
|
||||
# Cache file in Zookeeper
|
||||
if content is not None:
|
||||
files_cache[fn] = content
|
||||
files_cache.setValidFor(job.extra_config_files,
|
||||
job.extra_config_dirs)
|
||||
|
||||
def _updateUnparsedBranchCache(self, abide, tenant, source_context, files,
|
||||
loading_errors):
|
||||
loaded = False
|
||||
tpc = tenant.project_configs[source_context.project.canonical_name]
|
||||
for conf_root in (
|
||||
('zuul.yaml', 'zuul.d', '.zuul.yaml', '.zuul.d') +
|
||||
tpc.extra_config_files + tpc.extra_config_dirs):
|
||||
for fn in sorted(files.keys()):
|
||||
fn_root = fn.split('/')[0]
|
||||
if fn_root != conf_root or not files.get(fn):
|
||||
continue
|
||||
# Don't load from more than one configuration in a
|
||||
# project-branch (unless an "extra" file/dir).
|
||||
if (conf_root not in tpc.extra_config_files and
|
||||
conf_root not in tpc.extra_config_dirs):
|
||||
if (loaded and loaded != conf_root):
|
||||
self.log.warning("Multiple configuration files in %s",
|
||||
source_context)
|
||||
continue
|
||||
# Don't load from more than one configuration in a
|
||||
# project-branch (unless an "extra" file/dir).
|
||||
if (conf_root not in tpc.extra_config_files and
|
||||
conf_root not in tpc.extra_config_dirs):
|
||||
if (loaded and loaded != conf_root):
|
||||
self.log.warning(
|
||||
"Multiple configuration files in %s" %
|
||||
(job.source_context,))
|
||||
continue
|
||||
loaded = conf_root
|
||||
# Create a new source_context so we have unique filenames.
|
||||
source_context = job.source_context.copy()
|
||||
source_context.path = fn
|
||||
self.log.info(
|
||||
"Loading configuration from %s" %
|
||||
(source_context,))
|
||||
incdata = self.loadProjectYAML(
|
||||
job.files[fn], source_context, loading_errors)
|
||||
branch_cache = abide.getUnparsedBranchCache(
|
||||
source_context.project.canonical_name,
|
||||
source_context.branch)
|
||||
branch_cache.put(source_context.path, incdata)
|
||||
unparsed_config.extend(incdata)
|
||||
loaded = conf_root
|
||||
# Create a new source_context so we have unique filenames.
|
||||
source_context = source_context.copy()
|
||||
source_context.path = fn
|
||||
self.log.info(
|
||||
"Loading configuration from %s" %
|
||||
(source_context,))
|
||||
incdata = self.loadProjectYAML(
|
||||
files[fn], source_context, loading_errors)
|
||||
branch_cache = abide.getUnparsedBranchCache(
|
||||
source_context.project.canonical_name,
|
||||
source_context.branch)
|
||||
branch_cache.put(source_context.path, incdata)
|
||||
|
||||
def _loadTenantYAML(self, abide, tenant, loading_errors):
|
||||
config_projects_config = model.UnparsedConfig()
|
||||
|
@ -2262,7 +2295,7 @@ class ConfigLoader(object):
|
|||
return abide
|
||||
|
||||
def reloadTenant(self, abide, tenant, ansible_manager,
|
||||
unparsed_abide=None):
|
||||
unparsed_abide=None, cache_ltime=None):
|
||||
new_abide = model.Abide()
|
||||
new_abide.tenants = abide.tenants.copy()
|
||||
new_abide.admin_rules = abide.admin_rules.copy()
|
||||
|
@ -2296,7 +2329,7 @@ class ConfigLoader(object):
|
|||
|
||||
# When reloading a tenant only, use cached data if available.
|
||||
new_tenant = self.tenant_parser.fromYaml(
|
||||
new_abide, unparsed_config, ansible_manager)
|
||||
new_abide, unparsed_config, ansible_manager, cache_ltime)
|
||||
new_abide.tenants[tenant.name] = new_tenant
|
||||
if len(new_tenant.layout.loading_errors):
|
||||
self.log.warning(
|
||||
|
|
|
@ -71,6 +71,7 @@ from zuul.zk import ZooKeeperClient
|
|||
from zuul.zk.components import (
|
||||
BaseComponent, ComponentRegistry, SchedulerComponent
|
||||
)
|
||||
from zuul.zk.config_cache import UnparsedConfigCache
|
||||
from zuul.zk.event_queues import (
|
||||
GlobalEventWatcher,
|
||||
GlobalManagementEventQueue,
|
||||
|
@ -158,6 +159,7 @@ class Scheduler(threading.Thread):
|
|||
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
|
||||
self.component_info.register()
|
||||
self.component_registry = ComponentRegistry(self.zk_client)
|
||||
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client)
|
||||
|
||||
self.result_event_queue = NamedQueue("ResultEventQueue")
|
||||
self.global_watcher = GlobalEventWatcher(
|
||||
|
@ -434,6 +436,14 @@ class Scheduler(threading.Thread):
|
|||
except Exception:
|
||||
self.log.exception("Error in semaphore cleanup:")
|
||||
|
||||
cached_projects = set(
|
||||
self.unparsed_config_cache.listCachedProjects())
|
||||
active_projects = set(
|
||||
self.abide.unparsed_project_branch_cache.keys())
|
||||
unused_projects = cached_projects - active_projects
|
||||
for project_cname in unused_projects:
|
||||
self.unparsed_config_cache.clearCache(project_cname)
|
||||
|
||||
def addTriggerEvent(self, driver_name, event):
|
||||
event.arrived_at_scheduler_timestamp = time.time()
|
||||
self.trigger_events.put(driver_name, event)
|
||||
|
@ -865,11 +875,16 @@ class Scheduler(threading.Thread):
|
|||
project_name, branch_name)
|
||||
self.abide.clearUnparsedBranchCache(project_name,
|
||||
branch_name)
|
||||
with self.unparsed_config_cache.writeLock(project_name):
|
||||
self.unparsed_config_cache.clearCache(project_name,
|
||||
branch_name)
|
||||
|
||||
old_tenant = self.abide.tenants[event.tenant_name]
|
||||
loader = configloader.ConfigLoader(
|
||||
self.connections, self, self.merger, self.keystore)
|
||||
abide = loader.reloadTenant(
|
||||
self.abide, old_tenant, self.ansible_manager)
|
||||
abide = loader.reloadTenant(self.abide, old_tenant,
|
||||
self.ansible_manager,
|
||||
cache_ltime=event.zuul_event_ltime)
|
||||
tenant = abide.tenants[event.tenant_name]
|
||||
self._reconfigureTenant(tenant)
|
||||
self.abide = abide
|
||||
|
|
Loading…
Reference in New Issue