Merge "Refactor reconfiguration into a management event"

This commit is contained in:
Jenkins 2013-12-11 16:40:07 +00:00 committed by Gerrit Code Review
commit d8481d062b
1 changed files with 43 additions and 13 deletions

View File

@ -60,6 +60,30 @@ class MergeFailure(Exception):
pass pass
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
def setComplete(self):
self._wait_event.set()
def wait(self, timeout=None):
self._wait_event.wait(timeout)
return self._wait_event.is_set()
class ReconfigureEvent(ManagementEvent):
"""Reconfigure the scheduler. The layout will be (re-)loaded from
the path specified in the configuration.
:arg ConfigParser config: the new configuration
"""
def __init__(self, config):
super(ReconfigureEvent, self).__init__()
self.config = config
class Scheduler(threading.Thread): class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler") log = logging.getLogger("zuul.Scheduler")
@ -68,9 +92,7 @@ class Scheduler(threading.Thread):
self.daemon = True self.daemon = True
self.wake_event = threading.Event() self.wake_event = threading.Event()
self.layout_lock = threading.Lock() self.layout_lock = threading.Lock()
self.reconfigure_complete_event = threading.Event()
self._pause = False self._pause = False
self._reconfigure = False
self._exit = False self._exit = False
self._stopped = False self._stopped = False
self.launcher = None self.launcher = None
@ -81,6 +103,7 @@ class Scheduler(threading.Thread):
self.trigger_event_queue = Queue.Queue() self.trigger_event_queue = Queue.Queue()
self.result_event_queue = Queue.Queue() self.result_event_queue = Queue.Queue()
self.management_event_queue = Queue.Queue()
self.layout = model.Layout() self.layout = model.Layout()
def stop(self): def stop(self):
@ -366,12 +389,11 @@ class Scheduler(threading.Thread):
def reconfigure(self, config): def reconfigure(self, config):
self.log.debug("Prepare to reconfigure") self.log.debug("Prepare to reconfigure")
self.config = config event = ReconfigureEvent(config)
self._reconfigure = True self.management_event_queue.put(event)
self.wake_event.set() self.wake_event.set()
self.log.debug("Waiting for reconfiguration") self.log.debug("Waiting for reconfiguration")
self.reconfigure_complete_event.wait() event.wait()
self.reconfigure_complete_event.clear()
self.log.debug("Reconfiguration complete") self.log.debug("Reconfiguration complete")
def exit(self): def exit(self):
@ -434,10 +456,11 @@ class Scheduler(threading.Thread):
self._save_queue() self._save_queue()
os._exit(0) os._exit(0)
def _doReconfigureEvent(self): def _doReconfigureEvent(self, event):
# This is called in the scheduler loop after another thread sets # This is called in the scheduler loop after another thread submits
# the reconfigure flag # a request
self.layout_lock.acquire() self.layout_lock.acquire()
self.config = event.config
try: try:
self.log.debug("Performing reconfiguration") self.log.debug("Performing reconfiguration")
layout = self._parseConfig( layout = self._parseConfig(
@ -494,8 +517,6 @@ class Scheduler(threading.Thread):
except Exception: except Exception:
self.log.exception("Exception reporting initial " self.log.exception("Exception reporting initial "
"pipeline stats:") "pipeline stats:")
self._reconfigure = False
self.reconfigure_complete_event.set()
finally: finally:
self.layout_lock.release() self.layout_lock.release()
@ -526,8 +547,8 @@ class Scheduler(threading.Thread):
return return
self.log.debug("Run handler awake") self.log.debug("Run handler awake")
try: try:
if self._reconfigure: if not self.management_event_queue.empty():
self._doReconfigureEvent() self.process_management_queue()
# Give result events priority -- they let us stop builds, # Give result events priority -- they let us stop builds,
# whereas trigger evensts cause us to launch builds. # whereas trigger evensts cause us to launch builds.
@ -600,6 +621,15 @@ class Scheduler(threading.Thread):
self.trigger_event_queue.task_done() self.trigger_event_queue.task_done()
def process_management_queue(self):
self.log.debug("Fetching management event")
event = self.management_event_queue.get()
self.log.debug("Processing management event %s" % event)
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
event.setComplete()
self.management_event_queue.task_done()
def process_result_queue(self): def process_result_queue(self):
self.log.debug("Fetching result event") self.log.debug("Fetching result event")
event_type, build = self.result_event_queue.get() event_type, build = self.result_event_queue.get()