Send merge completed events even in case of error

The scheduler depends on merge completed events in order to advance
the lifecycle of a queue item.  Without them, items can be stuck in
the queue indefinitely.

In the case of certain merge errors, we may not have submitted a
result to the event queue.  This change corrects that.

Change-Id: I9527c79868ede31f1fa68faf93ff113ac786462b
This commit is contained in:
James E. Blair 2021-08-19 10:19:06 -07:00
parent 15b589c1e4
commit 9fa3c6ec6e
5 changed files with 86 additions and 55 deletions

13
tests/fixtures/git_fail.sh vendored Executable file
View File

@ -0,0 +1,13 @@
#!/bin/sh
echo $*
case "$1" in
fetch)
echo "Fake git error"
exit 1
;;
version)
echo "git version 1.0.0"
exit 0
;;
esac

View File

@ -195,9 +195,10 @@ class TestMergerRepo(ZuulTestCase):
parent_path = os.path.join(self.upstream_root, 'org/project1')
self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
os.path.join(FIXTURE_DIR, 'fake_git.sh'))
self.patch(Repo, 'retry_attempts', 1)
work_repo = Repo(parent_path, self.workspace_root,
'none@example.org', 'User Name', '0', '0',
git_timeout=0.001, retry_attempts=1)
git_timeout=0.001)
# TODO: have the merger and repo classes catch fewer
# exceptions, including this one on initialization. For the
# test, we try cloning again.
@ -207,9 +208,9 @@ class TestMergerRepo(ZuulTestCase):
def test_fetch_timeout(self):
parent_path = os.path.join(self.upstream_root, 'org/project1')
self.patch(Repo, 'retry_attempts', 1)
work_repo = Repo(parent_path, self.workspace_root,
'none@example.org', 'User Name', '0', '0',
retry_attempts=1)
'none@example.org', 'User Name', '0', '0')
work_repo.git_timeout = 0.001
self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
os.path.join(FIXTURE_DIR, 'fake_git.sh'))
@ -219,9 +220,9 @@ class TestMergerRepo(ZuulTestCase):
def test_fetch_retry(self):
parent_path = os.path.join(self.upstream_root, 'org/project1')
self.patch(Repo, 'retry_interval', 1)
work_repo = Repo(parent_path, self.workspace_root,
'none@example.org', 'User Name', '0', '0',
retry_interval=1)
'none@example.org', 'User Name', '0', '0')
self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
os.path.join(FIXTURE_DIR, 'git_fetch_error.sh'))
work_repo.update()

View File

@ -36,6 +36,7 @@ from zuul.driver.gerrit import gerritreporter
import zuul.scheduler
import zuul.rpcclient
import zuul.model
import zuul.merger.merger
from tests.base import (
SSLZuulTestCase,
@ -45,6 +46,7 @@ from tests.base import (
iterate_timeout,
RecordingExecutorServer,
TestConnectionRegistry,
FIXTURE_DIR,
)
from zuul.zk.layout import LayoutState
@ -6506,6 +6508,17 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertTrue('YAY' in A.messages[0])
self.assertTrue('BOO' in A.messages[0])
def test_merge_error(self):
# Test we don't get stuck on a merger error
self.waitUntilSettled()
self.patch(zuul.merger.merger.Repo, 'retry_attempts', 1)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.patch(git.Git, 'GIT_PYTHON_GIT_EXECUTABLE',
os.path.join(FIXTURE_DIR, 'git_fail.sh'))
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
class TestChangeQueues(ZuulTestCase):
tenant_config_file = 'config/change-queues/main.yaml'

View File

@ -68,10 +68,12 @@ def nullcontext():
class Repo(object):
commit_re = re.compile(r'^commit ([0-9a-f]{40})$')
diff_re = re.compile(r'^@@ -\d+,\d \+(\d+),\d @@$')
retry_attempts = 3
retry_interval = 30
def __init__(self, remote, local, email, username, speed_limit, speed_time,
sshkey=None, cache_path=None, logger=None, git_timeout=300,
retry_attempts=3, retry_interval=30, zuul_event_id=None):
zuul_event_id=None):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
@ -92,8 +94,6 @@ class Repo(object):
self.username = username
self.cache_path = cache_path
self._initialized = False
self.retry_attempts = retry_attempts
self.retry_interval = retry_interval
try:
self._setup_known_hosts()
except Exception:

View File

@ -317,61 +317,65 @@ class BaseMergeServer(metaclass=ABCMeta):
def completeMergeJob(self, merge_request, result):
log = get_annotated_logger(self.log, merge_request.event_id)
if result is not None:
payload = json.dumps(result)
self.log.debug("Completed %s job %s: payload size: %s",
merge_request.job_type, merge_request.uuid,
sys.getsizeof(payload))
merged = result.get("merged", False)
updated = result.get("updated", False)
commit = result.get("commit")
repo_state = result.get("repo_state", {})
item_in_branches = result.get("item_in_branches", [])
files = result.get("files", {})
# Always provide a result event, even if we have no
# information; otherwise items can get stuck in the queue.
if result is None:
result = {}
log.info(
"Merge %s complete, merged: %s, updated: %s, commit: %s, "
"branches: %s",
payload = json.dumps(result)
self.log.debug("Completed %s job %s: payload size: %s",
merge_request.job_type, merge_request.uuid,
sys.getsizeof(payload))
merged = result.get("merged", False)
updated = result.get("updated", False)
commit = result.get("commit")
repo_state = result.get("repo_state", {})
item_in_branches = result.get("item_in_branches", [])
files = result.get("files", {})
log.info(
"Merge %s complete, merged: %s, updated: %s, commit: %s, "
"branches: %s",
merge_request,
merged,
updated,
commit,
item_in_branches,
)
# Provide a result either via a result future or a result event
if merge_request.result_path:
log.debug(
"Providing synchronous result via future for %s",
merge_request,
merged,
updated,
commit,
item_in_branches,
)
self.merger_api.reportResult(merge_request, result)
# Provide a result either via a result future or a result event
if merge_request.result_path:
log.debug(
"Providing synchronous result via future for %s",
merge_request,
elif merge_request.build_set_uuid:
log.debug(
"Providing asynchronous result via result event for %s",
merge_request,
)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
merge_request.build_set_uuid, files
)
self.merger_api.reportResult(merge_request, result)
elif merge_request.build_set_uuid:
log.debug(
"Providing asynchronous result via result event for %s",
merge_request,
else:
event = MergeCompletedEvent(
merge_request.uuid,
merge_request.build_set_uuid,
merged,
updated,
commit,
files,
repo_state,
item_in_branches,
)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
merge_request.build_set_uuid, files
)
else:
event = MergeCompletedEvent(
merge_request.uuid,
merge_request.build_set_uuid,
merged,
updated,
commit,
files,
repo_state,
item_in_branches,
)
tenant_name = merge_request.tenant_name
pipeline_name = merge_request.pipeline_name
tenant_name = merge_request.tenant_name
pipeline_name = merge_request.pipeline_name
self.result_events[tenant_name][pipeline_name].put(event)
self.result_events[tenant_name][pipeline_name].put(event)
# Set the merge request to completed, unlock and delete it. Although
# the state update is mainly for consistency reasons, it might come in