Make queue processing more efficient

Instead of processing all of the queues each time any event happens,
first process all of the events (which are really queued up just so
that they can modify the pipeline data safely within the main run loop)
and then run the queue processor (which will then launch or cancel jobs
based on the new state of the queues).

This adds a lock to the run processor which is only needed so that
the test suite can safely inspect the state of the scheduler to
determine whether it is idle.

The result queue is made somewhat more generic in preparation for
handling merge result events.

Change-Id: I6bbb52f77b070df04a110a9d61b426265b1e89cc
This commit is contained in:
James E. Blair 2014-02-06 07:09:22 -08:00
parent bc03496b99
commit a84f0e4179
2 changed files with 100 additions and 60 deletions

View File

@ -1083,13 +1083,16 @@ class TestScheduler(testtools.TestCase):
self.fake_gerrit.event_queue.join() self.fake_gerrit.event_queue.join()
self.sched.trigger_event_queue.join() self.sched.trigger_event_queue.join()
self.sched.result_event_queue.join() self.sched.result_event_queue.join()
self.sched.run_handler_lock.acquire()
if (self.sched.trigger_event_queue.empty() and if (self.sched.trigger_event_queue.empty() and
self.sched.result_event_queue.empty() and self.sched.result_event_queue.empty() and
self.fake_gerrit.event_queue.empty() and self.fake_gerrit.event_queue.empty() and
self.areAllBuildsWaiting()): self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
self.worker.lock.release() self.worker.lock.release()
self.log.debug("...settled.") self.log.debug("...settled.")
return return
self.sched.run_handler_lock.release()
self.worker.lock.release() self.worker.lock.release()
self.sched.wake_event.wait(0.1) self.sched.wake_event.wait(0.1)

View File

@ -109,6 +109,33 @@ class PromoteEvent(ManagementEvent):
self.change_ids = change_ids self.change_ids = change_ids
class ResultEvent(object):
"""An event that needs to modify the pipeline state due to a
result from an external system."""
pass
class BuildStartedEvent(ResultEvent):
"""A build has started.
:arg Build build: The build which has started.
"""
def __init__(self, build):
self.build = build
class BuildCompletedEvent(ResultEvent):
"""A build has completed
:arg Build build: The build which has completed.
"""
def __init__(self, build):
self.build = build
class Scheduler(threading.Thread): class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler") log = logging.getLogger("zuul.Scheduler")
@ -117,6 +144,7 @@ class Scheduler(threading.Thread):
self.daemon = True self.daemon = True
self.wake_event = threading.Event() self.wake_event = threading.Event()
self.layout_lock = threading.Lock() self.layout_lock = threading.Lock()
self.run_handler_lock = threading.Lock()
self._pause = False self._pause = False
self._exit = False self._exit = False
self._stopped = False self._stopped = False
@ -414,7 +442,8 @@ class Scheduler(threading.Thread):
def onBuildStarted(self, build): def onBuildStarted(self, build):
self.log.debug("Adding start event for build: %s" % build) self.log.debug("Adding start event for build: %s" % build)
build.start_time = time.time() build.start_time = time.time()
self.result_event_queue.put(('started', build)) event = BuildStartedEvent(build)
self.result_event_queue.put(event)
self.wake_event.set() self.wake_event.set()
self.log.debug("Done adding start event for build: %s" % build) self.log.debug("Done adding start event for build: %s" % build)
@ -434,7 +463,8 @@ class Scheduler(threading.Thread):
statsd.incr(key) statsd.incr(key)
except: except:
self.log.exception("Exception reporting runtime stats") self.log.exception("Exception reporting runtime stats")
self.result_event_queue.put(('completed', build)) event = BuildCompletedEvent(build)
self.result_event_queue.put(event)
self.wake_event.set() self.wake_event.set()
self.log.debug("Done adding complete event for build: %s" % build) self.log.debug("Done adding complete event for build: %s" % build)
@ -617,8 +647,6 @@ class Scheduler(threading.Thread):
item.change, item.change,
enqueue_time=item.enqueue_time, enqueue_time=item.enqueue_time,
quiet=True) quiet=True)
while pipeline.manager.processQueue():
pass
def _areAllBuildsComplete(self): def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete") self.log.debug("Checking if all builds are complete")
@ -646,35 +674,37 @@ class Scheduler(threading.Thread):
if self._stopped: if self._stopped:
return return
self.log.debug("Run handler awake") self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
try: try:
if not self.management_event_queue.empty(): while not self.management_event_queue.empty():
self.process_management_queue() self.process_management_queue()
# Give result events priority -- they let us stop builds, # Give result events priority -- they let us stop builds,
# whereas trigger evensts cause us to launch builds. # whereas trigger evensts cause us to launch builds.
if not self.result_event_queue.empty(): while not self.result_event_queue.empty():
self.process_result_queue() self.process_result_queue()
elif not self._pause:
if not self.trigger_event_queue.empty(): if not self._pause:
while not self.trigger_event_queue.empty():
self.process_event_queue() self.process_event_queue()
if self._pause and self._areAllBuildsComplete(): if self._pause and self._areAllBuildsComplete():
self._doPauseEvent() self._doPauseEvent()
if not self._pause: for pipeline in self.layout.pipelines.values():
if not (self.trigger_event_queue.empty() and while pipeline.manager.processQueue():
self.result_event_queue.empty()): pass
self.wake_event.set()
else:
if not self.result_event_queue.empty():
self.wake_event.set()
if self._maintain_trigger_cache: if self._maintain_trigger_cache:
self.maintainTriggerCache() self.maintainTriggerCache()
self._maintain_trigger_cache = False self._maintain_trigger_cache = False
except: except Exception:
self.log.exception("Exception in run handler:") self.log.exception("Exception in run handler:")
# There may still be more events to process
self.wake_event.set()
finally:
self.run_handler_lock.release()
def maintainTriggerCache(self): def maintainTriggerCache(self):
relevant = set() relevant = set()
@ -692,35 +722,34 @@ class Scheduler(threading.Thread):
self.log.debug("Fetching trigger event") self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get() event = self.trigger_event_queue.get()
self.log.debug("Processing trigger event %s" % event) self.log.debug("Processing trigger event %s" % event)
project = self.layout.projects.get(event.project_name) try:
if not project: project = self.layout.projects.get(event.project_name)
self.log.warning("Project %s not found" % event.project_name) if not project:
self.log.warning("Project %s not found" % event.project_name)
return
# Preprocessing for ref-update events
if event.ref:
# Make sure the local git repo is up-to-date with the
# remote one. We better have the new ref before
# enqueuing the changes. This is done before
# enqueuing the changes to avoid calling an update per
# pipeline accepting the change.
self.log.info("Fetching references for %s" % project)
url = self.triggers['gerrit'].getGitUrl(project)
self.merger.updateRepo(project.name, url)
for pipeline in self.layout.pipelines.values():
change = event.getChange(project,
self.triggers.get(event.trigger_name))
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
if pipeline.manager.eventMatches(event, change):
self.log.info("Adding %s, %s to %s" %
(project, change, pipeline))
pipeline.manager.addChange(change)
finally:
self.trigger_event_queue.task_done() self.trigger_event_queue.task_done()
return
# Preprocessing for ref-update events
if event.ref:
# Make sure the local git repo is up-to-date with the remote one.
# We better have the new ref before enqueuing the changes.
# This is done before enqueuing the changes to avoid calling an
# update per pipeline accepting the change.
self.log.info("Fetching references for %s" % project)
url = self.triggers['gerrit'].getGitUrl(project)
self.merger.updateRepo(project.name, url)
for pipeline in self.layout.pipelines.values():
change = event.getChange(project,
self.triggers.get(event.trigger_name))
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
if pipeline.manager.eventMatches(event, change):
self.log.info("Adding %s, %s to %s" %
(project, change, pipeline))
pipeline.manager.addChange(change)
while pipeline.manager.processQueue():
pass
self.trigger_event_queue.task_done()
def process_management_queue(self): def process_management_queue(self):
self.log.debug("Fetching management event") self.log.debug("Fetching management event")
@ -740,19 +769,31 @@ class Scheduler(threading.Thread):
def process_result_queue(self): def process_result_queue(self):
self.log.debug("Fetching result event") self.log.debug("Fetching result event")
event_type, build = self.result_event_queue.get() event = self.result_event_queue.get()
self.log.debug("Processing result event %s" % build) self.log.debug("Processing result event %s" % event)
try:
if isinstance(event, BuildStartedEvent):
self._doBuildStartedEvent(event)
elif isinstance(event, BuildCompletedEvent):
self._doBuildCompletedEvent(event)
else:
self.log.error("Unable to handle event %s" % event)
finally:
self.result_event_queue.task_done()
def _doBuildStartedEvent(self, event):
for pipeline in self.layout.pipelines.values(): for pipeline in self.layout.pipelines.values():
if event_type == 'started': if pipeline.manager.onBuildStarted(event.build):
if pipeline.manager.onBuildStarted(build): return
self.result_event_queue.task_done() self.log.warning("Build %s not found by any queue manager" %
return (event.build))
elif event_type == 'completed':
if pipeline.manager.onBuildCompleted(build): def _doBuildCompletedEvent(self, event):
self.result_event_queue.task_done() for pipeline in self.layout.pipelines.values():
return if pipeline.manager.onBuildCompleted(event.build):
self.log.warning("Build %s not found by any queue manager" % (build)) return
self.result_event_queue.task_done() self.log.warning("Build %s not found by any queue manager" %
(event.build))
def formatStatusHTML(self): def formatStatusHTML(self):
ret = '<html><pre>' ret = '<html><pre>'
@ -1232,8 +1273,6 @@ class BasePipelineManager(object):
self.log.debug("Build %s started" % build) self.log.debug("Build %s started" % build)
self.updateBuildDescriptions(build.build_set) self.updateBuildDescriptions(build.build_set)
while self.processQueue():
pass
return True return True
def onBuildCompleted(self, build): def onBuildCompleted(self, build):
@ -1253,8 +1292,6 @@ class BasePipelineManager(object):
self.log.debug("Change %s status is now:\n %s" % self.log.debug("Change %s status is now:\n %s" %
(change, self.pipeline.formatStatus(change))) (change, self.pipeline.formatStatus(change)))
self.updateBuildDescriptions(build.build_set) self.updateBuildDescriptions(build.build_set)
while self.processQueue():
pass
return True return True
def reportItem(self, item): def reportItem(self, item):