From 66008900a845a8b16922aa6c4d79bab05f28948d Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 20 Sep 2021 10:31:54 -0700 Subject: [PATCH] Send synthetic merge completed events on cleanup When a merger crashes, the scheduler identifies merge jobs which were left in an incomplete state and cleans them up. However there may be queue items waiting for merge complete events, and nothing generates those in this case. Update the merge job cleanup procedure to mimic the executor job cleanup procedure which, in addition to deleting the incomplete job requests, also creates synthetic complete events in order to prompt the scheduler to resume processing. Change-Id: Idea384f636a0cd9e8c82ee92d3f5a65bef0889f2 --- zuul/merger/client.py | 48 +++++++++++++++++++++++++++++++++++++++++-- zuul/scheduler.py | 2 ++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/zuul/merger/client.py b/zuul/merger/client.py index ae6fb31bfb..40e4198827 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -17,10 +17,17 @@ from uuid import uuid4 from zuul.lib.config import get_default from zuul.lib.logutil import get_annotated_logger -from zuul.model import MergeRequest, PRECEDENCE_HIGH, PRECEDENCE_NORMAL +from zuul.model import ( + FilesChangesCompletedEvent, + MergeCompletedEvent, + MergeRequest, + PRECEDENCE_HIGH, + PRECEDENCE_NORMAL, +) +from zuul.zk.event_queues import PipelineResultEventQueue from zuul.zk.merger import MergerApi from zuul.zk.exceptions import JobRequestNotFound -from kazoo.exceptions import BadVersionError +from kazoo.exceptions import BadVersionError, NoNodeError class MergeClient(object): @@ -34,6 +41,9 @@ class MergeClient(object): self.git_timeout = get_default( self.config, 'merger', 'git_timeout', 300) self.merger_api = self._merger_api_class(self.sched.zk_client) + self.result_events = PipelineResultEventQueue.createRegistry( + self.sched.zk_client + ) def submitJob( self, @@ -135,6 +145,40 @@ class MergeClient(object): self.log.exception("Exception cleaning up lost merge request:") def cleanupLostMergeRequest(self, merge_request): + # Provide a result either via a result future or a result event + if merge_request.result_path: + self.log.debug( + "Merge request cleanup providing synchronous result " + "via future for %s", merge_request) + result = {} + self.merger_api.reportResult(merge_request, result) + + elif merge_request.build_set_uuid: + self.log.debug( + "Merge request cleanup providing asynchronous result " + "via result event for %s", merge_request) + if merge_request.job_type == MergeRequest.FILES_CHANGES: + event = FilesChangesCompletedEvent( + merge_request.build_set_uuid, files=None + ) + else: + event = MergeCompletedEvent( + merge_request.uuid, + merge_request.build_set_uuid, + merged=False, + updated=False, + commit=None, + files=None, + repo_state=None, + item_in_branches=None, + ) + try: + self.result_events[merge_request.tenant_name][ + merge_request.pipeline_name].put(event) + except NoNodeError: + self.log.warning("Pipeline was removed: %s", + merge_request.pipeline_name) + merge_request.state = MergeRequest.COMPLETED try: self.merger_api.update(merge_request) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 46e9280682..c55b7022d2 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -587,6 +587,7 @@ class Scheduler(threading.Thread): self.log.debug("Starting build request cleanup") self.executor.cleanupLostBuildRequests() finally: + self.log.debug("Finished build request cleanup") self.build_request_cleanup_lock.release() def _runMergeRequestCleanup(self): @@ -596,6 +597,7 @@ class Scheduler(threading.Thread): self.log.debug("Starting merge request cleanup") self.merger.cleanupLostMergeRequests() finally: + self.log.debug("Finished merge request cleanup") self.merge_request_cleanup_lock.release() def _runConnectionCleanup(self):