Parallelize static resource cleanup

Similar to the parallel registration of static nodes on startup we also
need to optimize the leaked resource cleanup. The problem here is the
same in that node registration was serialized and could be slowed down
significantly by a lot of timeouts.

This in turn blocked node re-registration in the node deleted
notification, which also needs the register lock.

Change-Id: Ibb4f759b1a98d564fe5eab065824239cec72364b
This commit is contained in:
Simon Westphahl
2024-10-09 09:05:26 +02:00
parent 2edaec0d5f
commit e1cb6dba85

View File

@@ -443,19 +443,26 @@ class StaticNodeProvider(Provider, QuotaSupport):
def cleanupLeakedResources(self): def cleanupLeakedResources(self):
if self._idle: if self._idle:
return return
with self._register_lock: with self._register_lock:
self.getRegisteredNodes() self.getRegisteredNodes()
for pool in self.provider.pools.values(): with ThreadPoolExecutor() as executor:
for static_node in pool.nodes: for pool in self.provider.pools.values():
try: synced_nodes = []
self.syncNodeCount(static_node, pool) for static_node in pool.nodes:
except StaticNodeError as exc: synced_nodes.append((static_node, executor.submit(
self.log.warning("Couldn't sync node: %s", exc) self.syncNodeCount, static_node, pool)))
continue
except Exception: for static_node, result in synced_nodes:
self.log.exception("Couldn't sync node %s:", try:
nodeTuple(static_node)) result.result()
continue except StaticNodeError as exc:
self.log.warning("Couldn't sync node: %s", exc)
continue
except Exception:
self.log.exception("Couldn't sync node %s:",
nodeTuple(static_node))
continue
def getRequestHandler(self, poolworker, request): def getRequestHandler(self, poolworker, request):
return StaticNodeRequestHandler(poolworker, request) return StaticNodeRequestHandler(poolworker, request)