Add zuul-scheduler tenant-reconfigure
This is a new reconfiguration command which behaves like full-reconfigure but only for a single tenant. This can be useful after connection issues with code hosting systems, or potentially with Zuul cache bugs. Because this is the first command-socket command with an argument, some command-socket infrastructure changes are necessary. Additionally, this includes some minor changes to make the services more consistent around socket commands. Change-Id: Ib695ab8e7ae54790a0a0e4ac04fdad96d60ee0c9
This commit is contained in:
parent
1d4a6e0b71
commit
a160484a86
@ -40,7 +40,11 @@ not read from a git repository. Zuul supports two kinds of reconfigurations.
|
||||
The full reconfiguration refetches and reloads the configuration of
|
||||
all tenants. To do so, run ``zuul-scheduler full-reconfigure``. For
|
||||
example this can be used to fix eventual configuration inconsistencies
|
||||
after connection problems to Gerrit/Github.
|
||||
after connection problems with the code hosting system.
|
||||
|
||||
To perform the same actions as a full reconfiguration but for a single
|
||||
tenant, use ``zuul-scheduler tenant-reconfigure TENANT`` (where
|
||||
``TENANT`` is the name of the tenant to reconfigure).
|
||||
|
||||
The smart reconfiguration reloads only the tenants that changed their
|
||||
configuration in the tenant config file. To do so, run
|
||||
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
A new command ``zuul-scheduler tenant-reconfigure`` has been
|
||||
added. It allows an operator to perform a reconfiguration of a
|
||||
single tenant. This may be helpful in clearing up issues after
|
||||
connection problems with the code hosting system.
|
@ -1,6 +1,6 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2016 Red Hat, Inc.
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -4271,6 +4271,12 @@ class SchedulerTestApp:
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
def tenantReconfigure(self, tenants):
|
||||
try:
|
||||
self.sched.reconfigure(self.config, smart=False, tenants=tenants)
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
|
||||
class SchedulerTestManager:
|
||||
def __init__(self, validate_tenants):
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -3797,6 +3798,34 @@ class TestScheduler(ZuulTestCase):
|
||||
else:
|
||||
time.sleep(0)
|
||||
|
||||
def test_tenant_reconfiguration_command_socket(self):
|
||||
"Test that single-tenant reconfiguration via command socket works"
|
||||
|
||||
# record previous tenant reconfiguration state, which may not be set
|
||||
old = self.scheds.first.sched.tenant_layout_state.get(
|
||||
'tenant-one', EMPTY_LAYOUT_STATE)
|
||||
self.waitUntilSettled()
|
||||
|
||||
command_socket = self.scheds.first.config.get(
|
||||
'scheduler', 'command_socket')
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
||||
s.connect(command_socket)
|
||||
s.sendall('tenant-reconfigure ["tenant-one"]\n'.encode('utf8'))
|
||||
|
||||
# Wait for full reconfiguration. Note that waitUntilSettled is not
|
||||
# reliable here because the reconfigure event may arrive in the
|
||||
# event queue after waitUntilSettled.
|
||||
start = time.time()
|
||||
while True:
|
||||
if time.time() - start > 15:
|
||||
raise Exception("Timeout waiting for full reconfiguration")
|
||||
new = self.scheds.first.sched.tenant_layout_state.get(
|
||||
'tenant-one', EMPTY_LAYOUT_STATE)
|
||||
if old < new:
|
||||
break
|
||||
else:
|
||||
time.sleep(0)
|
||||
|
||||
def test_double_live_reconfiguration_shared_queue(self):
|
||||
# This was a real-world regression. A change is added to
|
||||
# gate; a reconfigure happens, a second change which depends
|
||||
|
@ -1,5 +1,6 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -19,6 +20,7 @@ import configparser
|
||||
import daemon
|
||||
import extras
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import logging.config
|
||||
import os
|
||||
@ -100,6 +102,7 @@ class ZuulApp(object):
|
||||
self.args = None
|
||||
self.config = None
|
||||
self.connections = {}
|
||||
self.commands = {}
|
||||
|
||||
def _get_version(self):
|
||||
from zuul.version import version_info as zuul_version_info
|
||||
@ -116,14 +119,47 @@ class ZuulApp(object):
|
||||
help='show zuul version')
|
||||
return parser
|
||||
|
||||
def addSubCommands(self, parser, commands):
|
||||
# Add a list of commandsocket.Command items to the parser
|
||||
subparsers = parser.add_subparsers(
|
||||
title='Online commands',
|
||||
description=('The following commands may be used to affect '
|
||||
'the running process.'),
|
||||
)
|
||||
for command in commands:
|
||||
self.commands[command.name] = command
|
||||
cmd = subparsers.add_parser(
|
||||
command.name, help=command.help)
|
||||
cmd.set_defaults(command=command.name)
|
||||
for arg in command.args:
|
||||
cmd.add_argument(arg.name,
|
||||
help=arg.help,
|
||||
default=arg.default)
|
||||
|
||||
def handleCommands(self):
|
||||
command_name = getattr(self.args, 'command', None)
|
||||
if command_name in self.commands:
|
||||
command = self.commands[self.args.command]
|
||||
command_str = command.name
|
||||
command_args = [getattr(self.args, arg.name)
|
||||
for arg in command.args]
|
||||
if command_args:
|
||||
command_str += ' ' + json.dumps(command_args)
|
||||
self.sendCommand(command_str)
|
||||
sys.exit(0)
|
||||
|
||||
def parseArguments(self, args=None):
|
||||
parser = self.createParser()
|
||||
self.args = parser.parse_args(args)
|
||||
|
||||
if hasattr(self.args, 'foreground') and self.args.foreground:
|
||||
if getattr(self.args, 'foreground', False):
|
||||
self.args.nodaemon = True
|
||||
else:
|
||||
self.args.nodaemon = False
|
||||
|
||||
if getattr(self.args, 'command', None):
|
||||
self.args.nodaemon = True
|
||||
|
||||
return parser
|
||||
|
||||
def readConfig(self):
|
||||
@ -219,7 +255,7 @@ class ZuulDaemonApp(ZuulApp, metaclass=abc.ABCMeta):
|
||||
with daemon.DaemonContext(pidfile=pid, umask=0o022):
|
||||
self.run()
|
||||
|
||||
def send_command(self, cmd):
|
||||
def sendCommand(self, cmd):
|
||||
command_socket = get_default(
|
||||
self.config, self.app_name, 'command_socket',
|
||||
'/var/lib/zuul/%s.socket' % self.app_name)
|
||||
|
@ -1,5 +1,6 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013-2014 OpenStack Foundation
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -33,15 +34,11 @@ class Executor(zuul.cmd.ZuulDaemonApp):
|
||||
parser.add_argument('--keep-jobdir', dest='keep_jobdir',
|
||||
action='store_true',
|
||||
help='keep local jobdirs after run completes')
|
||||
parser.add_argument('command',
|
||||
choices=zuul.executor.server.COMMANDS,
|
||||
nargs='?')
|
||||
self.addSubCommands(parser, zuul.executor.server.COMMANDS)
|
||||
return parser
|
||||
|
||||
def parseArguments(self, args=None):
|
||||
super(Executor, self).parseArguments()
|
||||
if self.args.command:
|
||||
self.args.nodaemon = True
|
||||
|
||||
def exit_handler(self, signum, frame):
|
||||
if self.config.has_option('executor', 'sigterm_method'):
|
||||
@ -79,9 +76,7 @@ class Executor(zuul.cmd.ZuulDaemonApp):
|
||||
self.log_streamer_pid = child_pid
|
||||
|
||||
def run(self):
|
||||
if self.args.command in zuul.executor.server.COMMANDS:
|
||||
self.send_command(self.args.command)
|
||||
sys.exit(0)
|
||||
self.handleCommands()
|
||||
|
||||
self.configure_connections(source_only=True)
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -14,12 +15,10 @@
|
||||
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
import zuul.cmd
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.fingergw import COMMANDS, FingerGateway
|
||||
from zuul.lib import fingergw
|
||||
|
||||
|
||||
class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
||||
@ -32,29 +31,20 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
||||
|
||||
def __init__(self):
|
||||
super(FingerGatewayApp, self).__init__()
|
||||
self.gateway: Optional[FingerGateway] = None
|
||||
self.gateway = None
|
||||
|
||||
def createParser(self):
|
||||
parser = super(FingerGatewayApp, self).createParser()
|
||||
parser.add_argument('command',
|
||||
choices=COMMANDS,
|
||||
nargs='?')
|
||||
self.addSubCommands(parser, fingergw.COMMANDS)
|
||||
return parser
|
||||
|
||||
def parseArguments(self, args=None):
|
||||
super(FingerGatewayApp, self).parseArguments()
|
||||
if self.args.command:
|
||||
self.args.nodaemon = True
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Main entry point for the FingerGatewayApp.
|
||||
|
||||
Called by the main() method of the parent class.
|
||||
'''
|
||||
if self.args.command in COMMANDS:
|
||||
self.send_command(self.args.command)
|
||||
sys.exit(0)
|
||||
self.handleCommands()
|
||||
|
||||
self.setup_logging('fingergw', 'log_config')
|
||||
self.log = logging.getLogger('zuul.fingergw')
|
||||
@ -63,7 +53,7 @@ class FingerGatewayApp(zuul.cmd.ZuulDaemonApp):
|
||||
self.config, 'fingergw', 'command_socket',
|
||||
'/var/lib/zuul/%s.socket' % self.app_name)
|
||||
|
||||
self.gateway = FingerGateway(
|
||||
self.gateway = fingergw.FingerGateway(
|
||||
self.config,
|
||||
cmdsock,
|
||||
self.getPidFile(),
|
||||
|
@ -1,6 +1,7 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013-2014 OpenStack Foundation
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -18,7 +19,7 @@ import signal
|
||||
import sys
|
||||
|
||||
import zuul.cmd
|
||||
from zuul.merger.server import COMMANDS, MergeServer
|
||||
import zuul.merger.server
|
||||
|
||||
|
||||
class Merger(zuul.cmd.ZuulDaemonApp):
|
||||
@ -27,31 +28,23 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
||||
|
||||
def createParser(self):
|
||||
parser = super(Merger, self).createParser()
|
||||
parser.add_argument('command',
|
||||
choices=COMMANDS,
|
||||
nargs='?')
|
||||
self.addSubCommands(parser, zuul.merger.server.COMMANDS)
|
||||
return parser
|
||||
|
||||
def parseArguments(self, args=None):
|
||||
super(Merger, self).parseArguments()
|
||||
if self.args.command:
|
||||
self.args.nodaemon = True
|
||||
|
||||
def exit_handler(self, signum, frame):
|
||||
self.merger.stop()
|
||||
self.merger.join()
|
||||
sys.exit(0)
|
||||
|
||||
def run(self):
|
||||
if self.args.command in COMMANDS:
|
||||
self.send_command(self.args.command)
|
||||
sys.exit(0)
|
||||
self.handleCommands()
|
||||
|
||||
self.configure_connections(source_only=True)
|
||||
|
||||
self.setup_logging('merger', 'log_config')
|
||||
|
||||
self.merger = MergeServer(self.config, self.connections)
|
||||
self.merger = zuul.merger.server.MergeServer(
|
||||
self.config, self.connections)
|
||||
self.merger.start()
|
||||
|
||||
if self.args.nodaemon:
|
||||
|
@ -1,5 +1,6 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -38,16 +39,9 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||
'listed, all tenants will be validated. '
|
||||
'Note: this requires ZooKeeper and '
|
||||
'will distribute work to mergers.')
|
||||
parser.add_argument('command',
|
||||
choices=zuul.scheduler.COMMANDS,
|
||||
nargs='?')
|
||||
self.addSubCommands(parser, zuul.scheduler.COMMANDS)
|
||||
return parser
|
||||
|
||||
def parseArguments(self, args=None):
|
||||
super(Scheduler, self).parseArguments()
|
||||
if self.args.command:
|
||||
self.args.nodaemon = True
|
||||
|
||||
def fullReconfigure(self):
|
||||
self.log.debug("Reconfiguration triggered")
|
||||
self.readConfig()
|
||||
@ -66,15 +60,22 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
def tenantReconfigure(self, tenants):
|
||||
self.log.debug("Tenant reconfiguration triggered")
|
||||
self.readConfig()
|
||||
self.setup_logging('scheduler', 'log_config')
|
||||
try:
|
||||
self.sched.reconfigure(self.config, smart=False, tenants=tenants)
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
def exit_handler(self, signum, frame):
|
||||
self.sched.stop()
|
||||
self.sched.join()
|
||||
sys.exit(0)
|
||||
|
||||
def run(self):
|
||||
if self.args.command in zuul.scheduler.COMMANDS:
|
||||
self.send_command(self.args.command)
|
||||
sys.exit(0)
|
||||
self.handleCommands()
|
||||
|
||||
self.setup_logging('scheduler', 'log_config')
|
||||
self.log = logging.getLogger("zuul.Scheduler")
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -30,16 +31,9 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
|
||||
|
||||
def createParser(self):
|
||||
parser = super().createParser()
|
||||
parser.add_argument('command',
|
||||
choices=zuul.web.COMMANDS,
|
||||
nargs='?')
|
||||
self.addSubCommands(parser, zuul.web.COMMANDS)
|
||||
return parser
|
||||
|
||||
def parseArguments(self, args=None):
|
||||
super().parseArguments()
|
||||
if self.args.command:
|
||||
self.args.nodaemon = True
|
||||
|
||||
def exit_handler(self, signum, frame):
|
||||
self.web.stop()
|
||||
|
||||
@ -81,9 +75,7 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
|
||||
self.authenticators.configure(self.config)
|
||||
|
||||
def run(self):
|
||||
if self.args.command in zuul.web.COMMANDS:
|
||||
self.send_command(self.args.command)
|
||||
sys.exit(0)
|
||||
self.handleCommands()
|
||||
|
||||
self.setup_logging('web', 'log_config')
|
||||
self.log = logging.getLogger("zuul.WebServer")
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -80,8 +80,6 @@ from zuul.zk.system import ZuulSystem
|
||||
from zuul.zk.zkobject import ZKContext
|
||||
|
||||
BUFFER_LINES_FOR_SYNTAX = 200
|
||||
COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
|
||||
'unverbose', 'keep', 'nokeep', 'repl', 'norepl']
|
||||
DEFAULT_FINGER_PORT = 7900
|
||||
DEFAULT_STREAM_PORT = 19885
|
||||
BLACKLISTED_ANSIBLE_CONNECTION_TYPES = [
|
||||
@ -95,6 +93,40 @@ BLACKLISTED_VARS = dict(
|
||||
)
|
||||
|
||||
|
||||
class VerboseCommand(commandsocket.Command):
|
||||
name = 'verbose'
|
||||
help = 'Enable Ansible verbose mode'
|
||||
|
||||
|
||||
class UnVerboseCommand(commandsocket.Command):
|
||||
name = 'unverbose'
|
||||
help = 'Disable Ansible verbose mode'
|
||||
|
||||
|
||||
class KeepCommand(commandsocket.Command):
|
||||
name = 'keep'
|
||||
help = 'Keep build directories after completion'
|
||||
|
||||
|
||||
class NoKeepCommand(commandsocket.Command):
|
||||
name = 'nokeep'
|
||||
help = 'Remove build directories after completion'
|
||||
|
||||
|
||||
COMMANDS = [
|
||||
commandsocket.StopCommand,
|
||||
commandsocket.PauseCommand,
|
||||
commandsocket.UnPauseCommand,
|
||||
commandsocket.GracefulCommand,
|
||||
VerboseCommand,
|
||||
UnVerboseCommand,
|
||||
KeepCommand,
|
||||
NoKeepCommand,
|
||||
commandsocket.ReplCommand,
|
||||
commandsocket.NoReplCommand,
|
||||
]
|
||||
|
||||
|
||||
class NodeRequestError(Exception):
|
||||
pass
|
||||
|
||||
@ -3173,18 +3205,18 @@ class ExecutorServer(BaseMergeServer):
|
||||
self.governor_lock = threading.Lock()
|
||||
self.run_lock = threading.Lock()
|
||||
self.verbose = False
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
pause=self.pause,
|
||||
unpause=self.unpause,
|
||||
graceful=self.graceful,
|
||||
verbose=self.verboseOn,
|
||||
unverbose=self.verboseOff,
|
||||
keep=self.keep,
|
||||
nokeep=self.nokeep,
|
||||
repl=self.start_repl,
|
||||
norepl=self.stop_repl,
|
||||
)
|
||||
self.command_map = {
|
||||
commandsocket.StopCommand.name: self.stop,
|
||||
commandsocket.PauseCommand.name: self.pause,
|
||||
commandsocket.UnPauseCommand.name: self.unpause,
|
||||
commandsocket.GracefulCommand.name: self.graceful,
|
||||
VerboseCommand.name: self.verboseOn,
|
||||
UnVerboseCommand.name: self.verboseOff,
|
||||
KeepCommand.name: self.keep,
|
||||
NoKeepCommand.name: self.nokeep,
|
||||
commandsocket.ReplCommand.name: self.startRepl,
|
||||
commandsocket.NoReplCommand.name: self.stopRepl,
|
||||
}
|
||||
self.log_console_port = log_console_port
|
||||
self.repl = None
|
||||
|
||||
@ -3456,7 +3488,7 @@ class ExecutorServer(BaseMergeServer):
|
||||
# ZooKeeper. We do this as one of the last steps to ensure
|
||||
# that all ZK related components can be stopped first.
|
||||
super().stop()
|
||||
self.stop_repl()
|
||||
self.stopRepl()
|
||||
self.monitoring_server.stop()
|
||||
self.log.debug("Stopped")
|
||||
|
||||
@ -3510,13 +3542,13 @@ class ExecutorServer(BaseMergeServer):
|
||||
def nokeep(self):
|
||||
self.keep_jobdir = False
|
||||
|
||||
def start_repl(self):
|
||||
def startRepl(self):
|
||||
if self.repl:
|
||||
return
|
||||
self.repl = zuul.lib.repl.REPLServer(self)
|
||||
self.repl.start()
|
||||
|
||||
def stop_repl(self):
|
||||
def stopRepl(self):
|
||||
if not self.repl:
|
||||
# not running
|
||||
return
|
||||
@ -3526,9 +3558,9 @@ class ExecutorServer(BaseMergeServer):
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
try:
|
||||
command = self.command_socket.get().decode('utf8')
|
||||
command, args = self.command_socket.get()
|
||||
if command != '_stop':
|
||||
self.command_map[command]()
|
||||
self.command_map[command](*args)
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# Copyright 2014 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2016 Red Hat
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -14,6 +15,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
@ -22,6 +24,49 @@ import threading
|
||||
from zuul.lib.queue import NamedQueue
|
||||
|
||||
|
||||
class Command:
|
||||
name = None
|
||||
help = None
|
||||
args = []
|
||||
|
||||
|
||||
class Argument:
|
||||
name = None
|
||||
help = None
|
||||
required = None
|
||||
default = None
|
||||
|
||||
|
||||
class StopCommand(Command):
|
||||
name = 'stop'
|
||||
help = 'Stop the running process'
|
||||
|
||||
|
||||
class GracefulCommand(Command):
|
||||
name = 'graceful'
|
||||
help = 'Stop after completing existing work'
|
||||
|
||||
|
||||
class PauseCommand(Command):
|
||||
name = 'pause'
|
||||
help = 'Stop accepting new work'
|
||||
|
||||
|
||||
class UnPauseCommand(Command):
|
||||
name = 'unpause'
|
||||
help = 'Resume accepting new work'
|
||||
|
||||
|
||||
class ReplCommand(Command):
|
||||
name = 'repl'
|
||||
help = 'Enable the REPL for debugging'
|
||||
|
||||
|
||||
class NoReplCommand(Command):
|
||||
name = 'norepl'
|
||||
help = 'Disable the REPL'
|
||||
|
||||
|
||||
class CommandSocket(object):
|
||||
log = logging.getLogger("zuul.CommandSocket")
|
||||
|
||||
@ -54,7 +99,7 @@ class CommandSocket(object):
|
||||
# either handle '_stop' or just ignore the unknown command and
|
||||
# then check to see if they should continue to run before
|
||||
# re-entering their loop.
|
||||
self.queue.put(b'_stop')
|
||||
self.queue.put(('_stop', []))
|
||||
self.socket_thread.join()
|
||||
|
||||
def _socketListener(self):
|
||||
@ -70,11 +115,17 @@ class CommandSocket(object):
|
||||
buf = buf.strip()
|
||||
self.log.debug("Received %s from socket" % (buf,))
|
||||
s.close()
|
||||
|
||||
buf = buf.decode('utf8')
|
||||
parts = buf.split(' ', 1)
|
||||
# Because we use '_stop' internally to wake up a
|
||||
# waiting thread, don't allow it to actually be
|
||||
# injected externally.
|
||||
if buf != b'_stop':
|
||||
self.queue.put(buf)
|
||||
args = parts[1:]
|
||||
if args:
|
||||
args = json.loads(args[0])
|
||||
if parts[0] != '_stop':
|
||||
self.queue.put((parts[0], args))
|
||||
except Exception:
|
||||
self.log.exception("Exception in socket handler")
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -22,7 +23,7 @@ from typing import Optional
|
||||
|
||||
from zuul.exceptions import StreamingError
|
||||
from zuul.lib import streamer_utils
|
||||
from zuul.lib.commandsocket import CommandSocket
|
||||
from zuul.lib import commandsocket
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.monitoring import MonitoringServer
|
||||
from zuul.version import get_version_string
|
||||
@ -30,7 +31,9 @@ from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.components import ComponentRegistry, FingerGatewayComponent
|
||||
from zuul.zk.executor import ExecutorApi
|
||||
|
||||
COMMANDS = ['stop']
|
||||
COMMANDS = [
|
||||
commandsocket.StopCommand,
|
||||
]
|
||||
|
||||
|
||||
class RequestHandler(streamer_utils.BaseFingerRequestHandler):
|
||||
@ -168,9 +171,9 @@ class FingerGateway(object):
|
||||
else:
|
||||
self.tls_listen = False
|
||||
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
)
|
||||
self.command_map = {
|
||||
commandsocket.StopCommand.name: self.stop,
|
||||
}
|
||||
|
||||
self.hostname = get_default(config, 'fingergw', 'hostname',
|
||||
socket.getfqdn())
|
||||
@ -199,9 +202,9 @@ class FingerGateway(object):
|
||||
def _runCommand(self):
|
||||
while self.command_running:
|
||||
try:
|
||||
command = self.command_socket.get().decode('utf8')
|
||||
command, args = self.command_socket.get()
|
||||
if command != '_stop':
|
||||
self.command_map[command]()
|
||||
self.command_map[command](*args)
|
||||
else:
|
||||
return
|
||||
except Exception:
|
||||
@ -239,7 +242,8 @@ class FingerGateway(object):
|
||||
# Start the command processor after the server and privilege drop
|
||||
if self.command_socket_path:
|
||||
self.log.debug("Starting command processor")
|
||||
self.command_socket = CommandSocket(self.command_socket_path)
|
||||
self.command_socket = commandsocket.CommandSocket(
|
||||
self.command_socket_path)
|
||||
self.command_socket.start()
|
||||
self.command_running = True
|
||||
self.command_thread = threading.Thread(
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -39,7 +40,12 @@ from zuul.zk.components import MergerComponent
|
||||
from zuul.zk.event_queues import PipelineResultEventQueue
|
||||
from zuul.zk.merger import MergerApi
|
||||
|
||||
COMMANDS = ['stop', 'pause', 'unpause', 'graceful']
|
||||
COMMANDS = [
|
||||
commandsocket.StopCommand,
|
||||
commandsocket.PauseCommand,
|
||||
commandsocket.UnPauseCommand,
|
||||
commandsocket.GracefulCommand
|
||||
]
|
||||
|
||||
|
||||
class BaseRepoLocks(metaclass=ABCMeta):
|
||||
@ -475,14 +481,14 @@ class MergeServer(BaseMergeServer):
|
||||
self.component_info)
|
||||
self.monitoring_server.start()
|
||||
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
self.command_map = {
|
||||
commandsocket.StopCommand.name: self.stop,
|
||||
# Stop for the mergers is always graceful. We add this alias
|
||||
# to make it clearer to users that they can gracefully stop.
|
||||
graceful=self.stop,
|
||||
pause=self.pause,
|
||||
unpause=self.unpause,
|
||||
)
|
||||
commandsocket.GracefulCommand.name: self.stop,
|
||||
commandsocket.PauseCommand.name: self.pause,
|
||||
commandsocket.UnPauseCommand.name: self.unpause,
|
||||
}
|
||||
command_socket = get_default(
|
||||
self.config, 'merger', 'command_socket',
|
||||
'/var/lib/zuul/merger.socket')
|
||||
@ -527,8 +533,8 @@ class MergeServer(BaseMergeServer):
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
try:
|
||||
command = self.command_socket.get().decode('utf8')
|
||||
command, args = self.command_socket.get()
|
||||
if command != '_stop':
|
||||
self.command_map[command]()
|
||||
self.command_map[command](*args)
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -5555,18 +5555,21 @@ class ReconfigureEvent(ManagementEvent):
|
||||
"""Reconfigure the scheduler. The layout will be (re-)loaded from
|
||||
the path specified in the configuration."""
|
||||
|
||||
def __init__(self, smart=False):
|
||||
def __init__(self, smart=False, tenants=None):
|
||||
super(ReconfigureEvent, self).__init__()
|
||||
self.smart = smart
|
||||
self.tenants = tenants
|
||||
|
||||
def toDict(self):
|
||||
d = super().toDict()
|
||||
d["smart"] = self.smart
|
||||
d["tenants"] = self.tenants
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
event = cls(data.get("smart", False))
|
||||
event = cls(data.get("smart", False),
|
||||
data.get("tenants", None))
|
||||
event.updateFromDict(data)
|
||||
return event
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
# Copyright 2013 Antoine "hashar" Musso
|
||||
# Copyright 2013 Wikimedia Foundation Inc.
|
||||
# Copyright 2021-2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -105,7 +106,36 @@ from zuul.zk.system import ZuulSystem
|
||||
from zuul.zk.zkobject import ZKContext
|
||||
from zuul.zk.election import SessionAwareElection
|
||||
|
||||
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
|
||||
|
||||
class FullReconfigureCommand(commandsocket.Command):
|
||||
name = 'full-reconfigure'
|
||||
help = 'Perform a reconfiguration of all tenants'
|
||||
|
||||
|
||||
class SmartReconfigureCommand(commandsocket.Command):
|
||||
name = 'smart-reconfigure'
|
||||
help = 'Perform a reconfiguration of updated tenants'
|
||||
|
||||
|
||||
class TenantArgument(commandsocket.Argument):
|
||||
name = 'tenant'
|
||||
help = 'The name of the tenant'
|
||||
|
||||
|
||||
class TenantReconfigureCommand(commandsocket.Command):
|
||||
name = 'tenant-reconfigure'
|
||||
help = 'Perform a reconfiguration of a specific tenant'
|
||||
args = [TenantArgument]
|
||||
|
||||
|
||||
COMMANDS = [
|
||||
FullReconfigureCommand,
|
||||
SmartReconfigureCommand,
|
||||
TenantReconfigureCommand,
|
||||
commandsocket.StopCommand,
|
||||
commandsocket.ReplCommand,
|
||||
commandsocket.NoReplCommand,
|
||||
]
|
||||
|
||||
|
||||
class SchedulerStatsElection(SessionAwareElection):
|
||||
@ -163,11 +193,13 @@ class Scheduler(threading.Thread):
|
||||
# Only used by tests in order to quiesce the main run loop
|
||||
self.run_handler_lock = threading.Lock()
|
||||
self.command_map = {
|
||||
'stop': self.stop,
|
||||
'full-reconfigure': self.fullReconfigureCommandHandler,
|
||||
'smart-reconfigure': self.smartReconfigureCommandHandler,
|
||||
'repl': self.start_repl,
|
||||
'norepl': self.stop_repl,
|
||||
FullReconfigureCommand.name: self.fullReconfigureCommandHandler,
|
||||
SmartReconfigureCommand.name: self.smartReconfigureCommandHandler,
|
||||
TenantReconfigureCommand.name:
|
||||
self.tenantReconfigureCommandHandler,
|
||||
commandsocket.StopCommand.name: self.stop,
|
||||
commandsocket.ReplCommand.name: self.startRepl,
|
||||
commandsocket.NoReplCommand.name: self.stopRepl,
|
||||
}
|
||||
self._stopped = False
|
||||
|
||||
@ -330,7 +362,7 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Waiting for layout update thread")
|
||||
self.layout_update_event.set()
|
||||
self.layout_update_thread.join()
|
||||
self.stop_repl()
|
||||
self.stopRepl()
|
||||
self._command_running = False
|
||||
self.log.debug("Stopping command socket")
|
||||
self.command_socket.stop()
|
||||
@ -347,9 +379,9 @@ class Scheduler(threading.Thread):
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
try:
|
||||
command = self.command_socket.get().decode('utf8')
|
||||
command, args = self.command_socket.get()
|
||||
if command != '_stop':
|
||||
self.command_map[command]()
|
||||
self.command_map[command](*args)
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
||||
@ -777,13 +809,16 @@ class Scheduler(threading.Thread):
|
||||
def smartReconfigureCommandHandler(self):
|
||||
self._zuul_app.smartReconfigure()
|
||||
|
||||
def start_repl(self):
|
||||
def tenantReconfigureCommandHandler(self, tenant_name):
|
||||
self._zuul_app.tenantReconfigure([tenant_name])
|
||||
|
||||
def startRepl(self):
|
||||
if self.repl:
|
||||
return
|
||||
self.repl = zuul.lib.repl.REPLServer(self)
|
||||
self.repl.start()
|
||||
|
||||
def stop_repl(self):
|
||||
def stopRepl(self):
|
||||
if not self.repl:
|
||||
return
|
||||
self.repl.stop()
|
||||
@ -872,10 +907,10 @@ class Scheduler(threading.Thread):
|
||||
self.wake_event.set()
|
||||
self.component_info.state = self.component_info.RUNNING
|
||||
|
||||
def reconfigure(self, config, smart=False):
|
||||
def reconfigure(self, config, smart=False, tenants=None):
|
||||
self.log.debug("Submitting reconfiguration event")
|
||||
|
||||
event = ReconfigureEvent(smart=smart)
|
||||
event = ReconfigureEvent(smart=smart, tenants=tenants)
|
||||
event.zuul_event_ltime = self.zk_client.getCurrentLtime()
|
||||
event.ack_ref = threading.Event()
|
||||
self.reconfigure_event_queue.put(event)
|
||||
@ -1174,7 +1209,8 @@ class Scheduler(threading.Thread):
|
||||
# a request
|
||||
reconfigured_tenants = []
|
||||
with self.layout_lock:
|
||||
self.log.info("Reconfiguration beginning (smart=%s)", event.smart)
|
||||
self.log.info("Reconfiguration beginning (smart=%s, tenants=%s)",
|
||||
event.smart, event.tenants)
|
||||
start = time.monotonic()
|
||||
|
||||
# Update runtime related system attributes from config
|
||||
@ -1213,6 +1249,8 @@ class Scheduler(threading.Thread):
|
||||
new_tenant = self.unparsed_abide.tenants.get(tenant_name)
|
||||
if old_tenant == new_tenant:
|
||||
continue
|
||||
if event.tenants and tenant_name not in event.tenants:
|
||||
continue
|
||||
|
||||
old_tenant = self.abide.tenants.get(tenant_name)
|
||||
if event.smart:
|
||||
@ -1240,8 +1278,9 @@ class Scheduler(threading.Thread):
|
||||
self._reconfigureDeleteTenant(ctx, old_tenant)
|
||||
|
||||
duration = round(time.monotonic() - start, 3)
|
||||
self.log.info("Reconfiguration complete (smart: %s, "
|
||||
"duration: %s seconds)", event.smart, duration)
|
||||
self.log.info("Reconfiguration complete (smart: %s, tenants: %s, "
|
||||
"duration: %s seconds)", event.smart, event.tenants,
|
||||
duration)
|
||||
if event.smart:
|
||||
self.log.info("Reconfigured tenants: %s", reconfigured_tenants)
|
||||
|
||||
|
@ -83,7 +83,11 @@ from zuul.web.logutil import ZuulCherrypyLogManager
|
||||
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
|
||||
cherrypy.tools.websocket = WebSocketTool()
|
||||
|
||||
COMMANDS = ['stop', 'repl', 'norepl']
|
||||
COMMANDS = [
|
||||
commandsocket.StopCommand,
|
||||
commandsocket.ReplCommand,
|
||||
commandsocket.NoReplCommand,
|
||||
]
|
||||
|
||||
|
||||
def get_request_logger(logger=None):
|
||||
@ -1772,9 +1776,9 @@ class ZuulWeb(object):
|
||||
self.repl = None
|
||||
|
||||
self.command_map = {
|
||||
'stop': self.stop,
|
||||
'repl': self.start_repl,
|
||||
'norepl': self.stop_repl,
|
||||
commandsocket.StopCommand.name: self.stop,
|
||||
commandsocket.ReplCommand.name: self.startRepl,
|
||||
commandsocket.NoReplCommand.name: self.stopRepl,
|
||||
}
|
||||
|
||||
self.finger_tls_key = get_default(
|
||||
@ -1973,7 +1977,7 @@ class ZuulWeb(object):
|
||||
self.system_config_cache_wake_event.set()
|
||||
self.system_config_thread.join()
|
||||
self.zk_client.disconnect()
|
||||
self.stop_repl()
|
||||
self.stopRepl()
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
self.monitoring_server.stop()
|
||||
@ -1985,19 +1989,19 @@ class ZuulWeb(object):
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
try:
|
||||
command = self.command_socket.get().decode('utf8')
|
||||
command, args = self.command_socket.get()
|
||||
if command != '_stop':
|
||||
self.command_map[command]()
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
||||
def start_repl(self):
|
||||
def startRepl(self):
|
||||
if self.repl:
|
||||
return
|
||||
self.repl = zuul.lib.repl.REPLServer(self)
|
||||
self.repl.start()
|
||||
|
||||
def stop_repl(self):
|
||||
def stopRepl(self):
|
||||
if not self.repl:
|
||||
return
|
||||
self.repl.stop()
|
||||
|
Loading…
x
Reference in New Issue
Block a user