Merge "Add playbook semaphores"
This commit is contained in:
@@ -59,6 +59,7 @@ from zuul.model import (
|
||||
Change,
|
||||
ChangeManagementEvent,
|
||||
PipelinePostConfigEvent,
|
||||
SemaphoreReleaseEvent,
|
||||
PipelineSemaphoreReleaseEvent,
|
||||
DequeueEvent,
|
||||
EnqueueEvent,
|
||||
@@ -1944,6 +1945,27 @@ class Scheduler(threading.Thread):
|
||||
pipeline.manager.removeItem(item)
|
||||
return
|
||||
|
||||
def _doSemaphoreReleaseEvent(self, event, pipeline):
|
||||
tenant = pipeline.tenant
|
||||
semaphore = tenant.layout.getSemaphore(
|
||||
self.abide, event.semaphore_name)
|
||||
if semaphore.global_scope:
|
||||
tenants = [t for t in self.abide.tenants.values()
|
||||
if event.semaphore_name in t.global_semaphores]
|
||||
else:
|
||||
tenants = [tenant]
|
||||
for tenant in tenants:
|
||||
for pipeline_name in tenant.layout.pipelines.keys():
|
||||
if (tenant.name == pipeline.tenant.name and
|
||||
pipeline_name == pipeline.name):
|
||||
# This pipeline is already awake because it is
|
||||
# where this event originated.
|
||||
continue
|
||||
event = PipelineSemaphoreReleaseEvent()
|
||||
self.pipeline_management_events[
|
||||
tenant.name][pipeline_name].put(
|
||||
event, needs_result=False)
|
||||
|
||||
def _areAllBuildsComplete(self):
|
||||
self.log.debug("Checking if all builds are complete")
|
||||
waiting = False
|
||||
@@ -2570,6 +2592,8 @@ class Scheduler(threading.Thread):
|
||||
self._doFilesChangesCompletedEvent(event, pipeline)
|
||||
elif isinstance(event, NodesProvisionedEvent):
|
||||
self._doNodesProvisionedEvent(event, pipeline)
|
||||
elif isinstance(event, SemaphoreReleaseEvent):
|
||||
self._doSemaphoreReleaseEvent(event, pipeline)
|
||||
else:
|
||||
self.log.error("Unable to handle event %s", event)
|
||||
|
||||
@@ -2944,8 +2968,11 @@ class Scheduler(threading.Thread):
|
||||
buildset.addBuild(fakebuild)
|
||||
finally:
|
||||
# Release the semaphore in any case
|
||||
tenant = buildset.item.pipeline.tenant
|
||||
tenant.semaphore_handler.release(self, item, job)
|
||||
pipeline = buildset.item.pipeline
|
||||
tenant = pipeline.tenant
|
||||
event_queue = self.pipeline_result_events[
|
||||
tenant.name][pipeline.name]
|
||||
tenant.semaphore_handler.release(event_queue, item, job)
|
||||
|
||||
def createZKContext(self, lock, log):
|
||||
return ZKContext(self.zk_client, lock, self.stop_event, log)
|
||||
|
||||
Reference in New Issue
Block a user