Use tenant-level layout locks

The current "layout_lock" in the scheduler is really an "abide" lock.
We lock it every time we change something in the abide (including
tenant layouts).  The name is inherited from pre-multi-tenant Zuul.

This can cause some less-than-optimal behavior when we need to wait to
acquire the "layout_lock" for a tenant reconfiguration event in one
thread while another thread holds the same lock because it is
reloading the configuration for a different tenant.  Ideally we should
be able to have finer-grained tenant-level locking instead, allowing
for less time waiting to reconfigure.

The following sections describe the layout lock use prior to this
commit and how this commit adjusts the code to make it safe for
finer-grained locking.

1) Tenant iteration

The layout lock is used in some places (notably some cleanup methods)
to avoid having the tenant list change during the method.  However,
the configloader already performs an atomic replacement of the tenant
list making it safe for iteration.  This change adds a lock around
updates to the tenant list to prevent corruption if two threads update
it at the same time.

The semaphore cleanup method indirectly references the abide and
layout for use in global and local semaphores.  This is just for path
construction, and the semaphores exist apart from the abide and layout
configurations and so should not be affected by either changing while
the cleanup method is running.

The node request cleanup method could end up running with an outdated
layout objects, including pipelines, however it should not be a
problem if these orphaned objects end up refreshing data from ZK right
before they are removed.

In these cases, we can simply remove the layout lock.

2) Protecting the unparsed project branch cache

The config cache cleanup method uses the unparsed project branch cache
(that is, the in-memory cache of the contents of zuul config files) to
determine what the active projects are.

Within the configloader, the cache is updated and then also used while
loading tenant configuration.  The layout lock would have made sure
all of these actions were mutually exclusive.  In order to remove the
layout lock here, we need to make sure the Abide's
unparsed_project_branch_cache is safe for concurrent updates.

The unparsed_project_branch_cache attribute is a dictionary that
conains references to UnparsedBranchCache objects.  Previously, the
configloader would delete each UnparsedBranchCache object from the
dictionary, reinitialize it, then incrementially add to it.

This current process has a benign flaw.  The branch cache is cleared,
and then loaded with data based on the tenant project config (TPC)
currently being processed.  Because the cache is loaded based on data
from the TPC, it is really only valid for one tenant at a time despite
our intention that it be valid for the entire abide.  However, since
we do check whether it is valid for a given TPC, and then clear and
reload it if it is not, there is no error in data, merely an
incomplete utilization of the cache.

In order to make the cache safe for use by different tenants at the
same time, we address this problem (and effectively make it so that it
is also *effective* for different tenants, even at different times).
The cache is updated to store the ltime for each entry in the cache,
and also to store null entries (with ltimes) for files and paths that
have been checked but are not present in the project-cache.  This
means that at any given time we can determine whether the cache is
valid for a given TPC, and support multiple TPCs (i.e., multiple
tenants).

It's okay for the cache to be updated simultaneously by two tenants
since we don't allow the cache contents to go backwards in ltime.  The
cache will either have the data with at least the ltime required, or
if not, that particular tenant load will spawn cat jobs and update it.

3) Protecting Tenant Project Configs (TPCs)

The loadTPC method on the ConfigLoader would similarly clear the TPCs
for a tenant, then add them back.  This could be problematic for any
other thread which might be referencing or iterating over TPCs.  To
correct this, we take a similar approach of atomic replacement.

Because there are two flavors of TPCs (config and untrusted) and they
are stored in two separate dictionaries, in order to atomically update
a complete tenant at once, the storage hierarchy is restructured as
"tenant -> {config/untrusted} -> project" rather than
"{config/untrusted} -> tenant -> project".  A new class named
TenantTPCRegistry holds both flavors of TPCs for a given tenant, and
it is this object that is atomically replaced.

Now that these issues are dealt with, we can implement a tenant-level
thread lock that is used simply to ensure that two threads don't
update the configuration for the same tenant at the same time.

The scheduler's unparsed abide is updated in two places: upon full
reconfiguration, or when another scheduler has performed a full
reconfiguration and updated the copy in ZK.  To prevent these two
methods from performing the same update simultaneously, we add an
"unparsed_abide_lock" and mutually exclude them.

Change-Id: Ifba261b206db85611c16bab6157f8d1f4349535d
This commit is contained in:
James E. Blair 2023-08-07 15:19:34 -07:00
parent 210ca5d235
commit eb803984a0
5 changed files with 368 additions and 284 deletions

View File

@ -808,7 +808,8 @@ class TestUnparsedConfigCache(ZuulTestCase):
self.assertEqual(len(common_cache), 1)
self.assertIn("zuul.yaml", common_cache)
self.assertTrue(len(common_cache["zuul.yaml"]) > 0)
self.assertEqual(upb_common_cache.ltime, common_cache.ltime)
self.assertEqual(upb_common_cache.entries['zuul.yaml'].ltime,
common_cache.ltime)
project_cache = cache.getFilesCache("review.example.com/org/project",
"master")
@ -818,7 +819,8 @@ class TestUnparsedConfigCache(ZuulTestCase):
tpc = tenant.project_configs["review.example.com/org/project"]
self.assertTrue(project_cache.isValidFor(tpc, min_ltime=-1))
self.assertEqual(len(project_cache), 0)
self.assertEqual(upb_project_cache.ltime, project_cache.ltime)
self.assertEqual(upb_project_cache.entries['zuul.yaml'].ltime,
project_cache.ltime)
def test_cache_use(self):
sched = self.scheds.first.sched
@ -834,7 +836,8 @@ class TestUnparsedConfigCache(ZuulTestCase):
zk_initial_ltime = files_cache.ltime
upb_cache = sched.abide.getUnparsedBranchCache(
"review.example.com/org/project2", "master")
self.assertEqual(zk_initial_ltime, upb_cache.ltime)
self.assertEqual(zk_initial_ltime,
upb_cache.entries['zuul.yaml'].ltime)
# Get the current ltime from Zookeeper and run a full reconfiguration,
# so that we know all items in the cache have a larger ltime.
@ -843,7 +846,8 @@ class TestUnparsedConfigCache(ZuulTestCase):
self.assertGreater(files_cache.ltime, zk_initial_ltime)
upb_cache = sched.abide.getUnparsedBranchCache(
"review.example.com/org/project2", "master")
self.assertEqual(files_cache.ltime, upb_cache.ltime)
self.assertEqual(files_cache.ltime,
upb_cache.entries['zuul.yaml'].ltime)
# Clear the unparsed branch cache so all projects (except for
# org/project2) are retrieved from the cache in Zookeeper.
@ -879,8 +883,9 @@ class TestUnparsedConfigCache(ZuulTestCase):
upb_cache = sched.abide.getUnparsedBranchCache(
"review.example.com/common-config", "master")
self.assertEqual(common_cache.ltime, upb_cache.ltime)
self.assertNotIn("CANARY", upb_cache.extra_files_searched)
self.assertEqual(common_cache.ltime,
upb_cache.entries['zuul.yaml'].ltime)
self.assertNotIn("CANARY", upb_cache.entries)
# As the cache should be valid (cache ltime of org/project2 newer than
# event ltime) we don't expect any cat jobs.

View File

@ -1932,10 +1932,11 @@ class TenantParser(object):
tenant.unparsed_config = conf
# tpcs is TenantProjectConfigs
config_tpcs = abide.getConfigTPCs(tenant.name)
tpc_registry = abide.getTPCRegistry(tenant.name)
config_tpcs = tpc_registry.getConfigTPCs()
for tpc in config_tpcs:
tenant.addConfigProject(tpc)
untrusted_tpcs = abide.getUntrustedTPCs(tenant.name)
untrusted_tpcs = tpc_registry.getUntrustedTPCs()
for tpc in untrusted_tpcs:
tenant.addUntrustedProject(tpc)
@ -2338,9 +2339,10 @@ class TenantParser(object):
pb_ltime = min_ltimes[project.canonical_name][branch]
# If our unparsed branch cache is valid for the
# time, then we don't need to do anything else.
if branch_cache.isValidFor(tpc, pb_ltime):
min_ltimes[project.canonical_name][branch] =\
branch_cache.ltime
bc_ltime = branch_cache.getValidFor(tpc, ZUUL_CONF_ROOT,
pb_ltime)
if bc_ltime is not None:
min_ltimes[project.canonical_name][branch] = bc_ltime
return
except KeyError:
self.log.exception(
@ -2433,9 +2435,6 @@ class TenantParser(object):
loading_errors, ltime, min_ltimes):
loaded = False
tpc = tenant.project_configs[source_context.project_canonical_name]
# Make sure we are clearing the local cache before updating it.
abide.clearUnparsedBranchCache(source_context.project_canonical_name,
source_context.branch)
branch_cache = abide.getUnparsedBranchCache(
source_context.project_canonical_name,
source_context.branch)
@ -2449,14 +2448,16 @@ class TenantParser(object):
or (conf_root in valid_dirs
and fn.startswith(f"{conf_root}/"))):
continue
# Don't load from more than one configuration in a
# project-branch (unless an "extra" file/dir).
# Warn if there is more than one configuration in a
# project-branch (unless an "extra" file/dir). We
# continue to add the data to the cache for use by
# other tenants, but we will filter it out when we
# retrieve it later.
fn_root = fn.split('/')[0]
if (fn_root in ZUUL_CONF_ROOT):
if (loaded and loaded != conf_root):
self.log.warning("Multiple configuration files in %s",
source_context)
continue
loaded = conf_root
# Create a new source_context so we have unique filenames.
source_context = source_context.copy()
@ -2466,11 +2467,11 @@ class TenantParser(object):
(source_context,))
incdata = self.loadProjectYAML(
files[fn], source_context, loading_errors)
branch_cache.put(source_context.path, incdata)
branch_cache.setValidFor(tpc, ltime)
branch_cache.put(source_context.path, incdata, ltime)
branch_cache.setValidFor(tpc, ZUUL_CONF_ROOT, ltime)
if min_ltimes is not None:
min_ltimes[source_context.project_canonical_name][
source_context.branch] = branch_cache.ltime
source_context.branch] = ltime
def _loadTenantYAML(self, abide, tenant, loading_errors):
config_projects_config = model.UnparsedConfig()
@ -2482,7 +2483,7 @@ class TenantParser(object):
branch_cache = abide.getUnparsedBranchCache(
project.canonical_name, branch)
tpc = tenant.project_configs[project.canonical_name]
unparsed_branch_config = branch_cache.get(tpc)
unparsed_branch_config = branch_cache.get(tpc, ZUUL_CONF_ROOT)
if unparsed_branch_config:
unparsed_branch_config = self.filterConfigProjectYAML(
@ -2496,7 +2497,7 @@ class TenantParser(object):
branch_cache = abide.getUnparsedBranchCache(
project.canonical_name, branch)
tpc = tenant.project_configs[project.canonical_name]
unparsed_branch_config = branch_cache.get(tpc)
unparsed_branch_config = branch_cache.get(tpc, ZUUL_CONF_ROOT)
if unparsed_branch_config:
unparsed_branch_config = self.filterUntrustedProjectYAML(
unparsed_branch_config, loading_errors)
@ -2907,15 +2908,18 @@ class ConfigLoader(object):
# project's config files (incl. tenant specific extra config) at once.
with ThreadPoolExecutor(max_workers=4) as executor:
for tenant_name, unparsed_config in tenants_to_load.items():
tpc_registry = model.TenantTPCRegistry()
config_tpcs, untrusted_tpcs = (
self.tenant_parser.loadTenantProjects(unparsed_config,
executor)
)
abide.clearTPCs(tenant_name)
for tpc in config_tpcs:
abide.addConfigTPC(tenant_name, tpc)
tpc_registry.addConfigTPC(tpc)
for tpc in untrusted_tpcs:
abide.addUntrustedTPC(tenant_name, tpc)
tpc_registry.addUntrustedTPC(tpc)
# This atomic replacement of TPCs means that we don't need to
# lock the abide.
abide.setTPCRegistry(tenant_name, tpc_registry)
def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide,
min_ltimes=None, layout_uuid=None,
@ -2988,9 +2992,10 @@ class ConfigLoader(object):
"""
if tenant_name not in unparsed_abide.tenants:
# Copy tenants dictionary to not break concurrent iterations.
tenants = abide.tenants.copy()
del tenants[tenant_name]
abide.tenants = tenants
with abide.tenant_lock:
tenants = abide.tenants.copy()
del tenants[tenant_name]
abide.tenants = tenants
return None
unparsed_config = unparsed_abide.tenants[tenant_name]
@ -3000,9 +3005,10 @@ class ConfigLoader(object):
min_ltimes, layout_uuid, branch_cache_min_ltimes,
ignore_cat_exception)
# Copy tenants dictionary to not break concurrent iterations.
tenants = abide.tenants.copy()
tenants[tenant_name] = new_tenant
abide.tenants = tenants
with abide.tenant_lock:
tenants = abide.tenants.copy()
tenants[tenant_name] = new_tenant
abide.tenants = tenants
if len(new_tenant.layout.loading_errors):
self.log.warning(
"%s errors detected during %s tenant configuration loading",

View File

@ -21,6 +21,7 @@ import hashlib
import logging
import os
from functools import partial, total_ordering
import threading
import re2
import time
@ -8738,40 +8739,27 @@ class Tenant(object):
return Attributes(name=self.name)
class UnparsedBranchCache(object):
# data can be None here to mean "this path has been checked and as of
# ltime there was nothing in the repo at that path".
UnparsedBranchCacheEntry = namedtuple("UnparsedBranchCacheEntry",
["ltime", "data"])
class UnparsedBranchCache:
"""Cache information about a single branch"""
def __init__(self):
self.load_skipped = True
self.extra_files_searched = set()
self.extra_dirs_searched = set()
self.files = {}
self.ltime = -1
# This is a dict of path -> UnparsedBranchCacheEntry items.
# If a path exists here, it means it has been checked as of
# UnparsedBranchCacheEntry.ltime if anything was found, then
# UnparsedBranchCacheEntry.data will have the contents. If it
# was checked and no data was found, then
# UnparsedBranchCacheEntry.data well be None.
# If it has not been checked, then there will be no entry.
self.entries = {}
def isValidFor(self, tpc, min_ltime):
"""Return True if this has valid cache results for the extra
files/dirs in the tpc.
"""
if self.load_skipped:
return False
if self.ltime < min_ltime:
return False
if (set(tpc.extra_config_files) <= self.extra_files_searched and
set(tpc.extra_config_dirs) <= self.extra_dirs_searched):
return True
return False
def setValidFor(self, tpc, ltime):
self.ltime = ltime
self.load_skipped = False
self.extra_files_searched |= set(tpc.extra_config_files)
self.extra_dirs_searched |= set(tpc.extra_config_dirs)
def put(self, path, config):
self.files[path] = config
def get(self, tpc):
ret = UnparsedConfig()
files_list = self.files.keys()
def _getPaths(self, tpc):
# Return a list of paths we have entries for that match the TPC.
files_list = self.entries.keys()
fns1 = []
fns2 = []
fns3 = []
@ -8789,51 +8777,152 @@ class UnparsedBranchCache(object):
fns4.append(fn)
fns = (["zuul.yaml"] + sorted(fns1) + [".zuul.yaml"] +
sorted(fns2) + fns3 + sorted(fns4))
for fn in fns:
data = self.files.get(fn)
if data is not None:
ret.extend(data)
return fns
def getValidFor(self, tpc, conf_root, min_ltime):
"""Return the oldest ltime if this has valid cache results for the
extra files/dirs in the tpc. Otherwise, return None.
"""
oldest_ltime = None
for path in conf_root + tpc.extra_config_files + tpc.extra_config_dirs:
entry = self.entries.get(path)
if entry is None:
return None
if entry.ltime < min_ltime:
return None
if oldest_ltime is None or entry.ltime < oldest_ltime:
oldest_ltime = entry.ltime
return oldest_ltime
def setValidFor(self, tpc, conf_root, ltime):
"""Indicate that the cache has just been made current for the given
TPC as of ltime"""
seen = set()
# Identify any entries we have that match the TPC, and remove
# them if they are not up to date.
for path in self._getPaths(tpc):
entry = self.entries.get(path)
if entry is None:
# Probably "zuul.yaml" or similar hardcoded path that
# is unused.
continue
else:
# If this is a real entry (i.e., actually has data
# from the file cache), then mark it as seen so we
# don't create a dummy entry for it later, and also
# check to see if it can be pruned.
if entry.data is not None:
seen.add(path)
if entry.ltime < ltime:
# This is an old entry which is not in the present
# update but should have been if it existed in the
# repo. That means it was deleted and we can
# remove it.
del self.entries[path]
# Set the ltime for any paths that did not appear in our list
# (so that we know they have been checked and the cache is
# valid for that path+ltime).
for path in conf_root + tpc.extra_config_files + tpc.extra_config_dirs:
if path in seen:
continue
self.entries[path] = UnparsedBranchCacheEntry(ltime, None)
def put(self, path, data, ltime):
entry = self.entries.get(path)
if entry is not None:
if ltime < entry.ltime:
# We don't want the entry to go backward
return
self.entries[path] = UnparsedBranchCacheEntry(ltime, data)
def get(self, tpc, conf_root):
ret = UnparsedConfig()
loaded = False
for fn in self._getPaths(tpc):
entry = self.entries.get(fn)
if entry is not None and entry.data is not None:
# Don't load from more than one configuration in a
# project-branch (unless an "extra" file/dir).
fn_root = fn.split('/')[0]
if (fn_root in conf_root):
if (loaded and loaded != fn_root):
# "Multiple configuration files in source_context"
continue
loaded = fn_root
ret.extend(entry.data)
return ret
class TenantTPCRegistry:
def __init__(self):
# The project TPCs are stored as a list as we don't check for
# duplicate projects here.
self.config_tpcs = defaultdict(list)
self.untrusted_tpcs = defaultdict(list)
def addConfigTPC(self, tpc):
self.config_tpcs[tpc.project.name].append(tpc)
def addUntrustedTPC(self, tpc):
self.untrusted_tpcs[tpc.project.name].append(tpc)
def getConfigTPCs(self):
return list(itertools.chain.from_iterable(
self.config_tpcs.values()))
def getUntrustedTPCs(self):
return list(itertools.chain.from_iterable(
self.untrusted_tpcs.values()))
class Abide(object):
def __init__(self):
self.authz_rules = {}
self.semaphores = {}
self.tenants = {}
# tenant -> project -> list(tpcs)
# The project TPCs are stored as a list as we don't check for
# duplicate projects here.
self.config_tpcs = defaultdict(lambda: defaultdict(list))
self.untrusted_tpcs = defaultdict(lambda: defaultdict(list))
self.tenant_lock = threading.Lock()
# tenant -> TenantTPCRegistry
self.tpc_registry = defaultdict(TenantTPCRegistry)
# project -> branch -> UnparsedBranchCache
self.unparsed_project_branch_cache = {}
self.api_root = None
def addConfigTPC(self, tenant_name, tpc):
self.config_tpcs[tenant_name][tpc.project.name].append(tpc)
def clearTPCRegistry(self, tenant_name):
try:
del self.tpc_registry[tenant_name]
except KeyError:
pass
def getConfigTPCs(self, tenant_name):
return list(itertools.chain.from_iterable(
self.config_tpcs[tenant_name].values()))
def getTPCRegistry(self, tenant_name):
return self.tpc_registry[tenant_name]
def addUntrustedTPC(self, tenant_name, tpc):
self.untrusted_tpcs[tenant_name][tpc.project.name].append(tpc)
def setTPCRegistry(self, tenant_name, tpc_registry):
self.tpc_registry[tenant_name] = tpc_registry
def getUntrustedTPCs(self, tenant_name):
return list(itertools.chain.from_iterable(
self.untrusted_tpcs[tenant_name].values()))
def clearTPCs(self, tenant_name):
self.config_tpcs[tenant_name].clear()
self.untrusted_tpcs[tenant_name].clear()
def getAllTPCs(self, tenant_name):
# Hold a reference to the registry to make sure it doesn't
# change between the two calls below.
registry = self.tpc_registry[tenant_name]
return list(itertools.chain(
itertools.chain.from_iterable(
registry.config_tpcs.values()),
itertools.chain.from_iterable(
registry.untrusted_tpcs.values()),
))
def _allProjectTPCs(self, project_name):
# Flatten the lists of a project TPCs from all tenants
return itertools.chain.from_iterable(
tenant_tpcs.get(project_name, [])
for tenant_tpcs in itertools.chain(self.config_tpcs.values(),
self.untrusted_tpcs.values()))
# Force to a list to avoid iteration errors since the clear
# method can mutate the dictionary.
for tpc_registry in list(self.tpc_registry.values()):
for config_tpc in tpc_registry.config_tpcs.get(
project_name, []):
yield config_tpc
for untrusted_tpc in tpc_registry.untrusted_tpcs.get(
project_name, []):
yield untrusted_tpc
def getExtraConfigFiles(self, project_name):
"""Get all extra config files for a project accross tenants."""
@ -8858,22 +8947,7 @@ class Abide(object):
def getUnparsedBranchCache(self, canonical_project_name, branch):
project_branch_cache = self.unparsed_project_branch_cache.setdefault(
canonical_project_name, {})
cache = project_branch_cache.get(branch)
if cache is not None:
return cache
project_branch_cache[branch] = UnparsedBranchCache()
return project_branch_cache[branch]
def clearUnparsedBranchCache(self, canonical_project_name, branch=None):
if canonical_project_name in self.unparsed_project_branch_cache:
project_branch_cache = \
self.unparsed_project_branch_cache[canonical_project_name]
if branch in project_branch_cache:
del project_branch_cache[branch]
if len(project_branch_cache) == 0 or branch is None:
del self.unparsed_project_branch_cache[canonical_project_name]
return project_branch_cache.setdefault(branch, UnparsedBranchCache())
class Capabilities(object):

View File

@ -16,7 +16,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import logging
import socket
import sys
@ -221,8 +220,10 @@ class Scheduler(threading.Thread):
self.layout_update_event = threading.Event()
# Only used by tests in order to quiesce the layout update loop
self.layout_update_lock = threading.Lock()
# Don't change the abide without holding this lock
self.layout_lock = threading.Lock()
# Hold this lock when updating the local unparsed abide
self.unparsed_abide_lock = threading.Lock()
# Hold this lock when updating the local layout for a tenant
self.layout_lock = defaultdict(threading.Lock)
# Only used by tests in order to quiesce the main run loop
self.run_handler_lock = threading.Lock()
self.command_map = {
@ -645,33 +646,27 @@ class Scheduler(threading.Thread):
return
def _runSemaphoreCleanup(self):
# Get the layout lock to make sure the abide doesn't change
# under us.
with self.layout_lock:
if self.semaphore_cleanup_lock.acquire(blocking=False):
try:
self.log.debug("Starting semaphore cleanup")
for tenant in self.abide.tenants.values():
try:
tenant.semaphore_handler.cleanupLeaks()
except Exception:
self.log.exception("Error in semaphore cleanup:")
finally:
self.semaphore_cleanup_lock.release()
if self.semaphore_cleanup_lock.acquire(blocking=False):
try:
self.log.debug("Starting semaphore cleanup")
for tenant in self.abide.tenants.values():
try:
tenant.semaphore_handler.cleanupLeaks()
except Exception:
self.log.exception("Error in semaphore cleanup:")
finally:
self.semaphore_cleanup_lock.release()
def _runNodeRequestCleanup(self):
# Get the layout lock to make sure the abide doesn't change
# under us.
with self.layout_lock:
if self.node_request_cleanup_lock.acquire(blocking=False):
if self.node_request_cleanup_lock.acquire(blocking=False):
try:
self.log.debug("Starting node request cleanup")
try:
self.log.debug("Starting node request cleanup")
try:
self._cleanupNodeRequests()
except Exception:
self.log.exception("Error in node request cleanup:")
finally:
self.node_request_cleanup_lock.release()
self._cleanupNodeRequests()
except Exception:
self.log.exception("Error in node request cleanup:")
finally:
self.node_request_cleanup_lock.release()
def _cleanupNodeRequests(self):
# Get all the requests in ZK that belong to us
@ -726,19 +721,18 @@ class Scheduler(threading.Thread):
self.log.debug("Finished general cleanup")
def _runConfigCacheCleanup(self):
with self.layout_lock:
try:
self.log.debug("Starting config cache cleanup")
cached_projects = set(
self.unparsed_config_cache.listCachedProjects())
active_projects = set(
self.abide.unparsed_project_branch_cache.keys())
unused_projects = cached_projects - active_projects
for project_cname in unused_projects:
self.unparsed_config_cache.clearCache(project_cname)
self.log.debug("Finished config cache cleanup")
except Exception:
self.log.exception("Error in config cache cleanup:")
try:
self.log.debug("Starting config cache cleanup")
cached_projects = set(
self.unparsed_config_cache.listCachedProjects())
active_projects = set(
self.abide.unparsed_project_branch_cache.keys())
unused_projects = cached_projects - active_projects
for project_cname in unused_projects:
self.unparsed_config_cache.clearCache(project_cname)
self.log.debug("Finished config cache cleanup")
except Exception:
self.log.exception("Error in config cache cleanup:")
def _runExecutorApiCleanup(self):
try:
@ -810,21 +804,20 @@ class Scheduler(threading.Thread):
self.log.debug("Starting blob store cleanup")
try:
live_blobs = set()
with self.layout_lock:
# get the start ltime so that we can filter out any
# blobs used since this point
start_ltime = self.zk_client.getCurrentLtime()
# lock and refresh the pipeline
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
with (pipeline_lock(
self.zk_client, tenant.name,
pipeline.name) as lock,
self.createZKContext(lock, self.log) as ctx):
pipeline.state.refresh(ctx, read_only=True)
# add any blobstore references
for item in pipeline.getAllItems(include_old=True):
live_blobs.update(item.getBlobKeys())
# get the start ltime so that we can filter out any
# blobs used since this point
start_ltime = self.zk_client.getCurrentLtime()
# lock and refresh the pipeline
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
with (pipeline_lock(
self.zk_client, tenant.name,
pipeline.name) as lock,
self.createZKContext(lock, self.log) as ctx):
pipeline.state.refresh(ctx, read_only=True)
# add any blobstore references
for item in pipeline.getAllItems(include_old=True):
live_blobs.update(item.getBlobKeys())
with self.createZKContext(None, self.log) as ctx:
blobstore = BlobStore(ctx)
# get the set of blob keys unused since the start time
@ -1013,8 +1006,8 @@ class Scheduler(threading.Thread):
new_tenants = (set(self.unparsed_abide.tenants)
- self.abide.tenants.keys())
with self.layout_lock:
for tenant_name in new_tenants:
for tenant_name in new_tenants:
with self.layout_lock[tenant_name]:
stats_key = f'zuul.tenant.{tenant_name}'
layout_state = self.tenant_layout_state.get(tenant_name)
# In case we don't have a cached layout state we need to
@ -1312,10 +1305,11 @@ class Scheduler(threading.Thread):
loader = configloader.ConfigLoader(
self.connections, self.zk_client, self.globals, self.statsd, self,
self.merger, self.keystore)
# Since we are using the ZK 'locked' context manager with a threading
# lock, we need to pass -1 as the timeout value here as the default
# value for ZK locks is None.
with locked(self.layout_lock, blocking=False, timeout=-1):
# Since we are using the ZK 'locked' context manager (in order
# to have a non-blocking lock) with a threading lock, we need
# to pass -1 as the timeout value here as the default value
# for ZK locks is None.
with locked(self.layout_lock[tenant_name], blocking=False, timeout=-1):
start = time.monotonic()
log.debug("Updating local layout of tenant %s ", tenant_name)
layout_state = self.tenant_layout_state.get(tenant_name)
@ -1390,49 +1384,48 @@ class Scheduler(threading.Thread):
def validateTenants(self, config, tenants_to_validate):
self.config = config
with self.layout_lock:
self.log.info("Config validation beginning")
start = time.monotonic()
self.log.info("Config validation beginning")
start = time.monotonic()
loader = configloader.ConfigLoader(
self.connections, self.zk_client, self.globals, self.statsd,
self, self.merger, self.keystore)
tenant_config, script = self._checkTenantSourceConf(self.config)
unparsed_abide = loader.readConfig(
tenant_config,
from_script=script,
tenants_to_validate=tenants_to_validate)
available_tenants = list(unparsed_abide.tenants)
tenants_to_load = tenants_to_validate or available_tenants
loader = configloader.ConfigLoader(
self.connections, self.zk_client, self.globals, self.statsd,
self, self.merger, self.keystore)
tenant_config, script = self._checkTenantSourceConf(self.config)
unparsed_abide = loader.readConfig(
tenant_config,
from_script=script,
tenants_to_validate=tenants_to_validate)
available_tenants = list(unparsed_abide.tenants)
tenants_to_load = tenants_to_validate or available_tenants
# Use a temporary config cache for the validation
validate_root = f"/zuul/validate/{uuid.uuid4().hex}"
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client,
validate_root)
# Use a temporary config cache for the validation
validate_root = f"/zuul/validate/{uuid.uuid4().hex}"
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client,
validate_root)
try:
abide = Abide()
loader.loadAuthzRules(abide, unparsed_abide)
loader.loadSemaphores(abide, unparsed_abide)
loader.loadTPCs(abide, unparsed_abide)
for tenant_name in tenants_to_load:
loader.loadTenant(abide, tenant_name, self.ansible_manager,
unparsed_abide, min_ltimes=None,
ignore_cat_exception=False)
finally:
self.zk_client.client.delete(validate_root, recursive=True)
try:
abide = Abide()
loader.loadAuthzRules(abide, unparsed_abide)
loader.loadSemaphores(abide, unparsed_abide)
loader.loadTPCs(abide, unparsed_abide)
for tenant_name in tenants_to_load:
loader.loadTenant(abide, tenant_name, self.ansible_manager,
unparsed_abide, min_ltimes=None,
ignore_cat_exception=False)
finally:
self.zk_client.client.delete(validate_root, recursive=True)
loading_errors = []
for tenant in abide.tenants.values():
for error in tenant.layout.loading_errors:
if error.severity == SEVERITY_WARNING:
self.log.warning(repr(error))
else:
loading_errors.append(repr(error))
if loading_errors:
summary = '\n\n\n'.join(loading_errors)
raise configloader.ConfigurationSyntaxError(
f"Configuration errors: {summary}")
loading_errors = []
for tenant in abide.tenants.values():
for error in tenant.layout.loading_errors:
if error.severity == SEVERITY_WARNING:
self.log.warning(repr(error))
else:
loading_errors.append(repr(error))
if loading_errors:
summary = '\n\n\n'.join(loading_errors)
raise configloader.ConfigurationSyntaxError(
f"Configuration errors: {summary}")
duration = round(time.monotonic() - start, 3)
self.log.info("Config validation complete (duration: %s seconds)",
@ -1442,17 +1435,18 @@ class Scheduler(threading.Thread):
# This is called in the scheduler loop after another thread submits
# a request
reconfigured_tenants = []
with self.layout_lock:
self.log.info("Reconfiguration beginning (smart=%s, tenants=%s)",
event.smart, event.tenants)
start = time.monotonic()
# Update runtime related system attributes from config
self.config = self._zuul_app.config
self.globals = SystemAttributes.fromConfig(self.config)
self.ansible_manager = AnsibleManager(
default_version=self.globals.default_ansible_version)
self.log.info("Reconfiguration beginning (smart=%s, tenants=%s)",
event.smart, event.tenants)
start = time.monotonic()
# Update runtime related system attributes from config
self.config = self._zuul_app.config
self.globals = SystemAttributes.fromConfig(self.config)
self.ansible_manager = AnsibleManager(
default_version=self.globals.default_ansible_version)
with self.unparsed_abide_lock:
loader = configloader.ConfigLoader(
self.connections, self.zk_client, self.globals, self.statsd,
self, self.merger, self.keystore)
@ -1472,49 +1466,48 @@ class Scheduler(threading.Thread):
deleted_tenants = tenant_names.difference(
self.unparsed_abide.tenants.keys())
for tenant_name in deleted_tenants:
self.abide.clearTPCs(tenant_name)
self.abide.clearTPCRegistry(tenant_name)
loader.loadAuthzRules(self.abide, self.unparsed_abide)
loader.loadSemaphores(self.abide, self.unparsed_abide)
loader.loadTPCs(self.abide, self.unparsed_abide)
# Note end of unparsed abide lock here.
if event.smart:
# Consider caches always valid
min_ltimes = defaultdict(
lambda: defaultdict(lambda: -1))
# Consider all project branch caches valid.
branch_cache_min_ltimes = defaultdict(lambda: -1)
else:
# Consider caches valid if the cache ltime >= event ltime
min_ltimes = defaultdict(
lambda: defaultdict(lambda: event.zuul_event_ltime))
# Invalidate the branch cache
for connection in self.connections.connections.values():
if hasattr(connection, 'clearBranchCache'):
if event.tenants:
# Only clear the projects used by this
# tenant (zuul-web won't be able to load
# any tenants that we don't immediately
# reconfigure after clearing)
for tenant_name in event.tenants:
projects = [
tpc.project.name for tpc in
itertools.chain(
self.abide.getConfigTPCs(tenant_name),
self.abide.getUntrustedTPCs(
tenant_name))
]
connection.clearBranchCache(projects)
else:
# Clear all projects since we're reloading
# all tenants.
connection.clearBranchCache()
ltime = self.zk_client.getCurrentLtime()
# Consider the branch cache valid only after we
# cleared it
branch_cache_min_ltimes = defaultdict(lambda: ltime)
if event.smart:
# Consider caches always valid
min_ltimes = defaultdict(
lambda: defaultdict(lambda: -1))
# Consider all project branch caches valid.
branch_cache_min_ltimes = defaultdict(lambda: -1)
else:
# Consider caches valid if the cache ltime >= event ltime
min_ltimes = defaultdict(
lambda: defaultdict(lambda: event.zuul_event_ltime))
# Invalidate the branch cache
for connection in self.connections.connections.values():
if hasattr(connection, 'clearBranchCache'):
if event.tenants:
# Only clear the projects used by this
# tenant (zuul-web won't be able to load
# any tenants that we don't immediately
# reconfigure after clearing)
for tenant_name in event.tenants:
projects = [
tpc.project.name for tpc in
self.abide.getAllTPCs(tenant_name)
]
connection.clearBranchCache(projects)
else:
# Clear all projects since we're reloading
# all tenants.
connection.clearBranchCache()
ltime = self.zk_client.getCurrentLtime()
# Consider the branch cache valid only after we
# cleared it
branch_cache_min_ltimes = defaultdict(lambda: ltime)
for tenant_name in tenant_names:
for tenant_name in tenant_names:
with self.layout_lock[tenant_name]:
if event.smart:
old_tenant = old_unparsed_abide.tenants.get(tenant_name)
new_tenant = self.unparsed_abide.tenants.get(tenant_name)
@ -1561,36 +1554,36 @@ class Scheduler(threading.Thread):
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
self.updateSystemConfig()
with self.layout_lock:
log.info("Tenant reconfiguration beginning for %s due to "
"projects %s", event.tenant_name, event.project_branches)
start = time.monotonic()
# Consider all caches valid (min. ltime -1) except for the
# changed project-branches.
min_ltimes = defaultdict(lambda: defaultdict(lambda: -1))
for project_name, branch_name in event.project_branches:
if branch_name is None:
min_ltimes[project_name] = defaultdict(
lambda: event.zuul_event_ltime)
else:
min_ltimes[project_name][
branch_name
] = event.zuul_event_ltime
log.info("Tenant reconfiguration beginning for %s due to "
"projects %s", event.tenant_name, event.project_branches)
start = time.monotonic()
# Consider all caches valid (min. ltime -1) except for the
# changed project-branches.
min_ltimes = defaultdict(lambda: defaultdict(lambda: -1))
for project_name, branch_name in event.project_branches:
if branch_name is None:
min_ltimes[project_name] = defaultdict(
lambda: event.zuul_event_ltime)
else:
min_ltimes[project_name][
branch_name
] = event.zuul_event_ltime
# Consider all branch caches valid except for the ones where
# the events provides a minimum ltime.
branch_cache_min_ltimes = defaultdict(lambda: -1)
for connection_name, ltime in event.branch_cache_ltimes.items():
branch_cache_min_ltimes[connection_name] = ltime
# Consider all branch caches valid except for the ones where
# the events provides a minimum ltime.
branch_cache_min_ltimes = defaultdict(lambda: -1)
for connection_name, ltime in event.branch_cache_ltimes.items():
branch_cache_min_ltimes[connection_name] = ltime
loader = configloader.ConfigLoader(
self.connections, self.zk_client, self.globals, self.statsd,
self, self.merger, self.keystore)
loader = configloader.ConfigLoader(
self.connections, self.zk_client, self.globals, self.statsd,
self, self.merger, self.keystore)
loader.loadTPCs(self.abide, self.unparsed_abide,
[event.tenant_name])
stats_key = f'zuul.tenant.{event.tenant_name}'
with self.layout_lock[event.tenant_name]:
old_tenant = self.abide.tenants.get(event.tenant_name)
loader.loadTPCs(self.abide, self.unparsed_abide,
[event.tenant_name])
stats_key = f'zuul.tenant.{event.tenant_name}'
with (tenant_write_lock(
self.zk_client, event.tenant_name,
identifier=RECONFIG_LOCK_ID) as lock,
@ -2206,7 +2199,10 @@ class Scheduler(threading.Thread):
self.wake_event.set()
def primeSystemConfig(self):
with self.layout_lock:
# Strictly speaking, we don't need this lock since it's
# guaranteed to be single-threaded, but it's used here for
# consistency and future-proofing.
with self.unparsed_abide_lock:
loader = configloader.ConfigLoader(
self.connections, self.zk_client, self.globals, self.statsd,
self, self.merger, self.keystore)
@ -2220,7 +2216,11 @@ class Scheduler(threading.Thread):
loader.loadTPCs(self.abide, self.unparsed_abide)
def updateSystemConfig(self):
with self.layout_lock:
with self.unparsed_abide_lock:
if self.unparsed_abide.ltime >= self.system_config_cache.ltime:
# We have updated our local unparsed abide since the
# caller last checked.
return
self.log.debug("Updating system config")
self.unparsed_abide, self.globals = self.system_config_cache.get()
self.ansible_manager = AnsibleManager(
@ -2235,7 +2235,7 @@ class Scheduler(threading.Thread):
# Remove TPCs of deleted tenants
for tenant_name in deleted_tenants:
self.abide.clearTPCs(tenant_name)
self.abide.clearTPCRegistry(tenant_name)
loader.loadAuthzRules(self.abide, self.unparsed_abide)
loader.loadSemaphores(self.abide, self.unparsed_abide)
@ -2364,8 +2364,7 @@ class Scheduler(threading.Thread):
def _gatherConnectionCacheKeys(self):
relevant = set()
with (self.layout_lock,
self.createZKContext(None, self.log) as ctx):
with self.createZKContext(None, self.log) as ctx:
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",

View File

@ -2252,7 +2252,7 @@ class ZuulWeb(object):
# Remove TPCs of deleted tenants
for tenant_name in deleted_tenants:
self.abide.clearTPCs(tenant_name)
self.abide.clearTPCRegistry(tenant_name)
loader.loadAuthzRules(self.abide, self.unparsed_abide)
loader.loadSemaphores(self.abide, self.unparsed_abide)