diff --git a/zuul/merger/client.py b/zuul/merger/client.py index ae6fb31bfb..40e4198827 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -17,10 +17,17 @@ from uuid import uuid4 from zuul.lib.config import get_default from zuul.lib.logutil import get_annotated_logger -from zuul.model import MergeRequest, PRECEDENCE_HIGH, PRECEDENCE_NORMAL +from zuul.model import ( + FilesChangesCompletedEvent, + MergeCompletedEvent, + MergeRequest, + PRECEDENCE_HIGH, + PRECEDENCE_NORMAL, +) +from zuul.zk.event_queues import PipelineResultEventQueue from zuul.zk.merger import MergerApi from zuul.zk.exceptions import JobRequestNotFound -from kazoo.exceptions import BadVersionError +from kazoo.exceptions import BadVersionError, NoNodeError class MergeClient(object): @@ -34,6 +41,9 @@ class MergeClient(object): self.git_timeout = get_default( self.config, 'merger', 'git_timeout', 300) self.merger_api = self._merger_api_class(self.sched.zk_client) + self.result_events = PipelineResultEventQueue.createRegistry( + self.sched.zk_client + ) def submitJob( self, @@ -135,6 +145,40 @@ class MergeClient(object): self.log.exception("Exception cleaning up lost merge request:") def cleanupLostMergeRequest(self, merge_request): + # Provide a result either via a result future or a result event + if merge_request.result_path: + self.log.debug( + "Merge request cleanup providing synchronous result " + "via future for %s", merge_request) + result = {} + self.merger_api.reportResult(merge_request, result) + + elif merge_request.build_set_uuid: + self.log.debug( + "Merge request cleanup providing asynchronous result " + "via result event for %s", merge_request) + if merge_request.job_type == MergeRequest.FILES_CHANGES: + event = FilesChangesCompletedEvent( + merge_request.build_set_uuid, files=None + ) + else: + event = MergeCompletedEvent( + merge_request.uuid, + merge_request.build_set_uuid, + merged=False, + updated=False, + commit=None, + files=None, + repo_state=None, + item_in_branches=None, + ) + try: + self.result_events[merge_request.tenant_name][ + merge_request.pipeline_name].put(event) + except NoNodeError: + self.log.warning("Pipeline was removed: %s", + merge_request.pipeline_name) + merge_request.state = MergeRequest.COMPLETED try: self.merger_api.update(merge_request) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 46e9280682..c55b7022d2 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -587,6 +587,7 @@ class Scheduler(threading.Thread): self.log.debug("Starting build request cleanup") self.executor.cleanupLostBuildRequests() finally: + self.log.debug("Finished build request cleanup") self.build_request_cleanup_lock.release() def _runMergeRequestCleanup(self): @@ -596,6 +597,7 @@ class Scheduler(threading.Thread): self.log.debug("Starting merge request cleanup") self.merger.cleanupLostMergeRequests() finally: + self.log.debug("Finished merge request cleanup") self.merge_request_cleanup_lock.release() def _runConnectionCleanup(self):