Add global semaphore support

This adds support for global semaphores which can be used by multiple tenants.
This supports the use case where they represent real-world resources which
operate independentyl of Zuul tenants.

This implements and removes the spec describing the feature.  One change from
the spec is that the configuration object in the tenant config file is
"global-semaphore" rather than "semaphore".  This makes it easier to distinguish
them in documentation (facilitating easier cross-references and deep links),
and may also make it easier for users to understand that they have distinct
behavoirs.

Change-Id: I5f2225a700d8f9bef0399189017f23b3f4caad17
This commit is contained in:
James E. Blair 2022-05-16 16:02:05 -07:00
parent f2297cadb0
commit c3fc65dcd9
24 changed files with 481 additions and 165 deletions

View File

@ -219,14 +219,14 @@ Here is an example of two job definitions:
.. attr:: semaphores
The name of a :ref:`semaphore` (or list of them) which should be
acquired and released when the job begins and ends. If the
semaphore is at maximum capacity, then Zuul will wait until it
can be acquired before starting the job. The format is either a
string, a dictionary, or a list of either of those in the case
of multiple semaphores. If it's a string it references a
semaphore using the default value for
:attr:`job.semaphores.resources-first`.
The name of a :ref:`semaphore` (or list of them) or
:ref:`global_semaphore` which should be acquired and released
when the job begins and ends. If the semaphore is at maximum
capacity, then Zuul will wait until it can be acquired before
starting the job. The format is either a string, a dictionary,
or a list of either of those in the case of multiple
semaphores. If it's a string it references a semaphore using the
default value for :attr:`job.semaphores.resources-first`.
If multiple semaphores are requested, the job will not start
until all have been acquired, and Zuul will wait until all are

View File

@ -15,6 +15,10 @@ project as long as the value is the same. This is to aid in branch
maintenance, so that creating a new branch based on an existing branch
will not immediately produce a configuration error.
Zuul also supports global semaphores (see :ref:`global_semaphore`)
which may only be created by the Zuul administrator, but can be used
to coordinate resources across multiple tenants.
Semaphores are never subject to dynamic reconfiguration. If the value
of a semaphore is changed, it will take effect only when the change
where it is updated is merged. However, Zuul will attempt to validate

View File

@ -1,127 +0,0 @@
Global Semaphores
=================
.. warning:: This is not authoritative documentation. These features
are not currently available in Zuul. They may change significantly
before final implementation, or may never be fully completed.
Semaphores are useful for limiting access to resources, but their
implementation as a per-tenant configuration construct may be limiting
if they are used for real-world resources that span tenants.
This is a proposal to address that by adding global semaphores.
Background
----------
Semaphores may be used for a variety of purposes. One of these is to
limit access to constrained resources. Doing so allows Zuul to avoid
requesting nodes and scheduling jobs until these resources are
available. This makes the overall system more efficient as jobs don't
need to wait for resources during their run phase (where they may be
idling test nodes which could otherwise be put to better use).
A concrete example of this is software licenses. If a job requires
software which uses a license server to ensure that the number of
in-use seats does not exceed the available seats, a semaphore with a
max value equal to the number of available seats can be used to help
Zuul avoid starting jobs which would otherwise need to wait for a
license.
If only one Zuul tenant uses this piece of software, the existing
implementation of semaphores in Zuul is satisfactory. But if the
software licenses are shared across Zuul tenants, then a Zuul
semaphore can't be used in this way since semaphores are per-tenant
constructs.
The general solution to sharing Zuul configuration objects across
tenants is to define them in a single git repo and include that git
repo in multiple tenants. That works as expected for Jobs, Project
Templates, etc. But semaphores have a definition as well as a
run-time state (whether they are aquired and by whom). Including a
semaphore in multiple tenants essentially makes copies of that
semaphore, each with its own distinct set of holders.
Proposed Change
---------------
A new global semaphore configuration would be added to the tenant
configuration file. Note this is the global configuration file (where
tenants are defined); not in-repo configuration where semaphores are
currently defined.
The definition would be identical to the current in-repo semaphore
configuration. In order to grant access to only certain tenants, each
tenant will also need to specify whether that semaphore should be
available to the tenant. This scheme is similar to the way that
authorization rules are defined in this file and then attached to
tenants.
For example:
.. code-block:: yaml
- semaphore:
name: licensed-software
max: 8
- tenant:
name: example-tenant
semaphores:
- licensed-software
source:
gerrit:
config-projects:
...
The existing in-repo semaphores will remain as they are today -- they
will not be deprecated (they are still very useful on their own for
most other use cases).
If an in-repo semaphore is defined with the same name as a global
semaphore, that will become a configuration error. The global
semaphore will take precedence.
Implementation
--------------
The user-visible configuration is described above.
Current semaphores are stored in the ZooKeeper path
``/zuul/semaphores/<tenant>/<semaphore>``. Global semaphores will use
a similar scheme without the tenant name:
``/zuul/global-semaphores/<semaphore>``.
Locking, releasing, and leak cleanup will all behave similarly to the
current per-tenant semaphores. On release, a per-tenant semaphore
broadcasts a PipelineSemaphoreReleaseEvent to all pipelines in order
to trigger a pipeline run and start any jobs which may be waiting on
the semaphore. A global semaphore will do the same, but for every
pipeline of every tenant which includes the semaphore.
Alternatives
------------
We could add a field to the in-repo definitions of semaphores which
indicates that the semaphore should be global. As this has the
ability to affect other tenants, we would need to restrict this to
config-projects only. However, that still opens the possibility of
one tenant affecting another via the contents of a config-project.
Some method of allowing the administrator to control this via the
tenant config file would still likely be necessary. As long as that's
the case, it seems simpler to just define the semaphores there too.
We could outsource this to Nodepool. In fact, having nodepool manage
resources like this seems like a natural fit. However, the current
Nodepool implementation doesn't permit more than one provider to
satisfy a node request, so a hypothetical semaphore provider wouldn't
be able to be combined with a provider of actual test nodes.
Addressing this is in-scope and a worthwhile change for Nodepool, but
it is potentially a large and complex change. Additionally, the idea
of waiting for a semaphore before submitting requests for real
resources adds a new dimension to even that idea -- Nodepool would
need to know whether to run the semaphore provider first or last
depending on the desired resource aquisition order. Meanwhile, since
Zuul does have the concept of semaphores already and they almost fit
this use case, this seems like a reasonable change to make in Zuul
regardless of any potential Nodepool changes.

View File

@ -23,4 +23,3 @@ documentation instead.
enhanced-regional-executors
tenant-resource-quota
community-matrix
global-semaphores

View File

@ -414,6 +414,13 @@ This is a reference for object layout in Zookeeper.
An election to decide which scheduler will report system-wide stats
(such as total node requests).
.. path:: zuul/global-semaphores/<semaphore>
:type: SemaphoreHandler
Represents a global semaphore (shared by multiple tenants).
Information about which builds hold the semaphore is stored in the
znode data.
.. path:: zuul/semaphores/<tenant>/<semaphore>
:type: SemaphoreHandler

View File

@ -363,6 +363,57 @@ configuration. Some examples of tenant definitions are:
to add finer filtering to admin rules, for example filtering by the ``iss``
claim (generally equal to the issuer ID).
.. attr:: semaphores
A list of names of :attr:`global-semaphore` objects to allow
jobs in this tenant to access.
.. _global_semaphore:
Global Semaphore
----------------
Semaphores are normally defined in in-repo configuration (see
:ref:`semaphore`), however to support use-cases where semaphores are
used to represent constrained global resources that may be used by
multiple Zuul tenants, semaphores may be defined within the main
tenant configuration file.
In order for a job to use a global semaphore, the semaphore must first
be defined in the tenant configuration file with
:attr:`global-semaphore` and then added to each tenant which should
have access to it with :attr:`tenant.semaphores`. Once that is done,
Zuul jobs may use that semaphore in the same way they would use a
normal tenant-scoped semaphore.
If any tenant which is granted access to a global semaphore also has a
tenant-scoped semaphore defined with the same name, that definition
will be treated as a configuration error and subsequently ignored in
favor of the global semaphore.
An example definition looks similar to the normal semaphore object:
.. code-block:: yaml
- global-semaphore:
name: global-semaphore-foo
max: 5
.. attr:: global-semaphore
The following attributes are available:
.. attr:: name
:required:
The name of the semaphore, referenced by jobs.
.. attr:: max
:default: 1
The maximum number of running jobs which can use this semaphore.
.. _admin_rule_definition:
Access Rule

View File

@ -0,0 +1,5 @@
---
features:
- |
Support for global (cross-tenant) semaphores has been added. See
:ref:`global_semaphore`.

View File

@ -0,0 +1,10 @@
- tenant:
name: tenant-two
semaphores:
- global-semaphore
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project2

View File

@ -0,0 +1 @@
---

View File

@ -0,0 +1,52 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
- event: comment-added
comment: '^(Patch Set [0-9]+:\n\n)?(?i:recheck)$'
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- job:
name: base
parent: null
run: playbooks/run.yaml
- semaphore:
name: common-semaphore
max: 10
- job:
name: test-global-semaphore
semaphores: global-semaphore
- job:
name: test-common-semaphore
semaphores: common-semaphore

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,19 @@
# Not actually the global semaphore -- this will be overridden
- semaphore:
name: global-semaphore
max: 2
- semaphore:
name: project1-semaphore
max: 11
- job:
name: test-project1-semaphore
semaphores: project1-semaphore
- project:
check:
jobs:
- test-global-semaphore
- test-common-semaphore
- test-project1-semaphore

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,14 @@
- semaphore:
name: project2-semaphore
max: 12
- job:
name: test-project2-semaphore
semaphores: project2-semaphore
- project:
check:
jobs:
- test-global-semaphore
- test-common-semaphore
- test-project2-semaphore

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,20 @@
# Not actually the global semaphore -- this tenant doesn't have it, so
# this semaphore will be used.
- semaphore:
name: global-semaphore
max: 999
- semaphore:
name: project3-semaphore
max: 13
- job:
name: test-project3-semaphore
semaphores: project3-semaphore
- project:
check:
jobs:
- test-global-semaphore
- test-common-semaphore
- test-project3-semaphore

View File

@ -0,0 +1,34 @@
- global-semaphore:
name: global-semaphore
max: 100
- tenant:
name: tenant-one
semaphores:
- global-semaphore
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project1
- tenant:
name: tenant-two
semaphores:
- global-semaphore
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project2
- tenant:
name: tenant-three
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project3

View File

@ -0,0 +1,137 @@
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import zuul.configloader
from tests.base import ZuulTestCase
class TestGlobalSemaphores(ZuulTestCase):
tenant_config_file = 'config/global-semaphores/main.yaml'
def assertSemaphores(self, tenant, semaphores):
for k, v in semaphores.items():
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders(k)),
v, k)
def assertSemaphoresMax(self, tenant, semaphores):
for k, v in semaphores.items():
abide = tenant.semaphore_handler.abide
semaphore = tenant.layout.getSemaphore(abide, k)
self.assertEqual(semaphore.max, v, k)
def test_semaphore_scope(self):
# This tests global and tenant semaphore scope
self.executor_server.hold_jobs_in_build = True
tenant1 = self.scheds.first.sched.abide.tenants.get('tenant-one')
tenant2 = self.scheds.first.sched.abide.tenants.get('tenant-two')
tenant3 = self.scheds.first.sched.abide.tenants.get('tenant-three')
# The different max values will tell us that we have the right
# semaphore objects. Each tenant has one tenant-scope
# semaphore in a tenant-specific project, and one tenant-scope
# semaphore with a common definition. Tenants 1 and 2 share a
# global-scope semaphore, and tenant 3 has a tenant-scope
# semaphore with the same name.
# Here is what is defined in each tenant:
# Tenant-one:
# * global-semaphore: scope:global max:100 definition:main.yaml
# * common-semaphore: scope:tenant max:10 definition:common-config
# * project1-semaphore: scope:tenant max:11 definition:project1
# * (global-semaphore): scope:tenant max:2 definition:project1
# [unused since it shadows the actual global-semaphore]
# Tenant-two:
# * global-semaphore: scope:global max:100 definition:main.yaml
# * common-semaphore: scope:tenant max:10 definition:common-config
# * project2-semaphore: scope:tenant max:12 definition:project2
# Tenant-three:
# * global-semaphore: scope:global max:999 definition:project3
# * common-semaphore: scope:tenant max:10 definition:common-config
# * project3-semaphore: scope:tenant max:13 definition:project3
self.assertSemaphoresMax(tenant1, {'global-semaphore': 100,
'common-semaphore': 10,
'project1-semaphore': 11,
'project2-semaphore': 1,
'project3-semaphore': 1})
self.assertSemaphoresMax(tenant2, {'global-semaphore': 100,
'common-semaphore': 10,
'project1-semaphore': 1,
'project2-semaphore': 12,
'project3-semaphore': 1})
# This "global" semaphore is really tenant-scoped, it just has
# the same name.
self.assertSemaphoresMax(tenant3, {'global-semaphore': 999,
'common-semaphore': 10,
'project1-semaphore': 1,
'project2-semaphore': 1,
'project3-semaphore': 13})
# We should have a config error in tenant1 due to the
# redefinition.
self.assertEquals(len(tenant1.layout.loading_errors), 1)
self.assertEquals(len(tenant2.layout.loading_errors), 0)
self.assertEquals(len(tenant3.layout.loading_errors), 0)
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project3', 'master', 'C')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Checking the number of holders tells us whethere we are
# using global or tenant-scoped semaphores. Each in-use
# semaphore in a tenant should have only one holder except the
# global-scope semaphore shared between tenants 1 and 2.
self.assertSemaphores(tenant1, {'global-semaphore': 2,
'common-semaphore': 1,
'project1-semaphore': 1,
'project2-semaphore': 0,
'project3-semaphore': 0})
self.assertSemaphores(tenant2, {'global-semaphore': 2,
'common-semaphore': 1,
'project1-semaphore': 0,
'project2-semaphore': 1,
'project3-semaphore': 0})
self.assertSemaphores(tenant3, {'global-semaphore': 1,
'common-semaphore': 1,
'project1-semaphore': 0,
'project2-semaphore': 0,
'project3-semaphore': 1})
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
class TestGlobalSemaphoresBroken(ZuulTestCase):
validate_tenants = []
tenant_config_file = 'config/global-semaphores/broken.yaml'
# This test raises a config error during the startup of the test
# case which makes the first scheduler fail during its startup.
# The second (or any additional) scheduler won't even run as the
# startup is serialized in tests/base.py.
# Thus it doesn't make sense to execute this test with multiple
# schedulers.
scheduler_count = 1
def setUp(self):
self.assertRaises(zuul.configloader.GlobalSemaphoreNotFoundError,
super().setUp)
def test_broken_global_semaphore_config(self):
pass

View File

@ -8128,8 +8128,8 @@ class TestSemaphoreInRepo(ZuulTestCase):
item_dynamic_layout = pipeline.manager._layout_cache.get(
queue_item.layout_uuid)
self.assertIsNotNone(item_dynamic_layout)
dynamic_test_semaphore = \
item_dynamic_layout.semaphores.get('test-semaphore')
dynamic_test_semaphore = item_dynamic_layout.getSemaphore(
self.scheds.first.sched.abide, 'test-semaphore')
self.assertEqual(dynamic_test_semaphore.max, 1)
# one build must be in queue, one semaphores acquired
@ -8152,7 +8152,8 @@ class TestSemaphoreInRepo(ZuulTestCase):
# now that change A was merged, the new semaphore max must be effective
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(tenant.layout.semaphores.get('test-semaphore').max, 2)
self.assertEqual(tenant.layout.getSemaphore(
self.scheds.first.sched.abide, 'test-semaphore').max, 2)
# two builds must be in queue, two semaphores acquired
self.assertEqual(len(self.builds), 2)

View File

@ -189,6 +189,16 @@ class ProjectNotPermittedError(Exception):
super(ProjectNotPermittedError, self).__init__(message)
class GlobalSemaphoreNotFoundError(Exception):
def __init__(self, semaphore):
message = textwrap.dedent("""\
The global semaphore "{semaphore}" was not found. All
global semaphores must be added to the main configuration
file by the Zuul administrator.""")
message = textwrap.fill(message.format(semaphore=semaphore))
super(GlobalSemaphoreNotFoundError, self).__init__(message)
class YAMLDuplicateKeyError(ConfigurationSyntaxError):
def __init__(self, key, node, context, start_mark):
intro = textwrap.fill(textwrap.dedent("""\
@ -1448,6 +1458,26 @@ class AuthorizationRuleParser(object):
return a
class GlobalSemaphoreParser(object):
def __init__(self):
self.log = logging.getLogger("zuul.GlobalSemaphoreParser")
self.schema = self.getSchema()
def getSchema(self):
semaphore = {vs.Required('name'): str,
'max': int,
}
return vs.Schema(semaphore)
def fromYaml(self, conf):
self.schema(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1),
global_scope=True)
semaphore.freeze()
return semaphore
class ParseContext(object):
"""Hold information about a particular run of the parser"""
@ -1561,6 +1591,7 @@ class TenantParser(object):
'default-parent': str,
'default-ansible-version': vs.Any(str, float),
'admin-rules': to_list(str),
'semaphores': to_list(str),
'authentication-realm': str,
# TODO: Ignored, allowed for backwards compat, remove for v5.
'report-build-page': bool,
@ -1593,6 +1624,11 @@ class TenantParser(object):
tenant.authorization_rules = conf['admin-rules']
if conf.get('authentication-realm') is not None:
tenant.default_auth_realm = conf['authentication-realm']
if conf.get('semaphores') is not None:
tenant.global_semaphores = set(as_list(conf['semaphores']))
for semaphore_name in tenant.global_semaphores:
if semaphore_name not in abide.semaphores:
raise GlobalSemaphoreNotFoundError(semaphore_name)
tenant.web_root = conf.get('web-root', self.globals.web_root)
if tenant.web_root and not tenant.web_root.endswith('/'):
tenant.web_root += '/'
@ -1665,7 +1701,7 @@ class TenantParser(object):
if self.scheduler:
tenant.semaphore_handler = SemaphoreHandler(
self.zk_client, self.statsd, tenant.name, tenant.layout
self.zk_client, self.statsd, tenant.name, tenant.layout, abide
)
# Only call the postConfig hook if we have a scheduler as this will
# change data in ZooKeeper. In case we are in a zuul-web context,
@ -2380,6 +2416,7 @@ class ConfigLoader(object):
connections, zk_client, scheduler, merger, keystorage,
zuul_globals, statsd)
self.admin_rule_parser = AuthorizationRuleParser()
self.global_semaphore_parser = GlobalSemaphoreParser()
def expandConfigPath(self, config_path):
if config_path:
@ -2436,6 +2473,12 @@ class ConfigLoader(object):
admin_rule = self.admin_rule_parser.fromYaml(conf_admin_rule)
abide.admin_rules[admin_rule.name] = admin_rule
def loadSemaphores(self, abide, unparsed_abide):
abide.semaphores.clear()
for conf_semaphore in unparsed_abide.semaphores:
semaphore = self.global_semaphore_parser.fromYaml(conf_semaphore)
abide.semaphores[semaphore.name] = semaphore
def loadTPCs(self, abide, unparsed_abide, tenants=None):
if tenants:
tenants_to_load = {t: unparsed_abide.tenants[t] for t in tenants

View File

@ -6650,11 +6650,13 @@ class UnparsedAbideConfig(object):
self.ltime = -1
self.tenants = {}
self.admin_rules = []
self.semaphores = []
def extend(self, conf):
if isinstance(conf, UnparsedAbideConfig):
self.tenants.update(conf.tenants)
self.admin_rules.extend(conf.admin_rules)
self.semaphores.extend(conf.semaphores)
return
if not isinstance(conf, list):
@ -6673,6 +6675,8 @@ class UnparsedAbideConfig(object):
self.tenants[value["name"]] = value
elif key == 'admin-rule':
self.admin_rules.append(value)
elif key == 'global-semaphore':
self.semaphores.append(value)
else:
raise ConfigItemUnknownError(item)
@ -6681,6 +6685,7 @@ class UnparsedAbideConfig(object):
"uuid": self.uuid,
"tenants": self.tenants,
"admin_rules": self.admin_rules,
"semaphores": self.semaphores,
}
@classmethod
@ -6690,6 +6695,7 @@ class UnparsedAbideConfig(object):
unparsed_abide.ltime = ltime
unparsed_abide.tenants = data["tenants"]
unparsed_abide.admin_rules = data["admin_rules"]
unparsed_abide.semaphores = data["semaphores"]
return unparsed_abide
@ -6961,6 +6967,9 @@ class Layout(object):
# It's ok to have a duplicate semaphore definition, but only if
# they are in different branches of the same repo, and have
# the same values.
if semaphore.name in self.tenant.global_semaphores:
raise Exception("Semaphore %s shadows a global semaphore and "
"will be ignored" % (semaphore.name))
other = self.semaphores.get(semaphore.name)
if other is not None:
if not semaphore.source_context.isSameProject(
@ -6980,6 +6989,19 @@ class Layout(object):
return
self.semaphores[semaphore.name] = semaphore
def getSemaphore(self, abide, semaphore_name):
if semaphore_name in self.tenant.global_semaphores:
return abide.semaphores[semaphore_name]
semaphore = self.semaphores.get(semaphore_name)
if semaphore:
return semaphore
# Return an implied semaphore with max=1
# TODO: consider deprecating implied semaphores to avoid typo
# config errors
semaphore = Semaphore(semaphore_name)
semaphore.freeze()
return semaphore
def addQueue(self, queue):
# Change queues must be unique and cannot be overridden.
if queue.name in self.queues:
@ -7364,9 +7386,10 @@ class Layout(object):
class Semaphore(ConfigObject):
def __init__(self, name, max=1):
def __init__(self, name, max=1, global_scope=False):
super(Semaphore, self).__init__()
self.name = name
self.global_scope = global_scope
self.max = int(max)
def __ne__(self, other):
@ -7442,6 +7465,7 @@ class Tenant(object):
self.authorization_rules = []
self.default_auth_realm = None
self.global_semaphores = set()
def __repr__(self):
return f"<Tenant {self.name}>"
@ -7645,6 +7669,7 @@ class UnparsedBranchCache(object):
class Abide(object):
def __init__(self):
self.admin_rules = {}
self.semaphores = {}
self.tenants = {}
# tenant -> project -> list(tpcs)
# The project TPCs are stored as a list as we don't check for

View File

@ -1295,6 +1295,7 @@ class Scheduler(threading.Thread):
try:
abide = Abide()
loader.loadAdminRules(abide, unparsed_abide)
loader.loadSemaphores(abide, unparsed_abide)
loader.loadTPCs(abide, unparsed_abide)
for tenant_name in tenants_to_load:
loader.loadTenant(abide, tenant_name, self.ansible_manager,
@ -1352,8 +1353,9 @@ class Scheduler(threading.Thread):
for tenant_name in deleted_tenants:
self.abide.clearTPCs(tenant_name)
loader.loadTPCs(self.abide, self.unparsed_abide)
loader.loadAdminRules(self.abide, self.unparsed_abide)
loader.loadSemaphores(self.abide, self.unparsed_abide)
loader.loadTPCs(self.abide, self.unparsed_abide)
if event.smart:
# Consider caches always valid
@ -1981,8 +1983,9 @@ class Scheduler(threading.Thread):
tenant_config, from_script=script)
self.system_config_cache.set(self.unparsed_abide, self.globals)
loader.loadTPCs(self.abide, self.unparsed_abide)
loader.loadAdminRules(self.abide, self.unparsed_abide)
loader.loadSemaphores(self.abide, self.unparsed_abide)
loader.loadTPCs(self.abide, self.unparsed_abide)
def updateSystemConfig(self):
with self.layout_lock:
@ -2001,8 +2004,9 @@ class Scheduler(threading.Thread):
for tenant_name in deleted_tenants:
self.abide.clearTPCs(tenant_name)
loader.loadTPCs(self.abide, self.unparsed_abide)
loader.loadAdminRules(self.abide, self.unparsed_abide)
loader.loadSemaphores(self.abide, self.unparsed_abide)
loader.loadTPCs(self.abide, self.unparsed_abide)
def process_pipelines(self, tenant, tenant_lock):
for pipeline in tenant.layout.pipelines.values():

View File

@ -2070,8 +2070,9 @@ class ZuulWeb(object):
for tenant_name in deleted_tenants:
self.abide.clearTPCs(tenant_name)
loader.loadTPCs(self.abide, self.unparsed_abide)
loader.loadAdminRules(self.abide, self.unparsed_abide)
loader.loadSemaphores(self.abide, self.unparsed_abide)
loader.loadTPCs(self.abide, self.unparsed_abide)
def updateLayout(self):
self.log.debug("Updating layout state")

View File

@ -39,14 +39,23 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
log = logging.getLogger("zuul.zk.SemaphoreHandler")
semaphore_root = "/zuul/semaphores"
global_semaphore_root = "/zuul/global-semaphores"
def __init__(self, client, statsd, tenant_name, layout):
def __init__(self, client, statsd, tenant_name, layout, abide):
super().__init__(client)
self.abide = abide
self.layout = layout
self.statsd = statsd
self.tenant_name = tenant_name
self.tenant_root = f"{self.semaphore_root}/{tenant_name}"
def _makePath(self, semaphore):
semaphore_key = quote_plus(semaphore.name)
if semaphore.global_scope:
return f"{self.global_semaphore_root}/{semaphore_key}"
else:
return f"{self.tenant_root}/{semaphore_key}"
def _emitStats(self, semaphore_path, num_holders):
if self.statsd is None:
return
@ -80,8 +89,8 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
return False
return True
def _acquire_one(self, log, item, job, request_resources, semaphore):
if semaphore.resources_first and request_resources:
def _acquire_one(self, log, item, job, request_resources, job_semaphore):
if job_semaphore.resources_first and request_resources:
# We're currently in the resource request phase and want to get the
# resources before locking. So we don't need to do anything here.
return True
@ -92,8 +101,8 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
# the resources phase.
pass
semaphore_key = quote_plus(semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
semaphore_path = self._makePath(semaphore)
semaphore_handle = {
"buildset_path": item.current_build_set.getPath(),
"job_name": job.name,
@ -139,10 +148,13 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
return holdersFromData(data), zstat
def getSemaphores(self):
try:
return self.kazoo_client.get_children(self.tenant_root)
except NoNodeError:
return []
ret = []
for root in (self.global_semaphore_root, self.tenant_root):
try:
ret.extend(self.kazoo_client.get_children(root))
except NoNodeError:
pass
return ret
def _release(self, log, semaphore_path, semaphore_handle, quiet,
legacy_handle=None):
@ -183,8 +195,8 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
log = get_annotated_logger(self.log, item.event)
for semaphore in job.semaphores:
self._release_one(log, item, job, semaphore, quiet)
for job_semaphore in job.semaphores:
self._release_one(log, item, job, job_semaphore, quiet)
# If a scheduler has been provided (which it is except in the
# case of a rollback from acquire in this class), broadcast an
@ -197,9 +209,9 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
self.tenant_name][pipeline_name].put(
event, needs_result=False)
def _release_one(self, log, item, job, semaphore, quiet):
semaphore_key = quote_plus(semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
def _release_one(self, log, item, job, job_semaphore, quiet):
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
semaphore_path = self._makePath(semaphore)
semaphore_handle = {
"buildset_path": item.current_build_set.getPath(),
"job_name": job.name,
@ -209,16 +221,16 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
legacy_handle)
def semaphoreHolders(self, semaphore_name):
semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore = self.layout.getSemaphore(self.abide, semaphore_name)
semaphore_path = self._makePath(semaphore)
try:
holders, _ = self.getHolders(semaphore_path)
except NoNodeError:
holders = []
return holders
def _max_count(self, semaphore_name: str) -> int:
semaphore = self.layout.semaphores.get(semaphore_name)
def _max_count(self, semaphore_name):
semaphore = self.layout.getSemaphore(self.abide, semaphore_name)
return 1 if semaphore is None else semaphore.max
def cleanupLeaks(self):
@ -240,8 +252,9 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
is not None):
continue
semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore = self.layout.getSemaphore(
self.abide, semaphore_name)
semaphore_path = self._makePath(semaphore)
self.log.error("Releasing leaked semaphore %s held by %s",
semaphore_path, holder)
self._release(self.log, semaphore_path, holder, quiet=False)