Merge "Don't pause static pool on single label quota"
This commit is contained in:
commit
83c7c1a3a6
|
@ -14,6 +14,7 @@
|
|||
|
||||
import logging
|
||||
|
||||
from nodepool import zk
|
||||
from nodepool.driver import NodeRequestHandler
|
||||
|
||||
|
||||
|
@ -21,6 +22,11 @@ class StaticNodeRequestHandler(NodeRequestHandler):
|
|||
log = logging.getLogger("nodepool.driver.static."
|
||||
"StaticNodeRequestHandler")
|
||||
|
||||
DONE_STATES = {zk.READY, zk.FAILED}
|
||||
|
||||
def _check_node_state(self, node, deleted):
|
||||
return not (node.state in self.DONE_STATES or deleted)
|
||||
|
||||
@property
|
||||
def alive_thread_count(self):
|
||||
# We don't spawn threads to launch nodes, so always return 1.
|
||||
|
@ -33,17 +39,19 @@ class StaticNodeRequestHandler(NodeRequestHandler):
|
|||
return True
|
||||
|
||||
def hasRemainingQuota(self, ntype):
|
||||
# We are always at quota since we cannot launch new nodes.
|
||||
return False
|
||||
# A pool of static nodes can manage nodes with different labels.
|
||||
# There is no global quota that we can exceed here. Return true
|
||||
# so we can wait for the required node type and don't block
|
||||
# other node requests.
|
||||
return True
|
||||
|
||||
def launch(self, node):
|
||||
# NOTE: We do not expect this to be called since hasRemainingQuota()
|
||||
# returning False should prevent the call.
|
||||
raise Exception("Node launching not supported by static driver")
|
||||
self.log.debug("Waiting for node %s to be ready", node.id)
|
||||
self.zk.watchNode(node, self._check_node_state)
|
||||
|
||||
def launchesComplete(self):
|
||||
# We don't wait on a launch since we never actually launch.
|
||||
return True
|
||||
node_states = [node.state for node in self.nodeset]
|
||||
return all(s in self.DONE_STATES for s in node_states)
|
||||
|
||||
def checkReusableNode(self, node):
|
||||
return self.manager.checkNodeLiveness(node)
|
||||
|
|
|
@ -92,6 +92,17 @@ class StaticNodeProvider(Provider):
|
|||
nodes.append(node)
|
||||
return nodes
|
||||
|
||||
def getWaitingNodesOfType(self, labels):
|
||||
nodes = []
|
||||
for node in self.zk.nodeIterator():
|
||||
if (node.provider != self.provider.name or
|
||||
node.state != zk.BUILDING or
|
||||
not set(node.type).issubset(labels)
|
||||
):
|
||||
continue
|
||||
nodes.append(node)
|
||||
return nodes
|
||||
|
||||
def checkNodeLiveness(self, node):
|
||||
static_node = self.poolNodes().get(node.hostname)
|
||||
if static_node is None:
|
||||
|
@ -137,15 +148,22 @@ class StaticNodeProvider(Provider):
|
|||
A node can be registered multiple times to support max-parallel-jobs.
|
||||
These nodes will share a hostname.
|
||||
|
||||
In case there are 'building' nodes waiting for a label, those nodes
|
||||
will be updated and marked 'ready'.
|
||||
|
||||
:param int count: Number of times to register this node.
|
||||
:param str provider_name: Name of the provider.
|
||||
:param str pool_name: Name of the pool owning the node.
|
||||
:param dict static_node: The node definition from the config file.
|
||||
'''
|
||||
host_keys = self.checkHost(static_node)
|
||||
waiting_nodes = self.getWaitingNodesOfType(static_node["labels"])
|
||||
|
||||
for i in range(0, count):
|
||||
node = zk.Node()
|
||||
try:
|
||||
node = waiting_nodes.pop()
|
||||
except IndexError:
|
||||
node = zk.Node()
|
||||
node.state = zk.READY
|
||||
node.provider = provider_name
|
||||
node.pool = pool_name
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
labels:
|
||||
- name: fake-label
|
||||
- name: fake-label2
|
||||
|
||||
providers:
|
||||
- name: static-provider
|
||||
driver: static
|
||||
pools:
|
||||
- name: main
|
||||
nodes:
|
||||
- name: fake-host-1
|
||||
labels:
|
||||
- fake-label
|
||||
- fake-label2
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
||||
- name: fake-host-2
|
||||
labels:
|
||||
- fake-label2
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
|
@ -240,6 +240,40 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
node = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(node), 1)
|
||||
|
||||
def test_static_waiting_handler(self):
|
||||
configfile = self.setup_config('static-2-nodes-multilabel.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req)
|
||||
req = self.waitForNodeRequest(req, zk.FULFILLED)
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
self.zk.lockNode(node)
|
||||
node.state = zk.USED
|
||||
self.zk.storeNode(node)
|
||||
|
||||
req_waiting = zk.NodeRequest()
|
||||
req_waiting.state = zk.REQUESTED
|
||||
req_waiting.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req_waiting)
|
||||
req_waiting = self.waitForNodeRequest(req_waiting, zk.PENDING)
|
||||
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('fake-label2')
|
||||
self.zk.storeNodeRequest(req)
|
||||
req = self.waitForNodeRequest(req, zk.FULFILLED)
|
||||
|
||||
req_waiting = self.zk.getNodeRequest(req_waiting.id)
|
||||
self.assertEqual(req_waiting.state, zk.PENDING)
|
||||
|
||||
self.zk.unlockNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
self.waitForNodeRequest(req_waiting, zk.FULFILLED)
|
||||
|
||||
def test_static_multinode_handler(self):
|
||||
configfile = self.setup_config('static.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
|
|
|
@ -1900,6 +1900,27 @@ class ZooKeeper(object):
|
|||
path = self._nodePath(node.id)
|
||||
self.client.set(path, node.serialize())
|
||||
|
||||
def watchNode(self, node, callback):
|
||||
'''Watch an existing node for changes.
|
||||
|
||||
:param Node node: The node object to watch.
|
||||
:param callable callback: A callable object that will be invoked each
|
||||
time the node is updated. It is called with two arguments (node,
|
||||
deleted) where 'node' is the same argument passed to this method,
|
||||
and 'deleted' is a boolean which is True if the node no longer
|
||||
exists. The callback should return False when further updates are
|
||||
no longer necessary.
|
||||
'''
|
||||
def _callback_wrapper(data, stat):
|
||||
if data is not None:
|
||||
node.updateFromDict(self._bytesToDict(data))
|
||||
|
||||
deleted = data is None
|
||||
return callback(node, deleted)
|
||||
|
||||
path = self._nodePath(node.id)
|
||||
self.client.DataWatch(path, _callback_wrapper)
|
||||
|
||||
def deleteRawNode(self, node_id):
|
||||
'''
|
||||
Delete a znode for a Node.
|
||||
|
|
Loading…
Reference in New Issue