From 975f3523a80416d2ce28050d39fed7aefbf70faf Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Thu, 17 Oct 2019 09:49:00 +0200 Subject: [PATCH] Fix register race condition in static provider The fix for a data race in https://review.opendev.org/#/c/687261/ introduced another race condition since nodes are now (re-)registered from multiple threads. We have to introduce a lock to avoid data races between the cleanup and node deleted worker. Change-Id: Icd2cb3da82ce05a41b63bee5e76ef6406b59f12f --- nodepool/driver/static/provider.py | 65 +++++++++++++++++------------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/nodepool/driver/static/provider.py b/nodepool/driver/static/provider.py index a82152e12..c41c0713f 100644 --- a/nodepool/driver/static/provider.py +++ b/nodepool/driver/static/provider.py @@ -13,6 +13,7 @@ # under the License. import logging +import threading from operator import attrgetter from collections import Counter @@ -46,6 +47,9 @@ class StaticNodeProvider(Provider): def __init__(self, provider, *args): self.provider = provider + # Lock to avoid data races when registering nodes from + # multiple threads (e.g. cleanup and deleted node worker). + self._register_lock = threading.Lock() def checkHost(self, node): '''Check node is reachable''' @@ -371,18 +375,19 @@ class StaticNodeProvider(Provider): return True def cleanupLeakedResources(self): - registered = self.getRegisteredNodes() - for pool in self.provider.pools.values(): - for node in pool.nodes: - try: - self.syncNodeCount(registered, node, pool) - except Exception: - self.log.exception("Couldn't sync node:") - continue - try: - self.assignReadyNodes(node, pool) - except Exception: - self.log.exception("Couldn't assign ready nodes:") + with self._register_lock: + registered = self.getRegisteredNodes() + for pool in self.provider.pools.values(): + for node in pool.nodes: + try: + self.syncNodeCount(registered, node, pool) + except Exception: + self.log.exception("Couldn't sync node:") + continue + try: + self.assignReadyNodes(node, pool) + except Exception: + self.log.exception("Couldn't assign ready nodes:") def assignReadyNodes(self, node, pool): waiting_nodes = self.getWaitingNodesOfType(node["labels"]) @@ -413,22 +418,24 @@ class StaticNodeProvider(Provider): if static_node is None: return - try: - registered = self.getRegisteredNodes() - except Exception: - self.log.exception( - "Cannot get registered hostnames for node re-registration:") - return - current_count = registered[nodeTuple(node)] + with self._register_lock: + try: + registered = self.getRegisteredNodes() + except Exception: + self.log.exception( + "Cannot get registered hostnames for node re-registration:" + ) + return + current_count = registered[nodeTuple(node)] - # It's possible we were not able to de-register nodes due to a config - # change (because they were in use). In that case, don't bother to - # reregister. - if current_count >= static_node["max-parallel-jobs"]: - return + # It's possible we were not able to de-register nodes due to a + # config change (because they were in use). In that case, don't + # bother to reregister. + if current_count >= static_node["max-parallel-jobs"]: + return - try: - self.registerNodeFromConfig( - 1, node.provider, node.pool, static_node) - except Exception: - self.log.exception("Cannot re-register deleted node %s", node) + try: + self.registerNodeFromConfig( + 1, node.provider, node.pool, static_node) + except Exception: + self.log.exception("Cannot re-register deleted node %s", node)