Merge "Add support for label quota"
This commit is contained in:
commit
17d0112017
|
@ -39,19 +39,17 @@ class StaticNodeRequestHandler(NodeRequestHandler):
|
|||
return True
|
||||
|
||||
def hasRemainingQuota(self, ntype):
|
||||
# A pool of static nodes can manage nodes with different labels.
|
||||
# There is no global quota that we can exceed here. Return true
|
||||
# so we can wait for the required node type and don't block
|
||||
# other node requests.
|
||||
return True
|
||||
# We are always at quota since we cannot launch new nodes.
|
||||
return False
|
||||
|
||||
def launch(self, node):
|
||||
self.log.debug("Waiting for node %s to be ready", node.id)
|
||||
self.zk.watchNode(node, self._check_node_state)
|
||||
# NOTE: We do not expect this to be called since hasRemainingQuota()
|
||||
# returning False should prevent the call.
|
||||
raise Exception("Node launching not supported by static driver")
|
||||
|
||||
def launchesComplete(self):
|
||||
node_states = [node.state for node in self.nodeset]
|
||||
return all(s in self.DONE_STATES for s in node_states)
|
||||
# We don't wait on a launch since we never actually launch.
|
||||
return True
|
||||
|
||||
def checkReusableNode(self, node):
|
||||
return self.manager.checkNodeLiveness(node)
|
||||
|
|
|
@ -18,7 +18,7 @@ import math
|
|||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
|
||||
from collections import Counter, defaultdict, namedtuple
|
||||
from collections import Counter, namedtuple
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import nodeutils
|
||||
|
@ -102,31 +102,6 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
|||
nodes.append(node)
|
||||
return nodes
|
||||
|
||||
def getWaitingNodesOfType(self, labels):
|
||||
"""Get all waiting nodes of a type.
|
||||
|
||||
Nodes are sorted in ascending order by the associated request's
|
||||
priority, which means that they are in descending order of the
|
||||
priority value (a lower value means the request has a higher
|
||||
priority).
|
||||
"""
|
||||
nodes_by_prio = defaultdict(list)
|
||||
for node in self.zk.nodeIterator():
|
||||
if (node.provider != self.provider.name or
|
||||
node.state != zk.BUILDING or
|
||||
not set(node.type).issubset(labels) or
|
||||
not node.allocated_to
|
||||
):
|
||||
continue
|
||||
request = self.zk.getNodeRequest(node.allocated_to, cached=True)
|
||||
if request is None:
|
||||
continue
|
||||
nodes_by_prio[request.priority].append(node)
|
||||
|
||||
return list(itertools.chain.from_iterable(
|
||||
nodes_by_prio[p] for p in sorted(nodes_by_prio, reverse=True)
|
||||
))
|
||||
|
||||
def checkNodeLiveness(self, node):
|
||||
node_tuple = nodeTuple(node)
|
||||
static_node = self.poolNodes().get(node_tuple)
|
||||
|
@ -178,9 +153,6 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
|||
A node can be registered multiple times to support max-parallel-jobs.
|
||||
These nodes will share the same node tuple.
|
||||
|
||||
In case there are 'building' nodes waiting for a label, those nodes
|
||||
will be updated and marked 'ready'.
|
||||
|
||||
:param int count: Number of times to register this node.
|
||||
:param str provider_name: Name of the provider.
|
||||
:param str pool: Config of the pool owning the node.
|
||||
|
@ -188,14 +160,10 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
|||
'''
|
||||
pool_name = pool.name
|
||||
host_keys = self.checkHost(static_node)
|
||||
waiting_nodes = self.getWaitingNodesOfType(static_node["labels"])
|
||||
node_tuple = nodeTuple(static_node)
|
||||
|
||||
for i in range(0, count):
|
||||
try:
|
||||
node = waiting_nodes.pop()
|
||||
except IndexError:
|
||||
node = zk.Node()
|
||||
node = zk.Node()
|
||||
node.state = zk.READY
|
||||
node.provider = provider_name
|
||||
node.pool = pool_name
|
||||
|
@ -433,28 +401,6 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
|||
except Exception:
|
||||
self.log.exception("Couldn't sync node:")
|
||||
continue
|
||||
try:
|
||||
self.assignReadyNodes(node, pool)
|
||||
except StaticNodeError as exc:
|
||||
self.log.warning("Couldn't assign ready node: %s", exc)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't assign ready nodes:")
|
||||
|
||||
def assignReadyNodes(self, node, pool):
|
||||
waiting_nodes = self.getWaitingNodesOfType(node["labels"])
|
||||
if not waiting_nodes:
|
||||
return
|
||||
ready_nodes = self.getRegisteredReadyNodes(nodeTuple(node))
|
||||
if not ready_nodes:
|
||||
return
|
||||
|
||||
leaked_count = min(len(waiting_nodes), len(ready_nodes))
|
||||
self.log.info("Found %s ready node(s) that can be assigned to a "
|
||||
"waiting node", leaked_count)
|
||||
|
||||
self.deregisterNode(leaked_count, nodeTuple(node))
|
||||
self.registerNodeFromConfig(
|
||||
leaked_count, self.provider.name, pool, node)
|
||||
|
||||
def getRequestHandler(self, poolworker, request):
|
||||
return StaticNodeRequestHandler(poolworker, request)
|
||||
|
@ -508,3 +454,14 @@ class StaticNodeProvider(Provider, QuotaSupport):
|
|||
|
||||
def unmanagedQuotaUsed(self):
|
||||
return QuotaInformation()
|
||||
|
||||
def getLabelQuota(self):
|
||||
label_quota = Counter()
|
||||
for pool in self.provider.pools.values():
|
||||
for label in pool.labels:
|
||||
label_quota[label] = 0
|
||||
label_quota.update(
|
||||
itertools.chain.from_iterable(
|
||||
n.type for n in self.zk.nodeIterator()
|
||||
if n.state == zk.READY and n.allocated_to is None))
|
||||
return label_quota
|
||||
|
|
|
@ -20,6 +20,7 @@ import logging
|
|||
import math
|
||||
import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
from kazoo import exceptions as kze
|
||||
|
||||
|
@ -375,6 +376,13 @@ class QuotaSupport:
|
|||
"for quota:" % node)
|
||||
return used_quota
|
||||
|
||||
def getLabelQuota(self):
|
||||
"""Return available quota per label.
|
||||
|
||||
:returns: Mapping of labels to available quota
|
||||
"""
|
||||
return defaultdict(lambda: math.inf)
|
||||
|
||||
|
||||
class RateLimitInstance:
|
||||
def __init__(self, limiter, logger, msg):
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
|
@ -109,6 +110,14 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
# which express a preference for a specific provider.
|
||||
launchers = self.zk.getRegisteredLaunchers()
|
||||
|
||||
pm = self.getProviderManager()
|
||||
has_quota_support = isinstance(pm, QuotaSupport)
|
||||
if has_quota_support:
|
||||
# The label quota limits will be used for the whole loop since we
|
||||
# don't want to accept lower priority requests when a label becomes
|
||||
# available after we've already deferred higher priority requests.
|
||||
label_quota = pm.getLabelQuota()
|
||||
|
||||
pool = self.getPoolConfig()
|
||||
pool_labels = set(pool.labels)
|
||||
|
||||
|
@ -173,13 +182,18 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
req.provider, candidate_launchers)
|
||||
continue
|
||||
|
||||
pm = self.getProviderManager()
|
||||
if has_quota_support and not all(label_quota.get(l, math.inf) > 0
|
||||
for l in req.node_types):
|
||||
# Defer the request as we can't provide the required labels at
|
||||
# the moment.
|
||||
log.debug("Deferring request because labels are unavailable")
|
||||
continue
|
||||
|
||||
# check tenant quota if the request has a tenant associated
|
||||
# and there are resource limits configured for this tenant
|
||||
check_tenant_quota = req.tenant_name and req.tenant_name \
|
||||
in self.nodepool.config.tenant_resource_limits \
|
||||
and isinstance(pm, QuotaSupport)
|
||||
and has_quota_support
|
||||
|
||||
if check_tenant_quota and not self._hasTenantQuota(req, pm):
|
||||
# Defer request for it to be handled and fulfilled at a later
|
||||
|
@ -204,6 +218,13 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
# Got a lock, so assign it
|
||||
log.info("Assigning node request %s" % req)
|
||||
|
||||
if has_quota_support:
|
||||
# Adjust the label quota so we don't accept more requests
|
||||
# than we have labels available.
|
||||
for label in req.node_types:
|
||||
with contextlib.suppress(KeyError):
|
||||
label_quota[label] -= 1
|
||||
|
||||
rh = pm.getRequestHandler(self, req)
|
||||
rh.run()
|
||||
if rh.paused:
|
||||
|
|
|
@ -311,7 +311,6 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
req_waiting.state = zk.REQUESTED
|
||||
req_waiting.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting)
|
||||
req_waiting = self.waitForNodeRequest(req_waiting, zk.PENDING)
|
||||
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
|
@ -320,7 +319,8 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
req = self.waitForNodeRequest(req, zk.FULFILLED)
|
||||
|
||||
req_waiting = self.zk.getNodeRequest(req_waiting.id)
|
||||
self.assertEqual(req_waiting.state, zk.PENDING)
|
||||
self.assertEqual(req_waiting.state, zk.REQUESTED)
|
||||
self.assertEqual(req_waiting.declined_by, [])
|
||||
|
||||
self.zk.unlockNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
|
@ -344,7 +344,7 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
req_waiting.state = zk.REQUESTED
|
||||
req_waiting.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting)
|
||||
req_waiting = self.waitForNodeRequest(req_waiting, zk.PENDING)
|
||||
req_waiting = self.waitForNodeRequest(req_waiting, zk.REQUESTED)
|
||||
|
||||
# Make sure the node is not reallocated
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
|
@ -369,28 +369,25 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
req_waiting1.state = zk.REQUESTED
|
||||
req_waiting1.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting1, priority="300")
|
||||
req_waiting1 = self.waitForNodeRequest(req_waiting1, zk.PENDING)
|
||||
|
||||
req_waiting2 = zk.NodeRequest()
|
||||
req_waiting2.state = zk.REQUESTED
|
||||
req_waiting2.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting2, priority="200")
|
||||
req_waiting2 = self.waitForNodeRequest(req_waiting2, zk.PENDING)
|
||||
|
||||
req_waiting3 = zk.NodeRequest()
|
||||
req_waiting3.state = zk.REQUESTED
|
||||
req_waiting3.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting3, priority="200")
|
||||
req_waiting3 = self.waitForNodeRequest(req_waiting3, zk.PENDING)
|
||||
|
||||
self.zk.unlockNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
req_waiting2 = self.waitForNodeRequest(req_waiting2, zk.FULFILLED)
|
||||
req_waiting1 = self.zk.getNodeRequest(req_waiting1.id)
|
||||
self.assertEqual(req_waiting1.state, zk.PENDING)
|
||||
self.assertEqual(req_waiting1.state, zk.REQUESTED)
|
||||
req_waiting3 = self.zk.getNodeRequest(req_waiting3.id)
|
||||
self.assertEqual(req_waiting3.state, zk.PENDING)
|
||||
self.assertEqual(req_waiting3.state, zk.REQUESTED)
|
||||
|
||||
node_waiting2 = self.zk.getNode(req_waiting2.nodes[0])
|
||||
self.zk.lockNode(node_waiting2)
|
||||
|
@ -400,7 +397,7 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
|
||||
req_waiting3 = self.waitForNodeRequest(req_waiting3, zk.FULFILLED)
|
||||
req_waiting1 = self.zk.getNodeRequest(req_waiting1.id)
|
||||
self.assertEqual(req_waiting1.state, zk.PENDING)
|
||||
self.assertEqual(req_waiting1.state, zk.REQUESTED)
|
||||
|
||||
node_waiting3 = self.zk.getNode(req_waiting3.nodes[0])
|
||||
self.zk.lockNode(node_waiting3)
|
||||
|
@ -410,46 +407,6 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
|
||||
self.waitForNodeRequest(req_waiting1, zk.FULFILLED)
|
||||
|
||||
def test_static_handler_race_cleanup(self):
|
||||
configfile = self.setup_config('static-basic.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
node = self.waitForNodes('fake-label')[0]
|
||||
|
||||
pool_workers = pool.getPoolWorkers("static-provider")
|
||||
|
||||
# Dummy node request that is not handled by the static provider
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('fake-label')
|
||||
# Mark request as declined by the static provider
|
||||
req.declined_by.extend(w.launcher_id for w in pool_workers)
|
||||
self.zk.storeNodeRequest(req)
|
||||
|
||||
# Create the result of a race between re-registration of a
|
||||
# ready node and a new building node.
|
||||
data = node.toDict()
|
||||
data.update({
|
||||
"state": zk.BUILDING,
|
||||
"hostname": "",
|
||||
"username": "",
|
||||
"connection_port": 22,
|
||||
"allocated_to": req.id,
|
||||
})
|
||||
building_node = zk.Node.fromDict(data)
|
||||
self.zk.storeNode(building_node)
|
||||
self.zk.lockNode(building_node)
|
||||
|
||||
# Node will be deregistered and assigned to the building node
|
||||
self.waitForNodeDeletion(node)
|
||||
node = self.zk.getNode(building_node.id)
|
||||
self.assertEqual(node.state, zk.READY)
|
||||
|
||||
building_node.state = zk.USED
|
||||
self.zk.storeNode(building_node)
|
||||
self.zk.unlockNode(building_node)
|
||||
self.waitForNodeDeletion(building_node)
|
||||
|
||||
def test_static_multinode_handler(self):
|
||||
configfile = self.setup_config('static.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
|
|
Loading…
Reference in New Issue