Ansible launcher: add pause/unpause support

Change-Id: I55c68153a80477c657d7bc5d22e463c37a494eb6
This commit is contained in:
James E. Blair 2016-06-01 15:33:54 -07:00
parent 8fc762bc5e
commit a6a5004029
2 changed files with 71 additions and 20 deletions

View File

@ -29,8 +29,9 @@ import sys
import signal
import zuul.cmd
import zuul.launcher.ansiblelaunchserver
# No zuul imports here because they pull in paramiko which must not be
# No zuul imports that pull in paramiko here; it must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
# Similar situation with gear and statsd.
@ -50,7 +51,8 @@ class Launcher(zuul.cmd.ZuulApp):
parser.add_argument('--keep-jobdir', dest='keep_jobdir',
action='store_true',
help='keep local jobdirs after run completes')
parser.add_argument('command', choices=['reconfigure', 'stop'],
parser.add_argument('command',
choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
nargs='?')
self.args = parser.parse_args()
@ -66,21 +68,12 @@ class Launcher(zuul.cmd.ZuulApp):
s.connect(path)
s.sendall('%s\n' % cmd)
def send_reconfigure(self):
self.send_command('reconfigure')
sys.exit(0)
def send_stop(self):
self.send_command('stop')
sys.exit(0)
def exit_handler(self):
self.launcher.stop()
self.launcher.join()
def main(self, daemon=True):
# See comment at top of file about zuul imports
import zuul.launcher.ansiblelaunchserver
self.setup_logging('launcher', 'log_config')
@ -109,11 +102,8 @@ def main():
server.parse_arguments()
server.read_config()
if server.args.command == 'reconfigure':
server.send_reconfigure()
sys.exit(0)
elif server.args.command == 'stop':
server.send_stop()
if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
server.send_command(server.args.command)
sys.exit(0)
server.configure_connections()

View File

@ -37,6 +37,9 @@ import zuul.ansible.plugins.callback_plugins
from zuul.lib import commandsocket
COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause']
def boolify(x):
if isinstance(x, str):
return bool(int(x))
@ -77,6 +80,7 @@ class LaunchServer(object):
self.config = config
self.keep_jobdir = keep_jobdir
self.hostname = socket.gethostname()
self.registered_functions = set()
self.node_workers = {}
self.jobs = {}
self.builds = {}
@ -203,9 +207,16 @@ class LaunchServer(object):
del self.jobs[name]
def register(self):
new_functions = set()
if self.accept_nodes:
self.worker.registerFunction("node-assign:zuul")
self.worker.registerFunction("stop:%s" % self.hostname)
new_functions.add("node-assign:zuul")
new_functions.add("stop:%s" % self.hostname)
for function in new_functions - self.registered_functions:
self.worker.registerFunction(function)
for function in self.registered_functions - new_functions:
self.worker.unRegisterFunction(function)
self.registered_functions = new_functions
def reconfigure(self):
self.log.debug("Reconfiguring")
@ -219,6 +230,32 @@ class LaunchServer(object):
"to worker:")
self.log.debug("Reconfiguration complete")
def pause(self):
self.log.debug("Pausing")
self.accept_nodes = False
self.register()
for node in self.node_workers.values():
try:
if node.isAlive():
node.queue.put(dict(action='pause'))
except Exception:
self.log.exception("Exception sending pause command "
"to worker:")
self.log.debug("Paused")
def unpause(self):
self.log.debug("Unpausing")
self.accept_nodes = True
self.register()
for node in self.node_workers.values():
try:
if node.isAlive():
node.queue.put(dict(action='unpause'))
except Exception:
self.log.exception("Exception sending unpause command "
"to worker:")
self.log.debug("Unpaused")
def stop(self):
self.log.debug("Stopping")
# First, stop accepting new jobs
@ -254,8 +291,12 @@ class LaunchServer(object):
command = self.command_socket.get()
if command == 'reconfigure':
self.reconfigure()
if command == 'stop':
elif command == 'stop':
self.stop()
elif command == 'pause':
self.pause()
elif command == 'unpause':
self.unpause()
except Exception:
self.log.exception("Exception while processing command")
@ -376,6 +417,10 @@ class NodeWorker(object):
self.labels = labels
self.thread = None
self.registered_functions = set()
# If the unpaused Event is set, that means we should run jobs.
# If it is clear, then we are paused and should not run jobs.
self.unpaused = threading.Event()
self.unpaused.set()
self._running = True
self.queue = Queue.Queue()
self.manager_name = manager_name
@ -434,9 +479,17 @@ class NodeWorker(object):
# will be set by the queue thread.
self.log.debug("Submitting stop request")
self._running = False
self.unpaused.set()
self.queue.put(dict(action='stop'))
self.queue.join()
def pause(self):
self.unpaused.clear()
self.worker.stopWaitingForJobs()
def unpause(self):
self.unpaused.set()
def _runQueue(self):
item = self.queue.get()
try:
@ -449,6 +502,12 @@ class NodeWorker(object):
else:
self._job_complete_event.wait()
self.worker.shutdown()
if item['action'] == 'pause':
self.log.debug("Received pause request")
self.pause()
if item['action'] == 'unpause':
self.log.debug("Received unpause request")
self.unpause()
elif item['action'] == 'reconfigure':
self.log.debug("Received reconfigure request")
self.register()
@ -461,6 +520,8 @@ class NodeWorker(object):
def runGearman(self):
while self._running:
try:
self.unpaused.wait()
if self._running:
self._runGearman()
except Exception:
self.log.exception("Exception in gearman manager:")