Assign waiting static nodes by request priority
The static provider will create nodes in state "building" in case a label is currently not available. When a freed up node can be assigned to a waiting node we must use the request prio to decide which node to update. Before nodepool ordered the waiting nodes by creation time. This can lead to node requests being fulfilled in the wrong order in certain cases. Change-Id: Iae4091b1055d6bb0933f51ce1bbf860e62843206
This commit is contained in:
parent
a07bb0a0ae
commit
56acaf51d3
|
@ -12,13 +12,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from operator import attrgetter
|
||||
|
||||
from collections import Counter
|
||||
from collections import namedtuple
|
||||
from collections import Counter, defaultdict, namedtuple
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import nodeutils
|
||||
|
@ -101,17 +100,29 @@ class StaticNodeProvider(Provider):
|
|||
return nodes
|
||||
|
||||
def getWaitingNodesOfType(self, labels):
|
||||
nodes = []
|
||||
"""Get all waiting nodes of a type.
|
||||
|
||||
Nodes are sorted in ascending order by the associated request's
|
||||
priority, which means that they are in descending order of the
|
||||
priority value (a lower value means the request has a higher
|
||||
priority).
|
||||
"""
|
||||
nodes_by_prio = defaultdict(list)
|
||||
for node in self.zk.nodeIterator():
|
||||
if (node.provider != self.provider.name or
|
||||
node.state != zk.BUILDING or
|
||||
not set(node.type).issubset(labels)
|
||||
not set(node.type).issubset(labels) or
|
||||
not node.allocated_to
|
||||
):
|
||||
continue
|
||||
nodes.append(node)
|
||||
return list(
|
||||
sorted(nodes, key=attrgetter("created_time"), reverse=True)
|
||||
)
|
||||
request = self.zk.getNodeRequest(node.allocated_to, cached=True)
|
||||
if request is None:
|
||||
continue
|
||||
nodes_by_prio[request.priority].append(node)
|
||||
|
||||
return list(itertools.chain.from_iterable(
|
||||
nodes_by_prio[p] for p in sorted(nodes_by_prio, reverse=True)
|
||||
))
|
||||
|
||||
def checkNodeLiveness(self, node):
|
||||
node_tuple = nodeTuple(node)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import logging
|
||||
import math
|
||||
import operator
|
||||
import os
|
||||
import os.path
|
||||
import socket
|
||||
|
@ -174,14 +175,8 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
# which express a preference for a specific provider.
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
|
||||
# Sort requests by queue priority, then, for all requests at
|
||||
# the same priority, use the relative_priority field to
|
||||
# further sort, then finally, the submission order.
|
||||
requests = list(self.zk.nodeRequestIterator())
|
||||
requests.sort(key=lambda r: (r.id.split('-')[0],
|
||||
r.relative_priority,
|
||||
r.id.split('-')[1]))
|
||||
|
||||
requests = sorted(self.zk.nodeRequestIterator(),
|
||||
key=operator.attrgetter("priority"))
|
||||
for req in requests:
|
||||
if not self.running:
|
||||
return True
|
||||
|
|
|
@ -365,29 +365,47 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
req_waiting1 = zk.NodeRequest()
|
||||
req_waiting1.state = zk.REQUESTED
|
||||
req_waiting1.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting1)
|
||||
self.zk.storeNodeRequest(req_waiting1, priority="300")
|
||||
req_waiting1 = self.waitForNodeRequest(req_waiting1, zk.PENDING)
|
||||
|
||||
req_waiting2 = zk.NodeRequest()
|
||||
req_waiting2.state = zk.REQUESTED
|
||||
req_waiting2.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting2)
|
||||
self.zk.storeNodeRequest(req_waiting2, priority="200")
|
||||
req_waiting2 = self.waitForNodeRequest(req_waiting2, zk.PENDING)
|
||||
|
||||
req_waiting3 = zk.NodeRequest()
|
||||
req_waiting3.state = zk.REQUESTED
|
||||
req_waiting3.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting3, priority="200")
|
||||
req_waiting3 = self.waitForNodeRequest(req_waiting3, zk.PENDING)
|
||||
|
||||
self.zk.unlockNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
req_waiting1 = self.waitForNodeRequest(req_waiting1, zk.FULFILLED)
|
||||
req_waiting2 = self.zk.getNodeRequest(req_waiting2.id)
|
||||
self.assertEqual(req_waiting2.state, zk.PENDING)
|
||||
req_waiting2 = self.waitForNodeRequest(req_waiting2, zk.FULFILLED)
|
||||
req_waiting1 = self.zk.getNodeRequest(req_waiting1.id)
|
||||
self.assertEqual(req_waiting1.state, zk.PENDING)
|
||||
req_waiting3 = self.zk.getNodeRequest(req_waiting3.id)
|
||||
self.assertEqual(req_waiting3.state, zk.PENDING)
|
||||
|
||||
node_waiting1 = self.zk.getNode(req_waiting1.nodes[0])
|
||||
self.zk.lockNode(node_waiting1)
|
||||
node_waiting1.state = zk.USED
|
||||
self.zk.storeNode(node_waiting1)
|
||||
self.zk.unlockNode(node_waiting1)
|
||||
node_waiting2 = self.zk.getNode(req_waiting2.nodes[0])
|
||||
self.zk.lockNode(node_waiting2)
|
||||
node_waiting2.state = zk.USED
|
||||
self.zk.storeNode(node_waiting2)
|
||||
self.zk.unlockNode(node_waiting2)
|
||||
|
||||
self.waitForNodeRequest(req_waiting2, zk.FULFILLED)
|
||||
req_waiting3 = self.waitForNodeRequest(req_waiting3, zk.FULFILLED)
|
||||
req_waiting1 = self.zk.getNodeRequest(req_waiting1.id)
|
||||
self.assertEqual(req_waiting1.state, zk.PENDING)
|
||||
|
||||
node_waiting3 = self.zk.getNode(req_waiting3.nodes[0])
|
||||
self.zk.lockNode(node_waiting3)
|
||||
node_waiting3.state = zk.USED
|
||||
self.zk.storeNode(node_waiting3)
|
||||
self.zk.unlockNode(node_waiting3)
|
||||
|
||||
self.waitForNodeRequest(req_waiting1, zk.FULFILLED)
|
||||
|
||||
def test_static_handler_race_cleanup(self):
|
||||
configfile = self.setup_config('static-basic.yaml')
|
||||
|
@ -395,6 +413,16 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
pool.start()
|
||||
node = self.waitForNodes('fake-label')[0]
|
||||
|
||||
pool_workers = pool.getPoolWorkers("static-provider")
|
||||
|
||||
# Dummy node request that is not handled by the static provider
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('fake-label')
|
||||
# Mark request as declined by the static provider
|
||||
req.declined_by.extend(w.launcher_id for w in pool_workers)
|
||||
self.zk.storeNodeRequest(req)
|
||||
|
||||
# Create the result of a race between re-registration of a
|
||||
# ready node and a new building node.
|
||||
data = node.toDict()
|
||||
|
@ -403,6 +431,7 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
"hostname": "",
|
||||
"username": "",
|
||||
"connection_port": 22,
|
||||
"allocated_to": req.id,
|
||||
})
|
||||
building_node = zk.Node.fromDict(data)
|
||||
self.zk.storeNode(building_node)
|
||||
|
@ -410,9 +439,8 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
|
||||
# Node will be deregistered and assigned to the building node
|
||||
self.waitForNodeDeletion(node)
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(building_node.id, nodes[0].id)
|
||||
node = self.zk.getNode(building_node.id)
|
||||
self.assertEqual(node.state, zk.READY)
|
||||
|
||||
building_node.state = zk.USED
|
||||
self.zk.storeNode(building_node)
|
||||
|
|
|
@ -493,6 +493,14 @@ class NodeRequest(BaseModel):
|
|||
else:
|
||||
return False
|
||||
|
||||
@property
|
||||
def priority(self):
|
||||
# Sort requests by queue priority, then, for all requests at
|
||||
# the same priority, use the relative_priority field to
|
||||
# further sort, then finally, the submission order.
|
||||
precedence, sequence = self.id.split('-')
|
||||
return precedence, self.relative_priority, sequence
|
||||
|
||||
def toDict(self):
|
||||
'''
|
||||
Convert a NodeRequest object's attributes to a dictionary.
|
||||
|
|
Loading…
Reference in New Issue