diff --git a/tests/base.py b/tests/base.py index 9ee5838f79..9ec8e54291 100755 --- a/tests/base.py +++ b/tests/base.py @@ -919,14 +919,45 @@ class FakeNodepool(object): reqs.append(data) return reqs + def makeNode(self, request_id, node_type): + now = time.time() + path = '/nodepool/nodes/' + data = dict(type=node_type, + provider='test-provider', + region='test-region', + az=None, + public_ipv4='127.0.0.1', + private_ipv4=None, + public_ipv6=None, + allocated_to=request_id, + state='ready', + state_time=now, + created_time=now, + updated_time=now, + image_id=None, + launcher='fake-nodepool') + data = json.dumps(data) + path = self.client.create(path, data, + makepath=True, + sequence=True) + nodeid = path.split("/")[-1] + return nodeid + def fulfillRequest(self, request): if request['state'] == 'fulfilled': return request = request.copy() - request['state'] = 'fulfilled' - request['state_time'] = time.time() oid = request['_oid'] del request['_oid'] + + nodes = [] + for node in request['node_types']: + nodeid = self.makeNode(oid, node) + nodes.append(nodeid) + + request['state'] = 'fulfilled' + request['state_time'] = time.time() + request['nodes'] = nodes path = self.REQUEST_ROOT + '/' + oid data = json.dumps(request) self.log.debug("Fulfilling node request: %s %s" % (oid, data)) diff --git a/tests/test_nodepool.py b/tests/test_nodepool.py index 3fb03354ec..b5b9b17690 100644 --- a/tests/test_nodepool.py +++ b/tests/test_nodepool.py @@ -70,6 +70,14 @@ class TestNodepool(BaseTestCase): self.assertEqual(len(self.provisioned_requests), 1) self.assertEqual(request.state, 'fulfilled') + # Accept the nodes + self.nodepool.acceptNodes(request) + nodeset = request.nodeset + + for node in nodeset.getNodes(): + self.assertIsNotNone(node.lock) + self.assertEqual(node.state, 'ready') + def test_node_request_disconnect(self): # Test that node requests are re-submitted after disconnect diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 7a4c7cca70..213cfc44c4 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -615,6 +615,7 @@ class PipelineManager(object): item.setUnableToMerge() def onNodesProvisioned(self, event): + # TODOv3(jeblair): handle provisioning failure here request = event.request build_set = request.build_set build_set.jobNodeRequestComplete(request.job.name, request, diff --git a/zuul/model.py b/zuul/model.py index 745deff7b4..48ff98270d 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -354,10 +354,36 @@ class Node(object): self.name = name self.image = image self.id = None + self.lock = None + # Attributes from Nodepool + self._state = 'unknown' + self.state_time = time.time() + self.public_ipv4 = None + self.private_ipv4 = None + self.public_ipv6 = None + + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + # TODOv3(jeblair): reinstate + # if value not in STATES: + # raise TypeError("'%s' is not a valid state" % value) + self._state = value + self.state_time = time.time() def __repr__(self): return '' % (self.id, self.name, self.image) + def updateFromDict(self, data): + self._state = data['state'] + self.state_time = data['state_time'] + self.public_ipv4 = data.get('public_ipv4') + self.private_ipv4 = data.get('private_ipv4') + self.public_ipv6 = data.get('public_ipv6') + class NodeSet(object): """A set of nodes. @@ -407,6 +433,9 @@ class NodeRequest(object): self.stat = None self.uid = uuid4().hex self.id = None + # Zuul internal failure flag (not stored in ZK so it's not + # overwritten). + self.failed = False @property def state(self): diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 9d0d803f35..903d90c8c6 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -41,6 +41,34 @@ class Nodepool(object): def returnNodes(self, nodes, used=True): pass + def unlockNodeset(self, nodeset): + self._unlockNodes(nodeset.getNodes()) + + def _unlockNodes(self, nodes): + for node in nodes: + try: + self.sched.zk.unlockNode(node) + except Exception: + self.log.exception("Error unlocking node:") + + def lockNodeset(self, nodeset): + self._lockNodes(nodeset.getNodes()) + + def _lockNodes(self, nodes): + # Try to lock all of the supplied nodes. If any lock fails, + # try to unlock any which have already been locked before + # re-raising the error. + locked_nodes = [] + try: + for node in nodes: + self.log.debug("Locking node: %s" % (node,)) + self.sched.zk.lockNode(node) + locked_nodes.append(node) + except Exception: + self.log.exception("Error locking nodes:") + self._unlockNodes(locked_nodes) + raise + def _updateNodeRequest(self, request, deleted): # Return False to indicate that we should stop watching the # node. @@ -50,10 +78,45 @@ class Nodepool(object): return False if request.state == 'fulfilled': + self.log.info("Node request %s fulfilled" % (request,)) + + # Give our results to the scheduler. self.sched.onNodesProvisioned(request) del self.requests[request.uid] + + # Stop watching this request node. return False + # TODOv3(jeblair): handle allocation failure elif deleted: self.log.debug("Resubmitting lost node request %s" % (request,)) self.sched.zk.submitNodeRequest(request, self._updateNodeRequest) return True + + def acceptNodes(self, request): + # Called by the scheduler when it wants to accept and lock + # nodes for (potential) use. + + self.log.debug("Accepting node request: %s" % (request,)) + + # First, try to lock the nodes. + locked = False + try: + self.lockNodeset(request.nodeset) + locked = True + except Exception: + self.log.exception("Error locking nodes:") + request.failed = True + + # Regardless of whether locking succeeded, delete the + # request. + self.log.debug("Deleting node request: %s" % (request,)) + try: + self.sched.zk.deleteNodeRequest(request) + except Exception: + self.log.exception("Error deleting node request:") + request.failed = True + # If deleting the request failed, and we did lock the + # nodes, unlock the nodes since we're not going to use + # them. + if locked: + self.unlockNodeset(request.nodeset) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 4a6cc9312b..270e055f2d 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -800,6 +800,14 @@ class Scheduler(threading.Thread): def _doNodesProvisionedEvent(self, event): request = event.request build_set = request.build_set + + try: + self.nodepool.acceptNodes(request) + except Exception: + self.log.exception("Unable to accept nodes from request %s:" + % (request,)) + return + if build_set is not build_set.item.current_build_set: self.log.warning("Build set %s is not current" % (build_set,)) self.nodepool.returnNodes(request.nodes, used=False) diff --git a/zuul/zk.py b/zuul/zk.py index 190d7b487a..4f7d736007 100644 --- a/zuul/zk.py +++ b/zuul/zk.py @@ -17,6 +17,8 @@ import logging import six import time from kazoo.client import KazooClient, KazooState +from kazoo import exceptions as kze +from kazoo.recipe.lock import Lock # States: # We are building this node but it is not ready for use. @@ -29,6 +31,10 @@ DELETING = 'deleting' STATES = set([BUILDING, READY, DELETING]) +class LockException(Exception): + pass + + class ZooKeeperConnectionConfig(object): ''' Represents the connection parameters for a ZooKeeper server. @@ -178,6 +184,7 @@ class ZooKeeper(object): log = logging.getLogger("zuul.zk.ZooKeeper") REQUEST_ROOT = '/nodepool/requests' + NODE_ROOT = '/nodepool/nodes' def __init__(self): ''' @@ -300,7 +307,69 @@ class ZooKeeper(object): if data: data = self._strToDict(data) node_request.updateFromDict(data) + request_nodes = node_request.nodeset.getNodes() + for i, nodeid in enumerate(data.get('nodes', [])): + node_path = '%s/%s' % (self.NODE_ROOT, nodeid) + node_data, node_stat = self.client.get(node_path) + node_data = self._strToDict(node_data) + request_nodes[i].id = nodeid + request_nodes[i].updateFromDict(node_data) deleted = (data is None) # data *are* none return watcher(node_request, deleted) self.client.DataWatch(path, callback) + + def deleteNodeRequest(self, node_request): + ''' + Delete a request for nodes. + + :param NodeRequest node_request: A NodeRequest with the + contents of the request. + ''' + + path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) + try: + self.client.delete(path) + except kze.NoNodeError: + pass + + def lockNode(self, node, blocking=True, timeout=None): + ''' + Lock a node. + + This should be called as soon as a request is fulfilled and + the lock held for as long as the node is in-use. It can be + used by nodepool to detect if Zuul has gone offline and the + node should be reclaimed. + + :param Node node: The node which should be locked. + ''' + + lock_path = '%s/%s/lock' % (self.NODE_ROOT, node.id) + try: + lock = Lock(self.client, lock_path) + have_lock = lock.acquire(blocking, timeout) + except kze.LockTimeout: + raise LockException( + "Timeout trying to acquire lock %s" % lock_path) + + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + raise LockException("Did not get lock on %s" % lock_path) + + node.lock = lock + + def unlockNode(self, node): + ''' + Unlock a node. + + The node must already have been locked. + + :param Node node: The node which should be unlocked. + ''' + + if node.lock is None: + raise LockException("Node %s does not hold a lock" % (node,)) + node.lock.release() + node.lock = None