From e1cb6dba85eb8e1018466b0cf3ed9becb6d9e46c Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Wed, 9 Oct 2024 09:05:26 +0200 Subject: [PATCH] Parallelize static resource cleanup Similar to the parallel registration of static nodes on startup we also need to optimize the leaked resource cleanup. The problem here is the same in that node registration was serialized and could be slowed down significantly by a lot of timeouts. This in turn blocked node re-registration in the node deleted notification, which also needs the register lock. Change-Id: Ibb4f759b1a98d564fe5eab065824239cec72364b --- nodepool/driver/static/provider.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/nodepool/driver/static/provider.py b/nodepool/driver/static/provider.py index 8f03075bb..3b8659ad5 100644 --- a/nodepool/driver/static/provider.py +++ b/nodepool/driver/static/provider.py @@ -443,19 +443,26 @@ class StaticNodeProvider(Provider, QuotaSupport): def cleanupLeakedResources(self): if self._idle: return + with self._register_lock: self.getRegisteredNodes() - for pool in self.provider.pools.values(): - for static_node in pool.nodes: - try: - self.syncNodeCount(static_node, pool) - except StaticNodeError as exc: - self.log.warning("Couldn't sync node: %s", exc) - continue - except Exception: - self.log.exception("Couldn't sync node %s:", - nodeTuple(static_node)) - continue + with ThreadPoolExecutor() as executor: + for pool in self.provider.pools.values(): + synced_nodes = [] + for static_node in pool.nodes: + synced_nodes.append((static_node, executor.submit( + self.syncNodeCount, static_node, pool))) + + for static_node, result in synced_nodes: + try: + result.result() + except StaticNodeError as exc: + self.log.warning("Couldn't sync node: %s", exc) + continue + except Exception: + self.log.exception("Couldn't sync node %s:", + nodeTuple(static_node)) + continue def getRequestHandler(self, poolworker, request): return StaticNodeRequestHandler(poolworker, request)