Refactor config/tenant (re-)loading
In a multi-scheduler deployment we don't want to have a global layout lock in Zookeeper. Instead, we will use a read/write lock per tenant. This allows use to continue processing pipelines for other tenants in the meantime. In respect of that, this change refactors the config loading so that we have a single interface that performs config (re-)loading on a per-tenant basis. This is also important as later on a scheduler startup will no longer require a full reconfiguration. With the refactoring, the configloader always (re-)loads the config of a specific tenant as requested by the scheduler. If a full reconfiguration should be performed the scheduler will create a new Abide instance before loading any tenant. For the case of smart or tenant reconfigurations we will re-use the existing Abide instance instead of copying everything over from the old to the new instance. Change-Id: I6b7afd34203b74d5a6524e5277055a813147ff9e
This commit is contained in:
parent
3a0bbd205c
commit
ca0d379973
|
@ -2266,12 +2266,12 @@ class ConfigLoader(object):
|
|||
unparsed_abide.extend(data)
|
||||
return unparsed_abide
|
||||
|
||||
def loadConfig(self, unparsed_abide, ansible_manager, tenants=None):
|
||||
abide = model.Abide()
|
||||
def loadAdminRules(self, abide, unparsed_abide):
|
||||
for conf_admin_rule in unparsed_abide.admin_rules:
|
||||
admin_rule = self.admin_rule_parser.fromYaml(conf_admin_rule)
|
||||
abide.admin_rules[admin_rule.name] = admin_rule
|
||||
|
||||
def loadTPCs(self, abide, unparsed_abide, tenants=None):
|
||||
if tenants:
|
||||
tenants_to_load = {t: unparsed_abide.tenants[t] for t in tenants}
|
||||
else:
|
||||
|
@ -2279,76 +2279,34 @@ class ConfigLoader(object):
|
|||
|
||||
# Pre-load TenantProjectConfigs so we can get and cache all of a
|
||||
# project's config files (incl. tenant specific extra config) at once.
|
||||
for tenant_name, conf_tenant in tenants_to_load.items():
|
||||
for tenant_name, unparsed_config in tenants_to_load.items():
|
||||
config_tpcs, untrusted_tpcs = (
|
||||
self.tenant_parser.loadTenantProjects(conf_tenant)
|
||||
self.tenant_parser.loadTenantProjects(unparsed_config)
|
||||
)
|
||||
abide.clearTPCs(tenant_name)
|
||||
for tpc in config_tpcs:
|
||||
abide.addConfigTPC(tenant_name, tpc)
|
||||
for tpc in untrusted_tpcs:
|
||||
abide.addUntrustedTPC(tenant_name, tpc)
|
||||
|
||||
for conf_tenant in tenants_to_load.values():
|
||||
# When performing a full reload, do not use cached data.
|
||||
tenant = self.tenant_parser.fromYaml(
|
||||
abide, conf_tenant, ansible_manager)
|
||||
abide.tenants[tenant.name] = tenant
|
||||
if len(tenant.layout.loading_errors):
|
||||
self.log.warning(
|
||||
"%s errors detected during %s tenant "
|
||||
"configuration loading" % (
|
||||
len(tenant.layout.loading_errors), tenant.name))
|
||||
# Log accumulated errors
|
||||
for err in tenant.layout.loading_errors.errors[:10]:
|
||||
self.log.warning(err.error)
|
||||
return abide
|
||||
def loadTenant(self, abide, tenant_name, ansible_manager, unparsed_abide,
|
||||
cache_ltime=None):
|
||||
if tenant_name not in unparsed_abide.tenants:
|
||||
del abide.tenants[tenant_name]
|
||||
return None
|
||||
|
||||
def reloadTenant(self, abide, tenant, ansible_manager,
|
||||
unparsed_abide=None, cache_ltime=None):
|
||||
new_abide = model.Abide()
|
||||
new_abide.tenants = abide.tenants.copy()
|
||||
new_abide.admin_rules = abide.admin_rules.copy()
|
||||
new_abide.unparsed_project_branch_cache = \
|
||||
abide.unparsed_project_branch_cache
|
||||
new_abide.config_tpcs = abide.config_tpcs
|
||||
new_abide.untrusted_tpcs = abide.untrusted_tpcs
|
||||
|
||||
if unparsed_abide:
|
||||
# We got a new unparsed abide so re-load the tenant completely.
|
||||
# First check if the tenant is still existing and if not remove
|
||||
# from the abide.
|
||||
if tenant.name not in unparsed_abide.tenants:
|
||||
del new_abide.tenants[tenant.name]
|
||||
return new_abide
|
||||
|
||||
unparsed_config = unparsed_abide.tenants[tenant.name]
|
||||
else:
|
||||
unparsed_config = tenant.unparsed_config
|
||||
|
||||
# Pre-load TenantProjectConfig so we can get and cache all of a
|
||||
# project's config files (incl. tenant specific extra config) at once.
|
||||
config_tpcs, untrusted_tpcs = (
|
||||
self.tenant_parser.loadTenantProjects(unparsed_config)
|
||||
)
|
||||
new_abide.clearTPCs(tenant.name)
|
||||
for tpc in config_tpcs:
|
||||
new_abide.addConfigTPC(tenant.name, tpc)
|
||||
for tpc in untrusted_tpcs:
|
||||
new_abide.addUntrustedTPC(tenant.name, tpc)
|
||||
|
||||
# When reloading a tenant only, use cached data if available.
|
||||
unparsed_config = unparsed_abide.tenants[tenant_name]
|
||||
new_tenant = self.tenant_parser.fromYaml(
|
||||
new_abide, unparsed_config, ansible_manager, cache_ltime)
|
||||
new_abide.tenants[tenant.name] = new_tenant
|
||||
abide, unparsed_config, ansible_manager, cache_ltime)
|
||||
abide.tenants[tenant_name] = new_tenant
|
||||
if len(new_tenant.layout.loading_errors):
|
||||
self.log.warning(
|
||||
"%s errors detected during %s tenant "
|
||||
"configuration re-loading" % (
|
||||
len(new_tenant.layout.loading_errors), tenant.name))
|
||||
"%s errors detected during %s tenant configuration loading",
|
||||
len(new_tenant.layout.loading_errors), tenant_name)
|
||||
# Log accumulated errors
|
||||
for err in new_tenant.layout.loading_errors.errors[:10]:
|
||||
self.log.warning(err.error)
|
||||
return new_abide
|
||||
return new_tenant
|
||||
|
||||
def _loadDynamicProjectData(self, config, project,
|
||||
files, trusted, item, loading_errors,
|
||||
|
|
|
@ -63,7 +63,6 @@ from zuul.model import (
|
|||
PromoteEvent,
|
||||
ReconfigureEvent,
|
||||
SmartReconfigureEvent,
|
||||
Tenant,
|
||||
TenantReconfigureEvent,
|
||||
UnparsedAbideConfig,
|
||||
)
|
||||
|
@ -779,26 +778,48 @@ class Scheduler(threading.Thread):
|
|||
tenant_config, script = self._checkTenantSourceConf(self.config)
|
||||
self.unparsed_abide = loader.readConfig(
|
||||
tenant_config, from_script=script)
|
||||
abide = loader.loadConfig(
|
||||
self.unparsed_abide, self.ansible_manager,
|
||||
event.validate_tenants)
|
||||
if event.validate_tenants is None:
|
||||
for tenant in abide.tenants.values():
|
||||
self._reconfigureTenant(tenant)
|
||||
|
||||
tenants_to_load = list(self.unparsed_abide.tenants)
|
||||
if event.validate_tenants is not None:
|
||||
validate_tenants = set(event.validate_tenants)
|
||||
if not validate_tenants.issubset(tenants_to_load):
|
||||
invalid = validate_tenants.difference(tenants_to_load)
|
||||
raise RuntimeError(f"Invalid tenant(s) found: {invalid}")
|
||||
# In case we have an empty list, we validate all tenants.
|
||||
tenants_to_load = event.validate_tenants or tenants_to_load
|
||||
|
||||
abide = Abide()
|
||||
loader.loadAdminRules(abide, self.unparsed_abide)
|
||||
loader.loadTPCs(abide, self.unparsed_abide)
|
||||
for tenant_name in tenants_to_load:
|
||||
tenant = loader.loadTenant(abide, tenant_name,
|
||||
self.ansible_manager,
|
||||
self.unparsed_abide,
|
||||
cache_ltime=None)
|
||||
if event.validate_tenants:
|
||||
# We are only validating the tenant config; skip reconfig
|
||||
continue
|
||||
|
||||
if tenant is not None:
|
||||
old_tenant = self.abide.tenants.get(tenant_name)
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
|
||||
for old_tenant in self.abide.tenants.values():
|
||||
if not abide.tenants.get(old_tenant.name):
|
||||
if old_tenant.name not in abide.tenants:
|
||||
# We deleted a tenant
|
||||
self._reconfigureDeleteTenant(old_tenant)
|
||||
self.abide = abide
|
||||
else:
|
||||
|
||||
if event.validate_tenants is not None:
|
||||
loading_errors = []
|
||||
for tenant in abide.tenants.values():
|
||||
for error in tenant.layout.loading_errors:
|
||||
loading_errors.append(error.__repr__())
|
||||
loading_errors.append(repr(error))
|
||||
if loading_errors:
|
||||
summary = '\n\n\n'.join(loading_errors)
|
||||
raise configloader.ConfigurationSyntaxError(
|
||||
'Configuration errors: {}'.format(summary))
|
||||
f"Configuration errors: {summary}")
|
||||
|
||||
self.abide = abide
|
||||
finally:
|
||||
self.layout_lock.release()
|
||||
|
||||
|
@ -833,6 +854,14 @@ class Scheduler(threading.Thread):
|
|||
# all tenants from the currently known and the new ones.
|
||||
tenant_names = {t for t in self.abide.tenants}
|
||||
tenant_names.update(self.unparsed_abide.tenants.keys())
|
||||
|
||||
# Remove TPCs of deleted tenants
|
||||
deleted_tenants = tenant_names.difference(
|
||||
self.unparsed_abide.tenants.keys())
|
||||
for tenant_name in deleted_tenants:
|
||||
self.abide.clearTPCs(tenant_name)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
|
||||
for tenant_name in tenant_names:
|
||||
old_tenant = old_unparsed_abide.tenants.get(tenant_name)
|
||||
new_tenant = self.unparsed_abide.tenants.get(tenant_name)
|
||||
|
@ -841,20 +870,16 @@ class Scheduler(threading.Thread):
|
|||
|
||||
reconfigured_tenants.append(tenant_name)
|
||||
old_tenant = self.abide.tenants.get(tenant_name)
|
||||
if old_tenant is None:
|
||||
# If there is no old tenant, use a fake tenant with the
|
||||
# correct name
|
||||
old_tenant = Tenant(tenant_name)
|
||||
abide = loader.reloadTenant(
|
||||
self.abide, old_tenant, self.ansible_manager,
|
||||
self.unparsed_abide)
|
||||
tenant = loader.loadTenant(self.abide, tenant_name,
|
||||
self.ansible_manager,
|
||||
self.unparsed_abide,
|
||||
cache_ltime=event.zuul_event_ltime)
|
||||
|
||||
tenant = abide.tenants.get(tenant_name)
|
||||
tenant = self.abide.tenants.get(tenant_name)
|
||||
if tenant is not None:
|
||||
self._reconfigureTenant(tenant)
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
else:
|
||||
self._reconfigureDeleteTenant(old_tenant)
|
||||
self.abide = abide
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
self.log.info("Smart reconfiguration of tenants %s complete "
|
||||
"(duration: %s seconds)", reconfigured_tenants, duration)
|
||||
|
@ -879,15 +904,16 @@ class Scheduler(threading.Thread):
|
|||
self.unparsed_config_cache.clearCache(project_name,
|
||||
branch_name)
|
||||
|
||||
old_tenant = self.abide.tenants[event.tenant_name]
|
||||
loader = configloader.ConfigLoader(
|
||||
self.connections, self, self.merger, self.keystore)
|
||||
abide = loader.reloadTenant(self.abide, old_tenant,
|
||||
self.ansible_manager,
|
||||
old_tenant = self.abide.tenants.get(event.tenant_name)
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide,
|
||||
[event.tenant_name])
|
||||
loader.loadTenant(self.abide, event.tenant_name,
|
||||
self.ansible_manager, self.unparsed_abide,
|
||||
cache_ltime=event.zuul_event_ltime)
|
||||
tenant = abide.tenants[event.tenant_name]
|
||||
self._reconfigureTenant(tenant)
|
||||
self.abide = abide
|
||||
tenant = self.abide.tenants[event.tenant_name]
|
||||
self._reconfigureTenant(tenant, old_tenant)
|
||||
finally:
|
||||
self.layout_lock.release()
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
|
@ -1038,11 +1064,9 @@ class Scheduler(threading.Thread):
|
|||
if not new_pipeline:
|
||||
self._reconfigureDeletePipeline(old_pipeline)
|
||||
|
||||
def _reconfigureTenant(self, tenant):
|
||||
def _reconfigureTenant(self, tenant, old_tenant=None):
|
||||
# This is called from _doReconfigureEvent while holding the
|
||||
# layout lock
|
||||
old_tenant = self.abide.tenants.get(tenant.name)
|
||||
|
||||
if old_tenant:
|
||||
self._reenqueueTenant(old_tenant, tenant)
|
||||
|
||||
|
|
Loading…
Reference in New Issue