diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py index e8c3e728a..1c6cbb388 100644 --- a/nodepool/driver/utils.py +++ b/nodepool/driver/utils.py @@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread, try: dt = int((time.monotonic() - start_time) * 1000) self.recordLaunchStats(statsd_key, dt) - self.updateNodeStats(self.zk, self.provider_config) except Exception: self.log.exception("Exception while reporting stats:") diff --git a/nodepool/launcher.py b/nodepool/launcher.py index 192651f64..cabf5bf70 100755 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS SUSPEND_WAIT_TIME = 30 -class NodeDeleter(threading.Thread, stats.StatsReporter): +class NodeDeleter(threading.Thread): log = logging.getLogger("nodepool.NodeDeleter") def __init__(self, zk, provider_manager, node): threading.Thread.__init__(self, name='NodeDeleter for %s %s' % (node.provider, node.external_id)) - stats.StatsReporter.__init__(self) self._zk = zk self._provider_manager = provider_manager self._node = node @@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter): self.delete(self._zk, self._provider_manager, self._node, node_exists) - try: - self.updateNodeStats(self._zk, self._provider_manager.provider) - except Exception: - self.log.exception("Exception while reporting stats:") - -class PoolWorker(threading.Thread): +class PoolWorker(threading.Thread, stats.StatsReporter): ''' Class that manages node requests for a single provider pool. @@ -143,6 +137,7 @@ class PoolWorker(threading.Thread): self.launcher_id = "%s-%s-%s" % (socket.gethostname(), os.getpid(), self.name) + stats.StatsReporter.__init__(self) # --------------------------------------------------------------- # Private methods @@ -294,8 +289,12 @@ class PoolWorker(threading.Thread): launcher.id = self.launcher_id for prov_cfg in self.nodepool.config.providers.values(): launcher.supported_labels.update(prov_cfg.getSupportedLabels()) + launcher.provider_name = self.provider_name self.zk.registerLauncher(launcher) + self.updateProviderLimits( + self.nodepool.config.providers.get(self.provider_name)) + try: if not self.paused_handler: self._assignHandlers() @@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker): self.log.exception("Exception in DeletedNodeWorker:") +class StatsWorker(BaseCleanupWorker, stats.StatsReporter): + + def __init__(self, nodepool, interval): + super().__init__(nodepool, interval, name='StatsWorker') + self.log = logging.getLogger('nodepool.StatsWorker') + self.stats_event = threading.Event() + self.election = None + + def stop(self): + self._running = False + if self.election is not None: + self.log.debug('Cancel leader election') + self.election.cancel() + self.stats_event.set() + super().stop() + + def _run(self): + try: + stats.StatsReporter.__init__(self) + + if not self._statsd: + return + + if self.election is None: + zk = self._nodepool.getZK() + identifier = "%s-%s" % (socket.gethostname(), os.getpid()) + self.election = zk.getStatsElection(identifier) + + if not self._running: + return + + self.election.run(self._run_stats) + + except Exception: + self.log.exception('Exception in StatsWorker:') + + def _run_stats(self): + self.log.info('Won stats reporter election') + + # enable us getting events + zk = self._nodepool.getZK() + zk.setNodeStatsEvent(self.stats_event) + + while self._running: + signaled = self.stats_event.wait() + + if not self._running: + break + + if not signaled: + continue + + self.log.debug('Updating stats') + self.stats_event.clear() + try: + self.updateNodeStats(zk) + except Exception: + self.log.exception("Exception while reporting stats:") + time.sleep(1) + + # Unregister from node stats events + zk.setNodeStatsEvent(None) + + class NodePool(threading.Thread): log = logging.getLogger("nodepool.NodePool") @@ -710,6 +773,7 @@ class NodePool(threading.Thread): self.watermark_sleep = watermark_sleep self.cleanup_interval = 60 self.delete_interval = 5 + self.stats_interval = 5 self._stopped = False self._stop_event = threading.Event() self.config = None @@ -718,6 +782,7 @@ class NodePool(threading.Thread): self._pool_threads = {} self._cleanup_thread = None self._delete_thread = None + self._stats_thread = None self._submittedRequests = {} def stop(self): @@ -738,6 +803,10 @@ class NodePool(threading.Thread): self._delete_thread.stop() self._delete_thread.join() + if self._stats_thread: + self._stats_thread.stop() + self._stats_thread.join() + # Don't let stop() return until all pool threads have been # terminated. self.log.debug("Stopping pool threads") @@ -950,6 +1019,10 @@ class NodePool(threading.Thread): self, self.delete_interval) self._delete_thread.start() + if not self._stats_thread: + self._stats_thread = StatsWorker(self, self.stats_interval) + self._stats_thread.start() + # Stop any PoolWorker threads if the pool was removed # from the config. pool_keys = set() diff --git a/nodepool/stats.py b/nodepool/stats.py index 7cbf32e10..c2186331d 100755 --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -85,39 +85,40 @@ class StatsReporter(object): pipeline.incr(key) pipeline.send() - def updateNodeStats(self, zk_conn, provider): + def updateNodeStats(self, zk_conn): ''' Refresh statistics for all known nodes. :param ZooKeeper zk_conn: A ZooKeeper connection object. - :param Provider provider: A config Provider object. ''' if not self._statsd: return states = {} + launchers = zk_conn.getRegisteredLaunchers() + labels = set() + for launcher in launchers: + labels.update(launcher.supported_labels) + providers = set() + for launcher in launchers: + providers.add(launcher.provider_name) + # Initialize things we know about to zero for state in zk.Node.VALID_STATES: key = 'nodepool.nodes.%s' % state states[key] = 0 - key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state) - states[key] = 0 + for provider in providers: + key = 'nodepool.provider.%s.nodes.%s' % (provider, state) + states[key] = 0 # Initialize label stats to 0 - for label in provider.getSupportedLabels(): + for label in labels: for state in zk.Node.VALID_STATES: key = 'nodepool.label.%s.nodes.%s' % (label, state) states[key] = 0 - # Note that we intentionally don't use caching here because we don't - # know when the next update will happen and thus need to report the - # correct most recent state. Otherwise we can end up in reporting - # a gauge with a node in state deleting = 1 and never update this for - # a long time. - # TODO(tobiash): Changing updateNodeStats to just run periodically will - # resolve this and we can operate on cached data. - for node in zk_conn.nodeIterator(cached=False): + for node in zk_conn.nodeIterator(): # nodepool.nodes.STATE key = 'nodepool.nodes.%s' % node.state states[key] += 1 @@ -145,9 +146,18 @@ class StatsReporter(object): for key, count in states.items(): pipeline.gauge(key, count) + pipeline.send() + + def updateProviderLimits(self, provider): + if not self._statsd: + return + + pipeline = self._statsd.pipeline() + # nodepool.provider.PROVIDER.max_servers key = 'nodepool.provider.%s.max_servers' % provider.name max_servers = sum([p.max_servers for p in provider.pools.values() if p.max_servers]) pipeline.gauge(key, max_servers) + pipeline.send() diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 1ab83ac29..4a8e83d91 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase): 'fake-provider3', 'CleanupWorker', 'DeletedNodeWorker', + 'StatsWorker', 'pydevd.CommandThread', 'pydevd.Reader', 'pydevd.Writer', diff --git a/nodepool/zk.py b/nodepool/zk.py index daaf4f68a..da850adac 100755 --- a/nodepool/zk.py +++ b/nodepool/zk.py @@ -23,6 +23,7 @@ from kazoo import exceptions as kze from kazoo.handlers.threading import KazooTimeoutError from kazoo.recipe.lock import Lock from kazoo.recipe.cache import TreeCache, TreeEvent +from kazoo.recipe.election import Election from nodepool import exceptions as npe @@ -164,6 +165,7 @@ class Launcher(Serializable): def __init__(self): self.id = None + self.provider_name = None self._supported_labels = set() def __eq__(self, other): @@ -186,6 +188,7 @@ class Launcher(Serializable): def toDict(self): d = {} d['id'] = self.id + d['provider_name'] = self.provider_name # sets are not JSON serializable, so use a sorted list d['supported_labels'] = sorted(self.supported_labels) return d @@ -194,6 +197,10 @@ class Launcher(Serializable): def fromDict(d): obj = Launcher() obj.id = d.get('id') + # TODO(tobiash): The fallback to 'unknown' is only needed to avoid + # having a full nodepool shutdown on upgrade. It can be + # removed later. + obj.provider_name = d.get('provider_name', 'unknown') obj.supported_labels = set(d.get('supported_labels', [])) return obj @@ -693,6 +700,7 @@ class ZooKeeper(object): NODE_ROOT = "/nodepool/nodes" REQUEST_ROOT = "/nodepool/requests" REQUEST_LOCK_ROOT = "/nodepool/requests-lock" + ELECTION_ROOT = "/nodepool/elections" # Log zookeeper retry every 10 seconds retry_log_rate = 10 @@ -710,10 +718,15 @@ class ZooKeeper(object): self._cached_node_requests = {} self.enable_cache = enable_cache + self.node_stats_event = None + # ======================================================================= # Private Methods # ======================================================================= + def _electionPath(self, election): + return "%s/%s" % (self.ELECTION_ROOT, election) + def _imagePath(self, image): return "%s/%s" % (self.IMAGE_ROOT, image) @@ -2106,6 +2119,10 @@ class ZooKeeper(object): node = Node.fromDict(d, node_id) node.stat = event.event_data.stat self._cached_nodes[node_id] = node + + # set the stats event so the stats reporting thread can act upon it + if self.node_stats_event is not None: + self.node_stats_event.set() elif event.event_type == TreeEvent.NODE_REMOVED: try: del self._cached_nodes[node_id] @@ -2113,6 +2130,13 @@ class ZooKeeper(object): # If it's already gone, don't care pass + # set the stats event so the stats reporting thread can act upon it + if self.node_stats_event is not None: + self.node_stats_event.set() + + def setNodeStatsEvent(self, event): + self.node_stats_event = event + def requestCacheListener(self, event): if hasattr(event.event_data, 'path'): @@ -2158,3 +2182,7 @@ class ZooKeeper(object): except KeyError: # If it's already gone, don't care pass + + def getStatsElection(self, identifier): + path = self._electionPath('stats') + return Election(self.client, path, identifier)