Add basic zuul-launcher client/server skeleton

* new zuul-launcher cmd
* basic launcher client/server
* launcher started as part of the tests

Change-Id: Ia8e15142492e08be400b6a6aa33381544ea9b4b7
This commit is contained in:
Simon Westphahl 2024-06-13 12:55:51 +02:00 committed by James E. Blair
parent 2df37b086b
commit 7cf667677f
11 changed files with 232 additions and 3 deletions

View File

@ -35,6 +35,7 @@ console_scripts =
zuul-web = zuul.cmd.web:main
zuul-fingergw = zuul.cmd.fingergw:main
zuul-manage-ansible = zuul.cmd.manage_ansible:main
zuul-launcher = zuul.cmd.launcher:main
[build_sphinx]
source-dir = doc/source

View File

@ -104,6 +104,8 @@ import zuul.driver.sql
import zuul.scheduler
import zuul.executor.server
import zuul.executor.client
import zuul.launcher.server
import zuul.launcher.client
import zuul.lib.ansible
import zuul.lib.connections
import zuul.lib.auth
@ -2209,6 +2211,9 @@ class ZuulTestCase(BaseTestCase):
self.config.set(
'web', 'command_socket',
os.path.join(self.test_root, 'web.socket'))
self.config.set(
'launcher', 'command_socket',
os.path.join(self.test_root, 'launcher.socket'))
self.statsd = FakeStatsd()
if self.config.has_section('statsd'):
@ -2272,6 +2277,9 @@ class ZuulTestCase(BaseTestCase):
self.history = self.executor_server.build_history
self.builds = self.executor_server.running_builds
self.launcher = zuul.launcher.server.Launcher(self.config)
self.launcher.start()
self.scheds = SchedulerTestManager(self.validate_tenants,
self.wait_for_init,
self.disable_pipelines)
@ -2356,8 +2364,8 @@ class ZuulTestCase(BaseTestCase):
config.read(os.path.join(FIXTURE_DIR, config_file))
sections = [
'zuul', 'scheduler', 'executor', 'merger', 'web', 'zookeeper',
'keystore', 'database',
'zuul', 'scheduler', 'executor', 'merger', 'web', 'launcher',
'zookeeper', 'keystore', 'database',
]
for section in sections:
if not config.has_section(section):
@ -2677,6 +2685,9 @@ class ZuulTestCase(BaseTestCase):
self.assertNoPipelineExceptions()
def shutdown(self):
# Note: when making changes to this sequence, check if
# corresponding changes need to happen in
# tests/upgrade/test_upgrade_old.py
self.log.debug("Shutting down after tests")
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -2687,6 +2698,8 @@ class ZuulTestCase(BaseTestCase):
self.executor_server.stop()
self.executor_server.join()
self.launcher.stop()
self.launcher.join()
self.scheds.execute(lambda app: app.sched.stop())
self.scheds.execute(lambda app: app.sched.join())
self.statsd.stop()

View File

@ -134,3 +134,6 @@ class TestComponentRegistry(ZuulTestCase):
)
self.assertComponentState("web", BaseComponent.RUNNING)
def test_launcher_component(self):
self.assertComponentState("launcher", BaseComponent.RUNNING)

View File

@ -248,14 +248,16 @@ class TestWeb(BaseTestWeb):
data = resp.json()
# The list should contain one of each kind: executor, scheduler, web
self.assertEqual(len(data), 3)
self.assertEqual(len(data), 4)
self.assertEqual(len(data["executor"]), 1)
self.assertEqual(len(data["launcher"]), 1)
self.assertEqual(len(data["scheduler"]), self.scheduler_count)
self.assertEqual(len(data["web"]), 1)
# Each component should contain hostname and state information
for key in ["hostname", "state", "version"]:
self.assertIn(key, data["executor"][0])
self.assertIn(key, data["launcher"][0])
self.assertIn(key, data["scheduler"][0])
self.assertIn(key, data["web"][0])

View File

@ -44,6 +44,8 @@ class TestUpgradeOld(ZuulTestCase):
self.executor_server.stop()
self.executor_server.join()
self.launcher.stop()
self.launcher.join()
self.statsd.stop()
self.statsd.join()
self.fake_nodepool.stop()

62
zuul/cmd/launcher.py Executable file
View File

@ -0,0 +1,62 @@
# Copyright 2024 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import signal
import zuul.cmd
from zuul.launcher.server import COMMANDS, Launcher
class LauncherApp(zuul.cmd.ZuulDaemonApp):
app_name = 'launcher'
app_description = 'The Zuul launcher.'
launcher = None
def __init__(self):
super().__init__()
def createParser(self):
parser = super().createParser()
self.addSubCommands(parser, COMMANDS)
return parser
def exit_handler(self, signum, frame):
if self.launcher:
self.launcher.stop()
self.launcher.join()
def run(self):
self.handleCommands()
self.setup_logging('launcher', 'log_config')
self.log = logging.getLogger('zuul.launcher')
self.launcher = Launcher(self.config)
self.launcher.start()
if self.args.nodaemon:
signal.signal(signal.SIGTERM, self.exit_handler)
while True:
try:
self.launcher.join()
break
except KeyboardInterrupt:
print("Ctrl + C: asking launcher to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
def main():
LauncherApp().main()

View File

23
zuul/launcher/client.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright 2024 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
class LauncherClient:
log = logging.getLogger("zuul.LauncherClient")
def __init__(self, config, sched):
self.config = config
self.sched = sched

108
zuul/launcher/server.py Normal file
View File

@ -0,0 +1,108 @@
# Copyright 2024 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import socket
import threading
from zuul.lib import commandsocket, tracing
from zuul.lib.config import get_default
from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
from zuul.zk.components import LauncherComponent
from zuul.zk.event_queues import PipelineResultEventQueue
COMMANDS = (
commandsocket.StopCommand,
)
class Launcher:
log = logging.getLogger("zuul.Launcher")
def __init__(self, config):
self._running = False
self.config = config
self.tracing = tracing.Tracing(self.config)
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.result_events = PipelineResultEventQueue.createRegistry(
self.zk_client
)
self.hostname = socket.getfqdn()
self.component_info = LauncherComponent(
self.zk_client, self.hostname, version=get_version_string())
self.component_info.register()
self.command_map = {
commandsocket.StopCommand.name: self.stop,
}
command_socket = get_default(
self.config, "launcher", "command_socket",
"/var/lib/zuul/launcher.socket")
self.command_socket = commandsocket.CommandSocket(command_socket)
self._command_running = False
self.launcher_thread = threading.Thread(
target=self.run,
name="Launcher",
)
self.wake_event = threading.Event()
def run(self):
while self._running:
self.wake_event.wait()
self.wake_event.clear()
def start(self):
self.log.debug("Starting launcher thread")
self._running = True
self.launcher_thread.start()
self.log.debug("Starting command processor")
self._command_running = True
self.command_socket.start()
self.command_thread = threading.Thread(
target=self.runCommand, name="command")
self.command_thread.daemon = True
self.command_thread.start()
self.component_info.state = self.component_info.RUNNING
def stop(self):
self.log.debug("Stopping launcher")
self._running = False
self.wake_event.set()
self.component_info.state = self.component_info.STOPPED
self._command_running = False
self.command_socket.stop()
self.log.debug("Stopped launcher")
def join(self):
self.log.debug("Joining launcher")
self.launcher_thread.join()
self.zk_client.disconnect()
self.tracing.stop()
self.log.debug("Joined launcher")
def runCommand(self):
while self._command_running:
try:
command, args = self.command_socket.get()
if command != '_stop':
self.command_map[command](*args)
except Exception:
self.log.exception("Exception while processing command")

View File

@ -34,6 +34,7 @@ from kazoo.exceptions import NotEmptyError
from opentelemetry import trace
from zuul import configloader, exceptions
from zuul.launcher.client import LauncherClient
from zuul.lib import commandsocket
from zuul.lib.ansible import AnsibleManager
from zuul.lib.config import get_default
@ -214,6 +215,7 @@ class Scheduler(threading.Thread):
_connection_cleanup_interval = IntervalTrigger(minutes=5, jitter=10)
_merger_client_class = MergeClient
_executor_client_class = ExecutorClient
_launcher_client_class = LauncherClient
def __init__(self, config, connections, app, wait_for_init,
disable_pipelines=False, testonly=False):
@ -348,6 +350,7 @@ class Scheduler(threading.Thread):
if not testonly:
self.executor = self._executor_client_class(self.config, self)
self.merger = self._merger_client_class(self.config, self)
self.launcher = self._launcher_client_class(self.config, self)
self.nodepool = nodepool.Nodepool(
self.zk_client, self.system.system_id, self.statsd,
scheduler=True)

View File

@ -194,6 +194,17 @@ class WebComponent(BaseComponent):
kind = "web"
class LauncherComponent(BaseComponent):
kind = "launcher"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.initial_state = {
"connection_filter": None,
}
self.content.update(self.initial_state)
class ComponentRegistry(ZooKeeperBase):
"""A component registry is organized like:
@ -220,6 +231,7 @@ class ComponentRegistry(ZooKeeperBase):
"merger": MergerComponent,
"fingergw": FingerGatewayComponent,
"web": WebComponent,
"launcher": LauncherComponent,
}
def __init__(self, client):