diff --git a/tests/base.py b/tests/base.py index 346556bee7..f0ff64afc5 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2880,8 +2880,8 @@ class FakeBuild(object): # the complexity around multi-node jobs here # (self.nodes[0].label?) self.node = None - if len(self.parameters.get('nodes')) == 1: - self.node = self.parameters['nodes'][0]['label'] + if len(self.parameters['nodeset']['nodes']) == 1: + self.node = self.parameters['nodeset']['nodes'][0]['label'] self.unique = self.parameters['zuul']['build'] self.pipeline = self.parameters['zuul']['pipeline'] self.project = self.parameters['zuul']['project']['name'] diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py index 016f23b174..43478572c9 100644 --- a/tests/unit/test_nodepool.py +++ b/tests/unit/test_nodepool.py @@ -80,25 +80,26 @@ class TestNodepool(TestNodepoolBase): self.assertEqual(request.state, 'fulfilled') # Accept the nodes - accepted = self.nodepool.checkNodeRequest(request, request.id) - self.assertTrue(accepted) + new_nodeset = self.nodepool.checkNodeRequest( + request, request.id, nodeset) + self.assertIsNotNone(new_nodeset) # acceptNodes will be called on the executor, but only if the # noderequest was accepted before. - self.nodepool.acceptNodes(request) - nodeset = request.nodeset + executor_nodeset = nodeset.copy() + self.nodepool.acceptNodes(request, executor_nodeset) - for node in nodeset.getNodes(): + for node in executor_nodeset.getNodes(): self.assertIsNotNone(node.lock) self.assertEqual(node.state, 'ready') # Mark the nodes in use - self.nodepool.useNodeSet(nodeset) - for node in nodeset.getNodes(): + self.nodepool.useNodeSet(executor_nodeset) + for node in executor_nodeset.getNodes(): self.assertEqual(node.state, 'in-use') # Return the nodes - self.nodepool.returnNodeSet(nodeset) - for node in nodeset.getNodes(): + self.nodepool.returnNodeSet(executor_nodeset) + for node in executor_nodeset.getNodes(): self.assertIsNone(node.lock) self.assertEqual(node.state, 'used') @@ -151,12 +152,16 @@ class TestNodepool(TestNodepoolBase): self.assertEqual(request.state, 'fulfilled') # Accept the nodes, passing a different ID - accepted = self.nodepool.checkNodeRequest(request, "invalid") - self.assertFalse(accepted) + new_nodeset = self.nodepool.checkNodeRequest( + request, "invalid", nodeset) + self.assertIsNone(new_nodeset) # Don't call acceptNodes here as the node request wasn't accepted. - nodeset = request.nodeset - for node in nodeset.getNodes(): + # Nothing we have done has returned an updated nodeset with + # real node records, so we need to do that ourselves to verify + # they are still unused. + for node_id, node in zip(request.nodes, nodeset.getNodes()): + self.nodepool.zk_nodepool.updateNode(node, node_id) self.assertIsNone(node.lock) self.assertEqual(node.state, 'ready') @@ -177,10 +182,10 @@ class TestNodepool(TestNodepoolBase): self.zk_nodepool.deleteNodeRequest(request) # Accept the nodes - accepted = self.nodepool.checkNodeRequest(request, request.id) - self.assertFalse(accepted) + new_nodeset = self.nodepool.checkNodeRequest( + request, request.id, nodeset) + self.assertIsNone(new_nodeset) # Don't call acceptNodes here as the node request wasn't accepted. - nodeset = request.nodeset for node in nodeset.getNodes(): self.assertIsNone(node.lock) @@ -206,70 +211,6 @@ class TestNodepool(TestNodepoolBase): self.assertEqual(request2.state, 'fulfilled') self.assertTrue(request2.state_time < request1.state_time) - def test_get_node_request_with_nodeset(self): - # Test that we are able to deserialize a node request from ZK and - # update the node information while providing a valid NodeSet. - nodeset = model.NodeSet() - nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial')) - nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) - job = model.Job('testjob') - job.nodeset = nodeset - - request = self.nodepool.requestNodes( - "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) - self.waitForRequests() - self.assertEqual(len(self.provisioned_requests), 1) - self.assertEqual(request.state, 'fulfilled') - - # Look up the node request from ZooKeeper while providing the original - # nodeset. - restored_request = self.zk_nodepool.getNodeRequest(request.id, nodeset) - - # As we've provided the origial nodeset when retrieving the node - # request from ZooKeeper, they should be the same - self.assertEqual(restored_request.nodeset, nodeset) - - # And the nodeset should contain the original data - restored_nodes = restored_request.nodeset.getNodes() - self.assertEqual(restored_nodes[0].name, ['controller', 'foo']) - self.assertEqual(restored_nodes[1].name, ['compute']) - - def test_get_node_request_without_nodeset(self): - # Test that we are able to deserialize a node request from ZK and - # update the node information without providing a NodeSet object. - - # This is used in case something went wrong when processing the - # NodesProvisionedEvents in the scheduler and the original NodeRequest - # and/or NodeSet objects are not available anymore. - nodeset = model.NodeSet() - nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial')) - nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) - job = model.Job('testjob') - job.nodeset = nodeset - - request = self.nodepool.requestNodes( - "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) - self.waitForRequests() - self.assertEqual(len(self.provisioned_requests), 1) - self.assertEqual(request.state, 'fulfilled') - - # Look up the node request from ZooKeeper while providing no nodeset - # will result in a fake nodeset being created for the node update. - restored_request = self.zk_nodepool.getNodeRequest(request.id) - - # As we didn't provide a nodeset, the nodepool client will create a - # fake one to look up the node information from nodepool. - self.assertFalse(nodeset == restored_request.nodeset) - - restored_nodes = restored_request.nodeset.getNodes() - self.assertEqual(len(restored_nodes), 2) - self.assertEqual(restored_nodes[0].label, 'ubuntu-xenial') - self.assertEqual(restored_nodes[1].label, 'ubuntu-xenial') - # As the nodes were faked, they don't have the same name like the - # original ones from the config - self.assertEqual(restored_nodes[0].name, "ubuntu-xenial-0") - self.assertEqual(restored_nodes[1].name, "ubuntu-xenial-1") - class TestNodepoolResubmit(TestNodepoolBase): def setUp(self): @@ -291,7 +232,8 @@ class TestNodepoolResubmit(TestNodepoolBase): self.disconnect_event.wait() self.zk_client.client.stop() self.zk_client.client.start() - self.nodepool.checkNodeRequest(self.request, self.request.id) + self.nodepool.checkNodeRequest( + self.request, self.request.id, self.nodeset) def test_node_request_disconnect_late(self): # Test that node requests are re-submitted after a disconnect @@ -304,6 +246,7 @@ class TestNodepoolResubmit(TestNodepoolBase): nodeset = model.NodeSet() nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial')) nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) + self.nodeset = nodeset job = model.Job('testjob') job.nodeset = nodeset self.request = self.nodepool.requestNodes( diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 4cf0390f4a..f5eff0204f 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -3336,8 +3336,8 @@ class TestScheduler(ZuulTestCase): p = self.history[0].parameters self.assertEqual(p['timeout'], 40) - self.assertEqual(len(p['nodes']), 1) - self.assertEqual(p['nodes'][0]['label'], 'new') + self.assertEqual(len(p['nodeset']['nodes']), 1) + self.assertEqual(p['nodeset']['nodes'][0]['label'], 'new') self.assertEqual([x['path'] for x in p['pre_playbooks']], ['base-pre', 'py27-pre']) self.assertEqual([x['path'] for x in p['post_playbooks']], @@ -3347,8 +3347,8 @@ class TestScheduler(ZuulTestCase): p = self.history[1].parameters self.assertEqual(p['timeout'], 50) - self.assertEqual(len(p['nodes']), 1) - self.assertEqual(p['nodes'][0]['label'], 'old') + self.assertEqual(len(p['nodeset']['nodes']), 1) + self.assertEqual(p['nodeset']['nodes'][0]['label'], 'old') self.assertEqual([x['path'] for x in p['pre_playbooks']], ['base-pre', 'py27-pre', 'py27-diablo-pre']) self.assertEqual([x['path'] for x in p['post_playbooks']], @@ -3359,8 +3359,8 @@ class TestScheduler(ZuulTestCase): p = self.history[2].parameters self.assertEqual(p['timeout'], 40) - self.assertEqual(len(p['nodes']), 1) - self.assertEqual(p['nodes'][0]['label'], 'new') + self.assertEqual(len(p['nodeset']['nodes']), 1) + self.assertEqual(p['nodeset']['nodes'][0]['label'], 'new') self.assertEqual([x['path'] for x in p['pre_playbooks']], ['base-pre', 'py27-pre', 'py27-essex-pre']) self.assertEqual([x['path'] for x in p['post_playbooks']], diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index a15337af78..480231d068 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -1020,15 +1020,6 @@ class TestWeb(BaseTestWeb): 'projects': [], 'branch': 'master', 'cleanup_playbooks': [], - 'groups': [], - 'nodes': [{ - 'comment': None, - 'hold_job': None, - 'id': None, - 'label': 'label1', - 'name': ['controller'], - 'state': 'unknown' - }], 'nodeset': { 'groups': [], 'name': '', diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 3a67a64938..9c7a7a3459 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -61,7 +61,7 @@ class ExecutorClient(object): job, uuid, nodeset, item.change, dependent_changes) params = zuul.executor.common.construct_build_params( - uuid, self.sched, nodeset, + uuid, self.sched, job, item, pipeline, dependent_changes, merger_items, redact_secrets_and_keys=False) # TODO: deprecate and remove this variable? @@ -111,9 +111,10 @@ class ExecutorClient(object): # availability zone we can get executor_zone from only the first # node. executor_zone = None - if params["nodes"] and params["nodes"][0].get('attributes'): - executor_zone = params[ - "nodes"][0]['attributes'].get('executor-zone') + if len(nodeset.nodes): + node = nodeset.getNodes()[0] + if node.attributes: + executor_zone = node.attributes.get('executor-zone') zone_known = False if executor_zone: diff --git a/zuul/executor/common.py b/zuul/executor/common.py index da7d2ffe57..4156f3ba0b 100644 --- a/zuul/executor/common.py +++ b/zuul/executor/common.py @@ -17,7 +17,7 @@ import os from zuul.lib import strings -def construct_build_params(uuid, sched, nodeset, job, item, pipeline, +def construct_build_params(uuid, sched, job, item, pipeline, dependent_changes=[], merger_items=[], redact_secrets_and_keys=True): """Returns a list of all the parameters needed to build a job. @@ -124,16 +124,7 @@ def construct_build_params(uuid, sched, nodeset, job, item, pipeline, params['cleanup_playbooks'] = [make_playbook(x) for x in job.cleanup_run] - # TODO(corvus): Remove nodes and groups since they're included in - # nodeset - nodes = [] - for node in nodeset.getNodes(): - n = node.toDict() - n.update(dict(name=node.name, label=node.label)) - nodes.append(n) - params['nodes'] = nodes - params['groups'] = [group.toDict() for group in nodeset.getGroups()] - params["nodeset"] = nodeset.toDict() + params["nodeset"] = job.nodeset.toDict() params['ssh_keys'] = [] if pipeline.post_review: if redact_secrets_and_keys: diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 22167d6651..4df32c4516 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -1104,8 +1104,7 @@ class AnsibleJob(object): if node_request_id: zk_nodepool = self.executor_server.nodepool.zk_nodepool self.node_request = zk_nodepool.getNodeRequest( - self.arguments["noderequest_id"], self.nodeset - ) + self.arguments["noderequest_id"]) if self.node_request is None: self.log.error( @@ -1122,10 +1121,11 @@ class AnsibleJob(object): if self.node_request: self.log.debug("Locking nodeset") try: - self.executor_server.nodepool.acceptNodes(self.node_request) + self.executor_server.nodepool.acceptNodes( + self.node_request, self.nodeset) except Exception: self.log.exception( - "Error locking nodeset %s", self.node_request.nodeset + "Error locking nodeset %s", self.nodeset ) raise NodeRequestError @@ -1133,13 +1133,13 @@ class AnsibleJob(object): if self.node_request: try: self.executor_server.nodepool.returnNodeSet( - self.node_request.nodeset, + self.nodeset, self, zuul_event_id=self.zuul_event_id, ) except Exception: self.log.exception( - "Unable to return nodeset %s", self.node_request.nodeset + "Unable to return nodeset %s", self.nodeset ) def _base_job_data(self): @@ -1347,9 +1347,7 @@ class AnsibleJob(object): # start to run tasks on nodes (prepareVars in particular uses # Ansible to freeze hostvars). if self.node_request: - self.executor_server.nodepool.useNodeSet( - self.node_request.nodeset, self - ) + self.executor_server.nodepool.useNodeSet(self.nodeset, self) # This prepares each playbook and the roles needed for each. self.preparePlaybooks(args) @@ -3786,7 +3784,7 @@ class ExecutorServer(BaseMergeServer): if request is not None: self.log.debug("Got autohold %s", request) self.nodepool.holdNodeSet( - ansible_job.node_request.nodeset, request, ansible_job + ansible_job.nodeset, request, ansible_job ) return True return False diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 0f8011e9f3..44eb6aef25 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -1501,11 +1501,12 @@ class PipelineManager(metaclass=ABCMeta): repo_state[connection] = event.repo_state[connection] build_set.repo_state_state = build_set.COMPLETE - def onNodesProvisioned(self, request, build_set): + def onNodesProvisioned(self, request, nodeset, build_set): # TODOv3(jeblair): handle provisioning failure here log = get_annotated_logger(self.log, request.event_id) - build_set.jobNodeRequestComplete(request.job_name, request.nodeset) + if nodeset is not None: + build_set.jobNodeRequestComplete(request.job_name, nodeset) # TODO (felix): Check if the failed is still needed as the # NodesProvisionedEvents are now in ZooKeeper. if request.failed or not request.fulfilled: @@ -1519,7 +1520,7 @@ class PipelineManager(metaclass=ABCMeta): log.info("Completed node request %s for job %s of item %s " "with nodes %s", - request, request.job_name, build_set.item, request.nodeset) + request, request.job_name, build_set.item, request.nodes) def reportItem(self, item): log = get_annotated_logger(self.log, item.event) diff --git a/zuul/model.py b/zuul/model.py index 5529545594..73ad5c0624 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -619,6 +619,7 @@ class Node(ConfigObject): self.hold_expiration = None self.resources = None self.allocated_to = None + self.attributes = {} @property def state(self): @@ -798,14 +799,15 @@ class NodeRequest(object): """A request for a set of nodes.""" def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name, - job_name, nodeset, provider, relative_priority, + job_name, labels, provider, relative_priority, event_id=None): self.requestor = requestor self.build_set_uuid = build_set_uuid self.tenant_name = tenant_name self.pipeline_name = pipeline_name self.job_name = job_name - self.nodeset = nodeset + self.labels = labels + self.nodes = [] self._state = STATE_REQUESTED self.requested_time = time.time() self.state_time = time.time() @@ -826,7 +828,7 @@ class NodeRequest(object): # Reset the node request for re-submission self._zk_data = {} # Remove any real node information - self.nodeset = self.nodeset.copy() + self.nodes = [] self.id = None self.state = STATE_REQUESTED self.stat = None @@ -849,7 +851,7 @@ class NodeRequest(object): self.state_time = time.time() def __repr__(self): - return '' % (self.id, self.nodeset) + return '' % (self.id, self.labels) def toDict(self): """ @@ -868,9 +870,7 @@ class NodeRequest(object): "pipeline_name": self.pipeline_name, "job_name": self.job_name, } - nodes = [n.label for n in self.nodeset.getNodes()] - # These are immutable once set - d.setdefault('node_types', nodes) + d.setdefault('node_types', self.labels) d.setdefault('requestor', self.requestor) d.setdefault('created_time', self.created_time) d.setdefault('provider', self.provider) @@ -893,6 +893,7 @@ class NodeRequest(object): # to errors at other places where we rely on that info. if 'tenant_name' in data: self.tenant_name = data['tenant_name'] + self.nodes = data.get('nodes', []) @classmethod def fromDict(cls, data): @@ -910,7 +911,7 @@ class NodeRequest(object): tenant_name=requestor_data["tenant_name"], pipeline_name=requestor_data["pipeline_name"], job_name=requestor_data["job_name"], - nodeset=None, + labels=data["node_types"], provider=data["provider"], relative_priority=data.get("relative_priority", 0), ) diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 694d3bc810..59f1bdb219 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -70,13 +70,13 @@ class Nodepool(object): if dt: pipe.timing(key, dt) - for node in request.nodeset.getNodes(): - pipe.incr(key + '.label.%s' % node.label) + for label in request.labels: + pipe.incr(key + '.label.%s' % label) if dt: - pipe.timing(key + '.label.%s' % node.label, dt) - pipe.incr(key + '.size.%s' % len(request.nodeset.nodes)) + pipe.timing(key + '.label.%s' % label, dt) + pipe.incr(key + '.size.%s' % len(request.labels)) if dt: - pipe.timing(key + '.size.%s' % len(request.nodeset.nodes), dt) + pipe.timing(key + '.size.%s' % len(request.labels), dt) pipe.send() def emitStatsResources(self): @@ -112,19 +112,17 @@ class Nodepool(object): def requestNodes(self, build_set_uuid, job, tenant_name, pipeline_name, provider, priority, relative_priority, event=None): log = get_annotated_logger(self.log, event) - # Create a copy of the nodeset to represent the actual nodes - # returned by nodepool. - nodeset = job.nodeset.copy() + labels = [n.label for n in job.nodeset.getNodes()] if event: event_id = event.zuul_event_id else: event_id = None req = model.NodeRequest(self.hostname, build_set_uuid, tenant_name, - pipeline_name, job.name, nodeset, provider, + pipeline_name, job.name, labels, provider, relative_priority, event_id) self.requests[req.uid] = req - if nodeset.nodes: + if job.nodeset.nodes: self.zk_nodepool.submitNodeRequest( req, priority, self._updateNodeRequest) # Logged after submission so that we have the request id @@ -314,7 +312,7 @@ class Nodepool(object): except Exception: log.exception("Exception storing node %s " "while unlocking:", node) - self._unlockNodes(nodeset.getNodes()) + self.unlockNodeSet(nodeset) if not ansible_job: return @@ -354,25 +352,26 @@ class Nodepool(object): except Exception: self.log.exception("Error unlocking node:") - def lockNodeSet(self, nodeset, request_id): + def lockNodes(self, request, nodeset): # Try to lock all of the supplied nodes. If any lock fails, # try to unlock any which have already been locked before # re-raising the error. locked_nodes = [] try: - for node in nodeset.getNodes(): - if node.allocated_to != request_id: + for node_id, node in zip(request.nodes, nodeset.getNodes()): + self.zk_nodepool.updateNode(node, node_id) + if node.allocated_to != request.id: raise Exception("Node %s allocated to %s, not %s" % - (node.id, node.allocated_to, request_id)) + (node.id, node.allocated_to, request.id)) self.log.debug("Locking node %s" % (node,)) self.zk_nodepool.lockNode(node, timeout=30) # Check the allocated_to again to ensure that nodepool didn't # re-allocate the nodes to a different node request while we # were locking them. - if node.allocated_to != request_id: + if node.allocated_to != request.id: raise Exception( "Node %s was reallocated during locking %s, not %s" % - (node.id, node.allocated_to, request_id)) + (node.id, node.allocated_to, request.id)) locked_nodes.append(node) except Exception: self.log.exception("Error locking nodes:") @@ -420,34 +419,35 @@ class Nodepool(object): return True - def checkNodeRequest(self, request, request_id): + def checkNodeRequest(self, request, request_id, job_nodeset): """ Called by the scheduler when it wants to accept a node request for potential use of its nodes. The nodes itself will be accepted and locked by the executor when the corresponding job is started. - :returns: False if there is a problem with the request (canceled or - retrying), True if it is ready to be acted upon (success or - failure). + :returns: A new NodeSet object which contains information from + nodepool about the actual allocated nodes. """ log = get_annotated_logger(self.log, request.event_id) log.info("Accepting node request %s", request) + # A copy of the nodeset with information about the real nodes + nodeset = job_nodeset.copy() if request_id != request.id: log.info("Skipping node accept for %s (resubmitted as %s)", request_id, request.id) - return False + return None if request.canceled: log.info("Ignoring canceled node request %s", request) # The request was already deleted when it was canceled - return False + return None # If we didn't request nodes and the request is fulfilled then just # reutrn. We don't have to do anything in this case. Further don't even # ask ZK for the request as empty requests are not put into ZK. - if not request.nodeset.nodes and request.fulfilled: - return True + if not request.labels and request.fulfilled: + return nodeset # Make sure the request still exists. It's possible it could have # disappeared if we lost the ZK session between when the fulfillment @@ -464,7 +464,10 @@ class Nodepool(object): request.reset() self.zk_nodepool.submitNodeRequest( request, priority, self._updateNodeRequest) - return False + return None + else: + for node_id, node in zip(request.nodes, nodeset.getNodes()): + self.zk_nodepool.updateNode(node, node_id) except Exception: # If we cannot retrieve the node request from ZK we probably lost # the connection and thus the ZK session. Resubmitting the node @@ -473,16 +476,18 @@ class Nodepool(object): # with zookeeper and fail here. log.exception("Error getting node request %s:", request_id) request.failed = True - return True + return nodeset - return True + return nodeset - def acceptNodes(self, request): + def acceptNodes(self, request, nodeset): + # Accept the nodes supplied by request, mutate nodeset with + # the real node information. locked = False if request.fulfilled: # If the request succeeded, try to lock the nodes. try: - self.lockNodeSet(request.nodeset, request.id) + nodes = self.lockNodes(request, nodeset) locked = True except Exception: log = get_annotated_logger(self.log, request.event_id) @@ -495,6 +500,7 @@ class Nodepool(object): if request.failed: raise Exception("Accepting nodes failed") + return nodes def deleteNodeRequest(self, request, locked=False): log = get_annotated_logger(self.log, request.event_id) diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 496d493c1a..3c5c8f0c26 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -540,13 +540,10 @@ class RPCListener(RPCListenerBase): if not job: gear_job.sendWorkComplete(json.dumps(None)) return - # TODO: check if this is frozen? - nodeset = job.nodeset job.setBase(tenant.layout) uuid = '0' * 32 params = zuul.executor.common.construct_build_params( - uuid, self.sched, nodeset, - job, item, pipeline) + uuid, self.sched, job, item, pipeline) gear_job.sendWorkComplete(json.dumps(params, cls=ZuulJSONEncoder)) def handle_allowed_labels_get(self, job): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 10c02c11fd..d63b3eaf45 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2049,26 +2049,26 @@ class Scheduler(threading.Thread): if not request: return - ready = self.nodepool.checkNodeRequest(request, request_id) - if not ready: + log = get_annotated_logger(self.log, request.event_id) + job = build_set.item.getJob(request.job_name) + if job is None: + log.warning("Item %s does not contain job %s " + "for node request %s", + build_set.item, request.job_name, request) + build_set.removeJobNodeRequest(request.job_name) return - log = get_annotated_logger(self.log, request.event_id) + nodeset = self.nodepool.checkNodeRequest(request, request_id, + job.nodeset) + if nodeset is None: + return # If the request failed, we must directly delete it as the nodes will # never be accepted. if request.state == STATE_FAILED: self.nodepool.deleteNodeRequest(request) - if request.job_name not in [x.name for x in build_set.item.getJobs()]: - log.warning("Item %s does not contain job %s " - "for node request %s", - build_set.item, request.job_name, request) - build_set.removeJobNodeRequest(request.job_name) - self.nodepool.deleteNodeRequest(request) - return - - pipeline.manager.onNodesProvisioned(request, build_set) + pipeline.manager.onNodesProvisioned(request, nodeset, build_set) def formatStatusJSON(self, tenant_name): # TODOv3(jeblair): use tenants diff --git a/zuul/zk/nodepool.py b/zuul/zk/nodepool.py index e9e2465b2e..c07d731964 100644 --- a/zuul/zk/nodepool.py +++ b/zuul/zk/nodepool.py @@ -19,7 +19,7 @@ from kazoo.exceptions import NoNodeError, LockTimeout from kazoo.recipe.lock import Lock import zuul.model -from zuul.model import HoldRequest, Node, NodeRequest, NodeSet +from zuul.model import HoldRequest, NodeRequest from zuul.zk import ZooKeeperClient, ZooKeeperBase from zuul.zk.exceptions import LockException @@ -343,14 +343,11 @@ class ZooKeeperNodepool(ZooKeeperBase): self.kazoo_client.DataWatch(path, callback) - def getNodeRequest(self, node_request_id, nodeset=None): + def getNodeRequest(self, node_request_id): """ Retrieve a NodeRequest from a given path in ZooKeeper. - The serialized version of the NodeRequest doesn't contain a NodeSet, so - we have to add this to the request manually. The nodeset provided to - this method will be set on the NodeRequest before updating it. This - will ensure that all nodes are updated as well. + :param str node_request_id: The ID of the node request to retrieve. """ path = f"{self.REQUEST_ROOT}/{node_request_id}" @@ -365,28 +362,6 @@ class ZooKeeperNodepool(ZooKeeperBase): json_data = json.loads(data.decode("utf-8")) obj = NodeRequest.fromDict(json_data) - if nodeset is None: - # If no NodeSet is provided, we create one on-the-fly based on the - # list of labels (node_types) stored in the NodeRequest's znode - # data. - # This is necessary as the logic in the updateNodeRequest() method - # below will update each node "in-place" an thus, we have to ensure - # that the NodeRequest has a valid NodeSet with all nodes available - # in advance. - # This is only used to return the nodes to nodepool in case - # something went wrong and the original NodeRequest and/or NodeSet - # objects are not available anymore. - nodeset = NodeSet() - for i, node_type in enumerate(json_data["node_types"]): - node = Node(name=f"{node_type}-{i}", label=node_type) - nodeset.addNode(node) - - obj.nodeset = nodeset - # Using updateNodeRequest() here will ensure that the nodes are also - # updated. Doing the update in here directly rather than calling it - # from the outside afterwards, saves us one call to ZooKeeper as the - # data we just retrieved can directly be reused. - self.updateNodeRequest(obj, data) obj.id = node_request_id obj.stat = stat return obj @@ -441,10 +416,6 @@ class ZooKeeperNodepool(ZooKeeperBase): path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) data, stat = self.kazoo_client.get(path) data = json.loads(data.decode('utf8')) - request_nodes = list(node_request.nodeset.getNodes()) - for i, nodeid in enumerate(data.get('nodes', [])): - request_nodes[i].id = nodeid - self._updateNode(request_nodes[i]) node_request.updateFromDict(data) def storeNode(self, node): @@ -459,13 +430,18 @@ class ZooKeeperNodepool(ZooKeeperBase): path = '%s/%s' % (self.NODES_ROOT, node.id) self.kazoo_client.set(path, json.dumps(node.toDict()).encode('utf8')) - def _updateNode(self, node): + def updateNode(self, node, node_id): """ Refresh an existing node. :param Node node: The node to update. + + :param str node_id: The ID of the node to update. The + existing node.id attribute (if any) will be ignored + and replaced with this. """ - node_path = '%s/%s' % (self.NODES_ROOT, node.id) + node_path = '%s/%s' % (self.NODES_ROOT, node_id) + node.id = node_id node_data, node_stat = self.kazoo_client.get(node_path) node_data = json.loads(node_data.decode('utf8')) node.updateFromDict(node_data)