diff --git a/zuul/cmd/launcher.py b/zuul/cmd/launcher.py index 2ba4b85af1..1dbc3ee232 100644 --- a/zuul/cmd/launcher.py +++ b/zuul/cmd/launcher.py @@ -29,8 +29,9 @@ import sys import signal import zuul.cmd +import zuul.launcher.ansiblelaunchserver -# No zuul imports here because they pull in paramiko which must not be +# No zuul imports that pull in paramiko here; it must not be # imported until after the daemonization. # https://github.com/paramiko/paramiko/issues/59 # Similar situation with gear and statsd. @@ -50,7 +51,8 @@ class Launcher(zuul.cmd.ZuulApp): parser.add_argument('--keep-jobdir', dest='keep_jobdir', action='store_true', help='keep local jobdirs after run completes') - parser.add_argument('command', choices=['reconfigure', 'stop'], + parser.add_argument('command', + choices=zuul.launcher.ansiblelaunchserver.COMMANDS, nargs='?') self.args = parser.parse_args() @@ -66,21 +68,12 @@ class Launcher(zuul.cmd.ZuulApp): s.connect(path) s.sendall('%s\n' % cmd) - def send_reconfigure(self): - self.send_command('reconfigure') - sys.exit(0) - - def send_stop(self): - self.send_command('stop') - sys.exit(0) - def exit_handler(self): self.launcher.stop() self.launcher.join() def main(self, daemon=True): # See comment at top of file about zuul imports - import zuul.launcher.ansiblelaunchserver self.setup_logging('launcher', 'log_config') @@ -109,11 +102,8 @@ def main(): server.parse_arguments() server.read_config() - if server.args.command == 'reconfigure': - server.send_reconfigure() - sys.exit(0) - elif server.args.command == 'stop': - server.send_stop() + if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS: + server.send_command(server.args.command) sys.exit(0) server.configure_connections() diff --git a/zuul/launcher/ansiblelaunchserver.py b/zuul/launcher/ansiblelaunchserver.py index 5ecc954a48..c4c6ffc4e6 100644 --- a/zuul/launcher/ansiblelaunchserver.py +++ b/zuul/launcher/ansiblelaunchserver.py @@ -37,6 +37,9 @@ import zuul.ansible.plugins.callback_plugins from zuul.lib import commandsocket +COMMANDS = ['reconfigure', 'stop', 'pause', 'unpause'] + + def boolify(x): if isinstance(x, str): return bool(int(x)) @@ -77,6 +80,7 @@ class LaunchServer(object): self.config = config self.keep_jobdir = keep_jobdir self.hostname = socket.gethostname() + self.registered_functions = set() self.node_workers = {} self.jobs = {} self.builds = {} @@ -203,9 +207,16 @@ class LaunchServer(object): del self.jobs[name] def register(self): + new_functions = set() if self.accept_nodes: - self.worker.registerFunction("node-assign:zuul") - self.worker.registerFunction("stop:%s" % self.hostname) + new_functions.add("node-assign:zuul") + new_functions.add("stop:%s" % self.hostname) + + for function in new_functions - self.registered_functions: + self.worker.registerFunction(function) + for function in self.registered_functions - new_functions: + self.worker.unRegisterFunction(function) + self.registered_functions = new_functions def reconfigure(self): self.log.debug("Reconfiguring") @@ -219,6 +230,32 @@ class LaunchServer(object): "to worker:") self.log.debug("Reconfiguration complete") + def pause(self): + self.log.debug("Pausing") + self.accept_nodes = False + self.register() + for node in self.node_workers.values(): + try: + if node.isAlive(): + node.queue.put(dict(action='pause')) + except Exception: + self.log.exception("Exception sending pause command " + "to worker:") + self.log.debug("Paused") + + def unpause(self): + self.log.debug("Unpausing") + self.accept_nodes = True + self.register() + for node in self.node_workers.values(): + try: + if node.isAlive(): + node.queue.put(dict(action='unpause')) + except Exception: + self.log.exception("Exception sending unpause command " + "to worker:") + self.log.debug("Unpaused") + def stop(self): self.log.debug("Stopping") # First, stop accepting new jobs @@ -254,8 +291,12 @@ class LaunchServer(object): command = self.command_socket.get() if command == 'reconfigure': self.reconfigure() - if command == 'stop': + elif command == 'stop': self.stop() + elif command == 'pause': + self.pause() + elif command == 'unpause': + self.unpause() except Exception: self.log.exception("Exception while processing command") @@ -376,6 +417,10 @@ class NodeWorker(object): self.labels = labels self.thread = None self.registered_functions = set() + # If the unpaused Event is set, that means we should run jobs. + # If it is clear, then we are paused and should not run jobs. + self.unpaused = threading.Event() + self.unpaused.set() self._running = True self.queue = Queue.Queue() self.manager_name = manager_name @@ -434,9 +479,17 @@ class NodeWorker(object): # will be set by the queue thread. self.log.debug("Submitting stop request") self._running = False + self.unpaused.set() self.queue.put(dict(action='stop')) self.queue.join() + def pause(self): + self.unpaused.clear() + self.worker.stopWaitingForJobs() + + def unpause(self): + self.unpaused.set() + def _runQueue(self): item = self.queue.get() try: @@ -449,6 +502,12 @@ class NodeWorker(object): else: self._job_complete_event.wait() self.worker.shutdown() + if item['action'] == 'pause': + self.log.debug("Received pause request") + self.pause() + if item['action'] == 'unpause': + self.log.debug("Received unpause request") + self.unpause() elif item['action'] == 'reconfigure': self.log.debug("Received reconfigure request") self.register() @@ -461,7 +520,9 @@ class NodeWorker(object): def runGearman(self): while self._running: try: - self._runGearman() + self.unpaused.wait() + if self._running: + self._runGearman() except Exception: self.log.exception("Exception in gearman manager:")