From a84f0e41792159b37c4114959199c525c28b5001 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 6 Feb 2014 07:09:22 -0800 Subject: [PATCH] Make queue processing more efficient Instead of processing all of the queues each time any event happens, first process all of the events (which are really queued up just so that they can modify the pipeline data safely within the main run loop) and then run the queue processor (which will then launch or cancel jobs based on the new state of the queues). This adds a lock to the run processor which is only needed so that the test suite can safely inspect the state of the scheduler to determine whether it is idle. The result queue is made somewhat more generic in preparation for handling merge result events. Change-Id: I6bbb52f77b070df04a110a9d61b426265b1e89cc --- tests/test_scheduler.py | 3 + zuul/scheduler.py | 157 +++++++++++++++++++++++++--------------- 2 files changed, 100 insertions(+), 60 deletions(-) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 3d3602ee5b..67c4709c67 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1083,13 +1083,16 @@ class TestScheduler(testtools.TestCase): self.fake_gerrit.event_queue.join() self.sched.trigger_event_queue.join() self.sched.result_event_queue.join() + self.sched.run_handler_lock.acquire() if (self.sched.trigger_event_queue.empty() and self.sched.result_event_queue.empty() and self.fake_gerrit.event_queue.empty() and self.areAllBuildsWaiting()): + self.sched.run_handler_lock.release() self.worker.lock.release() self.log.debug("...settled.") return + self.sched.run_handler_lock.release() self.worker.lock.release() self.sched.wake_event.wait(0.1) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index cd954f179a..805d334afd 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -109,6 +109,33 @@ class PromoteEvent(ManagementEvent): self.change_ids = change_ids +class ResultEvent(object): + """An event that needs to modify the pipeline state due to a + result from an external system.""" + + pass + + +class BuildStartedEvent(ResultEvent): + """A build has started. + + :arg Build build: The build which has started. + """ + + def __init__(self, build): + self.build = build + + +class BuildCompletedEvent(ResultEvent): + """A build has completed + + :arg Build build: The build which has completed. + """ + + def __init__(self, build): + self.build = build + + class Scheduler(threading.Thread): log = logging.getLogger("zuul.Scheduler") @@ -117,6 +144,7 @@ class Scheduler(threading.Thread): self.daemon = True self.wake_event = threading.Event() self.layout_lock = threading.Lock() + self.run_handler_lock = threading.Lock() self._pause = False self._exit = False self._stopped = False @@ -414,7 +442,8 @@ class Scheduler(threading.Thread): def onBuildStarted(self, build): self.log.debug("Adding start event for build: %s" % build) build.start_time = time.time() - self.result_event_queue.put(('started', build)) + event = BuildStartedEvent(build) + self.result_event_queue.put(event) self.wake_event.set() self.log.debug("Done adding start event for build: %s" % build) @@ -434,7 +463,8 @@ class Scheduler(threading.Thread): statsd.incr(key) except: self.log.exception("Exception reporting runtime stats") - self.result_event_queue.put(('completed', build)) + event = BuildCompletedEvent(build) + self.result_event_queue.put(event) self.wake_event.set() self.log.debug("Done adding complete event for build: %s" % build) @@ -617,8 +647,6 @@ class Scheduler(threading.Thread): item.change, enqueue_time=item.enqueue_time, quiet=True) - while pipeline.manager.processQueue(): - pass def _areAllBuildsComplete(self): self.log.debug("Checking if all builds are complete") @@ -646,35 +674,37 @@ class Scheduler(threading.Thread): if self._stopped: return self.log.debug("Run handler awake") + self.run_handler_lock.acquire() try: - if not self.management_event_queue.empty(): + while not self.management_event_queue.empty(): self.process_management_queue() # Give result events priority -- they let us stop builds, # whereas trigger evensts cause us to launch builds. - if not self.result_event_queue.empty(): + while not self.result_event_queue.empty(): self.process_result_queue() - elif not self._pause: - if not self.trigger_event_queue.empty(): + + if not self._pause: + while not self.trigger_event_queue.empty(): self.process_event_queue() if self._pause and self._areAllBuildsComplete(): self._doPauseEvent() - if not self._pause: - if not (self.trigger_event_queue.empty() and - self.result_event_queue.empty()): - self.wake_event.set() - else: - if not self.result_event_queue.empty(): - self.wake_event.set() + for pipeline in self.layout.pipelines.values(): + while pipeline.manager.processQueue(): + pass if self._maintain_trigger_cache: self.maintainTriggerCache() self._maintain_trigger_cache = False - except: + except Exception: self.log.exception("Exception in run handler:") + # There may still be more events to process + self.wake_event.set() + finally: + self.run_handler_lock.release() def maintainTriggerCache(self): relevant = set() @@ -692,35 +722,34 @@ class Scheduler(threading.Thread): self.log.debug("Fetching trigger event") event = self.trigger_event_queue.get() self.log.debug("Processing trigger event %s" % event) - project = self.layout.projects.get(event.project_name) - if not project: - self.log.warning("Project %s not found" % event.project_name) + try: + project = self.layout.projects.get(event.project_name) + if not project: + self.log.warning("Project %s not found" % event.project_name) + return + + # Preprocessing for ref-update events + if event.ref: + # Make sure the local git repo is up-to-date with the + # remote one. We better have the new ref before + # enqueuing the changes. This is done before + # enqueuing the changes to avoid calling an update per + # pipeline accepting the change. + self.log.info("Fetching references for %s" % project) + url = self.triggers['gerrit'].getGitUrl(project) + self.merger.updateRepo(project.name, url) + + for pipeline in self.layout.pipelines.values(): + change = event.getChange(project, + self.triggers.get(event.trigger_name)) + if event.type == 'patchset-created': + pipeline.manager.removeOldVersionsOfChange(change) + if pipeline.manager.eventMatches(event, change): + self.log.info("Adding %s, %s to %s" % + (project, change, pipeline)) + pipeline.manager.addChange(change) + finally: self.trigger_event_queue.task_done() - return - - # Preprocessing for ref-update events - if event.ref: - # Make sure the local git repo is up-to-date with the remote one. - # We better have the new ref before enqueuing the changes. - # This is done before enqueuing the changes to avoid calling an - # update per pipeline accepting the change. - self.log.info("Fetching references for %s" % project) - url = self.triggers['gerrit'].getGitUrl(project) - self.merger.updateRepo(project.name, url) - - for pipeline in self.layout.pipelines.values(): - change = event.getChange(project, - self.triggers.get(event.trigger_name)) - if event.type == 'patchset-created': - pipeline.manager.removeOldVersionsOfChange(change) - if pipeline.manager.eventMatches(event, change): - self.log.info("Adding %s, %s to %s" % - (project, change, pipeline)) - pipeline.manager.addChange(change) - while pipeline.manager.processQueue(): - pass - - self.trigger_event_queue.task_done() def process_management_queue(self): self.log.debug("Fetching management event") @@ -740,19 +769,31 @@ class Scheduler(threading.Thread): def process_result_queue(self): self.log.debug("Fetching result event") - event_type, build = self.result_event_queue.get() - self.log.debug("Processing result event %s" % build) + event = self.result_event_queue.get() + self.log.debug("Processing result event %s" % event) + try: + if isinstance(event, BuildStartedEvent): + self._doBuildStartedEvent(event) + elif isinstance(event, BuildCompletedEvent): + self._doBuildCompletedEvent(event) + else: + self.log.error("Unable to handle event %s" % event) + finally: + self.result_event_queue.task_done() + + def _doBuildStartedEvent(self, event): for pipeline in self.layout.pipelines.values(): - if event_type == 'started': - if pipeline.manager.onBuildStarted(build): - self.result_event_queue.task_done() - return - elif event_type == 'completed': - if pipeline.manager.onBuildCompleted(build): - self.result_event_queue.task_done() - return - self.log.warning("Build %s not found by any queue manager" % (build)) - self.result_event_queue.task_done() + if pipeline.manager.onBuildStarted(event.build): + return + self.log.warning("Build %s not found by any queue manager" % + (event.build)) + + def _doBuildCompletedEvent(self, event): + for pipeline in self.layout.pipelines.values(): + if pipeline.manager.onBuildCompleted(event.build): + return + self.log.warning("Build %s not found by any queue manager" % + (event.build)) def formatStatusHTML(self): ret = '
'
@@ -1232,8 +1273,6 @@ class BasePipelineManager(object):
 
         self.log.debug("Build %s started" % build)
         self.updateBuildDescriptions(build.build_set)
-        while self.processQueue():
-            pass
         return True
 
     def onBuildCompleted(self, build):
@@ -1253,8 +1292,6 @@ class BasePipelineManager(object):
         self.log.debug("Change %s status is now:\n %s" %
                        (change, self.pipeline.formatStatus(change)))
         self.updateBuildDescriptions(build.build_set)
-        while self.processQueue():
-            pass
         return True
 
     def reportItem(self, item):