Merge "Assign waiting static nodes by request priority"
This commit is contained in:
commit
49997a424d
|
@ -12,13 +12,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from operator import attrgetter
|
||||
|
||||
from collections import Counter
|
||||
from collections import namedtuple
|
||||
from collections import Counter, defaultdict, namedtuple
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import nodeutils
|
||||
|
@ -102,17 +101,29 @@ class StaticNodeProvider(Provider):
|
|||
return nodes
|
||||
|
||||
def getWaitingNodesOfType(self, labels):
|
||||
nodes = []
|
||||
"""Get all waiting nodes of a type.
|
||||
|
||||
Nodes are sorted in ascending order by the associated request's
|
||||
priority, which means that they are in descending order of the
|
||||
priority value (a lower value means the request has a higher
|
||||
priority).
|
||||
"""
|
||||
nodes_by_prio = defaultdict(list)
|
||||
for node in self.zk.nodeIterator():
|
||||
if (node.provider != self.provider.name or
|
||||
node.state != zk.BUILDING or
|
||||
not set(node.type).issubset(labels)
|
||||
not set(node.type).issubset(labels) or
|
||||
not node.allocated_to
|
||||
):
|
||||
continue
|
||||
nodes.append(node)
|
||||
return list(
|
||||
sorted(nodes, key=attrgetter("created_time"), reverse=True)
|
||||
)
|
||||
request = self.zk.getNodeRequest(node.allocated_to, cached=True)
|
||||
if request is None:
|
||||
continue
|
||||
nodes_by_prio[request.priority].append(node)
|
||||
|
||||
return list(itertools.chain.from_iterable(
|
||||
nodes_by_prio[p] for p in sorted(nodes_by_prio, reverse=True)
|
||||
))
|
||||
|
||||
def checkNodeLiveness(self, node):
|
||||
node_tuple = nodeTuple(node)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import logging
|
||||
import math
|
||||
import operator
|
||||
import os
|
||||
import os.path
|
||||
import socket
|
||||
|
@ -107,14 +108,8 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
# which express a preference for a specific provider.
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
|
||||
# Sort requests by queue priority, then, for all requests at
|
||||
# the same priority, use the relative_priority field to
|
||||
# further sort, then finally, the submission order.
|
||||
requests = list(self.zk.nodeRequestIterator())
|
||||
requests.sort(key=lambda r: (r.id.split('-')[0],
|
||||
r.relative_priority,
|
||||
r.id.split('-')[1]))
|
||||
|
||||
requests = sorted(self.zk.nodeRequestIterator(),
|
||||
key=operator.attrgetter("priority"))
|
||||
for req in requests:
|
||||
if not self.running:
|
||||
return True
|
||||
|
|
|
@ -367,29 +367,47 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
req_waiting1 = zk.NodeRequest()
|
||||
req_waiting1.state = zk.REQUESTED
|
||||
req_waiting1.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting1)
|
||||
self.zk.storeNodeRequest(req_waiting1, priority="300")
|
||||
req_waiting1 = self.waitForNodeRequest(req_waiting1, zk.PENDING)
|
||||
|
||||
req_waiting2 = zk.NodeRequest()
|
||||
req_waiting2.state = zk.REQUESTED
|
||||
req_waiting2.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting2)
|
||||
self.zk.storeNodeRequest(req_waiting2, priority="200")
|
||||
req_waiting2 = self.waitForNodeRequest(req_waiting2, zk.PENDING)
|
||||
|
||||
req_waiting3 = zk.NodeRequest()
|
||||
req_waiting3.state = zk.REQUESTED
|
||||
req_waiting3.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting3, priority="200")
|
||||
req_waiting3 = self.waitForNodeRequest(req_waiting3, zk.PENDING)
|
||||
|
||||
self.zk.unlockNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
req_waiting1 = self.waitForNodeRequest(req_waiting1, zk.FULFILLED)
|
||||
req_waiting2 = self.zk.getNodeRequest(req_waiting2.id)
|
||||
self.assertEqual(req_waiting2.state, zk.PENDING)
|
||||
req_waiting2 = self.waitForNodeRequest(req_waiting2, zk.FULFILLED)
|
||||
req_waiting1 = self.zk.getNodeRequest(req_waiting1.id)
|
||||
self.assertEqual(req_waiting1.state, zk.PENDING)
|
||||
req_waiting3 = self.zk.getNodeRequest(req_waiting3.id)
|
||||
self.assertEqual(req_waiting3.state, zk.PENDING)
|
||||
|
||||
node_waiting1 = self.zk.getNode(req_waiting1.nodes[0])
|
||||
self.zk.lockNode(node_waiting1)
|
||||
node_waiting1.state = zk.USED
|
||||
self.zk.storeNode(node_waiting1)
|
||||
self.zk.unlockNode(node_waiting1)
|
||||
node_waiting2 = self.zk.getNode(req_waiting2.nodes[0])
|
||||
self.zk.lockNode(node_waiting2)
|
||||
node_waiting2.state = zk.USED
|
||||
self.zk.storeNode(node_waiting2)
|
||||
self.zk.unlockNode(node_waiting2)
|
||||
|
||||
self.waitForNodeRequest(req_waiting2, zk.FULFILLED)
|
||||
req_waiting3 = self.waitForNodeRequest(req_waiting3, zk.FULFILLED)
|
||||
req_waiting1 = self.zk.getNodeRequest(req_waiting1.id)
|
||||
self.assertEqual(req_waiting1.state, zk.PENDING)
|
||||
|
||||
node_waiting3 = self.zk.getNode(req_waiting3.nodes[0])
|
||||
self.zk.lockNode(node_waiting3)
|
||||
node_waiting3.state = zk.USED
|
||||
self.zk.storeNode(node_waiting3)
|
||||
self.zk.unlockNode(node_waiting3)
|
||||
|
||||
self.waitForNodeRequest(req_waiting1, zk.FULFILLED)
|
||||
|
||||
def test_static_handler_race_cleanup(self):
|
||||
configfile = self.setup_config('static-basic.yaml')
|
||||
|
@ -397,6 +415,16 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
pool.start()
|
||||
node = self.waitForNodes('fake-label')[0]
|
||||
|
||||
pool_workers = pool.getPoolWorkers("static-provider")
|
||||
|
||||
# Dummy node request that is not handled by the static provider
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('fake-label')
|
||||
# Mark request as declined by the static provider
|
||||
req.declined_by.extend(w.launcher_id for w in pool_workers)
|
||||
self.zk.storeNodeRequest(req)
|
||||
|
||||
# Create the result of a race between re-registration of a
|
||||
# ready node and a new building node.
|
||||
data = node.toDict()
|
||||
|
@ -405,6 +433,7 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
"hostname": "",
|
||||
"username": "",
|
||||
"connection_port": 22,
|
||||
"allocated_to": req.id,
|
||||
})
|
||||
building_node = zk.Node.fromDict(data)
|
||||
self.zk.storeNode(building_node)
|
||||
|
@ -412,9 +441,8 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
|
||||
# Node will be deregistered and assigned to the building node
|
||||
self.waitForNodeDeletion(node)
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(building_node.id, nodes[0].id)
|
||||
node = self.zk.getNode(building_node.id)
|
||||
self.assertEqual(node.state, zk.READY)
|
||||
|
||||
building_node.state = zk.USED
|
||||
self.zk.storeNode(building_node)
|
||||
|
|
|
@ -493,6 +493,14 @@ class NodeRequest(BaseModel):
|
|||
else:
|
||||
return False
|
||||
|
||||
@property
|
||||
def priority(self):
|
||||
# Sort requests by queue priority, then, for all requests at
|
||||
# the same priority, use the relative_priority field to
|
||||
# further sort, then finally, the submission order.
|
||||
precedence, sequence = self.id.split('-')
|
||||
return precedence, self.relative_priority, sequence
|
||||
|
||||
def toDict(self):
|
||||
'''
|
||||
Convert a NodeRequest object's attributes to a dictionary.
|
||||
|
|
Loading…
Reference in New Issue