Merge "Process semaphore release as tenant events"

This commit is contained in:
Zuul 2025-03-04 09:08:09 +00:00 committed by Gerrit Code Review
commit 364ffbca62
9 changed files with 108 additions and 29 deletions

View File

@ -233,3 +233,10 @@ Version 32
:Prior Zuul version: 11.1.0
:Description: Add topic query timestamp.
Affects schedulers.
Version 33
----------
:Prior Zuul version: 11.2.0
:Description: Send SemaphoreReleaseEvents to the tenant management event queue
instead of the pipeline trigger event queue.
Affects schedulers and executors.

View File

@ -30,6 +30,7 @@ from tests.base import (
)
from zuul.zk import ZooKeeperClient
from zuul.zk.branch_cache import BranchCache, BranchFlag
from zuul.zk.locks import management_queue_lock
from zuul.zk.zkobject import ZKContext
from tests.unit.test_zk import DummyConnection
@ -420,3 +421,42 @@ class TestBranchCacheUpgrade(BaseTestCase):
for branch_name in cache_project.branches.keys():
if branch_name not in project['branches']:
raise Exception(f"Unexpected branch {branch_name}")
class TestSemaphoreReleaseUpgrade(ZuulTestCase):
tenant_config_file = 'config/global-semaphores/main.yaml'
@model_version(32)
def test_model_32(self):
# This tests that a job finishing in one tenant will correctly
# start a job in another tenant waiting on the semaphore.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([])
self.assertBuilds([
dict(name='test-global-semaphore', changes='1,1'),
])
# Block tenant management event queues so we know that the
# semaphore release events are dispatched via the pipeline
# trigger event queue.
with (management_queue_lock(self.zk_client, "tenant-one"),
management_queue_lock(self.zk_client, "tenant-two")):
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='test-global-semaphore',
result='SUCCESS', changes='1,1'),
dict(name='test-global-semaphore',
result='SUCCESS', changes='2,1'),
], ordered=False)

View File

@ -82,7 +82,10 @@ from zuul.model import (
import zuul.model
from zuul.nodepool import Nodepool
from zuul.version import get_version_string
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.event_queues import (
PipelineResultEventQueue,
TenantManagementEventQueue,
)
from zuul.zk.blob_store import BlobStore
from zuul.zk.components import ExecutorComponent, COMPONENT_REGISTRY
from zuul.zk.exceptions import JobRequestNotFound
@ -3636,9 +3639,13 @@ class AnsibleJob(object):
self.RESULT_MAP[result], code))
if acquired_semaphores:
event_queue = self.executor_server.result_events[
self.build_request.tenant_name][
self.build_request.pipeline_name]
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.executor_server.management_events[
self.build_request.tenant_name]
else:
event_queue = self.executor_server.result_events[
self.build_request.tenant_name][
self.build_request.pipeline_name]
self.executor_server.semaphore_handler.releaseFromInfo(
self.log, event_queue, playbook.semaphores, semaphore_handle)
@ -3843,6 +3850,8 @@ class ExecutorServer(BaseMergeServer):
self.statsd)
self.launcher = LauncherClient(self.zk_client, None)
self.management_events = TenantManagementEventQueue.createRegistry(
self.zk_client)
self.result_events = PipelineResultEventQueue.createRegistry(
self.zk_client)
self.build_worker = threading.Thread(

View File

@ -1260,10 +1260,13 @@ class PipelineManager(metaclass=ABCMeta):
# If we hit an exception we don't have a build in the
# current item so a potentially aquired semaphore must be
# released as it won't be released on dequeue of the item.
tenant = item.pipeline.tenant
pipeline = build_set.item.pipeline
event_queue = self.sched.pipeline_result_events[
tenant.name][pipeline.name]
tenant = pipeline.tenant
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.sched.management_events[tenant.name]
else:
event_queue = self.sched.pipeline_result_events[
tenant.name][pipeline.name]
tenant.semaphore_handler.release(event_queue, item, job)
except Exception:
log.exception("Exception while releasing semaphore")
@ -2127,8 +2130,12 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("Build %s of %s completed", build, item)
event_queue = self.sched.pipeline_result_events[
item.pipeline.tenant.name][item.pipeline.name]
pipeline = item.pipeline
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.sched.management_events[pipeline.tenant.name]
else:
event_queue = self.sched.pipeline_result_events[
pipeline.tenant.name][pipeline.name]
item.pipeline.tenant.semaphore_handler.release(
event_queue, item, build.job)
@ -2284,8 +2291,11 @@ class PipelineManager(metaclass=ABCMeta):
self._resumeBuilds(build_set)
pipeline = build_set.item.pipeline
tenant = pipeline.tenant
event_queue = self.sched.pipeline_result_events[
tenant.name][pipeline.name]
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.sched.management_events[tenant.name]
else:
event_queue = self.sched.pipeline_result_events[
tenant.name][pipeline.name]
tenant.semaphore_handler.release(
event_queue, build_set.item, job)

View File

@ -7980,7 +7980,7 @@ class ResultEvent(AbstractEvent):
pass
class SemaphoreReleaseEvent(ResultEvent):
class SemaphoreReleaseEvent(ManagementEvent):
"""Enqueued after a semaphore has been released in order
to trigger a processing run.
@ -7990,16 +7990,19 @@ class SemaphoreReleaseEvent(ResultEvent):
"""
def __init__(self, semaphore_name):
super().__init__()
self.semaphore_name = semaphore_name
def toDict(self):
return {
"semaphore_name": self.semaphore_name,
}
d = super().toDict()
d["semaphore_name"] = self.semaphore_name
return d
@classmethod
def fromDict(cls, data):
return cls(data.get("semaphore_name"))
event = cls(data.get("semaphore_name"))
event.updateFromDict(data)
return event
def __repr__(self):
return (

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 32
MODEL_API = 33

View File

@ -2085,8 +2085,7 @@ class Scheduler(threading.Thread):
pipeline.manager.removeItem(item)
return
def _doSemaphoreReleaseEvent(self, event, pipeline):
tenant = pipeline.tenant
def _doSemaphoreReleaseEvent(self, event, tenant):
semaphore = tenant.layout.getSemaphore(
self.abide, event.semaphore_name)
if semaphore.global_scope:
@ -2096,11 +2095,6 @@ class Scheduler(threading.Thread):
tenants = [tenant]
for tenant in tenants:
for pipeline_name in tenant.layout.pipelines.keys():
if (tenant.name == pipeline.tenant.name and
pipeline_name == pipeline.name):
# This pipeline is already awake because it is
# where this event originated.
continue
event = PipelineSemaphoreReleaseEvent()
self.pipeline_management_events[
tenant.name][pipeline_name].put(
@ -2676,6 +2670,8 @@ class Scheduler(threading.Thread):
self._doTenantReconfigureEvent(event)
elif isinstance(event, (PromoteEvent, ChangeManagementEvent)):
event_forwarded = self._forward_management_event(event)
elif isinstance(event, SemaphoreReleaseEvent):
self._doSemaphoreReleaseEvent(event, tenant)
else:
self.log.error("Unable to handle event %s for tenant %s",
event, tenant.name)
@ -2799,7 +2795,10 @@ class Scheduler(threading.Thread):
elif isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event, pipeline)
elif isinstance(event, SemaphoreReleaseEvent):
self._doSemaphoreReleaseEvent(event, pipeline)
# MODEL_API <= 32
# Kept for backward compatibility; semaphore release events
# are now processed in the management event queue.
self._doSemaphoreReleaseEvent(event, pipeline.tenant)
else:
self.log.error("Unable to handle event %s", event)
@ -3209,8 +3208,11 @@ class Scheduler(threading.Thread):
# Release the semaphore in any case
pipeline = buildset.item.pipeline
tenant = pipeline.tenant
event_queue = self.pipeline_result_events[
tenant.name][pipeline.name]
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.management_events[tenant.name]
else:
event_queue = self.pipeline_result_events[
tenant.name][pipeline.name]
tenant.semaphore_handler.release(event_queue, item, job)
# Image related methods

View File

@ -41,6 +41,9 @@ RESULT_EVENT_TYPE_MAP = {
"FilesChangesCompletedEvent": model.FilesChangesCompletedEvent,
"MergeCompletedEvent": model.MergeCompletedEvent,
"NodesProvisionedEvent": model.NodesProvisionedEvent,
# MODEL_API <= 32
# Kept for backward compatibility; semaphore release events
# are now processed in the management event queue.
"SemaphoreReleaseEvent": model.SemaphoreReleaseEvent,
}
@ -49,6 +52,7 @@ MANAGEMENT_EVENT_TYPE_MAP = {
"EnqueueEvent": model.EnqueueEvent,
"PromoteEvent": model.PromoteEvent,
"ReconfigureEvent": model.ReconfigureEvent,
"SemaphoreReleaseEvent": model.SemaphoreReleaseEvent,
"TenantReconfigureEvent": model.TenantReconfigureEvent,
"PipelinePostConfigEvent": model.PipelinePostConfigEvent,
"PipelineSemaphoreReleaseEvent": model.PipelineSemaphoreReleaseEvent,

View File

@ -22,6 +22,7 @@ from kazoo.exceptions import BadVersionError, NoNodeError
from zuul.lib.logutil import get_annotated_logger
from zuul.model import SemaphoreReleaseEvent
from zuul.zk import ZooKeeperSimpleBase
from zuul.zk.event_queues import PipelineResultEventQueue
def holdersFromData(data):
@ -183,12 +184,15 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
def releaseFromInfo(self, log, event_queue, infos, handle, quiet=False):
for info in infos:
self._release_one(log, info, handle, quiet)
if event_queue:
if event_queue is not None:
# If a scheduler has been provided (which it is except
# in the case of a rollback from acquire in this
# class), broadcast an event to trigger pipeline runs.
event = SemaphoreReleaseEvent(info['name'])
event_queue.put(event)
if isinstance(event_queue, PipelineResultEventQueue):
event_queue.put(event)
else:
event_queue.put(event, needs_result=False)
def _release_one(self, log, info, handle, quiet=False):
while True: