diff --git a/doc/source/developer/zookeeper.rst b/doc/source/developer/zookeeper.rst index f15085063e..bc2ad56477 100644 --- a/doc/source/developer/zookeeper.rst +++ b/doc/source/developer/zookeeper.rst @@ -306,6 +306,12 @@ This is a reference for object layout in Zookeeper. A zone-specific executor build request queue. The contents are the same as above. +.. path:: zuul/launcher/stats-election + :type: LauncherStatsElection + + An election to decide which launcher will report system-wide + launcher stats (such as total nodes). + .. path:: zuul/layout/ The layout state for the tenant. Contains the cache and time data diff --git a/tests/base.py b/tests/base.py index ddfb2c4ecd..b0a6c97ba0 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2554,6 +2554,7 @@ class ZuulTestCase(BaseTestCase): self.config, launcher_connections) launcher._start_cleanup = False + launcher._stats_interval = 1 launcher.start() return launcher diff --git a/tests/unit/test_launcher.py b/tests/unit/test_launcher.py index 6ce37a6eec..c6d493d008 100644 --- a/tests/unit/test_launcher.py +++ b/tests/unit/test_launcher.py @@ -501,6 +501,16 @@ class TestLauncher(LauncherBaseTestCase): self.assertEqual(A.reported, 2) self.assertEqual(self.getJobFromHistory('check-job').node, 'debian-normal') + pname = 'review_example_com%2Forg%2Fcommon-config/aws-us-east-1-main' + self.assertReportedStat( + f'zuul.provider.{pname}.nodes.state.requested', + kind='g') + self.assertReportedStat( + f'zuul.provider.{pname}.label.debian-normal.nodes.state.requested', + kind='g') + self.assertReportedStat( + 'zuul.nodes.state.requested', + kind='g') @simple_layout('layouts/nodepool-empty-nodeset.yaml', enable_nodepool=True) def test_empty_nodeset(self): diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index a699dac45f..9f30d7973c 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -16,6 +16,7 @@ from concurrent.futures import ThreadPoolExecutor import concurrent.futures import collections +from contextlib import nullcontext import errno import fcntl import logging @@ -43,9 +44,11 @@ from zuul.zk.image_registry import ( ImageUploadRegistry, ) from zuul.lib.logutil import get_annotated_logger +from zuul.lib.statsd import get_statsd, normalize_statsd_name from zuul.version import get_version_string from zuul.zk import ZooKeeperClient from zuul.zk.components import COMPONENT_REGISTRY, LauncherComponent +from zuul.zk.election import SessionAwareElection from zuul.zk.exceptions import LockException from zuul.zk.event_queues import ( PipelineResultEventQueue, @@ -95,6 +98,14 @@ class ProviderNodeError(Exception): pass +class LauncherStatsElection(SessionAwareElection): + """Election for emitting launcher stats.""" + + def __init__(self, client): + self.election_root = "/zuul/launcher/stats-election" + super().__init__(client.client, self.election_root) + + class DeleteJob: log = logging.getLogger("zuul.Launcher") @@ -808,6 +819,7 @@ class Launcher: MAX_SLEEP = 1 DELETE_TIMEOUT = 600 MAX_QUOTA_AGE = 5 * 60 # How long to keep the quota information cached + _stats_interval = 30 def __init__(self, config, connections): self._running = True @@ -825,6 +837,12 @@ class Launcher: # launcher self.endpoints = {} + self.statsd = get_statsd(config) + if self.statsd: + self.statsd_timer = self.statsd.timer + else: + self.statsd_timer = nullcontext + self._provider_quota_cache = cachetools.TTLCache( maxsize=8192, ttl=self.MAX_QUOTA_AGE) @@ -848,6 +866,7 @@ class Launcher: self.component_info.register() self.wake_event = threading.Event() self.stop_event = threading.Event() + self.join_event = threading.Event() self.connection_filter = get_default( self.config, "launcher", "connection_filter") @@ -907,6 +926,9 @@ class Launcher: thread_name_prefix="UploadWorker", ) self.cleanup_worker = CleanupWorker(self) + self.stats_election = LauncherStatsElection(self.zk_client) + self.stats_thread = threading.Thread(target=self.runStatsElection) + self.stats_thread.daemon = True def _layoutUpdatedCallback(self): self.layout_updated_event.set() @@ -1655,14 +1677,20 @@ class Launcher: self.log.debug("Starting cleanup thread") self.cleanup_worker.start() + self.stats_thread.start() + def stop(self): self.log.debug("Stopping launcher") self._running = False + self.stop_event.set() self.wake_event.set() self.component_info.state = self.component_info.STOPPED self.stopRepl() self._command_running = False self.command_socket.stop() + self.log.debug("Stopping stats thread") + self.stats_election.cancel() + self.stats_thread.join() self.cleanup_worker.stop() self.cleanup_worker.join() self.upload_executor.shutdown() @@ -1675,9 +1703,9 @@ class Launcher: def join(self): self.log.debug("Joining launcher") self.launcher_thread.join() - # Don't set the stop event until after the main thread is + # Don't set the join event until after the main thread is # joined because doing so will terminate the ZKContext. - self.stop_event.set() + self.join_event.set() self.api.stop() self.zk_client.disconnect() self.tracing.stop() @@ -1706,7 +1734,7 @@ class Launcher: self.repl = None def createZKContext(self, lock, log): - return ZKContext(self.zk_client, lock, self.stop_event, log) + return ZKContext(self.zk_client, lock, self.join_event, log) def updateTenantProviders(self): # We need to handle new and deleted tenants, so we need to @@ -2089,3 +2117,77 @@ class Launcher: total.subtract(node.quota) log.debug("Node required quota: %s", node.quota) return total.nonNegative() + + def runStatsElection(self): + while self._running: + try: + self.log.debug("Running stats election") + self.stats_election.run(self.runStats) + except Exception: + self.log.exception("Exception running election stats:") + time.sleep(1) + + def runStats(self): + self.log.debug("Won stats election") + while not self.stop_event.wait(self._stats_interval): + if not self.stats_election.is_still_valid(): + self.log.debug("Stats election no longer valid") + return + try: + self._runStats() + except Exception: + self.log.exception("Error in periodic stats:") + + def _runStats(self): + if not self.statsd: + return + + node_defaults = {} + for state in model.ProviderNode.State: + node_defaults[state] = 0 + + nodes = node_defaults.copy() + provider_nodes = collections.defaultdict( + lambda: node_defaults.copy()) + provider_label_nodes = collections.defaultdict( + lambda: collections.defaultdict( + lambda: node_defaults.copy())) + + for node in self.api.nodes_cache.getItems(): + nodes[node.state] += 1 + provider_nodes[node.provider][node.state] += 1 + provider_label_nodes[node.provider][node.label][node.state] += 1 + + providers = {} + for tenant_name, tenant_providers in self.tenant_providers.items(): + for tenant_provider in tenant_providers: + providers[tenant_provider.canonical_name] = tenant_provider + for provider in providers.values(): + safe_pname = normalize_statsd_name(provider.canonical_name) + limits = provider.getQuotaLimits().getResources() + # zuul.provider..limit. gauge + for limit, value in limits.items(): + safe_limit = normalize_statsd_name(limit) + self.statsd.gauge( + f'zuul.provider.{safe_pname}.limit.{safe_limit}', + value) + # zuul.provider..nodes.state. gauge + for state, value in provider_nodes[ + provider.canonical_name].items(): + self.statsd.gauge( + f'zuul.provider.{safe_pname}.nodes.state.{state.value}', + value) + # zuul.provider..label.