diff --git a/releasenotes/notes/requestor-data-field-f69ad4a4b1e6e0ce.yaml b/releasenotes/notes/requestor-data-field-f69ad4a4b1e6e0ce.yaml new file mode 100644 index 0000000000..595202bb55 --- /dev/null +++ b/releasenotes/notes/requestor-data-field-f69ad4a4b1e6e0ce.yaml @@ -0,0 +1,5 @@ +--- +upgrade: + - | + Zuul now requires at least Nodepool version of 4.2.0 due to an + internal API change. diff --git a/tests/nodepool/test_nodepool_integration.py b/tests/nodepool/test_nodepool_integration.py index e212ef8ff5..6e756d811b 100644 --- a/tests/nodepool/test_nodepool_integration.py +++ b/tests/nodepool/test_nodepool_integration.py @@ -59,7 +59,8 @@ class TestNodepoolIntegration(BaseTestCase): nodeset.addNode(model.Node(['controller'], 'fake-label')) job = model.Job('testjob') job.nodeset = nodeset - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 1) self.assertEqual(request.state, model.STATE_FULFILLED) @@ -89,7 +90,8 @@ class TestNodepoolIntegration(BaseTestCase): nodeset.addNode(model.Node(['controller'], 'invalid-label')) job = model.Job('testjob') job.nodeset = nodeset - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 1) self.assertEqual(request.state, model.STATE_FAILED) @@ -104,7 +106,8 @@ class TestNodepoolIntegration(BaseTestCase): job = model.Job('testjob') job.nodeset = nodeset self.fake_nodepool.paused = True - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.zk_client.client.stop() self.zk_client.client.start() self.fake_nodepool.paused = False @@ -122,7 +125,8 @@ class TestNodepoolIntegration(BaseTestCase): job = model.Job('testjob') job.nodeset = nodeset self.fake_nodepool.paused = True - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.nodepool.cancelRequest(request) self.waitForRequests() diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py index f3b8352c62..f08e47fdd9 100644 --- a/tests/unit/test_nodepool.py +++ b/tests/unit/test_nodepool.py @@ -70,7 +70,8 @@ class TestNodepool(BaseTestCase): nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) job = model.Job('testjob') job.nodeset = nodeset - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 1) self.assertEqual(request.state, 'fulfilled') @@ -107,7 +108,8 @@ class TestNodepool(BaseTestCase): job = model.Job('testjob') job.nodeset = nodeset self.fake_nodepool.pause() - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.zk_client.client.stop() self.zk_client.client.start() self.fake_nodepool.unpause() @@ -124,7 +126,8 @@ class TestNodepool(BaseTestCase): job = model.Job('testjob') job.nodeset = nodeset self.fake_nodepool.pause() - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.nodepool.cancelRequest(request) self.waitForRequests() @@ -138,7 +141,8 @@ class TestNodepool(BaseTestCase): nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) job = model.Job('testjob') job.nodeset = nodeset - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 1) self.assertEqual(request.state, 'fulfilled') @@ -161,7 +165,8 @@ class TestNodepool(BaseTestCase): nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) job = model.Job('testjob') job.nodeset = nodeset - request = self.nodepool.requestNodes(None, job, 0) + request = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 1) self.assertEqual(request.state, 'fulfilled') @@ -187,8 +192,10 @@ class TestNodepool(BaseTestCase): job = model.Job('testjob') job.nodeset = nodeset self.fake_nodepool.pause() - request1 = self.nodepool.requestNodes(None, job, 1) - request2 = self.nodepool.requestNodes(None, job, 0) + request1 = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 1) + request2 = self.nodepool.requestNodes( + "test-uuid", job, "tenant", "pipeline", "provider", 0, 0) self.fake_nodepool.unpause() self.waitForRequests() self.assertEqual(len(self.provisioned_requests), 2) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 249a3d8aa3..0c10bc661e 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -622,17 +622,47 @@ class PipelineManager(metaclass=ABCMeta): build_set = item.current_build_set log.debug("Requesting nodes for change %s", item.change) if self.sched.use_relative_priority: - priority = item.getNodePriority() + relative_priority = item.getNodePriority() else: - priority = 0 + relative_priority = 0 for job in jobs: + provider = self._getPausedParentProvider(build_set, job) + priority = self._calculateNodeRequestPriority(build_set, job) + tenant_name = build_set.item.pipeline.tenant.name + pipeline_name = build_set.item.pipeline.name req = self.sched.nodepool.requestNodes( - build_set, job, priority, event=item.event) + build_set.uuid, job, tenant_name, pipeline_name, provider, + priority, relative_priority, event=item.event) log.debug("Adding node request %s for job %s to item %s", req, job, item) build_set.setJobNodeRequest(job.name, req) return True + def _getPausedParent(self, build_set, job): + job_graph = build_set.item.job_graph + if job_graph: + for parent in job_graph.getParentJobsRecursively(job.name): + build = build_set.getBuild(parent.name) + if build.paused: + return build + return None + + def _getPausedParentProvider(self, build_set, job): + build = self._getPausedParent(build_set, job) + if build: + nodeset = build_set.getJobNodeSet(build.job.name) + if nodeset and nodeset.nodes: + return list(nodeset.nodes.values())[0].provider + return None + + def _calculateNodeRequestPriority(self, build_set, job): + precedence_adjustment = 0 + precedence = build_set.item.pipeline.precedence + if self._getPausedParent(build_set, job): + precedence_adjustment = -1 + initial_precedence = model.PRIORITY_MAP[precedence] + return max(0, initial_precedence + precedence_adjustment) + def _executeJobs(self, item, jobs): log = get_annotated_logger(self.log, item.event) log.debug("Executing jobs for change %s", item.change) @@ -1452,25 +1482,24 @@ class PipelineManager(metaclass=ABCMeta): repo_state[connection] = event.repo_state[connection] build_set.repo_state_state = build_set.COMPLETE - def onNodesProvisioned(self, event): + def onNodesProvisioned(self, event, build_set): # TODOv3(jeblair): handle provisioning failure here request = event.request log = get_annotated_logger(self.log, request.event_id) - build_set = request.build_set - build_set.jobNodeRequestComplete(request.job.name, - request.nodeset) + build_set.jobNodeRequestComplete(request.job_name, request.nodeset) if request.failed or not request.fulfilled: log.info("Node request %s: failure for %s", - request, request.job.name) - build_set.item.setNodeRequestFailure(request.job) - self._resumeBuilds(request.build_set) + request, request.job_name) + job = build_set.item.getJob(request.job_name) + build_set.item.setNodeRequestFailure(job) + self._resumeBuilds(build_set) tenant = build_set.item.pipeline.tenant - tenant.semaphore_handler.release(build_set.item, request.job) + tenant.semaphore_handler.release(build_set.item, job) log.info("Completed node request %s for job %s of item %s " "with nodes %s", - request, request.job, build_set.item, request.nodeset) + request, request.job_name, build_set.item, request.nodeset) def reportItem(self, item): log = get_annotated_logger(self.log, item.event) diff --git a/zuul/model.py b/zuul/model.py index 960f1e3f30..3974cd797a 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -791,11 +791,13 @@ class NodeSet(ConfigObject): class NodeRequest(object): """A request for a set of nodes.""" - def __init__(self, requestor, build_set, job, nodeset, relative_priority, - event=None): + def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name, + job_name, nodeset, provider, relative_priority, event=None): self.requestor = requestor - self.build_set = build_set - self.job = job + self.build_set_uuid = build_set_uuid + self.tenant_name = tenant_name + self.pipeline_name = pipeline_name + self.job_name = job_name self.nodeset = nodeset self._state = STATE_REQUESTED self.requested_time = time.time() @@ -804,7 +806,7 @@ class NodeRequest(object): self.stat = None self.uid = uuid4().hex self.relative_priority = relative_priority - self.provider = self._getPausedParentProvider() + self.provider = provider self.id = None self._zk_data = {} # Data that we read back from ZK if event is not None: @@ -816,37 +818,6 @@ class NodeRequest(object): self.failed = False self.canceled = False - def _getPausedParent(self): - if self.build_set: - job_graph = self.build_set.item.job_graph - if job_graph: - for parent in job_graph.getParentJobsRecursively( - self.job.name): - build = self.build_set.getBuild(parent.name) - if build.paused: - return build - return None - - def _getPausedParentProvider(self): - build = self._getPausedParent() - if build: - nodeset = self.build_set.getJobNodeSet(build.job.name) - if nodeset and nodeset.nodes: - return list(nodeset.nodes.values())[0].provider - return None - - @property - def priority(self): - precedence_adjustment = 0 - if self.build_set: - precedence = self.build_set.item.pipeline.precedence - if self._getPausedParent(): - precedence_adjustment = -1 - else: - precedence = PRECEDENCE_NORMAL - initial_precedence = PRIORITY_MAP[precedence] - return max(0, initial_precedence + precedence_adjustment) - @property def fulfilled(self): return (self._state == STATE_FULFILLED) and not self.failed @@ -866,8 +837,22 @@ class NodeRequest(object): return '' % (self.id, self.nodeset) def toDict(self): + """ + Serialize a NodeRequest so it can be stored in ZooKeeper. + + Any additional information must be stored in the requestor_data field, + so Nodepool doesn't strip the information when it fulfills the request. + """ # Start with any previously read data d = self._zk_data.copy() + # The requestor_data is opaque to nodepool and won't be touched when + # by nodepool when it fulfills the request. + d["requestor_data"] = { + "build_set_uuid": self.build_set_uuid, + "tenant_name": self.tenant_name, + "pipeline_name": self.pipeline_name, + "job_name": self.job_name, + } nodes = [n.label for n in self.nodeset.getNodes()] # These are immutable once set d.setdefault('node_types', nodes) @@ -891,21 +876,20 @@ class NodeRequest(object): def fromDict(cls, data): """Deserialize a NodeRequest from the data in ZooKeeper. - When nodepool answeres/updates the NodeRequest in ZooKeeper, it strips - any additional attributes. Thus, we can only deserialize a NodeRequest - with a subset of attributes. + Any additional information must be stored in the requestor_data field, + so Nodepool doesn't strip the information when it fulfills the request. """ + # The requestor_data contains zuul-specific information which is opaque + # to nodepool and returned as-is when the NodeRequest is fulfilled. + requestor_data = data["requestor_data"] request = cls( requestor=data["requestor"], - # TODO (felix): Check if the build_set and job parameters are still - # required on the NodeRequest. In the current implementation they - # aren't available when the node request is deserialized on the - # executor because those values couldn't be serialized to ZK in the - # first place. So there might be a good chance that they aren't - # needed on the scheduler as well. - build_set=None, - job=None, + build_set_uuid=requestor_data["build_set_uuid"], + tenant_name=requestor_data["tenant_name"], + pipeline_name=requestor_data["pipeline_name"], + job_name=requestor_data["job_name"], nodeset=None, + provider=data["provider"], relative_priority=data.get("relative_priority", 0), ) @@ -4119,6 +4103,7 @@ class NodesProvisionedEvent(ResultEvent): def __init__(self, request): self.request = request + self.build_set_uuid = request.build_set_uuid self.request_id = request.id def toDict(self): diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 77839f4c28..b5b3048a24 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -111,17 +111,20 @@ class Nodepool(object): self.statsd.incr( key, value * duration, project=project, resource=resource) - def requestNodes(self, build_set, job, relative_priority, event=None): + def requestNodes(self, build_set_uuid, job, tenant_name, pipeline_name, + provider, priority, relative_priority, event=None): log = get_annotated_logger(self.log, event) # Create a copy of the nodeset to represent the actual nodes # returned by nodepool. nodeset = job.nodeset.copy() - req = model.NodeRequest(self.hostname, build_set, job, - nodeset, relative_priority, event=event) + req = model.NodeRequest(self.hostname, build_set_uuid, tenant_name, + pipeline_name, job.name, nodeset, provider, + relative_priority, event=event) self.requests[req.uid] = req if nodeset.nodes: - self.zk_nodepool.submitNodeRequest(req, self._updateNodeRequest) + self.zk_nodepool.submitNodeRequest( + req, priority, self._updateNodeRequest) # Logged after submission so that we have the request id log.info("Submitted node request %s", req) self.emitStats(req) @@ -375,7 +378,7 @@ class Nodepool(object): self._unlockNodes(locked_nodes) raise - def _updateNodeRequest(self, request, deleted): + def _updateNodeRequest(self, request, priority, deleted): log = get_annotated_logger(self.log, request.event_id) # Return False to indicate that we should stop watching the # node. @@ -395,7 +398,7 @@ class Nodepool(object): log.debug("Resubmitting lost node request %s", request) request.id = None self.zk_nodepool.submitNodeRequest( - request, self._updateNodeRequest) + request, priority, self._updateNodeRequest) # Stop watching this request node return False elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED): @@ -453,11 +456,14 @@ class Nodepool(object): if not self.zk_nodepool.nodeRequestExists(request): log.info("Request %s no longer exists, resubmitting", request.id) + # Look up the priority from the old request id before resetting + # it. + priority = request.id.partition("-")[0] request.id = None request.state = model.STATE_REQUESTED self.requests[request.uid] = request self.zk_nodepool.submitNodeRequest( - request, self._updateNodeRequest) + request, priority, self._updateNodeRequest) return False except Exception: # If we cannot retrieve the node request from ZK we probably lost diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 88a75e5e77..2de54a56ef 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1126,17 +1126,22 @@ class Scheduler(threading.Thread): for request_job, request in \ item.current_build_set.node_requests.items(): requests_to_cancel.append( - (item.current_build_set, request)) + ( + item.current_build_set, + request, + item.getJob(request_job), + ) + ) for build in builds_to_cancel: self.log.info( "Canceling build %s during reconfiguration" % (build,)) self.cancelJob(build.build_set, build.job, build=build) - for build_set, request in requests_to_cancel: + for build_set, request, request_job in requests_to_cancel: self.log.info( "Canceling node request %s during reconfiguration", request) - self.cancelJob(build_set, request.job) + self.cancelJob(build_set, request_job) for name, old_pipeline in old_tenant.layout.pipelines.items(): new_pipeline = tenant.layout.pipelines.get(name) if not new_pipeline: @@ -1206,18 +1211,23 @@ class Scheduler(threading.Thread): for request_job, request in \ item.current_build_set.node_requests.items(): requests_to_cancel.append( - (item.current_build_set, request)) + ( + item.current_build_set, + request, + item.getJob(request_job), + ) + ) for build in builds_to_cancel: self.log.info( "Canceling build %s during reconfiguration" % (build,)) self.cancelJob(build.build_set, build.job, build=build, force=True) - for build_set, request in requests_to_cancel: + for build_set, request, request_job in requests_to_cancel: self.log.info( "Canceling node request %s during reconfiguration", request) - self.cancelJob(build_set, request.job, force=True) + self.cancelJob(build_set, request_job, force=True) def _doPromoteEvent(self, event): tenant = self.abide.tenants.get(event.tenant_name) @@ -1852,7 +1862,9 @@ class Scheduler(threading.Thread): def _doNodesProvisionedEvent(self, event): request = event.request request_id = event.request_id - build_set = request.build_set + tenant_name = request.tenant_name + pipeline_name = request.pipeline_name + log = get_annotated_logger(self.log, request.event_id) ready = self.nodepool.checkNodeRequest(request, request_id) @@ -1864,31 +1876,34 @@ class Scheduler(threading.Thread): if request.state == STATE_FAILED: self.nodepool.deleteNodeRequest(request) - if build_set is not build_set.item.current_build_set: - log.warning("Build set %s is not current " - "for node request %s", build_set, request) + # Look up the pipeline by its name + # TODO (felix): The pipeline lookup can be removed once the + # NodesProvisionedEvents are in ZooKeeper. + pipeline = None + tenant = self.abide.tenants.get(tenant_name) + for pl in tenant.layout.pipelines.values(): + if pl.name == pipeline_name: + pipeline = pl + break + + build_set = self._getBuildSetFromPipeline(event, pipeline) + if not build_set: if request.fulfilled: self.nodepool.returnNodeSet(request.nodeset, zuul_event_id=request.event_id) return - if request.job.name not in [x.name for x in build_set.item.getJobs()]: + + if request.job_name not in [x.name for x in build_set.item.getJobs()]: log.warning("Item %s does not contain job %s " "for node request %s", - build_set.item, request.job.name, request) - build_set.removeJobNodeRequest(request.job.name) + build_set.item, request.job_name, request) + build_set.removeJobNodeRequest(request.job_name) if request.fulfilled: self.nodepool.returnNodeSet(request.nodeset, zuul_event_id=request.event_id) return - pipeline = build_set.item.pipeline - if not pipeline: - log.warning("Build set %s is not associated with a pipeline " - "for node request %s", build_set, request) - if request.fulfilled: - self.nodepool.returnNodeSet(request.nodeset, - zuul_event_id=request.event_id) - return - pipeline.manager.onNodesProvisioned(event) + + pipeline.manager.onNodesProvisioned(event, build_set) def formatStatusJSON(self, tenant_name): # TODOv3(jeblair): use tenants diff --git a/zuul/zk/nodepool.py b/zuul/zk/nodepool.py index 6d4f342df8..3101f8d2f1 100644 --- a/zuul/zk/nodepool.py +++ b/zuul/zk/nodepool.py @@ -366,7 +366,7 @@ class ZooKeeperNodepool(ZooKeeperBase): self.log.exception( "Exception in hold request cache update for event: %s", event) - def submitNodeRequest(self, node_request, watcher): + def submitNodeRequest(self, node_request, priority, watcher): """ Submit a request for nodes to Nodepool. @@ -385,7 +385,7 @@ class ZooKeeperNodepool(ZooKeeperBase): node_request.created_time = time.time() data = node_request.toDict() - path = '{}/{:0>3}-'.format(self.REQUEST_ROOT, node_request.priority) + path = '{}/{:0>3}-'.format(self.REQUEST_ROOT, priority) path = self.kazoo_client.create(path, json.dumps(data).encode('utf8'), makepath=True, sequence=True, ephemeral=True) @@ -396,7 +396,7 @@ class ZooKeeperNodepool(ZooKeeperBase): if value: self.updateNodeRequest(node_request, value) deleted = (value is None) # data *are* none - return watcher(node_request, deleted) + return watcher(node_request, priority, deleted) self.kazoo_client.DataWatch(path, callback)