From d27a96dd092a409b9b726778d598749a0543bab9 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 10 Jul 2014 13:25:13 -0700 Subject: [PATCH] Cause the enqueue command to ignore pipeline reqs The enqueue command typically is used after a Zuul restart. It may also be useful when the normal method of triggering a change is not possible, or perhaps too time consuming. In all of those cases, it may be desirable to bypass pipeline requirements (and in the first case, it may be necessary since the act of enqueing a change in a pipeline may cause it, as a side effect, to no longer satisfy the requirements for that pipeline). To resolve, this, ignore the pipeline requirements when a change is enqueued into a pipeline as the result of an administrative enqueue command. Change-Id: Ic672452cd99355e3ee91df3c649f944ef75958a9 --- zuul/rpclistener.py | 2 +- zuul/scheduler.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 8719c76fb7..fcf11615f8 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -116,7 +116,7 @@ class RPCListener(object): if errors: job.sendWorkException(errors.encode('utf8')) else: - self.sched.addEvent(event) + self.sched.enqueue(event) job.sendWorkComplete() def handle_promote(self, job): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 0c653e7aec..15520ac792 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -109,6 +109,18 @@ class PromoteEvent(ManagementEvent): self.change_ids = change_ids +class EnqueueEvent(ManagementEvent): + """Enqueue a change into a pipeline + + :arg TriggerEvent trigger_event: a TriggerEvent describing the + trigger, pipeline, and change to enqueue + """ + + def __init__(self, trigger_event): + super(EnqueueEvent, self).__init__() + self.trigger_event = trigger_event + + class ResultEvent(object): """An event that needs to modify the pipeline state due to a result from an external system.""" @@ -522,6 +534,14 @@ class Scheduler(threading.Thread): event.wait() self.log.debug("Promotion complete") + def enqueue(self, trigger_event): + event = EnqueueEvent(trigger_event) + self.management_event_queue.put(event) + self.wake_event.set() + self.log.debug("Waiting for enqueue") + event.wait() + self.log.debug("Enqueue complete") + def exit(self): self.log.debug("Prepare to exit") self._pause = True @@ -691,6 +711,17 @@ class Scheduler(threading.Thread): quiet=True, ignore_requirements=True) + def _doEnqueueEvent(self, event): + project = self.layout.projects.get(event.project_name) + pipeline = self.layout.pipelines[event.forced_pipeline] + trigger = self.triggers.get(event.trigger_name) + change = event.getChange(project, trigger) + self.log.debug("Event %s for change %s was directly assigned " + "to pipeline %s" % (event, change, self)) + self.log.info("Adding %s, %s to %s" % + (project, change, pipeline)) + pipeline.manager.addChange(change, ignore_requirements=True) + def _areAllBuildsComplete(self): self.log.debug("Checking if all builds are complete") waiting = False @@ -800,6 +831,8 @@ class Scheduler(threading.Thread): self._doReconfigureEvent(event) elif isinstance(event, PromoteEvent): self._doPromoteEvent(event) + elif isinstance(event, EnqueueEvent): + self._doEnqueueEvent(event.trigger_event) else: self.log.error("Unable to handle event %s" % event) event.done()