Ansible launcher: use a socket for reconfiguration

Signals and multiprocessing don't mix well.  Instead of using
signals for things like stop and reconfiguration, use a socket
that accepts simple commands, and use the zuul-launcher
command to send them.

This implements reconfiguration and stopping.  Other commands
(eg, graceful stop, pause, etc) can be implemented later.

Change-Id: I14b1fdc5e3a20f4b1161dbc14705ad424ad13fbd
This commit is contained in:
James E. Blair 2016-05-27 16:45:26 -07:00
parent f5922b67cd
commit c4b2041cff
3 changed files with 170 additions and 28 deletions

View File

@ -24,6 +24,7 @@ pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
import logging
import os
import socket
import sys
import signal
@ -49,25 +50,35 @@ class Launcher(zuul.cmd.ZuulApp):
parser.add_argument('--keep-jobdir', dest='keep_jobdir',
action='store_true',
help='keep local jobdirs after run completes')
parser.add_argument('command', choices=['reconfigure', 'stop'],
nargs='?')
self.args = parser.parse_args()
def reconfigure_handler(self, signum, frame):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
self.log.debug("Reconfiguration triggered")
self.read_config()
self.setup_logging('launcher', 'log_config')
try:
self.launcher.reconfigure(self.config)
except Exception:
self.log.exception("Reconfiguration failed:")
signal.signal(signal.SIGHUP, self.reconfigure_handler)
def send_command(self, cmd):
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(path)
s.sendall('%s\n' % cmd)
def exit_handler(self, signum, frame):
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
def send_reconfigure(self):
self.send_command('reconfigure')
sys.exit(0)
def send_stop(self):
self.send_command('stop')
sys.exit(0)
def exit_handler(self):
self.launcher.stop()
self.launcher.join()
def main(self):
def main(self, daemon=True):
# See comment at top of file about zuul imports
import zuul.launcher.ansiblelaunchserver
@ -80,23 +91,31 @@ class Launcher(zuul.cmd.ZuulApp):
keep_jobdir=self.args.keep_jobdir)
self.launcher.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
while True:
try:
signal.pause()
except KeyboardInterrupt:
print "Ctrl + C: asking launcher to exit nicely...\n"
self.exit_handler(signal.SIGINT, None)
sys.exit(0)
if daemon:
self.launcher.join()
else:
while True:
try:
signal.pause()
except KeyboardInterrupt:
print "Ctrl + C: asking launcher to exit nicely...\n"
self.exit_handler()
sys.exit(0)
def main():
server = Launcher()
server.parse_arguments()
server.read_config()
if server.args.command == 'reconfigure':
server.send_reconfigure()
sys.exit(0)
elif server.args.command == 'stop':
server.send_stop()
sys.exit(0)
server.configure_connections()
if server.config.has_option('launcher', 'pidfile'):
@ -106,10 +125,10 @@ def main():
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:
server.main()
server.main(False)
else:
with daemon.DaemonContext(pidfile=pid):
server.main()
server.main(True)
if __name__ == "__main__":

View File

@ -35,6 +35,7 @@ import zmq
import zuul.ansible.library
import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
def boolify(x):
@ -78,6 +79,9 @@ class LaunchServer(object):
self.keep_jobdir = keep_jobdir
self.hostname = socket.gethostname()
self.node_workers = {}
# This has the side effect of creating the logger; our logging
# config will handle the rest.
multiprocessing.get_logger()
self.mpmanager = multiprocessing.Manager()
self.jobs = self.mpmanager.dict()
self.builds = self.mpmanager.dict()
@ -90,6 +94,14 @@ class LaunchServer(object):
else:
self.accept_nodes = True
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
self.command_socket = commandsocket.CommandSocket(path)
for section in config.sections():
m = self.site_section_re.match(section)
if m:
@ -128,6 +140,7 @@ class LaunchServer(object):
self._gearman_running = True
self._zmq_running = True
self._reaper_running = True
self._command_running = True
# Setup ZMQ
self.zcontext = zmq.Context()
@ -147,6 +160,13 @@ class LaunchServer(object):
self.log.debug("Registering")
self.register()
# Start command socket
self.log.debug("Starting command processor")
self.command_socket.start()
self.command_thread = threading.Thread(target=self.runCommand)
self.command_thread.daemon = True
self.command_thread.start()
# Load JJB config
self.loadJobs()
@ -197,9 +217,8 @@ class LaunchServer(object):
self.worker.registerFunction("node-assign:zuul")
self.worker.registerFunction("stop:%s" % self.hostname)
def reconfigure(self, config):
def reconfigure(self):
self.log.debug("Reconfiguring")
self.config = config
self.loadJobs()
for node in self.node_workers.values():
try:
@ -212,22 +231,43 @@ class LaunchServer(object):
def stop(self):
self.log.debug("Stopping")
# First, stop accepting new jobs
self._gearman_running = False
self._reaper_running = False
self.worker.shutdown()
# Then stop all of the workers
for node in self.node_workers.values():
try:
if node.isAlive():
node.stop()
except Exception:
self.log.exception("Exception sending stop command to worker:")
# Stop ZMQ afterwords so that the send queue is flushed
self._zmq_running = False
self.zmq_send_queue.put(None)
self.zmq_send_queue.join()
# Stop command processing
self._command_running = False
self.command_socket.stop()
# Join the gearman thread which was stopped earlier.
self.gearman_thread.join()
# The command thread is joined in the join() method of this
# class, which is called by the command shell.
self.log.debug("Stopped")
def join(self):
self.gearman_thread.join()
self.command_thread.join()
def runCommand(self):
while self._command_running:
try:
command = self.command_socket.get()
if command == 'reconfigure':
self.reconfigure()
if command == 'stop':
self.stop()
except Exception:
self.log.exception("Exception while processing command")
def runZMQ(self):
while self._zmq_running or not self.zmq_send_queue.empty():

83
zuul/lib/commandsocket.py Normal file
View File

@ -0,0 +1,83 @@
# Copyright 2014 OpenStack Foundation
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import socket
import threading
import Queue
class CommandSocket(object):
log = logging.getLogger("zuul.CommandSocket")
def __init__(self, path):
self.running = False
self.path = path
self.queue = Queue.Queue()
def start(self):
self.running = True
if os.path.exists(self.path):
os.unlink(self.path)
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.bind(self.path)
self.socket.listen(1)
self.socket_thread = threading.Thread(target=self._socketListener)
self.socket_thread.daemon = True
self.socket_thread.start()
def stop(self):
# First, wake up our listener thread with a connection and
# tell it to stop running.
self.running = False
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(self.path)
s.sendall('_stop\n')
# The command '_stop' will be ignored by our listener, so
# directly inject it into the queue so that consumers of this
# class which are waiting in .get() are awakened. They can
# either handle '_stop' or just ignore the unknown command and
# then check to see if they should continue to run before
# re-entering their loop.
self.queue.put('_stop')
self.socket_thread.join()
def _socketListener(self):
while self.running:
try:
s, addr = self.socket.accept()
self.log.debug("Accepted socket connection %s" % (s,))
buf = ''
while True:
buf += s.recv(1)
if buf[-1] == '\n':
break
buf = buf.strip()
self.log.debug("Received %s from socket" % (buf,))
s.close()
# Because we use '_stop' internally to wake up a
# waiting thread, don't allow it to actually be
# injected externally.
if buf != '_stop':
self.queue.put(buf)
except Exception:
self.log.exception("Exception in socket handler")
def get(self):
if not self.running:
raise Exception("CommandSocket.get called while stopped")
return self.queue.get()