diff --git a/tests/base.py b/tests/base.py index af77ac790a..aa0f056679 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3210,6 +3210,15 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): job.arguments = json.dumps(args) super(RecordingExecutorServer, self).executeJob(job) + def lockNodes(self, job, event): + locked = super().lockNodes(job, event) + if not locked: + # If the nodes could not be locked, directly remove the build + # from the list of running builds as it will never be executed. + build = self.job_builds.get(job.unique) + self.running_builds.remove(build) + return locked + def stopJob(self, job): self.log.debug("handle stop") parameters = json.loads(job.arguments) diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py index baca6e053e..d69f3d2db8 100644 --- a/tests/unit/test_executor.py +++ b/tests/unit/test_executor.py @@ -436,8 +436,16 @@ class TestAnsibleJob(ZuulTestCase): def setUp(self): super(TestAnsibleJob, self).setUp() ansible_version = AnsibleManager().default_version - args = '{"ansible_version": "%s"}' % ansible_version - job = gear.TextJob('executor:execute', args, unique='test') + args = { + "ansible_version": ansible_version, + "nodeset": { + "name": "dummy-node", + "node_request_id": 0, + "nodes": [], + "groups": [], + }, + } + job = gear.TextJob('executor:execute', json.dumps(args), unique='test') self.test_job = zuul.executor.server.AnsibleJob(self.executor_server, job) diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py index 3b6bfca3ce..4ca5d71641 100644 --- a/tests/unit/test_nodepool.py +++ b/tests/unit/test_nodepool.py @@ -80,9 +80,14 @@ class TestNodepool(BaseTestCase): nodeset = request.nodeset for node in nodeset.getNodes(): - self.assertIsNotNone(node.lock) self.assertEqual(node.state, 'ready') + # Lock the nodes + self.nodepool.lockNodeSet(nodeset) + + for node in nodeset.getNodes(): + self.assertIsNotNone(node.lock) + # Mark the nodes in use self.nodepool.useNodeSet(nodeset) for node in nodeset.getNodes(): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index da5f6aec81..93f2e2d919 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -30,7 +30,6 @@ from kazoo.exceptions import NoNodeError import git import testtools from testtools.matchers import AfterPreprocessing, MatchesRegex -from zuul.scheduler import Scheduler import fixtures import zuul.change_matcher @@ -45,7 +44,8 @@ from tests.base import ( repack_repo, simple_layout, iterate_timeout, - RecordingExecutorServer, TestConnectionRegistry, + RecordingExecutorServer, + TestConnectionRegistry, ) @@ -2279,10 +2279,10 @@ class TestScheduler(ZuulTestCase): @simple_layout('layouts/autohold.yaml') def test_autohold_request_expiration(self): - orig_exp = Scheduler.EXPIRED_HOLD_REQUEST_TTL + orig_exp = RecordingExecutorServer.EXPIRED_HOLD_REQUEST_TTL def reset_exp(): - self.scheds.first.sched.EXPIRED_HOLD_REQUEST_TTL = orig_exp + self.executor_server.EXPIRED_HOLD_REQUEST_TTL = orig_exp self.addCleanup(reset_exp) @@ -2313,7 +2313,7 @@ class TestScheduler(ZuulTestCase): # Temporarily shorten hold time so that the hold request can be # auto-deleted (which is done on another test failure). And wait # long enough for nodes to expire and request to delete. - self.scheds.first.sched.EXPIRED_HOLD_REQUEST_TTL = 1 + self.executor_server.EXPIRED_HOLD_REQUEST_TTL = 1 time.sleep(3) B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index 12c9a824b2..9d99ce0b94 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -346,6 +346,7 @@ class TestWeb(BaseTestWeb): 'nodeset': { 'groups': [], 'name': '', + 'node_request_id': None, 'nodes': [{'comment': None, 'hold_job': None, 'id': None, @@ -393,6 +394,7 @@ class TestWeb(BaseTestWeb): 'nodeset': { 'groups': [], 'name': '', + 'node_request_id': None, 'nodes': [{'comment': None, 'hold_job': None, 'id': None, diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 02e75d622c..944a87ca12 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -172,7 +172,6 @@ class ExecutorClient(object): zuul_event_id=item.event.zuul_event_id, ) build.parameters = params - build.nodeset = nodeset log.debug("Adding build %s of job %s to item %s", build, job, item) @@ -198,6 +197,15 @@ class ExecutorClient(object): # is up to date. attempts = build.build_set.getTries(job.name) params["zuul"]['attempts'] = attempts + params['max_attempts'] = job.attempts + + # Store the nodeset in the job arguments, so we can lock it on the + # executor side. + # TODO (felix): The nodeset shouldn't be ne necessary anymore as we are + # now updating the nodes directly on the noderequest in the executor. + params["nodeset"] = nodeset.toDict() + params["noderequest_id"] = build.build_set.getJobNodeRequest( + job.name).id functions = getGearmanFunctions(self.gearman) function_name = 'executor:execute' diff --git a/zuul/executor/server.py b/zuul/executor/server.py index e4f1053fe5..425e829766 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -59,9 +59,14 @@ from zuul.executor.sensors.ram import RAMSensor from zuul.lib import commandsocket from zuul.merger.server import BaseMergeServer, RepoLocks from zuul.model import ( - BuildCompletedEvent, BuildPausedEvent, BuildStartedEvent, BuildStatusEvent + BuildCompletedEvent, + BuildPausedEvent, + BuildStartedEvent, + BuildStatusEvent, + NodeSet, + SCHEME_GOLANG, ) -import zuul.model +from zuul.nodepool import Nodepool from zuul.zk.event_queues import PipelineResultEventQueue BUFFER_LINES_FOR_SYNTAX = 200 @@ -827,12 +832,14 @@ class AnsibleJob(object): self.ansible_version = self.arguments.get('ansible_version') # TODO(corvus): Remove default setting after 4.3.0; this is to handle # scheduler/executor version skew. - self.scheme = self.arguments.get('workspace_scheme', - zuul.model.SCHEME_GOLANG) + self.scheme = self.arguments.get('workspace_scheme', SCHEME_GOLANG) self.log = get_annotated_logger( logger, self.zuul_event_id, build=job.unique) self.executor_server = executor_server self.job = job + self.nodeset = NodeSet.fromDict(self.arguments["nodeset"]) + self.node_request = self.executor_server.nodepool.zk_nodepool.getNodeRequest( + self.arguments["noderequest_id"]) self.jobdir = None self.proc = None self.proc_lock = threading.Lock() @@ -996,8 +1003,9 @@ class AnsibleJob(object): self.executor_server.finishJob(self.job.unique) except Exception: self.log.exception("Error finalizing job thread:") - self.log.info("Job execution took: %.3f seconds" % ( - time.monotonic() - self.time_starting_build)) + + self.log.info("Job execution took: %.3f seconds", + self.end_time - self.time_starting_build) def _base_job_data(self): return { @@ -1840,7 +1848,7 @@ class AnsibleJob(object): root, self.executor_server.merge_root, logger=self.log, - scheme=zuul.model.SCHEME_GOLANG) + scheme=SCHEME_GOLANG) merger.checkoutBranch( project.connection_name, project.name, branch, @@ -1880,7 +1888,7 @@ class AnsibleJob(object): root, self.jobdir.src_root, logger=self.log, - scheme=zuul.model.SCHEME_GOLANG, + scheme=SCHEME_GOLANG, cache_scheme=self.scheme) break @@ -1890,7 +1898,7 @@ class AnsibleJob(object): root, self.executor_server.merge_root, logger=self.log, - scheme=zuul.model.SCHEME_GOLANG) + scheme=SCHEME_GOLANG) # If we don't have this repo yet prepared we need to restore # the repo state. Otherwise we have speculative merges in the @@ -2671,6 +2679,9 @@ class ExecutorServer(BaseMergeServer): _job_class = AnsibleJob _repo_locks_class = RepoLocks + # Number of seconds past node expiration a hold request will remain + EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60 + def __init__( self, config, @@ -2823,6 +2834,7 @@ class ExecutorServer(BaseMergeServer): self.process_merge_jobs = get_default(self.config, 'executor', 'merge_jobs', True) + self.nodepool = Nodepool(self.zk_client, self.hostname, self.statsd) self.result_events = PipelineResultEventQueue.createRegistry( self.zk_client @@ -3149,18 +3161,43 @@ class ExecutorServer(BaseMergeServer): def executeJob(self, job): args = json.loads(job.arguments) zuul_event_id = args.get('zuul_event_id') - log = get_annotated_logger(self.log, zuul_event_id) + log = get_annotated_logger(self.log, zuul_event_id, build=job.unique) log.debug("Got %s job: %s", job.name, job.unique) if self.statsd: base_key = 'zuul.executor.{hostname}' self.statsd.incr(base_key + '.builds') + + # Make sure the job is added to the job workers and we have to look it + # up from there in the completeBuild() method to return the NodeSet. self.job_workers[job.unique] = self._job_class(self, job) + + if not self.lockNodes(job, event=zuul_event_id): + # Don't start the build if the nodes could not be locked + return + # Run manageLoad before starting the thread mostly for the # benefit of the unit tests to make the calculation of the # number of starting jobs more deterministic. self.manageLoad() self.job_workers[job.unique].run() + def lockNodes(self, job, event=None): + log = get_annotated_logger(self.log, event, build=job.unique) + try: + log.debug("Locking nodeset") + ansible_job = self.job_workers[job.unique] + self.nodepool.acceptNodes(ansible_job.node_request) + return True + except Exception: + data = dict( + result="NODE_FAILURE", exception=traceback.format_exc() + ) + self.completeBuild(job, data) + # Remove the job from the job_workers as it was never started and + # thus won't finish on its own. + self.finishJob(job.unique) + return False + def run_governor(self): while not self.governor_stop_event.wait(10): try: @@ -3254,6 +3291,141 @@ class ExecutorServer(BaseMergeServer): except Exception: log.exception("Exception sending stop command to worker:") + def _handleExpiredHoldRequest(self, request): + ''' + Check if a hold request is expired and delete it if it is. + + The 'expiration' attribute will be set to the clock time when the + hold request was used for the last time. If this is NOT set, then + the request is still active. + + If a node expiration time is set on the request, and the request is + expired, *and* we've waited for a defined period past the node + expiration (EXPIRED_HOLD_REQUEST_TTL), then we will delete the hold + request. + + :param: request Hold request + :returns: True if it is expired, False otherwise. + ''' + if not request.expired: + return False + + if not request.node_expiration: + # Request has been used up but there is no node expiration, so + # we don't auto-delete it. + return True + + elapsed = time.time() - request.expired + if elapsed < self.EXPIRED_HOLD_REQUEST_TTL + request.node_expiration: + # Haven't reached our defined expiration lifetime, so don't + # auto-delete it yet. + return True + + try: + self.nodepool.zk_nodepool.lockHoldRequest(request) + self.log.info("Removing expired hold request %s", request) + self.nodepool.zk_nodepool.deleteHoldRequest(request) + except Exception: + self.log.exception( + "Failed to delete expired hold request %s", request + ) + finally: + try: + self.nodepool.zk_nodepool.unlockHoldRequest(request) + except Exception: + pass + + return True + + def _getAutoholdRequest(self, ansible_job): + args = ansible_job.arguments + autohold_key_base = ( + args["zuul"]["tenant"], + args["zuul"]["project"]["canonical_name"], + args["zuul"]["job"], + ) + + class Scope(object): + """Enum defining a precedence/priority of autohold requests. + + Autohold requests for specific refs should be fulfilled first, + before those for changes, and generic jobs. + + Matching algorithm goes over all existing autohold requests, and + returns one with the highest number (in case of duplicated + requests the last one wins). + """ + NONE = 0 + JOB = 1 + CHANGE = 2 + REF = 3 + + # Do a partial match of the autohold key against all autohold + # requests, ignoring the last element of the key (ref filter), + # and finally do a regex match between ref filter from + # the autohold request and the build's change ref to check + # if it matches. Lastly, make sure that we match the most + # specific autohold request by comparing "scopes" + # of requests - the most specific is selected. + autohold = None + scope = Scope.NONE + self.log.debug("Checking build autohold key %s", autohold_key_base) + for request_id in self.nodepool.zk_nodepool.getHoldRequests(): + request = self.nodepool.zk_nodepool.getHoldRequest(request_id) + if not request: + continue + + if self._handleExpiredHoldRequest(request): + continue + + ref_filter = request.ref_filter + + if request.current_count >= request.max_count: + # This request has been used the max number of times + continue + elif not ( + request.tenant == autohold_key_base[0] + and request.project == autohold_key_base[1] + and request.job == autohold_key_base[2] + ): + continue + elif not re.match(ref_filter, args["zuul"]["ref"]): + continue + + if ref_filter == ".*": + candidate_scope = Scope.JOB + elif ref_filter.endswith(".*"): + candidate_scope = Scope.CHANGE + else: + candidate_scope = Scope.REF + + self.log.debug( + "Build autohold key %s matched scope %s", + autohold_key_base, + candidate_scope, + ) + if candidate_scope > scope: + scope = candidate_scope + autohold = request + + return autohold + + def _processAutohold(self, ansible_job, result): + # We explicitly only want to hold nodes for jobs if they have + # failed / retry_limit / post_failure and have an autohold request. + hold_list = ["FAILURE", "RETRY_LIMIT", "POST_FAILURE", "TIMED_OUT"] + if result not in hold_list: + return False + + request = self._getAutoholdRequest(ansible_job) + if request is not None: + self.log.debug("Got autohold %s", request) + self.nodepool.holdNodeSet( + ansible_job.node_request.nodeset, request, ansible_job + ) + return True + return False + def startBuild(self, job: gear.TextJob, data: Dict) -> None: # Mark the gearman job as started, as we are still using it for the # actual job execution. The data, however, will be passed to the @@ -3307,7 +3479,47 @@ class ExecutorServer(BaseMergeServer): # result dict for that. result["end_time"] = time.time() + # NOTE (felix): We store the end_time on the ansible job to calculate + # the in-use duration of locked nodes when the nodeset is returned. + ansible_job = self.job_workers[job.unique] + ansible_job.end_time = time.monotonic() + params = json.loads(job.arguments) + # If the result is None, check if the build has reached its max + # attempts and if so set the result to RETRY_LIMIT. + # NOTE (felix): This must be done in order to correctly process the + # autohold in the next step. Since we only want to hold the node if the + # build has reached a final result. + if result.get("result") is None: + attempts = params["zuul"]["attempts"] + max_attempts = params["max_attempts"] + if attempts >= max_attempts: + result["result"] = "RETRY_LIMIT" + + zuul_event_id = params["zuul_event_id"] + log = get_annotated_logger(self.log, zuul_event_id, build=job.unique) + + # Provide the hold information back to the scheduler via the build + # result. + try: + held = self._processAutohold(ansible_job, result.get("result")) + result["held"] = held + log.info("Held status set to %s", held) + except Exception: + log.exception("Unable to process autohold for %s", ansible_job) + + # Regardless of any other conditions which might cause us not to pass + # the build result on to the scheduler/pipeline manager, make sure we + # return the nodes to nodepool. + try: + self.nodepool.returnNodeSet( + ansible_job.node_request.nodeset, ansible_job, zuul_event_id=zuul_event_id + ) + except Exception: + self.log.exception( + "Unable to return nodeset %s", ansible_job.node_request.nodeset + ) + tenant_name = params["zuul"]["tenant"] pipeline_name = params["zuul"]["pipeline"] diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 74d05b08f2..530fa9bb93 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -635,10 +635,6 @@ class PipelineManager(metaclass=ABCMeta): for job in jobs: log.debug("Found job %s for change %s", job, item.change) try: - nodeset = item.current_build_set.getJobNodeSet(job.name) - self.sched.nodepool.useNodeSet( - nodeset, build_set=item.current_build_set, - event=item.event) self.sched.executor.execute( job, item, self.pipeline, build_set.dependent_changes, @@ -1437,7 +1433,6 @@ class PipelineManager(metaclass=ABCMeta): if request.failed or not request.fulfilled: log.info("Node request %s: failure for %s", request, request.job.name) - build_set.item.setNodeRequestFailure(request.job) self._resumeBuilds(request.build_set) tenant = build_set.item.pipeline.tenant tenant.semaphore_handler.release(build_set.item, request.job) diff --git a/zuul/model.py b/zuul/model.py index 3289292218..a3c0549e8b 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -609,6 +609,7 @@ class Node(ConfigObject): self.username = None self.hold_expiration = None self.resources = None + self.allocated_to = None @property def state(self): @@ -717,6 +718,7 @@ class NodeSet(ConfigObject): self.name = name or '' self.nodes = OrderedDict() self.groups = OrderedDict() + self.node_request_id = None def __ne__(self, other): return not self.__eq__(other) @@ -728,9 +730,11 @@ class NodeSet(ConfigObject): self.nodes == other.nodes) def toDict(self): - d = {} - d['name'] = self.name - d['nodes'] = [] + d = { + "node_request_id": self.node_request_id, + "name": self.name, + "nodes": [], + } for node in self.nodes.values(): d['nodes'].append(node.toDict(internal_attributes=True)) d['groups'] = [] @@ -741,6 +745,7 @@ class NodeSet(ConfigObject): @classmethod def fromDict(cls, data): nodeset = cls(data["name"]) + nodeset.node_request_id = data["node_request_id"] for node in data["nodes"]: nodeset.addNode(Node.fromDict(node)) for group in data["groups"]: @@ -749,6 +754,7 @@ class NodeSet(ConfigObject): def copy(self): n = NodeSet(self.name) + n.node_request_id = self.node_request_id for name, node in self.nodes.items(): n.addNode(Node(node.name, node.label)) for name, group in self.groups.items(): @@ -863,6 +869,10 @@ class NodeRequest(object): def toDict(self): # Start with any previously read data d = self._zk_data.copy() + # This is just the nodeset structure without data, but we need it to + # update the nodes on the executor side when the node request is + # fulfilled. + d["nodeset"] = self.nodeset.toDict() nodes = [n.label for n in self.nodeset.getNodes()] # These are immutable once set d.setdefault('node_types', nodes) @@ -882,6 +892,32 @@ class NodeRequest(object): self.state_time = data['state_time'] self.relative_priority = data.get('relative_priority', 0) + @classmethod + def fromDict(cls, data): + request = cls( + requestor=data["requestor"], + # TODO (felix): How to get the build_set and the job? They + # shouldn't be relevant for the current implementation, but we + # should know that they are missing. + build_set=None, + job=None, + nodeset=NodeSet.fromDict(data["nodeset"]), + relative_priority=data.get("relative_priority", 0), + ) + + request.created_time = data["created_time"] + request.provider = data["provider"] + + # Set _state directly to bypass the setter and avoid overwriting the + # state_time. + request._state = data["state"] + request.state_time = data["state_time"] + request.event_id = data["event_id"] + + request._zk_data = data + + return request + class Secret(ConfigObject): """A collection of private data. @@ -2040,7 +2076,6 @@ class Build(object): self.worker = Worker() self.node_labels = [] self.node_name = None - self.nodeset = None self.zuul_event_id = zuul_event_id def __repr__(self): @@ -2261,7 +2296,7 @@ class BuildSet(object): if job_name in self.nodesets: raise Exception("Prior node request for %s" % (job_name)) self.nodesets[job_name] = nodeset - del self.node_requests[job_name] + #del self.node_requests[job_name] def getTries(self, job_name): return self.tries.get(job_name, 0) @@ -2942,14 +2977,6 @@ class QueueItem(object): fakebuild.result = 'SKIPPED' self.addBuild(fakebuild) - def setNodeRequestFailure(self, job): - fakebuild = Build(job, self.current_build_set, None) - fakebuild.start_time = time.time() - fakebuild.end_time = time.time() - self.addBuild(fakebuild) - fakebuild.result = 'NODE_FAILURE' - self.setResult(fakebuild) - def setDequeuedNeedingChange(self): self.dequeued_needing_change = True self._setAllJobsSkipped() diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 69ce515f4e..62562d3a85 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -128,6 +128,10 @@ class Nodepool(object): # the executor side. self.sched.onNodesProvisioned(req) del self.requests[req.uid] + + # Store the node_request id on the nodeset for later evaluation + nodeset.node_request_id = req.id + return req def cancelRequest(self, request): @@ -173,7 +177,7 @@ class Nodepool(object): except Exception: log.exception("Unable to unlock node request %s", request) - def holdNodeSet(self, nodeset, request, build): + def holdNodeSet(self, nodeset, request, ansible_job): ''' Perform a hold on the given set of nodes. @@ -197,7 +201,7 @@ class Nodepool(object): self.zk_nodepool.storeNode(node) request.nodes.append(dict( - build=build.uuid, + build=ansible_job.job.unique, nodes=[node.id for node in nodes], )) request.current_count += 1 @@ -225,21 +229,22 @@ class Nodepool(object): # _doBuildCompletedEvent, we always want to try to unlock it. self.zk_nodepool.unlockHoldRequest(request) - def useNodeSet(self, nodeset, build_set=None, event=None): - self.log.info("Setting nodeset %s in use" % (nodeset,)) + def useNodeSet(self, nodeset, ansible_job=None, event=None): + self.log.info("Setting nodeset %s in use", nodeset) resources = defaultdict(int) for node in nodeset.getNodes(): if node.lock is None: - raise Exception("Node %s is not locked" % (node,)) + raise Exception("Node %s is not locked", node) node.state = model.STATE_IN_USE self.zk_nodepool.storeNode(node) if node.resources: add_resources(resources, node.resources) - if build_set and resources: + if ansible_job and resources: + args = ansible_job.arguments # we have a buildset and thus also tenant and project so we # can emit project specific resource usage stats - tenant_name = build_set.item.layout.tenant.name - project_name = build_set.item.change.project.canonical_name + tenant_name = args["zuul"]["tenant"] + project_name = args["zuul"]["project"]["canonical_name"] self.current_resources_by_tenant.setdefault( tenant_name, defaultdict(int)) @@ -252,24 +257,11 @@ class Nodepool(object): resources) self.emitStatsResources() - def returnNodeSet(self, nodeset, build=None, zuul_event_id=None): + def returnNodeSet(self, nodeset, ansible_job=None, zuul_event_id=None): log = get_annotated_logger(self.log, zuul_event_id) log.info("Returning nodeset %s", nodeset) resources = defaultdict(int) - duration = None - project = None - tenant = None - if build: - project = build.build_set.item.change.project - tenant = build.build_set.item.pipeline.tenant.name - if (build and build.start_time and build.end_time and - build.build_set and build.build_set.item and - build.build_set.item.change and - build.build_set.item.change.project): - duration = build.end_time - build.start_time - log.info("Nodeset %s with %s nodes was in use " - "for %s seconds for build %s for project %s", - nodeset, len(nodeset.nodes), duration, build, project) + for node in nodeset.getNodes(): if node.lock is None: log.error("Node %s is not locked", node) @@ -285,21 +277,33 @@ class Nodepool(object): "while unlocking:", node) self._unlockNodes(nodeset.getNodes()) + if not ansible_job: + return + + args = ansible_job.arguments + project = args["zuul"]["project"]["canonical_name"] + tenant = args["zuul"]["tenant"] + duration = 0 + if ansible_job.end_time and ansible_job.time_starting_build: + duration = ansible_job.end_time - ansible_job.time_starting_build + log.info("Nodeset %s with %s nodes was in use " + "for %s seconds for build %s for project %s", + nodeset, len(nodeset.nodes), duration, ansible_job, project) + # When returning a nodeset we need to update the gauges if we have a # build. Further we calculate resource*duration and increment their # tenant or project specific counters. With that we have both the # current value and also counters to be able to perform accounting. - if tenant and project and resources: - project_name = project.canonical_name + if resources: subtract_resources( self.current_resources_by_tenant[tenant], resources) subtract_resources( - self.current_resources_by_project[project_name], resources) + self.current_resources_by_project[project], resources) self.emitStatsResources() if duration: self.emitStatsResourceCounters( - tenant, project_name, resources, duration) + tenant, project, resources, duration) def unlockNodeSet(self, nodeset): self._unlockNodes(nodeset.getNodes()) @@ -312,15 +316,12 @@ class Nodepool(object): self.log.exception("Error unlocking node:") def lockNodeSet(self, nodeset, request_id): - self._lockNodes(nodeset.getNodes(), request_id) - - def _lockNodes(self, nodes, request_id): # Try to lock all of the supplied nodes. If any lock fails, # try to unlock any which have already been locked before # re-raising the error. locked_nodes = [] try: - for node in nodes: + for node in nodeset.getNodes(): if node.allocated_to != request_id: raise Exception("Node %s allocated to %s, not %s" % (node.id, node.allocated_to, request_id)) @@ -338,6 +339,9 @@ class Nodepool(object): # node. log.debug("Updating node request %s", request) + # Update the node_request id on the nodeset + request.nodeset.node_request_id = request.id + if request.uid not in self.requests: log.debug("Request %s is unknown", request.uid) return False @@ -372,7 +376,7 @@ class Nodepool(object): return True - def acceptNodes(self, request, request_id): + def acceptNodes(self, request): log = get_annotated_logger(self.log, request.event_id) # Called by the scheduler when it wants to accept and lock @@ -382,11 +386,19 @@ class Nodepool(object): log.info("Accepting node request %s", request) + # TODO (felix): I think this shouldn't be a problem anymore as the + # node request is now directly retrieved from ZooKeeper before + # accepting the nodes on the executor. + """ if request_id != request.id: log.info("Skipping node accept for %s (resubmitted as %s)", request_id, request.id) return False + """ + # TODO (felix): The canceled might also not be necessary anymore as the + # executor won't be able to retrieve the NodeRequest from ZooKeeper if + # it was deleted. if request.canceled: log.info("Ignoring canceled node request %s", request) # The request was already deleted when it was canceled @@ -419,13 +431,13 @@ class Nodepool(object): # request probably doesn't make sense at this point in time as it # is likely to directly fail again. So just log the problem # with zookeeper and fail here. - log.exception("Error getting node request %s:", request_id) + log.exception("Error getting node request %s:", request) request.failed = True return True locked = False if request.fulfilled: - # If the request suceeded, try to lock the nodes. + # If the request succeeded, try to lock the nodes. try: self.lockNodeSet(request.nodeset, request.id) locked = True @@ -446,4 +458,6 @@ class Nodepool(object): # them. if locked: self.unlockNodeSet(request.nodeset) - return True + + if request.failed: + raise Exception("Accepting nodes failed") diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 96032c33aa..72016e82e3 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -18,7 +18,6 @@ import json import logging import os -import re import socket import sys import threading @@ -110,9 +109,6 @@ class Scheduler(threading.Thread): _cleanup_interval = 60 * 60 _merger_client_class = MergeClient - # Number of seconds past node expiration a hold request will remain - EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60 - def __init__(self, config, connections, app, testonly=False): threading.Thread.__init__(self) self.daemon = True @@ -1539,131 +1535,6 @@ class Scheduler(threading.Thread): return pipeline.manager.onBuildPaused(build) - def _handleExpiredHoldRequest(self, request): - ''' - Check if a hold request is expired and delete it if it is. - - The 'expiration' attribute will be set to the clock time when the - hold request was used for the last time. If this is NOT set, then - the request is still active. - - If a node expiration time is set on the request, and the request is - expired, *and* we've waited for a defined period past the node - expiration (EXPIRED_HOLD_REQUEST_TTL), then we will delete the hold - request. - - :returns: True if it is expired, False otherwise. - ''' - if not request.expired: - return False - - if not request.node_expiration: - # Request has been used up but there is no node expiration, so - # we don't auto-delete it. - return True - - elapsed = time.time() - request.expired - if elapsed < self.EXPIRED_HOLD_REQUEST_TTL + request.node_expiration: - # Haven't reached our defined expiration lifetime, so don't - # auto-delete it yet. - return True - - try: - self.zk_nodepool.lockHoldRequest(request) - self.log.info("Removing expired hold request %s", request) - self.zk_nodepool.deleteHoldRequest(request) - except Exception: - self.log.exception( - "Failed to delete expired hold request %s", request) - finally: - try: - self.zk_nodepool.unlockHoldRequest(request) - except Exception: - pass - - return True - - def _getAutoholdRequest(self, build): - change = build.build_set.item.change - - autohold_key_base = (build.pipeline.tenant.name, - change.project.canonical_name, - build.job.name) - - class Scope(object): - """Enum defining a precedence/priority of autohold requests. - - Autohold requests for specific refs should be fulfilled first, - before those for changes, and generic jobs. - - Matching algorithm goes over all existing autohold requests, and - returns one with the highest number (in case of duplicated - requests the last one wins). - """ - NONE = 0 - JOB = 1 - CHANGE = 2 - REF = 3 - - # Do a partial match of the autohold key against all autohold - # requests, ignoring the last element of the key (ref filter), - # and finally do a regex match between ref filter from - # the autohold request and the build's change ref to check - # if it matches. Lastly, make sure that we match the most - # specific autohold request by comparing "scopes" - # of requests - the most specific is selected. - autohold = None - scope = Scope.NONE - self.log.debug("Checking build autohold key %s", autohold_key_base) - for request_id in self.zk_nodepool.getHoldRequests(): - request = self.zk_nodepool.getHoldRequest(request_id) - if not request: - continue - - if self._handleExpiredHoldRequest(request): - continue - - ref_filter = request.ref_filter - - if request.current_count >= request.max_count: - # This request has been used the max number of times - continue - elif not (request.tenant == autohold_key_base[0] and - request.project == autohold_key_base[1] and - request.job == autohold_key_base[2]): - continue - elif not re.match(ref_filter, change.ref): - continue - - if ref_filter == ".*": - candidate_scope = Scope.JOB - elif ref_filter.endswith(".*"): - candidate_scope = Scope.CHANGE - else: - candidate_scope = Scope.REF - - self.log.debug("Build autohold key %s matched scope %s", - autohold_key_base, candidate_scope) - if candidate_scope > scope: - scope = candidate_scope - autohold = request - - return autohold - - def _processAutohold(self, build): - # We explicitly only want to hold nodes for jobs if they have - # failed / retry_limit / post_failure and have an autohold request. - hold_list = ["FAILURE", "RETRY_LIMIT", "POST_FAILURE", "TIMED_OUT"] - if build.result not in hold_list: - return False - - request = self._getAutoholdRequest(build) - if request is not None: - self.log.debug("Got autohold %s", request) - self.nodepool.holdNodeSet(build.nodeset, request, build) - return True - return False - def _doBuildCompletedEvent(self, event): # Get the local build object from the executor client build = self.executor.builds.get(event.build) @@ -1682,13 +1553,8 @@ class Scheduler(threading.Thread): build.error_detail = event_result.get("error_detail") if result is None: - if ( - build.build_set.getTries(build.job.name) >= build.job.attempts - ): - result = "RETRY_LIMIT" - else: - build.retry = True - if result in ("DISCONNECT", "ABORTED"): + build.retry = True + if result == "ABORTED": # Always retry if the executor just went away build.retry = True if result == "MERGER_FAILURE": @@ -1709,7 +1575,6 @@ class Scheduler(threading.Thread): result_data = event_result.get("data", {}) warnings = event_result.get("warnings", []) - log.info("Build complete, result %s, warnings %s", result, warnings) if build.retry: @@ -1724,25 +1589,15 @@ class Scheduler(threading.Thread): build.end_time = event_result["end_time"] build.result_data = result_data build.build_set.warning_messages.extend(warnings) + build.held = event_result.get("held") build.result = result self._reportBuildStats(build) - # Regardless of any other conditions which might cause us not - # to pass this on to the pipeline manager, make sure we return - # the nodes to nodepool. - try: - build.held = self._processAutohold(build) - self.log.debug( - 'build "%s" held status set to %s', build, build.held - ) - except Exception: - log.exception("Unable to process autohold for %s", build) - try: - self.nodepool.returnNodeSet(build.nodeset, build=build, - zuul_event_id=build.zuul_event_id) - except Exception: - log.exception("Unable to return nodeset %s" % build.nodeset) + # The build is completed and the nodes were already returned by the + # executor. For consistency, also remove the node request from the + # build set. + build.build_set.removeJobNodeRequest(build.job.name) # The test suite expects the build to be removed from the # internal dict after it's added to the report queue. @@ -1790,14 +1645,9 @@ class Scheduler(threading.Thread): def _doNodesProvisionedEvent(self, event): request = event.request - request_id = event.request_id build_set = request.build_set log = get_annotated_logger(self.log, request.event_id) - ready = self.nodepool.acceptNodes(request, request_id) - if not ready: - return - if build_set is not build_set.item.current_build_set: log.warning("Build set %s is not current " "for node request %s", build_set, request) @@ -1919,7 +1769,7 @@ class Scheduler(threading.Thread): nodeset = buildset.getJobNodeSet(job_name) if nodeset: self.nodepool.returnNodeSet( - nodeset, build=build, zuul_event_id=item.event) + nodeset, zuul_event_id=item.event) # In the unlikely case that a build is removed and # later added back, make sure we clear out the nodeset diff --git a/zuul/zk/nodepool.py b/zuul/zk/nodepool.py index 36a92500f1..2a4b4e3a58 100644 --- a/zuul/zk/nodepool.py +++ b/zuul/zk/nodepool.py @@ -21,7 +21,7 @@ from kazoo.recipe.cache import TreeEvent from kazoo.recipe.lock import Lock import zuul.model -from zuul.model import HoldRequest +from zuul.model import HoldRequest, NodeRequest from zuul.zk import ZooKeeperClient, ZooKeeperBase from zuul.zk.exceptions import LockException @@ -400,6 +400,41 @@ class ZooKeeperNodepool(ZooKeeperBase): self.kazoo_client.DataWatch(path, callback) + def getNodeRequest(self, node_request_id): + """ + Retrieve a NodeRequest from a given path in ZooKeeper + """ + + # Create an empty NodeRequest object which will be updated with the + # current data in ZooKeeper. This will ensure that also the nodes are + # updated. + """ + obj = NodeRequest( + requestor=None, + build_set=None, + job=None, + nodeset=None, + relative_priority=0, + event=None + ) + obj.id = node_request_id + """ + + path = f"{self.REQUEST_ROOT}/{node_request_id}" + try: + data, stat = self.kazoo_client.get(path) + except NoNodeError: + return None + + if not data: + return None + + obj = NodeRequest.fromDict(json.loads(data.decode("utf-8"))) + self.updateNodeRequest(obj, data) + obj.id = node_request_id + obj.stat = stat + return obj + def deleteNodeRequest(self, node_request): """ Delete a request for nodes. @@ -455,6 +490,7 @@ class ZooKeeperNodepool(ZooKeeperBase): request_nodes[i].id = nodeid self._updateNode(request_nodes[i]) node_request.updateFromDict(data) + #node_request.stat = stat def storeNode(self, node): """ @@ -509,15 +545,16 @@ class ZooKeeperNodepool(ZooKeeperBase): """ Unlock a node. - The node must already have been locked. - :param Node node: The node which should be unlocked. """ if node.lock is None: - raise LockException("Node %s does not hold a lock" % (node,)) - node.lock.release() - node.lock = None + # This could happen if acquiring the lock failed and shouldn't be + # treated as an error. + self.log.warning("Node %s does not hold a lock", node) + else: + node.lock.release() + node.lock = None def lockNodeRequest(self, request, blocking=True, timeout=None): """