Merge "Add periodic stats to launcher"

This commit is contained in:
Zuul 2025-05-07 20:53:22 +00:00 committed by Gerrit Code Review
commit 5cd4724520
4 changed files with 122 additions and 3 deletions

View File

@ -306,6 +306,12 @@ This is a reference for object layout in Zookeeper.
A zone-specific executor build request queue. The contents are the
same as above.
.. path:: zuul/launcher/stats-election
:type: LauncherStatsElection
An election to decide which launcher will report system-wide
launcher stats (such as total nodes).
.. path:: zuul/layout/<tenant>
The layout state for the tenant. Contains the cache and time data

View File

@ -2554,6 +2554,7 @@ class ZuulTestCase(BaseTestCase):
self.config,
launcher_connections)
launcher._start_cleanup = False
launcher._stats_interval = 1
launcher.start()
return launcher

View File

@ -501,6 +501,16 @@ class TestLauncher(LauncherBaseTestCase):
self.assertEqual(A.reported, 2)
self.assertEqual(self.getJobFromHistory('check-job').node,
'debian-normal')
pname = 'review_example_com%2Forg%2Fcommon-config/aws-us-east-1-main'
self.assertReportedStat(
f'zuul.provider.{pname}.nodes.state.requested',
kind='g')
self.assertReportedStat(
f'zuul.provider.{pname}.label.debian-normal.nodes.state.requested',
kind='g')
self.assertReportedStat(
'zuul.nodes.state.requested',
kind='g')
@simple_layout('layouts/nodepool-empty-nodeset.yaml', enable_nodepool=True)
def test_empty_nodeset(self):

View File

@ -16,6 +16,7 @@
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
import collections
from contextlib import nullcontext
import errno
import fcntl
import logging
@ -43,9 +44,11 @@ from zuul.zk.image_registry import (
ImageUploadRegistry,
)
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.statsd import get_statsd, normalize_statsd_name
from zuul.version import get_version_string
from zuul.zk import ZooKeeperClient
from zuul.zk.components import COMPONENT_REGISTRY, LauncherComponent
from zuul.zk.election import SessionAwareElection
from zuul.zk.exceptions import LockException
from zuul.zk.event_queues import (
PipelineResultEventQueue,
@ -95,6 +98,14 @@ class ProviderNodeError(Exception):
pass
class LauncherStatsElection(SessionAwareElection):
"""Election for emitting launcher stats."""
def __init__(self, client):
self.election_root = "/zuul/launcher/stats-election"
super().__init__(client.client, self.election_root)
class DeleteJob:
log = logging.getLogger("zuul.Launcher")
@ -808,6 +819,7 @@ class Launcher:
MAX_SLEEP = 1
DELETE_TIMEOUT = 600
MAX_QUOTA_AGE = 5 * 60 # How long to keep the quota information cached
_stats_interval = 30
def __init__(self, config, connections):
self._running = True
@ -825,6 +837,12 @@ class Launcher:
# launcher
self.endpoints = {}
self.statsd = get_statsd(config)
if self.statsd:
self.statsd_timer = self.statsd.timer
else:
self.statsd_timer = nullcontext
self._provider_quota_cache = cachetools.TTLCache(
maxsize=8192, ttl=self.MAX_QUOTA_AGE)
@ -848,6 +866,7 @@ class Launcher:
self.component_info.register()
self.wake_event = threading.Event()
self.stop_event = threading.Event()
self.join_event = threading.Event()
self.connection_filter = get_default(
self.config, "launcher", "connection_filter")
@ -907,6 +926,9 @@ class Launcher:
thread_name_prefix="UploadWorker",
)
self.cleanup_worker = CleanupWorker(self)
self.stats_election = LauncherStatsElection(self.zk_client)
self.stats_thread = threading.Thread(target=self.runStatsElection)
self.stats_thread.daemon = True
def _layoutUpdatedCallback(self):
self.layout_updated_event.set()
@ -1655,14 +1677,20 @@ class Launcher:
self.log.debug("Starting cleanup thread")
self.cleanup_worker.start()
self.stats_thread.start()
def stop(self):
self.log.debug("Stopping launcher")
self._running = False
self.stop_event.set()
self.wake_event.set()
self.component_info.state = self.component_info.STOPPED
self.stopRepl()
self._command_running = False
self.command_socket.stop()
self.log.debug("Stopping stats thread")
self.stats_election.cancel()
self.stats_thread.join()
self.cleanup_worker.stop()
self.cleanup_worker.join()
self.upload_executor.shutdown()
@ -1675,9 +1703,9 @@ class Launcher:
def join(self):
self.log.debug("Joining launcher")
self.launcher_thread.join()
# Don't set the stop event until after the main thread is
# Don't set the join event until after the main thread is
# joined because doing so will terminate the ZKContext.
self.stop_event.set()
self.join_event.set()
self.api.stop()
self.zk_client.disconnect()
self.tracing.stop()
@ -1706,7 +1734,7 @@ class Launcher:
self.repl = None
def createZKContext(self, lock, log):
return ZKContext(self.zk_client, lock, self.stop_event, log)
return ZKContext(self.zk_client, lock, self.join_event, log)
def updateTenantProviders(self):
# We need to handle new and deleted tenants, so we need to
@ -2089,3 +2117,77 @@ class Launcher:
total.subtract(node.quota)
log.debug("Node required quota: %s", node.quota)
return total.nonNegative()
def runStatsElection(self):
while self._running:
try:
self.log.debug("Running stats election")
self.stats_election.run(self.runStats)
except Exception:
self.log.exception("Exception running election stats:")
time.sleep(1)
def runStats(self):
self.log.debug("Won stats election")
while not self.stop_event.wait(self._stats_interval):
if not self.stats_election.is_still_valid():
self.log.debug("Stats election no longer valid")
return
try:
self._runStats()
except Exception:
self.log.exception("Error in periodic stats:")
def _runStats(self):
if not self.statsd:
return
node_defaults = {}
for state in model.ProviderNode.State:
node_defaults[state] = 0
nodes = node_defaults.copy()
provider_nodes = collections.defaultdict(
lambda: node_defaults.copy())
provider_label_nodes = collections.defaultdict(
lambda: collections.defaultdict(
lambda: node_defaults.copy()))
for node in self.api.nodes_cache.getItems():
nodes[node.state] += 1
provider_nodes[node.provider][node.state] += 1
provider_label_nodes[node.provider][node.label][node.state] += 1
providers = {}
for tenant_name, tenant_providers in self.tenant_providers.items():
for tenant_provider in tenant_providers:
providers[tenant_provider.canonical_name] = tenant_provider
for provider in providers.values():
safe_pname = normalize_statsd_name(provider.canonical_name)
limits = provider.getQuotaLimits().getResources()
# zuul.provider.<provider>.limit.<limit> gauge
for limit, value in limits.items():
safe_limit = normalize_statsd_name(limit)
self.statsd.gauge(
f'zuul.provider.{safe_pname}.limit.{safe_limit}',
value)
# zuul.provider.<provider>.nodes.state.<state> gauge
for state, value in provider_nodes[
provider.canonical_name].items():
self.statsd.gauge(
f'zuul.provider.{safe_pname}.nodes.state.{state.value}',
value)
# zuul.provider.<provider>.label.<label>.nodes.state.<state> gauge
for label, label_nodes in provider_label_nodes[
provider.canonical_name].items():
safe_label = normalize_statsd_name(label)
for state, value in label_nodes.items():
self.statsd.gauge(
f'zuul.provider.{safe_pname}.label.{safe_label}.'
f'nodes.state.{state.value}',
value)
# zuul.nodes.state.<state> gauge
for state, value in nodes.items():
self.statsd.gauge(
f'zuul.nodes.state.{state.value}',
value)