Add graceful exit.

A SIGUSR1 will cause zuul to queue new events, wait for existing
jobs to finish, then save the queue and exit.

It will likely take quite a while to complete (perhaps an hour),
so it's not implemented as a SIGTERM handler.

Can be used in an init script to implement a graceful restart.

Change-Id: I09fce571e971f16b5d20c5d69d595a05c7f6a4ba
This commit is contained in:
James E. Blair 2012-07-06 10:24:01 -07:00
parent 11700c3787
commit 5d5bc2b92e
3 changed files with 94 additions and 12 deletions

View File

@ -12,3 +12,4 @@ sshkey=/home/jenkins/.ssh/id_rsa
layout_config=/etc/zuul/layout.yaml
log_config=/etc/zuul/logging.yaml
pidfile=/var/run/zuul/zuul.pid
state_dir=/var/lib/zuul

View File

@ -69,6 +69,10 @@ class Server(object):
self.sched.reconfigure(self.config)
signal.signal(signal.SIGHUP, self.reconfigure_handler)
def exit_handler(self, signum, frame):
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
self.sched.exit()
def main(self):
# See comment at top of file about zuul imports
import zuul.scheduler
@ -85,7 +89,9 @@ class Server(object):
self.sched.start()
self.sched.reconfigure(self.config)
self.sched.resume()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
signal.signal(signal.SIGUSR1, self.exit_handler)
while True:
signal.pause()
@ -95,6 +101,21 @@ if __name__ == '__main__':
server.parse_arguments()
server.read_config()
if server.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
test_fn = os.path.join(state_dir, 'test')
try:
f = open(test_fn, 'w')
f.close()
os.unlink(test_fn)
except:
print
print "Unable to write to state directory: %s" % state_dir
print
raise
if server.config.has_option('zuul', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('zuul', 'pidfile'))
else:

View File

@ -18,6 +18,7 @@ import threading
import logging
import re
import yaml
import pickle
from model import Job, Change, Project, ChangeQueue, EventFilter
@ -29,6 +30,9 @@ class Scheduler(threading.Thread):
threading.Thread.__init__(self)
self.wake_event = threading.Event()
self.reconfigure_complete_event = threading.Event()
self._pause = False
self._reconfigure = False
self._exit = False
self.launcher = None
self.trigger = None
@ -160,21 +164,77 @@ class Scheduler(threading.Thread):
self.wake_event.set()
def reconfigure(self, config):
self.log.debug("Reconfigure")
self.log.debug("Prepare to reconfigure")
self.config = config
self._reconfigure_flag = True
self._pause = True
self._reconfigure = True
self.wake_event.set()
self.log.debug("Waiting for reconfiguration")
self.reconfigure_complete_event.wait()
self.reconfigure_complete_event.clear()
self.log.debug("Reconfiguration complete")
def _doReconfigure(self):
self.log.debug("Performing reconfiguration")
self._init()
self._parseConfig(self.config.get('zuul', 'layout_config'))
self._reconfigure_flag = False
self.reconfigure_complete_event.set()
def exit(self):
self.log.debug("Prepare to exit")
self._pause = True
self._exit = True
self.wake_event.set()
self.log.debug("Waiting for exit")
def _get_queue_pickle_file(self):
state_dir = os.path.expanduser(self.config.get('zuul', 'state_dir'))
return os.path.join(state_dir, 'queue.pickle')
def _save_queue(self):
pickle_file = self._get_queue_pickle_file()
events = []
while not self.trigger_event_queue.empty():
events.append(self.trigger_event_queue.get())
self.log.debug("Queue length is %s" % len(events))
if events:
self.log.debug("Saving queue")
pickle.dump(events, open(pickle_file, 'wb'))
def _load_queue(self):
pickle_file = self._get_queue_pickle_file()
if os.path.exists(pickle_file):
self.log.debug("Loading queue")
events = pickle.load(open(pickle_file, 'rb'))
self.log.debug("Queue length is %s" % len(events))
for event in events:
self.trigger_event_queue.put(event)
else:
self.log.debug("No queue file found")
def _delete_queue(self):
pickle_file = self._get_queue_pickle_file()
if os.path.exists(pickle_file):
self.log.debug("Deleting saved queue")
os.unlink(pickle_file)
def resume(self):
try:
self._load_queue()
except:
self.log.exception("Unable to load queue")
try:
self._delete_queue()
except:
self.log.exception("Unable to delete saved queue")
self.log.debug("Resuming queue processing")
self.wake_event.set()
def _doPauseEvent(self):
if self._exit:
self.log.debug("Exiting")
self._save_queue()
os._exit(0)
if self._reconfigure:
self.log.debug("Performing reconfiguration")
self._init()
self._parseConfig(self.config.get('zuul', 'layout_config'))
self._pause = False
self.reconfigure_complete_event.set()
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
@ -196,17 +256,17 @@ class Scheduler(threading.Thread):
self.wake_event.clear()
self.log.debug("Run handler awake")
try:
if not self._reconfigure_flag:
if not self._pause:
if not self.trigger_event_queue.empty():
self.process_event_queue()
if not self.result_event_queue.empty():
self.process_result_queue()
if self._reconfigure_flag and self._areAllBuildsComplete():
self._doReconfigure()
if self._pause and self._areAllBuildsComplete():
self._doPauseEvent()
if not self._reconfigure_flag:
if not self._pause:
if not (self.trigger_event_queue.empty() and
self.result_event_queue.empty()):
self.wake_event.set()