Send synthetic merge completed events on cleanup

When a merger crashes, the scheduler identifies merge jobs which
were left in an incomplete state and cleans them up.  However there
may be queue items waiting for merge complete events, and nothing
generates those in this case.

Update the merge job cleanup procedure to mimic the executor job
cleanup procedure which, in addition to deleting the incomplete job
requests, also creates synthetic complete events in order to prompt
the scheduler to resume processing.

Change-Id: Idea384f636a0cd9e8c82ee92d3f5a65bef0889f2
This commit is contained in:
James E. Blair 2021-09-20 10:31:54 -07:00
parent d6c738abdb
commit 66008900a8
2 changed files with 48 additions and 2 deletions

View File

@ -17,10 +17,17 @@ from uuid import uuid4
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
from zuul.model import MergeRequest, PRECEDENCE_HIGH, PRECEDENCE_NORMAL
from zuul.model import (
FilesChangesCompletedEvent,
MergeCompletedEvent,
MergeRequest,
PRECEDENCE_HIGH,
PRECEDENCE_NORMAL,
)
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.merger import MergerApi
from zuul.zk.exceptions import JobRequestNotFound
from kazoo.exceptions import BadVersionError
from kazoo.exceptions import BadVersionError, NoNodeError
class MergeClient(object):
@ -34,6 +41,9 @@ class MergeClient(object):
self.git_timeout = get_default(
self.config, 'merger', 'git_timeout', 300)
self.merger_api = self._merger_api_class(self.sched.zk_client)
self.result_events = PipelineResultEventQueue.createRegistry(
self.sched.zk_client
)
def submitJob(
self,
@ -135,6 +145,40 @@ class MergeClient(object):
self.log.exception("Exception cleaning up lost merge request:")
def cleanupLostMergeRequest(self, merge_request):
# Provide a result either via a result future or a result event
if merge_request.result_path:
self.log.debug(
"Merge request cleanup providing synchronous result "
"via future for %s", merge_request)
result = {}
self.merger_api.reportResult(merge_request, result)
elif merge_request.build_set_uuid:
self.log.debug(
"Merge request cleanup providing asynchronous result "
"via result event for %s", merge_request)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
merge_request.build_set_uuid, files=None
)
else:
event = MergeCompletedEvent(
merge_request.uuid,
merge_request.build_set_uuid,
merged=False,
updated=False,
commit=None,
files=None,
repo_state=None,
item_in_branches=None,
)
try:
self.result_events[merge_request.tenant_name][
merge_request.pipeline_name].put(event)
except NoNodeError:
self.log.warning("Pipeline was removed: %s",
merge_request.pipeline_name)
merge_request.state = MergeRequest.COMPLETED
try:
self.merger_api.update(merge_request)

View File

@ -587,6 +587,7 @@ class Scheduler(threading.Thread):
self.log.debug("Starting build request cleanup")
self.executor.cleanupLostBuildRequests()
finally:
self.log.debug("Finished build request cleanup")
self.build_request_cleanup_lock.release()
def _runMergeRequestCleanup(self):
@ -596,6 +597,7 @@ class Scheduler(threading.Thread):
self.log.debug("Starting merge request cleanup")
self.merger.cleanupLostMergeRequests()
finally:
self.log.debug("Finished merge request cleanup")
self.merge_request_cleanup_lock.release()
def _runConnectionCleanup(self):