From de6eaacc8f69863c1793d92ff7d7a2254c2b1a0d Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Mon, 20 Apr 2020 10:56:20 +0200 Subject: [PATCH] Parallelize initial static node synchronization We currently register static nodes serially on startup of the static node provider. This is not a problem as long as all static nodes are reachable. However if there are multiple static nodes unreachable and fail with timeout the startup of the provider can take very long since it must wait for many timeouts one after another. In order to handle this better parallelize the initial sync of static nodes. Change-Id: I4e9b12de277ef0c3140815fb61fed612be2d9396 --- nodepool/driver/static/provider.py | 50 +++++++++++++---------- nodepool/tests/unit/test_driver_static.py | 4 +- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/nodepool/driver/static/provider.py b/nodepool/driver/static/provider.py index c44f7f6dc..faca1c77e 100644 --- a/nodepool/driver/static/provider.py +++ b/nodepool/driver/static/provider.py @@ -14,6 +14,7 @@ import logging import threading +from concurrent.futures.thread import ThreadPoolExecutor from operator import attrgetter from collections import Counter @@ -317,29 +318,36 @@ class StaticNodeProvider(Provider): registered = self.getRegisteredNodes() static_nodes = {} - for pool in self.provider.pools.values(): - for node in pool.nodes: - try: - self.syncNodeCount(registered, node, pool) - except StaticNodeError as exc: - self.log.warning("Couldn't sync node: %s", exc) - continue - except Exception: - self.log.exception("Couldn't sync node %s:", - nodeTuple(node)) - continue + with ThreadPoolExecutor() as executor: + for pool in self.provider.pools.values(): + synced_nodes = [] + for node in pool.nodes: + synced_nodes.append((node, executor.submit( + self.syncNodeCount, registered, node, pool))) - try: - self.updateNodeFromConfig(node) - except StaticNodeError as exc: - self.log.warning("Couldn't update static node: %s", exc) - continue - except Exception: - self.log.exception("Couldn't update static node %s:", - nodeTuple(node)) - continue + for node, result in synced_nodes: + try: + result.result() + except StaticNodeError as exc: + self.log.warning("Couldn't sync node: %s", exc) + continue + except Exception: + self.log.exception("Couldn't sync node %s:", + nodeTuple(node)) + continue - static_nodes[nodeTuple(node)] = node + try: + self.updateNodeFromConfig(node) + except StaticNodeError as exc: + self.log.warning( + "Couldn't update static node: %s", exc) + continue + except Exception: + self.log.exception("Couldn't update static node %s:", + nodeTuple(node)) + continue + + static_nodes[nodeTuple(node)] = node # De-register nodes to synchronize with our configuration. # This case covers any registered nodes that no longer appear in diff --git a/nodepool/tests/unit/test_driver_static.py b/nodepool/tests/unit/test_driver_static.py index 057f08e4e..56984557b 100644 --- a/nodepool/tests/unit/test_driver_static.py +++ b/nodepool/tests/unit/test_driver_static.py @@ -119,8 +119,8 @@ class TestDriverStatic(tests.DBTestCase): nodes = self.waitForNodes('fake-label') self.assertEqual(len(nodes), 1) - registered_ids = {n.id for n in self.zk.nodeIterator()} - self.assertEqual(registered_ids, {'0000000001', '0000000002'}) + registered_labels = {n.type[0] for n in self.zk.nodeIterator()} + self.assertEqual(registered_labels, {'fake-label', 'other-label'}) def test_static_unresolvable(self): '''