diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 8719c76fb7..fcf11615f8 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -116,7 +116,7 @@ class RPCListener(object): if errors: job.sendWorkException(errors.encode('utf8')) else: - self.sched.addEvent(event) + self.sched.enqueue(event) job.sendWorkComplete() def handle_promote(self, job): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 0c653e7aec..15520ac792 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -109,6 +109,18 @@ class PromoteEvent(ManagementEvent): self.change_ids = change_ids +class EnqueueEvent(ManagementEvent): + """Enqueue a change into a pipeline + + :arg TriggerEvent trigger_event: a TriggerEvent describing the + trigger, pipeline, and change to enqueue + """ + + def __init__(self, trigger_event): + super(EnqueueEvent, self).__init__() + self.trigger_event = trigger_event + + class ResultEvent(object): """An event that needs to modify the pipeline state due to a result from an external system.""" @@ -522,6 +534,14 @@ class Scheduler(threading.Thread): event.wait() self.log.debug("Promotion complete") + def enqueue(self, trigger_event): + event = EnqueueEvent(trigger_event) + self.management_event_queue.put(event) + self.wake_event.set() + self.log.debug("Waiting for enqueue") + event.wait() + self.log.debug("Enqueue complete") + def exit(self): self.log.debug("Prepare to exit") self._pause = True @@ -691,6 +711,17 @@ class Scheduler(threading.Thread): quiet=True, ignore_requirements=True) + def _doEnqueueEvent(self, event): + project = self.layout.projects.get(event.project_name) + pipeline = self.layout.pipelines[event.forced_pipeline] + trigger = self.triggers.get(event.trigger_name) + change = event.getChange(project, trigger) + self.log.debug("Event %s for change %s was directly assigned " + "to pipeline %s" % (event, change, self)) + self.log.info("Adding %s, %s to %s" % + (project, change, pipeline)) + pipeline.manager.addChange(change, ignore_requirements=True) + def _areAllBuildsComplete(self): self.log.debug("Checking if all builds are complete") waiting = False @@ -800,6 +831,8 @@ class Scheduler(threading.Thread): self._doReconfigureEvent(event) elif isinstance(event, PromoteEvent): self._doPromoteEvent(event) + elif isinstance(event, EnqueueEvent): + self._doEnqueueEvent(event.trigger_event) else: self.log.error("Unable to handle event %s" % event) event.done()