diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst index 126e4e2e0a..86b01efc37 100644 --- a/doc/source/admin/components.rst +++ b/doc/source/admin/components.rst @@ -224,6 +224,11 @@ The following sections of ``zuul.conf`` are used by the scheduler: .. attr:: scheduler + .. attr:: command_socket + :default: /var/lib/zuul/scheduler.socket + + Path to command socket file for the scheduler process. + .. attr:: tenant_config :required: diff --git a/tests/base.py b/tests/base.py index 2c55f30078..79ad63dcdc 100755 --- a/tests/base.py +++ b/tests/base.py @@ -2066,6 +2066,9 @@ class ZuulTestCase(BaseTestCase): FIXTURE_DIR, self.config.get('scheduler', 'tenant_config'))) self.config.set('scheduler', 'state_dir', self.state_root) + self.config.set( + 'scheduler', 'command_socket', + os.path.join(self.test_root, 'scheduler.socket')) self.config.set('merger', 'git_dir', self.merger_src_root) self.config.set('executor', 'git_dir', self.executor_src_root) self.config.set('executor', 'private_key_file', self.private_key_file) diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 539d55b17a..7722d6e9cf 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -22,6 +22,7 @@ import signal import zuul.cmd from zuul.lib.config import get_default from zuul.lib.statsd import get_statsd_config +import zuul.scheduler # No zuul imports here because they pull in paramiko which must not be # imported until after the daemonization. @@ -37,6 +38,18 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): super(Scheduler, self).__init__() self.gear_server_pid = None + def createParser(self): + parser = super(Scheduler, self).createParser() + parser.add_argument('command', + choices=zuul.scheduler.COMMANDS, + nargs='?') + return parser + + def parseArguments(self, args=None): + super(Scheduler, self).parseArguments() + if self.args.command: + self.args.nodaemon = True + def reconfigure_handler(self, signum, frame): signal.signal(signal.SIGHUP, signal.SIG_IGN) self.log.debug("Reconfiguration triggered") @@ -48,8 +61,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.log.exception("Reconfiguration failed:") signal.signal(signal.SIGHUP, self.reconfigure_handler) - def exit_handler(self, signum, frame): - signal.signal(signal.SIGUSR1, signal.SIG_IGN) + def exit_handler(self): self.sched.exit() self.sched.join() self.stop_gear_server() @@ -104,6 +116,10 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): def run(self): # See comment at top of file about zuul imports import zuul.scheduler + if self.args.command in zuul.scheduler.COMMANDS: + self.send_command(self.args.command) + sys.exit(0) + # See comment at top of file about zuul imports import zuul.executor.client import zuul.merger.client import zuul.nodepool @@ -162,14 +178,17 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): webapp.start() signal.signal(signal.SIGHUP, self.reconfigure_handler) - signal.signal(signal.SIGUSR1, self.exit_handler) - signal.signal(signal.SIGTERM, self.term_handler) - while True: - try: - signal.pause() - except KeyboardInterrupt: - print("Ctrl + C: asking scheduler to exit nicely...\n") - self.exit_handler(signal.SIGINT, None) + + if self.args.nodaemon: + while True: + try: + signal.pause() + except KeyboardInterrupt: + print("Ctrl + C: asking scheduler to exit nicely...\n") + self.exit_handler() + sys.exit(0) + else: + self.sched.join() def main(): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 7dee00d6ab..b978979d32 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -30,10 +30,13 @@ from zuul import model from zuul import exceptions from zuul import version as zuul_version from zuul import rpclistener +from zuul.lib import commandsocket from zuul.lib.config import get_default from zuul.lib.statsd import get_statsd import zuul.lib.queue +COMMANDS = ['stop'] + class ManagementEvent(object): """An event that should be processed within the main queue run loop""" @@ -215,6 +218,9 @@ class Scheduler(threading.Thread): self.wake_event = threading.Event() self.layout_lock = threading.Lock() self.run_handler_lock = threading.Lock() + self.command_map = dict( + stop=self.stop, + ) self._pause = False self._exit = False self._stopped = False @@ -243,6 +249,11 @@ class Scheduler(threading.Thread): time_dir = self._get_time_database_dir() self.time_database = model.TimeDataBase(time_dir) + command_socket = get_default( + self.config, 'scheduler', 'command_socket', + '/var/lib/zuul/scheduler.socket') + self.command_socket = commandsocket.CommandSocket(command_socket) + self.zuul_version = zuul_version.version_info.release_string() self.last_reconfigured = None self.tenant_last_reconfigured = {} @@ -250,6 +261,14 @@ class Scheduler(threading.Thread): def start(self): super(Scheduler, self).start() + self._command_running = True + self.log.debug("Starting command processor") + self.command_socket.start() + self.command_thread = threading.Thread(target=self.runCommand, + name='command') + self.command_thread.daemon = True + self.command_thread.start() + self.rpc.start() self.stats_thread.start() @@ -261,6 +280,17 @@ class Scheduler(threading.Thread): self.stats_thread.join() self.rpc.stop() self.rpc.join() + self._command_running = False + self.command_socket.stop() + + def runCommand(self): + while self._command_running: + try: + command = self.command_socket.get().decode('utf8') + if command != '_stop': + self.command_map[command]() + except Exception: + self.log.exception("Exception while processing command") def registerConnections(self, connections, webapp, load=True): # load: whether or not to trigger the onLoad for the connection. This