Update providers on launcher

When a scheduler updates the layout state for a tenant, all launchers
should reload their provider objects.

This adds a layout update thread like we use on the web servers,
except that instead of updating the full layout, it loads new Provider
objects.

Change-Id: Ia0a297907dba83a7e40f9d040d70da48537482bf
This commit is contained in:
James E. Blair
2024-06-28 10:55:22 -07:00
parent 12ddfb4ab3
commit f0d3c2c263
6 changed files with 141 additions and 28 deletions

View File

@@ -1,6 +1,6 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Red Hat, Inc.
# Copyright 2021-2022 Acme Gating, LLC
# Copyright 2021-2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -1781,7 +1781,9 @@ class BaseTestCase(testtools.TestCase):
log_defaults_from_env = os.environ.get(
'OS_LOG_DEFAULTS',
'git.cmd=INFO,'
'kazoo.client=WARNING,kazoo.recipe=WARNING')
'kazoo.client=WARNING,kazoo.recipe=WARNING,'
'botocore=WARNING'
)
if log_defaults_from_env:
for default in log_defaults_from_env.split(','):
@@ -2279,7 +2281,16 @@ class ZuulTestCase(BaseTestCase):
self.history = self.executor_server.build_history
self.builds = self.executor_server.running_builds
self.launcher = zuul.launcher.server.Launcher(self.config)
launcher_connections = TestConnectionRegistry(
self.config, self.test_config,
self.additional_event_queues,
self.upstream_root, self.poller_events,
self.git_url_with_auth, self.addCleanup)
launcher_connections.configure(self.config, providers=True)
self.launcher = zuul.launcher.server.Launcher(
self.config,
launcher_connections)
self.launcher.start()
self.scheds = SchedulerTestManager(self.validate_tenants,

View File

@@ -12,12 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from zuul.provider import BaseProvider
from moto import mock_aws
from tests.base import (
ZuulTestCase,
iterate_timeout,
simple_layout,
)
@@ -45,16 +44,13 @@ class TestAwsDriver(ZuulTestCase):
@simple_layout('layouts/nodepool.yaml', enable_nodepool=True)
def test_aws_launcher(self):
# Temporary test until replaced by launcher to show that we
# can deserialize a provider without a layout.
canonical_name = (
'review.example.com%2Forg%2Fcommon-config/aws-us-east-1-main'
)
path = (f'/zuul/tenant/tenant-one/'
f'provider/{canonical_name}/config')
with self.createZKContext() as context:
provider = BaseProvider.fromZK(
context, path, self.scheds.first.sched.connections
)
for _ in iterate_timeout(
30, "scheduler and launcher to have the same layout"):
if (self.scheds.first.sched.local_layout_state.get("tenant-one") ==
self.launcher.local_layout_state.get("tenant-one")):
break
providers = self.launcher.tenant_providers['tenant-one']
self.assertEqual(1, len(providers))
provider = providers[0]
endpoint = provider.getEndpoint()
self.assertTrue(len(endpoint.testListAmis()) > 1)

View File

@@ -43,7 +43,8 @@ class LauncherApp(zuul.cmd.ZuulDaemonApp):
self.setup_logging('launcher', 'log_config')
self.log = logging.getLogger('zuul.launcher')
self.launcher = Launcher(self.config)
self.configure_connections(providers=True)
self.launcher = Launcher(self.config, self.connections)
self.launcher.start()
if self.args.nodaemon:

View File

@@ -1,4 +1,5 @@
# Copyright 2024 BMW Group
# Copyright 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -22,6 +23,12 @@ from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
from zuul.zk.components import LauncherComponent
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.layout import (
LayoutProvidersStore,
LayoutStateStore,
)
from zuul.zk.locks import tenant_read_lock
from zuul.zk.zkobject import ZKContext
COMMANDS = (
commandsocket.StopCommand,
@@ -31,9 +38,11 @@ COMMANDS = (
class Launcher:
log = logging.getLogger("zuul.Launcher")
def __init__(self, config):
def __init__(self, config, connections):
self._running = False
self.config = config
self.connections = connections
self.tenant_providers = {}
self.tracing = tracing.Tracing(self.config)
self.zk_client = ZooKeeperClient.fromConfig(self.config)
@@ -47,6 +56,7 @@ class Launcher:
self.component_info = LauncherComponent(
self.zk_client, self.hostname, version=get_version_string())
self.component_info.register()
self.wake_event = threading.Event()
self.command_map = {
commandsocket.StopCommand.name: self.stop,
@@ -57,22 +67,30 @@ class Launcher:
self.command_socket = commandsocket.CommandSocket(command_socket)
self._command_running = False
self.tenant_layout_state = LayoutStateStore(
self.zk_client, self.wake_event.set)
self.layout_providers_store = LayoutProvidersStore(
self.zk_client, self.connections)
self.local_layout_state = {}
self.launcher_thread = threading.Thread(
target=self.run,
name="Launcher",
)
self.wake_event = threading.Event()
def run(self):
while self._running:
self.wake_event.wait()
self.wake_event.clear()
try:
self._run()
except Exception:
self.log.exception("Error in main thread:")
def _run(self):
self.updateTenantProviders()
def start(self):
self.log.debug("Starting launcher thread")
self._running = True
self.launcher_thread.start()
self.log.debug("Starting command processor")
self._command_running = True
self.command_socket.start()
@@ -80,6 +98,11 @@ class Launcher:
target=self.runCommand, name="command")
self.command_thread.daemon = True
self.command_thread.start()
self.log.debug("Starting launcher thread")
self._running = True
self.launcher_thread.start()
self.component_info.state = self.component_info.RUNNING
def stop(self):
@@ -89,6 +112,7 @@ class Launcher:
self.component_info.state = self.component_info.STOPPED
self._command_running = False
self.command_socket.stop()
self.connections.stop()
self.log.debug("Stopped launcher")
def join(self):
@@ -106,3 +130,38 @@ class Launcher:
self.command_map[command](*args)
except Exception:
self.log.exception("Exception while processing command")
def createZKContext(self, lock, log):
return ZKContext(self.zk_client, lock, None, log)
def updateTenantProviders(self):
# We need to handle new and deleted tenants, so we need to
# process all tenants currently known and the new ones.
tenant_names = set(self.tenant_providers.keys())
tenant_names.update(self.tenant_layout_state)
for tenant_name in tenant_names:
# Reload the tenant if the layout changed.
self._updateTenantProviders(tenant_name)
return True
def _updateTenantProviders(self, tenant_name):
# Reload the tenant if the layout changed.
if (self.local_layout_state.get(tenant_name)
== self.tenant_layout_state.get(tenant_name)):
return
self.log.debug("Updating tenant %s", tenant_name)
with tenant_read_lock(self.zk_client, tenant_name, self.log) as tlock:
layout_state = self.tenant_layout_state.get(tenant_name)
if layout_state:
with self.createZKContext(tlock, self.log) as context:
providers = list(self.layout_providers_store.get(
context, tenant_name))
self.tenant_providers[tenant_name] = providers
for provider in providers:
self.log.debug("Loaded provider %s", provider.name)
self.local_layout_state[tenant_name] = layout_state
else:
self.tenant_providers.pop(tenant_name, None)
self.local_layout_state.pop(tenant_name, None)

View File

@@ -106,7 +106,11 @@ from zuul.zk.event_queues import (
TENANT_ROOT,
)
from zuul.zk.exceptions import LockException
from zuul.zk.layout import LayoutState, LayoutStateStore
from zuul.zk.layout import (
LayoutProvidersStore,
LayoutState,
LayoutStateStore,
)
from zuul.zk.locks import (
locked,
tenant_read_lock,
@@ -332,6 +336,8 @@ class Scheduler(threading.Thread):
self.tenant_layout_state = LayoutStateStore(
self.zk_client,
self.layout_update_event.set)
self.layout_providers_store = LayoutProvidersStore(
self.zk_client, self.connections)
self.local_layout_state = {}
command_socket = get_default(
@@ -1841,8 +1847,8 @@ class Scheduler(threading.Thread):
branch_cache_min_ltimes=branch_cache_min_ltimes,
)
# Write the new provider configs
for provider in tenant.layout.providers.values():
provider.internalCreate(context)
self.layout_providers_store.set(context, tenant.name,
tenant.layout.providers.values())
# Save the min_ltimes which are sharded before we atomically
# update the layout state.
self.tenant_layout_state.setMinLtimes(layout_state, min_ltimes)

View File

@@ -1,5 +1,5 @@
# Copyright 2020 BMW Group
# Copyright 2022 Acme Gating, LLC
# Copyright 2022, 2024 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -22,7 +22,8 @@ import time
from kazoo.exceptions import NoNodeError
from zuul.zk import sharding, ZooKeeperBase
from zuul.zk import sharding, ZooKeeperBase, ZooKeeperSimpleBase
from zuul.provider import BaseProvider
@total_ordering
@@ -216,3 +217,42 @@ class LayoutStateStore(ZooKeeperBase, MutableMapping):
with suppress(NoNodeError):
self.kazoo_client.delete(path, recursive=True)
self.log.debug("Finished layout data cleanup")
class LayoutProvidersStore(ZooKeeperSimpleBase):
log = logging.getLogger("zuul.LayoutProvidersStore")
tenant_root = "/zuul/tenant"
def __init__(self, client, connections):
super().__init__(client)
self.connections = connections
def get(self, context, tenant_name):
path = f"{self.tenant_root}/{tenant_name}/provider"
try:
repo_names = self.kazoo_client.get_children(path)
except NoNodeError:
repo_names = []
for repo in repo_names:
path = f"{self.tenant_root}/{tenant_name}/provider/{repo}"
try:
provider_names = self.kazoo_client.get_children(path)
except NoNodeError:
provider_names = []
for provider_name in provider_names:
provider_path = (f"{path}/{provider_name}/config")
yield BaseProvider.fromZK(
context, provider_path, self.connections
)
def set(self, context, tenant_name, providers):
self.clear(tenant_name)
path = f"{self.tenant_root}/{tenant_name}/provider"
self.kazoo_client.ensure_path(path)
for provider in providers:
provider.internalCreate(context)
def clear(self, tenant_name):
path = f"{self.tenant_root}/{tenant_name}/provider"
self.kazoo_client.delete(path, recursive=True)