Merge "Ensure config cache stages are used correctly"
This commit is contained in:
@@ -575,23 +575,30 @@ class TestUnparsedConfigCache(ZuulTestCase):
|
||||
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||
|
||||
def test_config_caching(self):
|
||||
cache = self.scheds.first.sched. unparsed_config_cache
|
||||
tenant = self.scheds.first.sched.abide.tenants["tenant-one"]
|
||||
sched = self.scheds.first.sched
|
||||
cache = sched.unparsed_config_cache
|
||||
tenant = sched.abide.tenants["tenant-one"]
|
||||
|
||||
common_cache = cache.getFilesCache("review.example.com/common-config",
|
||||
"master")
|
||||
upb_common_cache = sched.abide.getUnparsedBranchCache(
|
||||
"review.example.com/common-config", "master")
|
||||
tpc = tenant.project_configs["review.example.com/common-config"]
|
||||
self.assertTrue(common_cache.isValidFor(tpc, min_ltime=-1))
|
||||
self.assertEqual(len(common_cache), 1)
|
||||
self.assertIn("zuul.yaml", common_cache)
|
||||
self.assertTrue(len(common_cache["zuul.yaml"]) > 0)
|
||||
self.assertEqual(upb_common_cache.ltime, common_cache.ltime)
|
||||
|
||||
project_cache = cache.getFilesCache("review.example.com/org/project",
|
||||
"master")
|
||||
upb_project_cache = sched.abide.getUnparsedBranchCache(
|
||||
"review.example.com/org/project", "master")
|
||||
# Cache of org/project should be valid but empty (no in-repo config)
|
||||
tpc = tenant.project_configs["review.example.com/org/project"]
|
||||
self.assertTrue(project_cache.isValidFor(tpc, min_ltime=-1))
|
||||
self.assertEqual(len(project_cache), 0)
|
||||
self.assertEqual(upb_project_cache.ltime, project_cache.ltime)
|
||||
|
||||
def test_cache_use(self):
|
||||
sched = self.scheds.first.sched
|
||||
@@ -601,10 +608,22 @@ class TestUnparsedConfigCache(ZuulTestCase):
|
||||
tenant = sched.abide.tenants['tenant-one']
|
||||
_, project = tenant.getProject('org/project2')
|
||||
|
||||
cache = self.scheds.first.sched.unparsed_config_cache
|
||||
files_cache = cache.getFilesCache(
|
||||
"review.example.com/org/project2", "master")
|
||||
zk_initial_ltime = files_cache.ltime
|
||||
upb_cache = sched.abide.getUnparsedBranchCache(
|
||||
"review.example.com/org/project2", "master")
|
||||
self.assertEqual(zk_initial_ltime, upb_cache.ltime)
|
||||
|
||||
# Get the current ltime from Zookeeper and run a full reconfiguration,
|
||||
# so that we know all items in the cache have a larger ltime.
|
||||
ltime = self.zk_client.getCurrentLtime()
|
||||
self.scheds.first.fullReconfigure()
|
||||
self.assertGreater(files_cache.ltime, zk_initial_ltime)
|
||||
upb_cache = sched.abide.getUnparsedBranchCache(
|
||||
"review.example.com/org/project2", "master")
|
||||
self.assertEqual(files_cache.ltime, upb_cache.ltime)
|
||||
|
||||
# Clear the unparsed branch cache so all projects (except for
|
||||
# org/project2) are retrieved from the cache in Zookeeper.
|
||||
@@ -619,11 +638,37 @@ class TestUnparsedConfigCache(ZuulTestCase):
|
||||
sched.management_events[tenant.name].put(event, needs_result=False)
|
||||
self.waitUntilSettled()
|
||||
|
||||
# As the cache should be valid, we only expect a cat job for
|
||||
# org/project2
|
||||
# As the cache should be valid (cache ltime of org/project2 newer than
|
||||
# event ltime) we don't expect any cat jobs.
|
||||
cat_jobs = [job for job in self.gearman_server.jobs_history
|
||||
if job.name == b"merger:cat"]
|
||||
self.assertEqual(len(cat_jobs), 1)
|
||||
self.assertEqual(len(cat_jobs), 0)
|
||||
|
||||
# Set canary value so we can detect if the configloader used
|
||||
# the cache in Zookeeper (it shouldn't).
|
||||
common_cache = cache.getFilesCache("review.example.com/common-config",
|
||||
"master")
|
||||
common_cache.setValidFor({"CANARY"}, set(), common_cache.ltime)
|
||||
self.gearman_server.jobs_history.clear()
|
||||
|
||||
# Create a tenant reconfiguration event with a known ltime that is
|
||||
# smaller than the ltime of the items in the cache.
|
||||
event = model.TenantReconfigureEvent(
|
||||
tenant.name, project.canonical_name, branch_name=None)
|
||||
event.zuul_event_ltime = ltime
|
||||
sched.management_events[tenant.name].put(event, needs_result=False)
|
||||
self.waitUntilSettled()
|
||||
|
||||
upb_cache = sched.abide.getUnparsedBranchCache(
|
||||
"review.example.com/common-config", "master")
|
||||
self.assertEqual(common_cache.ltime, upb_cache.ltime)
|
||||
self.assertNotIn("CANARY", upb_cache.extra_files_searched)
|
||||
|
||||
# As the cache should be valid (cache ltime of org/project2 newer than
|
||||
# event ltime) we don't expect any cat jobs.
|
||||
cat_jobs = [job for job in self.gearman_server.jobs_history
|
||||
if job.name == b"merger:cat"]
|
||||
self.assertEqual(len(cat_jobs), 0)
|
||||
sched.apsched.start()
|
||||
|
||||
|
||||
|
||||
@@ -8805,6 +8805,7 @@ class TestSchedulerSmartReconfiguration(ZuulTestCase):
|
||||
|
||||
self.newTenantConfig('config/multi-tenant/main-reconfig.yaml')
|
||||
|
||||
self.gearman_server.jobs_history.clear()
|
||||
self.scheds.execute(
|
||||
lambda app: app.smartReconfigure(command_socket=command_socket))
|
||||
|
||||
@@ -8823,8 +8824,15 @@ class TestSchedulerSmartReconfiguration(ZuulTestCase):
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
|
||||
# Ensure that tenant-one has not been reconfigured
|
||||
self.waitUntilSettled()
|
||||
|
||||
# We're only adding two new repos, so we should only need to
|
||||
# issue 2 cat jobs.
|
||||
cat_jobs = [job for job in self.gearman_server.jobs_history
|
||||
if job.name == b"merger:cat"]
|
||||
self.assertEqual(len(cat_jobs), 2)
|
||||
|
||||
# Ensure that tenant-one has not been reconfigured
|
||||
new_one = self.scheds.first.sched.tenant_layout_state.get(
|
||||
'tenant-one', EMPTY_LAYOUT_STATE)
|
||||
self.assertEqual(old_one, new_one)
|
||||
|
||||
@@ -1538,7 +1538,7 @@ class TenantParser(object):
|
||||
}
|
||||
return vs.Schema(tenant)
|
||||
|
||||
def fromYaml(self, abide, conf, ansible_manager, cache_ltime=None):
|
||||
def fromYaml(self, abide, conf, ansible_manager, min_ltimes=None):
|
||||
self.getSchema()(conf)
|
||||
tenant = model.Tenant(conf['name'])
|
||||
pcontext = ParseContext(self.connections, self.scheduler,
|
||||
@@ -1596,7 +1596,7 @@ class TenantParser(object):
|
||||
# Start by fetching any YAML needed by this tenant which isn't
|
||||
# already cached. Full reconfigurations start with an empty
|
||||
# cache.
|
||||
self._cacheTenantYAML(abide, tenant, loading_errors, cache_ltime)
|
||||
self._cacheTenantYAML(abide, tenant, loading_errors, min_ltimes)
|
||||
|
||||
# Then collect the appropriate YAML based on this tenant
|
||||
# config.
|
||||
@@ -1762,7 +1762,7 @@ class TenantParser(object):
|
||||
|
||||
return config_projects, untrusted_projects
|
||||
|
||||
def _cacheTenantYAML(self, abide, tenant, loading_errors, cache_ltime):
|
||||
def _cacheTenantYAML(self, abide, tenant, loading_errors, min_ltimes):
|
||||
jobs = []
|
||||
for project in itertools.chain(
|
||||
tenant.config_projects, tenant.untrusted_projects):
|
||||
@@ -1773,11 +1773,6 @@ class TenantParser(object):
|
||||
# in-repo configuration apply only to that branch.
|
||||
branches = tenant.getProjectBranches(project)
|
||||
for branch in branches:
|
||||
branch_cache = abide.getUnparsedBranchCache(
|
||||
project.canonical_name, branch)
|
||||
if branch_cache.isValidFor(tpc):
|
||||
# We already have this branch cached.
|
||||
continue
|
||||
if not tpc.load_classes:
|
||||
# If all config classes are excluded then do not
|
||||
# request any getFiles jobs.
|
||||
@@ -1785,19 +1780,24 @@ class TenantParser(object):
|
||||
|
||||
source_context = model.SourceContext(
|
||||
project, branch, '', False)
|
||||
if cache_ltime is not None:
|
||||
if min_ltimes is not None:
|
||||
files_cache = self.unparsed_config_cache.getFilesCache(
|
||||
project.canonical_name, branch)
|
||||
with self.unparsed_config_cache.readLock(
|
||||
project.canonical_name):
|
||||
if files_cache.isValidFor(tpc, cache_ltime):
|
||||
pb_ltime = min_ltimes[project.canonical_name][branch]
|
||||
if files_cache.isValidFor(tpc, pb_ltime):
|
||||
self.log.debug(
|
||||
"Using files from cache for project %s @%s",
|
||||
project.canonical_name, branch)
|
||||
branch_cache = abide.getUnparsedBranchCache(
|
||||
project.canonical_name, branch)
|
||||
if branch_cache.isValidFor(tpc, files_cache.ltime):
|
||||
# Unparsed branch cache is already up-to-date
|
||||
continue
|
||||
self._updateUnparsedBranchCache(
|
||||
abide, tenant, source_context, files_cache,
|
||||
loading_errors)
|
||||
branch_cache.setValidFor(tpc)
|
||||
loading_errors, files_cache.ltime)
|
||||
continue
|
||||
|
||||
extra_config_files = abide.getExtraConfigFiles(project.name)
|
||||
@@ -1817,7 +1817,6 @@ class TenantParser(object):
|
||||
job.ltime = ltime
|
||||
job.source_context = source_context
|
||||
jobs.append(job)
|
||||
branch_cache.setValidFor(tpc)
|
||||
|
||||
for job in jobs:
|
||||
self.log.debug("Waiting for cat job %s" % (job,))
|
||||
@@ -1832,7 +1831,8 @@ class TenantParser(object):
|
||||
(job, job.files.keys()))
|
||||
|
||||
self._updateUnparsedBranchCache(abide, tenant, job.source_context,
|
||||
job.files, loading_errors)
|
||||
job.files, loading_errors,
|
||||
job.ltime)
|
||||
|
||||
# Save all config files in Zookeeper (not just for the current tpc)
|
||||
files_cache = self.unparsed_config_cache.getFilesCache(
|
||||
@@ -1848,9 +1848,15 @@ class TenantParser(object):
|
||||
job.ltime)
|
||||
|
||||
def _updateUnparsedBranchCache(self, abide, tenant, source_context, files,
|
||||
loading_errors):
|
||||
loading_errors, ltime):
|
||||
loaded = False
|
||||
tpc = tenant.project_configs[source_context.project.canonical_name]
|
||||
# Make sure we are clearing the local cache before updating it.
|
||||
abide.clearUnparsedBranchCache(source_context.project.canonical_name,
|
||||
source_context.branch)
|
||||
branch_cache = abide.getUnparsedBranchCache(
|
||||
source_context.project.canonical_name,
|
||||
source_context.branch)
|
||||
for conf_root in (
|
||||
('zuul.yaml', 'zuul.d', '.zuul.yaml', '.zuul.d') +
|
||||
tpc.extra_config_files + tpc.extra_config_dirs):
|
||||
@@ -1875,10 +1881,8 @@ class TenantParser(object):
|
||||
(source_context,))
|
||||
incdata = self.loadProjectYAML(
|
||||
files[fn], source_context, loading_errors)
|
||||
branch_cache = abide.getUnparsedBranchCache(
|
||||
source_context.project.canonical_name,
|
||||
source_context.branch)
|
||||
branch_cache.put(source_context.path, incdata)
|
||||
branch_cache.setValidFor(tpc, ltime)
|
||||
|
||||
def _loadTenantYAML(self, abide, tenant, loading_errors):
|
||||
config_projects_config = model.UnparsedConfig()
|
||||
@@ -2276,14 +2280,80 @@ class ConfigLoader(object):
|
||||
abide.addUntrustedTPC(tenant_name, tpc)
|
||||
|
||||
def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide,
|
||||
cache_ltime=None):
|
||||
min_ltimes=None):
|
||||
"""(Re-)load a single tenant.
|
||||
|
||||
Description of cache stages:
|
||||
|
||||
We have a local unparsed branch cache on each scheduler and the
|
||||
global config cache in Zookeeper. Depending on the event that
|
||||
triggers (re-)loading of a tenant we must make sure that those
|
||||
caches are considered valid or invalid correctly.
|
||||
|
||||
If provided, the ``min_ltimes`` argument is expected to be a
|
||||
nested dictionary with the project-branches. The value defines
|
||||
the minimum logical time that is required for a cached config to
|
||||
be considered valid::
|
||||
|
||||
{
|
||||
"example.com/org/project": {
|
||||
"master": 12234,
|
||||
"stable": -1,
|
||||
},
|
||||
"example.com/common-config": {
|
||||
"master": -1,
|
||||
},
|
||||
...
|
||||
}
|
||||
|
||||
There are four scenarios to consider when loading a tenant.
|
||||
|
||||
1. Processing a tenant reconfig event:
|
||||
- The min. ltime for the changed project(-branches) will be
|
||||
set to the event's ``zuul_event_ltime`` (to establish a
|
||||
happened-before relation in respect to the config change).
|
||||
The min. ltime for all other project-branches will be -1.
|
||||
- Config for needed project-branch(es) is updated via cat job
|
||||
if the cache is not valid (cache ltime < min. ltime).
|
||||
- Cache in Zookeeper and local unparsed branch cache is
|
||||
updated. The ltime of the cache will be the timestamp
|
||||
created shortly before requesting the config via the
|
||||
mergers (only for outdated items).
|
||||
2. Processing a FULL reconfiguration event:
|
||||
- The min. ltime for all project-branches is given as the
|
||||
``zuul_event_ltime`` of the reconfiguration event.
|
||||
- Config for needed project-branch(es) is updated via cat job
|
||||
if the cache is not valid (cache ltime < min. ltime).
|
||||
Otherwise the local unparsed branch cache or the global
|
||||
config cache in Zookeeper is used.
|
||||
- Cache in Zookeeper and local unparsed branch cache is
|
||||
updated, with the ltime shortly before requesting the
|
||||
config via the mergers (only for outdated items).
|
||||
3. Processing a SMART reconfiguration event:
|
||||
- The min. ltime for all project-branches is given as -1 in
|
||||
order to use cached data wherever possible.
|
||||
- Config for new project-branch(es) is updated via cat job if
|
||||
the project is not yet cached. Otherwise the local unparsed
|
||||
branch cache or the global config cache in Zookeper is
|
||||
used.
|
||||
- Cache in Zookeeper and local unparsed branch cache is
|
||||
updated, with the ltime shortly before requesting the
|
||||
config via the mergers (only for new items).
|
||||
4. (Re-)loading a tenant due to a changed layout (happens after
|
||||
an event according to one of the other scenarios was
|
||||
processed on another scheduler):
|
||||
- The min. ltime for all project-branches is given as -1 in
|
||||
order to only use cached config.
|
||||
- Local unparsed branch cache is updated if needed.
|
||||
|
||||
"""
|
||||
if tenant_name not in unparsed_abide.tenants:
|
||||
del abide.tenants[tenant_name]
|
||||
return None
|
||||
|
||||
unparsed_config = unparsed_abide.tenants[tenant_name]
|
||||
new_tenant = self.tenant_parser.fromYaml(
|
||||
abide, unparsed_config, ansible_manager, cache_ltime)
|
||||
abide, unparsed_config, ansible_manager, min_ltimes)
|
||||
abide.tenants[tenant_name] = new_tenant
|
||||
if len(new_tenant.layout.loading_errors):
|
||||
self.log.warning(
|
||||
|
||||
@@ -3787,6 +3787,8 @@ class TenantReconfigureEvent(ManagementEvent):
|
||||
if self.tenant_name != other.tenant_name:
|
||||
raise Exception("Can not merge events from different tenants")
|
||||
self.project_branches |= other.project_branches
|
||||
self.zuul_event_ltime = max(self.zuul_event_ltime,
|
||||
other.zuul_event_ltime)
|
||||
self.merged_events.append(other)
|
||||
|
||||
def toDict(self):
|
||||
@@ -5428,19 +5430,23 @@ class UnparsedBranchCache(object):
|
||||
self.extra_files_searched = set()
|
||||
self.extra_dirs_searched = set()
|
||||
self.files = {}
|
||||
self.ltime = -1
|
||||
|
||||
def isValidFor(self, tpc):
|
||||
def isValidFor(self, tpc, min_ltime):
|
||||
"""Return True if this has valid cache results for the extra
|
||||
files/dirs in the tpc.
|
||||
"""
|
||||
if self.load_skipped:
|
||||
return False
|
||||
if self.ltime < min_ltime:
|
||||
return False
|
||||
if (set(tpc.extra_config_files) <= self.extra_files_searched and
|
||||
set(tpc.extra_config_dirs) <= self.extra_dirs_searched):
|
||||
return True
|
||||
return False
|
||||
|
||||
def setValidFor(self, tpc):
|
||||
def setValidFor(self, tpc, ltime):
|
||||
self.ltime = ltime
|
||||
self.load_skipped = False
|
||||
self.extra_files_searched |= set(tpc.extra_config_files)
|
||||
self.extra_dirs_searched |= set(tpc.extra_config_dirs)
|
||||
|
||||
@@ -24,6 +24,7 @@ import threading
|
||||
import time
|
||||
import traceback
|
||||
import urllib
|
||||
from collections import defaultdict
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
@@ -628,7 +629,9 @@ class Scheduler(threading.Thread):
|
||||
|
||||
def prime(self, config):
|
||||
self.log.debug("Priming scheduler config")
|
||||
self._doReconfigureEvent(ReconfigureEvent())
|
||||
event = ReconfigureEvent()
|
||||
event.zuul_event_ltime = self.zk_client.getCurrentLtime()
|
||||
self._doReconfigureEvent(event)
|
||||
self.log.debug("Config priming complete")
|
||||
self.last_reconfigured = int(time.time())
|
||||
|
||||
@@ -636,6 +639,7 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Submitting reconfiguration event")
|
||||
|
||||
event = ReconfigureEvent(smart=smart)
|
||||
event.zuul_event_ltime = self.zk_client.getCurrentLtime()
|
||||
event.ack_ref = threading.Event()
|
||||
self.reconfigure_event_queue.put(event)
|
||||
self.wake_event.set()
|
||||
@@ -856,7 +860,7 @@ class Scheduler(threading.Thread):
|
||||
loader.loadTPCs(abide, unparsed_abide)
|
||||
for tenant_name in tenants_to_load:
|
||||
loader.loadTenant(abide, tenant_name, self.ansible_manager,
|
||||
unparsed_abide, cache_ltime=None)
|
||||
unparsed_abide, min_ltimes=None)
|
||||
|
||||
loading_errors = []
|
||||
for tenant in abide.tenants.values():
|
||||
@@ -908,12 +912,6 @@ class Scheduler(threading.Thread):
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
loader.loadAdminRules(self.abide, self.unparsed_abide)
|
||||
|
||||
cache_ltime = event.zuul_event_ltime
|
||||
if not event.smart:
|
||||
self.abide.unparsed_project_branch_cache.clear()
|
||||
# Force a reload of the config via the mergers
|
||||
cache_ltime = None
|
||||
|
||||
for tenant_name in tenant_names:
|
||||
if event.smart:
|
||||
old_tenant = old_unparsed_abide.tenants.get(tenant_name)
|
||||
@@ -922,11 +920,19 @@ class Scheduler(threading.Thread):
|
||||
continue
|
||||
|
||||
old_tenant = self.abide.tenants.get(tenant_name)
|
||||
if event.smart:
|
||||
# Consider caches always valid
|
||||
min_ltimes = defaultdict(
|
||||
lambda: defaultdict(lambda: -1))
|
||||
else:
|
||||
# Consider caches valid if the cache ltime >= event ltime
|
||||
min_ltimes = defaultdict(
|
||||
lambda: defaultdict(lambda: event.zuul_event_ltime))
|
||||
with tenant_write_lock(self.zk_client, tenant_name):
|
||||
tenant = loader.loadTenant(self.abide, tenant_name,
|
||||
self.ansible_manager,
|
||||
self.unparsed_abide,
|
||||
cache_ltime=cache_ltime)
|
||||
min_ltimes=min_ltimes)
|
||||
reconfigured_tenants.append(tenant_name)
|
||||
if tenant is not None:
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
@@ -947,26 +953,28 @@ class Scheduler(threading.Thread):
|
||||
"projects %s",
|
||||
event.tenant_name, event.project_branches)
|
||||
start = time.monotonic()
|
||||
# If a change landed to a project, clear out the cached
|
||||
# config of the changed branch before reconfiguring.
|
||||
# Consider all caches valid (min. ltime -1) except for the
|
||||
# changed project-branches.
|
||||
min_ltimes = defaultdict(lambda: defaultdict(lambda: -1))
|
||||
for project_name, branch_name in event.project_branches:
|
||||
self.log.debug("Clearing unparsed config: %s @%s",
|
||||
project_name, branch_name)
|
||||
self.abide.clearUnparsedBranchCache(project_name,
|
||||
branch_name)
|
||||
with self.unparsed_config_cache.writeLock(project_name):
|
||||
self.unparsed_config_cache.clearCache(project_name,
|
||||
branch_name)
|
||||
if branch_name is None:
|
||||
min_ltimes[project_name] = defaultdict(
|
||||
lambda: event.zuul_event_ltime)
|
||||
else:
|
||||
min_ltimes[project_name][
|
||||
branch_name
|
||||
] = event.zuul_event_ltime
|
||||
|
||||
loader = configloader.ConfigLoader(
|
||||
self.connections, self, self.merger, self.keystore)
|
||||
old_tenant = self.abide.tenants.get(event.tenant_name)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide,
|
||||
[event.tenant_name])
|
||||
|
||||
with tenant_write_lock(self.zk_client, event.tenant_name):
|
||||
loader.loadTenant(self.abide, event.tenant_name,
|
||||
self.ansible_manager, self.unparsed_abide,
|
||||
cache_ltime=event.zuul_event_ltime)
|
||||
min_ltimes=min_ltimes)
|
||||
tenant = self.abide.tenants[event.tenant_name]
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
|
||||
Reference in New Issue
Block a user