Merge "Parallelize initial static node synchronization"
This commit is contained in:
commit
3f082077c8
@ -14,6 +14,7 @@
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from operator import attrgetter
|
||||
|
||||
from collections import Counter
|
||||
@ -317,29 +318,36 @@ class StaticNodeProvider(Provider):
|
||||
registered = self.getRegisteredNodes()
|
||||
|
||||
static_nodes = {}
|
||||
for pool in self.provider.pools.values():
|
||||
for node in pool.nodes:
|
||||
try:
|
||||
self.syncNodeCount(registered, node, pool)
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning("Couldn't sync node: %s", exc)
|
||||
continue
|
||||
except Exception:
|
||||
self.log.exception("Couldn't sync node %s:",
|
||||
nodeTuple(node))
|
||||
continue
|
||||
with ThreadPoolExecutor() as executor:
|
||||
for pool in self.provider.pools.values():
|
||||
synced_nodes = []
|
||||
for node in pool.nodes:
|
||||
synced_nodes.append((node, executor.submit(
|
||||
self.syncNodeCount, registered, node, pool)))
|
||||
|
||||
try:
|
||||
self.updateNodeFromConfig(node)
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning("Couldn't update static node: %s", exc)
|
||||
continue
|
||||
except Exception:
|
||||
self.log.exception("Couldn't update static node %s:",
|
||||
nodeTuple(node))
|
||||
continue
|
||||
for node, result in synced_nodes:
|
||||
try:
|
||||
result.result()
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning("Couldn't sync node: %s", exc)
|
||||
continue
|
||||
except Exception:
|
||||
self.log.exception("Couldn't sync node %s:",
|
||||
nodeTuple(node))
|
||||
continue
|
||||
|
||||
static_nodes[nodeTuple(node)] = node
|
||||
try:
|
||||
self.updateNodeFromConfig(node)
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning(
|
||||
"Couldn't update static node: %s", exc)
|
||||
continue
|
||||
except Exception:
|
||||
self.log.exception("Couldn't update static node %s:",
|
||||
nodeTuple(node))
|
||||
continue
|
||||
|
||||
static_nodes[nodeTuple(node)] = node
|
||||
|
||||
# De-register nodes to synchronize with our configuration.
|
||||
# This case covers any registered nodes that no longer appear in
|
||||
|
@ -119,8 +119,8 @@ class TestDriverStatic(tests.DBTestCase):
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
|
||||
registered_ids = {n.id for n in self.zk.nodeIterator()}
|
||||
self.assertEqual(registered_ids, {'0000000001', '0000000002'})
|
||||
registered_labels = {n.type[0] for n in self.zk.nodeIterator()}
|
||||
self.assertEqual(registered_labels, {'fake-label', 'other-label'})
|
||||
|
||||
def test_static_unresolvable(self):
|
||||
'''
|
||||
|
Loading…
Reference in New Issue
Block a user