From dbe13ce07697c04ea13d956f2cf26a91a9fc3ee3 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 25 Aug 2021 16:56:33 -0700 Subject: [PATCH] Remove nodeset from NodeRequest To make things simpler for schedulers to handle node provisioned events for node requests which they may not have in their local pipeline state, we need to make the pipeline storage of node requests simpler. That starts by removing the nodeset object as an attribute of the NodeRequest object. This means that the scheduler can work with a node request object without relying on having the associated nodeset. It also simplifies the ZooKeeper code that deserializes NodeRequests (as it doesn't have to create fake NodeSet objects too). And finally, it simplifies what must be stored in the pipeline and queue item structures, which will also come in handy later. Two tests designed to verify that the request->nodeset magic deserialization worked have been removed since they are no longer applicable. Change-Id: I70ae083765d5cd9a4fd1afc2442bf22d6c52ba0b --- tests/base.py | 4 +- tests/unit/test_nodepool.py | 105 ++++++++--------------------------- tests/unit/test_scheduler.py | 12 ++-- tests/unit/test_web.py | 9 --- zuul/executor/client.py | 9 +-- zuul/executor/common.py | 13 +---- zuul/executor/server.py | 18 +++--- zuul/manager/__init__.py | 7 ++- zuul/model.py | 17 +++--- zuul/nodepool.py | 66 ++++++++++++---------- zuul/rpclistener.py | 5 +- zuul/scheduler.py | 24 ++++---- zuul/zk/nodepool.py | 44 ++++----------- 13 files changed, 119 insertions(+), 214 deletions(-) diff --git a/tests/base.py b/tests/base.py index 346556bee7..f0ff64afc5 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2880,8 +2880,8 @@ class FakeBuild(object): # the complexity around multi-node jobs here # (self.nodes[0].label?) self.node = None - if len(self.parameters.get('nodes')) == 1: - self.node = self.parameters['nodes'][0]['label'] + if len(self.parameters['nodeset']['nodes']) == 1: + self.node = self.parameters['nodeset']['nodes'][0]['label'] self.unique = self.parameters['zuul']['build'] self.pipeline = self.parameters['zuul']['pipeline'] self.project = self.parameters['zuul']['project']['name'] diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py index 016f23b174..43478572c9 100644 --- a/tests/unit/test_nodepool.py +++ b/tests/unit/test_nodepool.py @@ -80,25 +80,26 @@ class TestNodepool(TestNodepoolBase): self.assertEqual(request.state, 'fulfilled') # Accept the nodes - accepted = self.nodepool.checkNodeRequest(request, request.id) - self.assertTrue(accepted) + new_nodeset = self.nodepool.checkNodeRequest( + request, request.id, nodeset) + self.assertIsNotNone(new_nodeset) # acceptNodes will be called on the executor, but only if the # noderequest was accepted before. - self.nodepool.acceptNodes(request) - nodeset = request.nodeset + executor_nodeset = nodeset.copy() + self.nodepool.acceptNodes(request, executor_nodeset) - for node in nodeset.getNodes(): + for node in executor_nodeset.getNodes(): self.assertIsNotNone(node.lock) self.assertEqual(node.state, 'ready') # Mark the nodes in use - self.nodepool.useNodeSet(nodeset) - for node in nodeset.getNodes(): + self.nodepool.useNodeSet(executor_nodeset) + for node in executor_nodeset.getNodes(): self.assertEqual(node.state, 'in-use') # Return the nodes - self.nodepool.returnNodeSet(nodeset) - for node in nodeset.getNodes(): + self.nodepool.returnNodeSet(executor_nodeset) + for node in executor_nodeset.getNodes(): self.assertIsNone(node.lock) self.assertEqual(node.state, 'used') @@ -151,12 +152,16 @@ class TestNodepool(TestNodepoolBase): self.assertEqual(request.state, 'fulfilled') # Accept the nodes, passing a different ID - accepted = self.nodepool.checkNodeRequest(request, "invalid") - self.assertFalse(accepted) + new_nodeset = self.nodepool.checkNodeRequest( + request, "invalid", nodeset) + self.assertIsNone(new_nodeset) # Don't call acceptNodes here as the node request wasn't accepted. - nodeset = request.nodeset - for node in nodeset.getNodes(): + # Nothing we have done has returned an updated nodeset with + # real node records, so we need to do that ourselves to verify + # they are still unused. + for node_id, node in zip(request.nodes, nodeset.getNodes()): + self.nodepool.zk_nodepool.updateNode(node, node_id) self.assertIsNone(node.lock) self.assertEqual(node.state, 'ready') @@ -177,10 +182,10 @@ class TestNodepool(TestNodepoolBase): self.zk_nodepool.deleteNodeRequest(request) # Accept the nodes - accepted = self.nodepool.checkNodeRequest(request, request.id) - self.assertFalse(accepted) + new_nodeset = self.nodepool.checkNodeRequest( + request, request.id, nodeset) + self.assertIsNone(new_nodeset) # Don't call acceptNodes here as the node request wasn't accepted. - nodeset = request.nodeset for node in nodeset.getNodes(): self.assertIsNone(node.lock) @@ -206,70 +211,6 @@ class TestNodepool(TestNodepoolBase): self.assertEqual(request2.state, 'fulfilled') self.assertTrue(request2.state_time < request1.state_time) - def test_get_node_request_with_nodeset(self): - # Test that we are able to deserialize a node request from ZK and - # update the node information while providing a valid NodeSet. - nodeset = model.NodeSet() - nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial')) - nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) - job = model.Job('testjob') - job.nodeset = nodeset - - request = self.nodepool.requestNodes( - "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) - self.waitForRequests() - self.assertEqual(len(self.provisioned_requests), 1) - self.assertEqual(request.state, 'fulfilled') - - # Look up the node request from ZooKeeper while providing the original - # nodeset. - restored_request = self.zk_nodepool.getNodeRequest(request.id, nodeset) - - # As we've provided the origial nodeset when retrieving the node - # request from ZooKeeper, they should be the same - self.assertEqual(restored_request.nodeset, nodeset) - - # And the nodeset should contain the original data - restored_nodes = restored_request.nodeset.getNodes() - self.assertEqual(restored_nodes[0].name, ['controller', 'foo']) - self.assertEqual(restored_nodes[1].name, ['compute']) - - def test_get_node_request_without_nodeset(self): - # Test that we are able to deserialize a node request from ZK and - # update the node information without providing a NodeSet object. - - # This is used in case something went wrong when processing the - # NodesProvisionedEvents in the scheduler and the original NodeRequest - # and/or NodeSet objects are not available anymore. - nodeset = model.NodeSet() - nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial')) - nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) - job = model.Job('testjob') - job.nodeset = nodeset - - request = self.nodepool.requestNodes( - "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) - self.waitForRequests() - self.assertEqual(len(self.provisioned_requests), 1) - self.assertEqual(request.state, 'fulfilled') - - # Look up the node request from ZooKeeper while providing no nodeset - # will result in a fake nodeset being created for the node update. - restored_request = self.zk_nodepool.getNodeRequest(request.id) - - # As we didn't provide a nodeset, the nodepool client will create a - # fake one to look up the node information from nodepool. - self.assertFalse(nodeset == restored_request.nodeset) - - restored_nodes = restored_request.nodeset.getNodes() - self.assertEqual(len(restored_nodes), 2) - self.assertEqual(restored_nodes[0].label, 'ubuntu-xenial') - self.assertEqual(restored_nodes[1].label, 'ubuntu-xenial') - # As the nodes were faked, they don't have the same name like the - # original ones from the config - self.assertEqual(restored_nodes[0].name, "ubuntu-xenial-0") - self.assertEqual(restored_nodes[1].name, "ubuntu-xenial-1") - class TestNodepoolResubmit(TestNodepoolBase): def setUp(self): @@ -291,7 +232,8 @@ class TestNodepoolResubmit(TestNodepoolBase): self.disconnect_event.wait() self.zk_client.client.stop() self.zk_client.client.start() - self.nodepool.checkNodeRequest(self.request, self.request.id) + self.nodepool.checkNodeRequest( + self.request, self.request.id, self.nodeset) def test_node_request_disconnect_late(self): # Test that node requests are re-submitted after a disconnect @@ -304,6 +246,7 @@ class TestNodepoolResubmit(TestNodepoolBase): nodeset = model.NodeSet() nodeset.addNode(model.Node(['controller'], 'ubuntu-xenial')) nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) + self.nodeset = nodeset job = model.Job('testjob') job.nodeset = nodeset self.request = self.nodepool.requestNodes( diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 4cf0390f4a..f5eff0204f 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -3336,8 +3336,8 @@ class TestScheduler(ZuulTestCase): p = self.history[0].parameters self.assertEqual(p['timeout'], 40) - self.assertEqual(len(p['nodes']), 1) - self.assertEqual(p['nodes'][0]['label'], 'new') + self.assertEqual(len(p['nodeset']['nodes']), 1) + self.assertEqual(p['nodeset']['nodes'][0]['label'], 'new') self.assertEqual([x['path'] for x in p['pre_playbooks']], ['base-pre', 'py27-pre']) self.assertEqual([x['path'] for x in p['post_playbooks']], @@ -3347,8 +3347,8 @@ class TestScheduler(ZuulTestCase): p = self.history[1].parameters self.assertEqual(p['timeout'], 50) - self.assertEqual(len(p['nodes']), 1) - self.assertEqual(p['nodes'][0]['label'], 'old') + self.assertEqual(len(p['nodeset']['nodes']), 1) + self.assertEqual(p['nodeset']['nodes'][0]['label'], 'old') self.assertEqual([x['path'] for x in p['pre_playbooks']], ['base-pre', 'py27-pre', 'py27-diablo-pre']) self.assertEqual([x['path'] for x in p['post_playbooks']], @@ -3359,8 +3359,8 @@ class TestScheduler(ZuulTestCase): p = self.history[2].parameters self.assertEqual(p['timeout'], 40) - self.assertEqual(len(p['nodes']), 1) - self.assertEqual(p['nodes'][0]['label'], 'new') + self.assertEqual(len(p['nodeset']['nodes']), 1) + self.assertEqual(p['nodeset']['nodes'][0]['label'], 'new') self.assertEqual([x['path'] for x in p['pre_playbooks']], ['base-pre', 'py27-pre', 'py27-essex-pre']) self.assertEqual([x['path'] for x in p['post_playbooks']], diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index a15337af78..480231d068 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -1020,15 +1020,6 @@ class TestWeb(BaseTestWeb): 'projects': [], 'branch': 'master', 'cleanup_playbooks': [], - 'groups': [], - 'nodes': [{ - 'comment': None, - 'hold_job': None, - 'id': None, - 'label': 'label1', - 'name': ['controller'], - 'state': 'unknown' - }], 'nodeset': { 'groups': [], 'name': '', diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 3a67a64938..9c7a7a3459 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -61,7 +61,7 @@ class ExecutorClient(object): job, uuid, nodeset, item.change, dependent_changes) params = zuul.executor.common.construct_build_params( - uuid, self.sched, nodeset, + uuid, self.sched, job, item, pipeline, dependent_changes, merger_items, redact_secrets_and_keys=False) # TODO: deprecate and remove this variable? @@ -111,9 +111,10 @@ class ExecutorClient(object): # availability zone we can get executor_zone from only the first # node. executor_zone = None - if params["nodes"] and params["nodes"][0].get('attributes'): - executor_zone = params[ - "nodes"][0]['attributes'].get('executor-zone') + if len(nodeset.nodes): + node = nodeset.getNodes()[0] + if node.attributes: + executor_zone = node.attributes.get('executor-zone') zone_known = False if executor_zone: diff --git a/zuul/executor/common.py b/zuul/executor/common.py index da7d2ffe57..4156f3ba0b 100644 --- a/zuul/executor/common.py +++ b/zuul/executor/common.py @@ -17,7 +17,7 @@ import os from zuul.lib import strings -def construct_build_params(uuid, sched, nodeset, job, item, pipeline, +def construct_build_params(uuid, sched, job, item, pipeline, dependent_changes=[], merger_items=[], redact_secrets_and_keys=True): """Returns a list of all the parameters needed to build a job. @@ -124,16 +124,7 @@ def construct_build_params(uuid, sched, nodeset, job, item, pipeline, params['cleanup_playbooks'] = [make_playbook(x) for x in job.cleanup_run] - # TODO(corvus): Remove nodes and groups since they're included in - # nodeset - nodes = [] - for node in nodeset.getNodes(): - n = node.toDict() - n.update(dict(name=node.name, label=node.label)) - nodes.append(n) - params['nodes'] = nodes - params['groups'] = [group.toDict() for group in nodeset.getGroups()] - params["nodeset"] = nodeset.toDict() + params["nodeset"] = job.nodeset.toDict() params['ssh_keys'] = [] if pipeline.post_review: if redact_secrets_and_keys: diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 22167d6651..4df32c4516 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -1104,8 +1104,7 @@ class AnsibleJob(object): if node_request_id: zk_nodepool = self.executor_server.nodepool.zk_nodepool self.node_request = zk_nodepool.getNodeRequest( - self.arguments["noderequest_id"], self.nodeset - ) + self.arguments["noderequest_id"]) if self.node_request is None: self.log.error( @@ -1122,10 +1121,11 @@ class AnsibleJob(object): if self.node_request: self.log.debug("Locking nodeset") try: - self.executor_server.nodepool.acceptNodes(self.node_request) + self.executor_server.nodepool.acceptNodes( + self.node_request, self.nodeset) except Exception: self.log.exception( - "Error locking nodeset %s", self.node_request.nodeset + "Error locking nodeset %s", self.nodeset ) raise NodeRequestError @@ -1133,13 +1133,13 @@ class AnsibleJob(object): if self.node_request: try: self.executor_server.nodepool.returnNodeSet( - self.node_request.nodeset, + self.nodeset, self, zuul_event_id=self.zuul_event_id, ) except Exception: self.log.exception( - "Unable to return nodeset %s", self.node_request.nodeset + "Unable to return nodeset %s", self.nodeset ) def _base_job_data(self): @@ -1347,9 +1347,7 @@ class AnsibleJob(object): # start to run tasks on nodes (prepareVars in particular uses # Ansible to freeze hostvars). if self.node_request: - self.executor_server.nodepool.useNodeSet( - self.node_request.nodeset, self - ) + self.executor_server.nodepool.useNodeSet(self.nodeset, self) # This prepares each playbook and the roles needed for each. self.preparePlaybooks(args) @@ -3786,7 +3784,7 @@ class ExecutorServer(BaseMergeServer): if request is not None: self.log.debug("Got autohold %s", request) self.nodepool.holdNodeSet( - ansible_job.node_request.nodeset, request, ansible_job + ansible_job.nodeset, request, ansible_job ) return True return False diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 0f8011e9f3..44eb6aef25 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -1501,11 +1501,12 @@ class PipelineManager(metaclass=ABCMeta): repo_state[connection] = event.repo_state[connection] build_set.repo_state_state = build_set.COMPLETE - def onNodesProvisioned(self, request, build_set): + def onNodesProvisioned(self, request, nodeset, build_set): # TODOv3(jeblair): handle provisioning failure here log = get_annotated_logger(self.log, request.event_id) - build_set.jobNodeRequestComplete(request.job_name, request.nodeset) + if nodeset is not None: + build_set.jobNodeRequestComplete(request.job_name, nodeset) # TODO (felix): Check if the failed is still needed as the # NodesProvisionedEvents are now in ZooKeeper. if request.failed or not request.fulfilled: @@ -1519,7 +1520,7 @@ class PipelineManager(metaclass=ABCMeta): log.info("Completed node request %s for job %s of item %s " "with nodes %s", - request, request.job_name, build_set.item, request.nodeset) + request, request.job_name, build_set.item, request.nodes) def reportItem(self, item): log = get_annotated_logger(self.log, item.event) diff --git a/zuul/model.py b/zuul/model.py index 5529545594..73ad5c0624 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -619,6 +619,7 @@ class Node(ConfigObject): self.hold_expiration = None self.resources = None self.allocated_to = None + self.attributes = {} @property def state(self): @@ -798,14 +799,15 @@ class NodeRequest(object): """A request for a set of nodes.""" def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name, - job_name, nodeset, provider, relative_priority, + job_name, labels, provider, relative_priority, event_id=None): self.requestor = requestor self.build_set_uuid = build_set_uuid self.tenant_name = tenant_name self.pipeline_name = pipeline_name self.job_name = job_name - self.nodeset = nodeset + self.labels = labels + self.nodes = [] self._state = STATE_REQUESTED self.requested_time = time.time() self.state_time = time.time() @@ -826,7 +828,7 @@ class NodeRequest(object): # Reset the node request for re-submission self._zk_data = {} # Remove any real node information - self.nodeset = self.nodeset.copy() + self.nodes = [] self.id = None self.state = STATE_REQUESTED self.stat = None @@ -849,7 +851,7 @@ class NodeRequest(object): self.state_time = time.time() def __repr__(self): - return '' % (self.id, self.nodeset) + return '' % (self.id, self.labels) def toDict(self): """ @@ -868,9 +870,7 @@ class NodeRequest(object): "pipeline_name": self.pipeline_name, "job_name": self.job_name, } - nodes = [n.label for n in self.nodeset.getNodes()] - # These are immutable once set - d.setdefault('node_types', nodes) + d.setdefault('node_types', self.labels) d.setdefault('requestor', self.requestor) d.setdefault('created_time', self.created_time) d.setdefault('provider', self.provider) @@ -893,6 +893,7 @@ class NodeRequest(object): # to errors at other places where we rely on that info. if 'tenant_name' in data: self.tenant_name = data['tenant_name'] + self.nodes = data.get('nodes', []) @classmethod def fromDict(cls, data): @@ -910,7 +911,7 @@ class NodeRequest(object): tenant_name=requestor_data["tenant_name"], pipeline_name=requestor_data["pipeline_name"], job_name=requestor_data["job_name"], - nodeset=None, + labels=data["node_types"], provider=data["provider"], relative_priority=data.get("relative_priority", 0), ) diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 694d3bc810..59f1bdb219 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -70,13 +70,13 @@ class Nodepool(object): if dt: pipe.timing(key, dt) - for node in request.nodeset.getNodes(): - pipe.incr(key + '.label.%s' % node.label) + for label in request.labels: + pipe.incr(key + '.label.%s' % label) if dt: - pipe.timing(key + '.label.%s' % node.label, dt) - pipe.incr(key + '.size.%s' % len(request.nodeset.nodes)) + pipe.timing(key + '.label.%s' % label, dt) + pipe.incr(key + '.size.%s' % len(request.labels)) if dt: - pipe.timing(key + '.size.%s' % len(request.nodeset.nodes), dt) + pipe.timing(key + '.size.%s' % len(request.labels), dt) pipe.send() def emitStatsResources(self): @@ -112,19 +112,17 @@ class Nodepool(object): def requestNodes(self, build_set_uuid, job, tenant_name, pipeline_name, provider, priority, relative_priority, event=None): log = get_annotated_logger(self.log, event) - # Create a copy of the nodeset to represent the actual nodes - # returned by nodepool. - nodeset = job.nodeset.copy() + labels = [n.label for n in job.nodeset.getNodes()] if event: event_id = event.zuul_event_id else: event_id = None req = model.NodeRequest(self.hostname, build_set_uuid, tenant_name, - pipeline_name, job.name, nodeset, provider, + pipeline_name, job.name, labels, provider, relative_priority, event_id) self.requests[req.uid] = req - if nodeset.nodes: + if job.nodeset.nodes: self.zk_nodepool.submitNodeRequest( req, priority, self._updateNodeRequest) # Logged after submission so that we have the request id @@ -314,7 +312,7 @@ class Nodepool(object): except Exception: log.exception("Exception storing node %s " "while unlocking:", node) - self._unlockNodes(nodeset.getNodes()) + self.unlockNodeSet(nodeset) if not ansible_job: return @@ -354,25 +352,26 @@ class Nodepool(object): except Exception: self.log.exception("Error unlocking node:") - def lockNodeSet(self, nodeset, request_id): + def lockNodes(self, request, nodeset): # Try to lock all of the supplied nodes. If any lock fails, # try to unlock any which have already been locked before # re-raising the error. locked_nodes = [] try: - for node in nodeset.getNodes(): - if node.allocated_to != request_id: + for node_id, node in zip(request.nodes, nodeset.getNodes()): + self.zk_nodepool.updateNode(node, node_id) + if node.allocated_to != request.id: raise Exception("Node %s allocated to %s, not %s" % - (node.id, node.allocated_to, request_id)) + (node.id, node.allocated_to, request.id)) self.log.debug("Locking node %s" % (node,)) self.zk_nodepool.lockNode(node, timeout=30) # Check the allocated_to again to ensure that nodepool didn't # re-allocate the nodes to a different node request while we # were locking them. - if node.allocated_to != request_id: + if node.allocated_to != request.id: raise Exception( "Node %s was reallocated during locking %s, not %s" % - (node.id, node.allocated_to, request_id)) + (node.id, node.allocated_to, request.id)) locked_nodes.append(node) except Exception: self.log.exception("Error locking nodes:") @@ -420,34 +419,35 @@ class Nodepool(object): return True - def checkNodeRequest(self, request, request_id): + def checkNodeRequest(self, request, request_id, job_nodeset): """ Called by the scheduler when it wants to accept a node request for potential use of its nodes. The nodes itself will be accepted and locked by the executor when the corresponding job is started. - :returns: False if there is a problem with the request (canceled or - retrying), True if it is ready to be acted upon (success or - failure). + :returns: A new NodeSet object which contains information from + nodepool about the actual allocated nodes. """ log = get_annotated_logger(self.log, request.event_id) log.info("Accepting node request %s", request) + # A copy of the nodeset with information about the real nodes + nodeset = job_nodeset.copy() if request_id != request.id: log.info("Skipping node accept for %s (resubmitted as %s)", request_id, request.id) - return False + return None if request.canceled: log.info("Ignoring canceled node request %s", request) # The request was already deleted when it was canceled - return False + return None # If we didn't request nodes and the request is fulfilled then just # reutrn. We don't have to do anything in this case. Further don't even # ask ZK for the request as empty requests are not put into ZK. - if not request.nodeset.nodes and request.fulfilled: - return True + if not request.labels and request.fulfilled: + return nodeset # Make sure the request still exists. It's possible it could have # disappeared if we lost the ZK session between when the fulfillment @@ -464,7 +464,10 @@ class Nodepool(object): request.reset() self.zk_nodepool.submitNodeRequest( request, priority, self._updateNodeRequest) - return False + return None + else: + for node_id, node in zip(request.nodes, nodeset.getNodes()): + self.zk_nodepool.updateNode(node, node_id) except Exception: # If we cannot retrieve the node request from ZK we probably lost # the connection and thus the ZK session. Resubmitting the node @@ -473,16 +476,18 @@ class Nodepool(object): # with zookeeper and fail here. log.exception("Error getting node request %s:", request_id) request.failed = True - return True + return nodeset - return True + return nodeset - def acceptNodes(self, request): + def acceptNodes(self, request, nodeset): + # Accept the nodes supplied by request, mutate nodeset with + # the real node information. locked = False if request.fulfilled: # If the request succeeded, try to lock the nodes. try: - self.lockNodeSet(request.nodeset, request.id) + nodes = self.lockNodes(request, nodeset) locked = True except Exception: log = get_annotated_logger(self.log, request.event_id) @@ -495,6 +500,7 @@ class Nodepool(object): if request.failed: raise Exception("Accepting nodes failed") + return nodes def deleteNodeRequest(self, request, locked=False): log = get_annotated_logger(self.log, request.event_id) diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 496d493c1a..3c5c8f0c26 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -540,13 +540,10 @@ class RPCListener(RPCListenerBase): if not job: gear_job.sendWorkComplete(json.dumps(None)) return - # TODO: check if this is frozen? - nodeset = job.nodeset job.setBase(tenant.layout) uuid = '0' * 32 params = zuul.executor.common.construct_build_params( - uuid, self.sched, nodeset, - job, item, pipeline) + uuid, self.sched, job, item, pipeline) gear_job.sendWorkComplete(json.dumps(params, cls=ZuulJSONEncoder)) def handle_allowed_labels_get(self, job): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 10c02c11fd..d63b3eaf45 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2049,26 +2049,26 @@ class Scheduler(threading.Thread): if not request: return - ready = self.nodepool.checkNodeRequest(request, request_id) - if not ready: + log = get_annotated_logger(self.log, request.event_id) + job = build_set.item.getJob(request.job_name) + if job is None: + log.warning("Item %s does not contain job %s " + "for node request %s", + build_set.item, request.job_name, request) + build_set.removeJobNodeRequest(request.job_name) return - log = get_annotated_logger(self.log, request.event_id) + nodeset = self.nodepool.checkNodeRequest(request, request_id, + job.nodeset) + if nodeset is None: + return # If the request failed, we must directly delete it as the nodes will # never be accepted. if request.state == STATE_FAILED: self.nodepool.deleteNodeRequest(request) - if request.job_name not in [x.name for x in build_set.item.getJobs()]: - log.warning("Item %s does not contain job %s " - "for node request %s", - build_set.item, request.job_name, request) - build_set.removeJobNodeRequest(request.job_name) - self.nodepool.deleteNodeRequest(request) - return - - pipeline.manager.onNodesProvisioned(request, build_set) + pipeline.manager.onNodesProvisioned(request, nodeset, build_set) def formatStatusJSON(self, tenant_name): # TODOv3(jeblair): use tenants diff --git a/zuul/zk/nodepool.py b/zuul/zk/nodepool.py index e9e2465b2e..c07d731964 100644 --- a/zuul/zk/nodepool.py +++ b/zuul/zk/nodepool.py @@ -19,7 +19,7 @@ from kazoo.exceptions import NoNodeError, LockTimeout from kazoo.recipe.lock import Lock import zuul.model -from zuul.model import HoldRequest, Node, NodeRequest, NodeSet +from zuul.model import HoldRequest, NodeRequest from zuul.zk import ZooKeeperClient, ZooKeeperBase from zuul.zk.exceptions import LockException @@ -343,14 +343,11 @@ class ZooKeeperNodepool(ZooKeeperBase): self.kazoo_client.DataWatch(path, callback) - def getNodeRequest(self, node_request_id, nodeset=None): + def getNodeRequest(self, node_request_id): """ Retrieve a NodeRequest from a given path in ZooKeeper. - The serialized version of the NodeRequest doesn't contain a NodeSet, so - we have to add this to the request manually. The nodeset provided to - this method will be set on the NodeRequest before updating it. This - will ensure that all nodes are updated as well. + :param str node_request_id: The ID of the node request to retrieve. """ path = f"{self.REQUEST_ROOT}/{node_request_id}" @@ -365,28 +362,6 @@ class ZooKeeperNodepool(ZooKeeperBase): json_data = json.loads(data.decode("utf-8")) obj = NodeRequest.fromDict(json_data) - if nodeset is None: - # If no NodeSet is provided, we create one on-the-fly based on the - # list of labels (node_types) stored in the NodeRequest's znode - # data. - # This is necessary as the logic in the updateNodeRequest() method - # below will update each node "in-place" an thus, we have to ensure - # that the NodeRequest has a valid NodeSet with all nodes available - # in advance. - # This is only used to return the nodes to nodepool in case - # something went wrong and the original NodeRequest and/or NodeSet - # objects are not available anymore. - nodeset = NodeSet() - for i, node_type in enumerate(json_data["node_types"]): - node = Node(name=f"{node_type}-{i}", label=node_type) - nodeset.addNode(node) - - obj.nodeset = nodeset - # Using updateNodeRequest() here will ensure that the nodes are also - # updated. Doing the update in here directly rather than calling it - # from the outside afterwards, saves us one call to ZooKeeper as the - # data we just retrieved can directly be reused. - self.updateNodeRequest(obj, data) obj.id = node_request_id obj.stat = stat return obj @@ -441,10 +416,6 @@ class ZooKeeperNodepool(ZooKeeperBase): path = '%s/%s' % (self.REQUEST_ROOT, node_request.id) data, stat = self.kazoo_client.get(path) data = json.loads(data.decode('utf8')) - request_nodes = list(node_request.nodeset.getNodes()) - for i, nodeid in enumerate(data.get('nodes', [])): - request_nodes[i].id = nodeid - self._updateNode(request_nodes[i]) node_request.updateFromDict(data) def storeNode(self, node): @@ -459,13 +430,18 @@ class ZooKeeperNodepool(ZooKeeperBase): path = '%s/%s' % (self.NODES_ROOT, node.id) self.kazoo_client.set(path, json.dumps(node.toDict()).encode('utf8')) - def _updateNode(self, node): + def updateNode(self, node, node_id): """ Refresh an existing node. :param Node node: The node to update. + + :param str node_id: The ID of the node to update. The + existing node.id attribute (if any) will be ignored + and replaced with this. """ - node_path = '%s/%s' % (self.NODES_ROOT, node.id) + node_path = '%s/%s' % (self.NODES_ROOT, node_id) + node.id = node_id node_data, node_stat = self.kazoo_client.get(node_path) node_data = json.loads(node_data.decode('utf8')) node.updateFromDict(node_data)