Don't call the merger for non-live items

When processing changes with dependencies in pipelines using the
independent pipeline manager we enqueue non-live items that represent
the dependent changes but don't run tests. Currently we perform a
merge operation also with every non-live item. Especially when
rebasing large stacks in Gerrit this can lead to a huge overhead of
merges. In this case we perform 1 + 2 + ... + n merges where n is the
size of the stack in Gerrit. E.g. a rebase of a stack of 20 changes we
perform 210 merge operations.

Those merges are needed to get the changed config of every item ahead
because we need the resulting config changes to produce a correct
layout. Fortunately the merger already returns a list of file changes
per item. We can leverage this to populate the changed files of all
non-live changes ahead when we receive the merge result of the live
change. This makes it possible to skip the merges of the non-live
changes ahead and have one merge operation per change.

In the rebase example above this optimization reduces the number of
performed merges from 210 to 20.

Change-Id: I82848367bd6f191ec5ae5822a1f438070cde14e1
This commit is contained in:
Tobias Henkel 2019-03-16 14:18:34 +01:00
parent 9b6c7e60b5
commit 2b3a041e61
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
5 changed files with 308 additions and 9 deletions

View File

@ -1555,6 +1555,19 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
return hosts return hosts
class RecordingMergeClient(zuul.merger.client.MergeClient):
def __init__(self, config, sched):
super().__init__(config, sched)
self.history = {}
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
self.history.setdefault(name, [])
self.history[name].append((data, build_set))
return super().submitJob(name, data, build_set, precedence)
class RecordingExecutorServer(zuul.executor.server.ExecutorServer): class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
"""An Ansible executor to be used in tests. """An Ansible executor to be used in tests.
@ -2517,8 +2530,7 @@ class ZuulTestCase(BaseTestCase):
self.executor_client = zuul.executor.client.ExecutorClient( self.executor_client = zuul.executor.client.ExecutorClient(
self.config, self.sched) self.config, self.sched)
self.merge_client = zuul.merger.client.MergeClient( self.merge_client = RecordingMergeClient(self.config, self.sched)
self.config, self.sched)
self.merge_server = None self.merge_server = None
self.nodepool = zuul.nodepool.Nodepool(self.sched) self.nodepool = zuul.nodepool.Nodepool(self.sched)
self.zk = zuul.zk.ZooKeeper() self.zk = zuul.zk.ZooKeeper()

View File

@ -21,6 +21,7 @@ import git
import testtools import testtools
from zuul.merger.merger import Repo from zuul.merger.merger import Repo
from zuul.model import MERGER_MERGE_RESOLVE
from tests.base import ZuulTestCase, FIXTURE_DIR, simple_layout from tests.base import ZuulTestCase, FIXTURE_DIR, simple_layout
@ -329,3 +330,116 @@ class TestMergerWithAuthUrl(ZuulTestCase):
# the urls should differ # the urls should differ
self.assertNotEqual(first_url, second_url) self.assertNotEqual(first_url, second_url)
class TestMerger(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
@staticmethod
def _item_from_fake_change(fake_change):
return dict(
number=fake_change.number,
patchset=1,
ref=fake_change.patchsets[0]['ref'],
connection='gerrit',
branch=fake_change.branch,
project=fake_change.project,
buildset_uuid='fake-uuid',
merge_mode=MERGER_MERGE_RESOLVE,
)
def test_merge_multiple_items(self):
"""
Tests that the merger merges and returns the requested file changes per
change and in the correct order.
"""
merger = self.executor_server.merger
files = ['zuul.yaml', '.zuul.yaml']
dirs = ['zuul.d', '.zuul.d']
# Simple change A
file_dict_a = {'zuul.d/a.yaml': 'a'}
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
files=file_dict_a)
item_a = self._item_from_fake_change(A)
# Simple change B
file_dict_b = {'zuul.d/b.yaml': 'b'}
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
files=file_dict_b)
item_b = self._item_from_fake_change(B)
# Simple change C on top of A
file_dict_c = {'zuul.d/a.yaml': 'a-with-c'}
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C',
files=file_dict_c,
parent=A.patchsets[0]['ref'])
item_c = self._item_from_fake_change(C)
# Change in different project
file_dict_d = {'zuul.d/a.yaml': 'a-in-project1'}
D = self.fake_gerrit.addFakeChange('org/project1', 'master', 'D',
files=file_dict_d)
item_d = self._item_from_fake_change(D)
# Merge A
result = merger.mergeChanges([item_a], files=files, dirs=dirs)
self.assertIsNotNone(result)
hexsha, read_files, repo_state, ret_recent, orig_commit = result
self.assertEqual(len(read_files), 1)
self.assertEqual(read_files[0]['project'], 'org/project')
self.assertEqual(read_files[0]['branch'], 'master')
self.assertEqual(read_files[0]['files']['zuul.d/a.yaml'], 'a')
# Merge A -> B
result = merger.mergeChanges([item_a, item_b], files=files, dirs=dirs)
self.assertIsNotNone(result)
hexsha, read_files, repo_state, ret_recent, orig_commit = result
self.assertEqual(len(read_files), 2)
self.assertEqual(read_files[0]['project'], 'org/project')
self.assertEqual(read_files[0]['branch'], 'master')
self.assertEqual(read_files[0]['files']['zuul.d/a.yaml'], 'a')
self.assertEqual(read_files[1]['project'], 'org/project')
self.assertEqual(read_files[1]['branch'], 'master')
self.assertEqual(read_files[1]['files']['zuul.d/b.yaml'], 'b')
# Merge A -> B -> C
result = merger.mergeChanges([item_a, item_b, item_c], files=files,
dirs=dirs)
self.assertIsNotNone(result)
hexsha, read_files, repo_state, ret_recent, orig_commit = result
self.assertEqual(len(read_files), 3)
self.assertEqual(read_files[0]['project'], 'org/project')
self.assertEqual(read_files[0]['branch'], 'master')
self.assertEqual(read_files[0]['files']['zuul.d/a.yaml'], 'a')
self.assertEqual(read_files[1]['project'], 'org/project')
self.assertEqual(read_files[1]['branch'], 'master')
self.assertEqual(read_files[1]['files']['zuul.d/b.yaml'], 'b')
self.assertEqual(read_files[2]['project'], 'org/project')
self.assertEqual(read_files[2]['branch'], 'master')
self.assertEqual(read_files[2]['files']['zuul.d/a.yaml'],
'a-with-c')
# Merge A -> B -> C -> D
result = merger.mergeChanges([item_a, item_b, item_c, item_d],
files=files, dirs=dirs)
self.assertIsNotNone(result)
hexsha, read_files, repo_state, ret_recent, orig_commit = result
self.assertEqual(len(read_files), 4)
self.assertEqual(read_files[0]['project'], 'org/project')
self.assertEqual(read_files[0]['branch'], 'master')
self.assertEqual(read_files[0]['files']['zuul.d/a.yaml'], 'a')
self.assertEqual(read_files[1]['project'], 'org/project')
self.assertEqual(read_files[1]['branch'], 'master')
self.assertEqual(read_files[1]['files']['zuul.d/b.yaml'], 'b')
self.assertEqual(read_files[2]['project'], 'org/project')
self.assertEqual(read_files[2]['branch'], 'master')
self.assertEqual(read_files[2]['files']['zuul.d/a.yaml'],
'a-with-c')
self.assertEqual(read_files[3]['project'], 'org/project1')
self.assertEqual(read_files[3]['branch'], 'master')
self.assertEqual(read_files[3]['files']['zuul.d/a.yaml'],
'a-in-project1')

View File

@ -2078,6 +2078,173 @@ class TestInRepoConfig(ZuulTestCase):
A.messages[0], "A should have debug info") A.messages[0], "A should have debug info")
class TestNonLiveMerges(ZuulTestCase):
config_file = 'zuul-connections-gerrit-and-github.conf'
tenant_config_file = 'config/in-repo/main.yaml'
def test_non_live_merges(self):
"""
This test checks that we don't do merges for non-live queue items.
The scenario tests three scenarios:
* Simple dependency chain:
A -> B -> C
* Dependency chain that includes a merge conflict:
A -> B -> B_conflict (conflicts with A) -> C_conflict
* Dependency chain with broken config in the middls:
A_invalid (broken config) -> B_after_invalid (repairs broken config(
"""
in_repo_conf_a = textwrap.dedent(
"""
- job:
name: project-test1
run: playbooks/project-test.yaml
- project:
name: org/project
check:
jobs:
- project-test1
""")
in_repo_playbook = textwrap.dedent(
"""
- hosts: all
tasks: []
""")
file_dict_a = {'.zuul.yaml': in_repo_conf_a,
'playbooks/project-test.yaml': in_repo_playbook}
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
files=file_dict_a)
in_repo_conf_b = textwrap.dedent(
"""
- job:
name: project-test1
run: playbooks/project-test.yaml
- job:
name: project-test2
run: playbooks/project-test.yaml
- project:
name: org/project
check:
jobs:
- project-test1
- project-test2
""")
file_dict_b = {'.zuul.yaml': in_repo_conf_b}
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
files=file_dict_b,
parent=A.patchsets[0]['ref'])
B.setDependsOn(A, 1)
in_repo_conf_c = textwrap.dedent(
"""
- job:
name: project-test1
run: playbooks/project-test.yaml
- job:
name: project-test2
run: playbooks/project-test.yaml
- job:
name: project-test3
run: playbooks/project-test.yaml
- project:
name: org/project
check:
jobs:
- project-test1
- project-test2
- project-test3
""")
file_dict_c = {'.zuul.yaml': in_repo_conf_c}
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C',
files=file_dict_c,
parent=B.patchsets[0]['ref'])
C.setDependsOn(B, 1)
B_conflict = self.fake_gerrit.addFakeChange(
'org/project', 'master', 'B_conflict', files=file_dict_a)
B_conflict.setDependsOn(B, 1)
C_conflict = self.fake_gerrit.addFakeChange(
'org/project', 'master', 'C_conflict')
C_conflict.setDependsOn(B_conflict, 1)
in_repo_conf_a_invalid = textwrap.dedent(
"""
- project:
name: org/project
check:
jobs:
- project-test1
""")
file_dict_a_invalid = {'.zuul.yaml': in_repo_conf_a_invalid,
'playbooks/project-test.yaml': in_repo_playbook}
A_invalid = self.fake_gerrit.addFakeChange(
'org/project', 'master', 'A_invalid', files=file_dict_a_invalid)
B_after_invalid = self.fake_gerrit.addFakeChange(
'org/project', 'master', 'B', files=file_dict_b,
parent=A_invalid.patchsets[0]['ref'])
B_after_invalid.setDependsOn(A_invalid, 1)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B_conflict.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(C_conflict.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(A_invalid.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B_after_invalid.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(A.reported, 1, "A should report")
self.assertEqual(B.reported, 1, "B should report")
self.assertEqual(C.reported, 1, "C should report")
self.assertEqual(B_conflict.reported, 1, "B_conflict should report")
self.assertEqual(C_conflict.reported, 1, "C_conflict should report")
self.assertEqual(B_after_invalid.reported, 1,
"B_after_invalid should report")
self.assertIn('Build succeeded', A.messages[0])
self.assertIn('Build succeeded', B.messages[0])
self.assertIn('Build succeeded', C.messages[0])
self.assertIn('Merge Failed', B_conflict.messages[0])
self.assertIn('Merge Failed', C_conflict.messages[0])
self.assertIn('Zuul encountered a syntax error', A_invalid.messages[0])
self.assertIn('Build succeeded', B_after_invalid.messages[0])
self.assertHistory([
# Change A
dict(name='project-test1', result='SUCCESS', changes='1,1'),
# Change B
dict(name='project-test1', result='SUCCESS', changes='1,1 2,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1 2,1'),
# Change C
dict(name='project-test1', result='SUCCESS',
changes='1,1 2,1 3,1'),
dict(name='project-test2', result='SUCCESS',
changes='1,1 2,1 3,1'),
dict(name='project-test3', result='SUCCESS',
changes='1,1 2,1 3,1'),
# Change B_after_invalid
dict(name='project-test1', result='SUCCESS', changes='6,1 7,1'),
dict(name='project-test2', result='SUCCESS', changes='6,1 7,1'),
], ordered=False)
# We expect one merge call per change
self.assertEqual(len(self.merge_client.history['merger:merge']), 7)
class TestJobContamination(AnsibleZuulTestCase): class TestJobContamination(AnsibleZuulTestCase):
config_file = 'zuul-connections-gerrit-and-github.conf' config_file = 'zuul-connections-gerrit-and-github.conf'

View File

@ -708,7 +708,7 @@ class PipelineManager(object):
change_queue.moveItem(item, nnfi) change_queue.moveItem(item, nnfi)
changed = True changed = True
self.cancelJobs(item) self.cancelJobs(item)
if actionable: if actionable and item.live:
ready = self.prepareItem(item) and self.prepareJobs(item) ready = self.prepareItem(item) and self.prepareJobs(item)
# Starting jobs reporting should only be done once if there are # Starting jobs reporting should only be done once if there are
# jobs to run for this item. # jobs to run for this item.
@ -720,14 +720,8 @@ class PipelineManager(object):
item.reported_start = True item.reported_start = True
if item.current_build_set.unable_to_merge: if item.current_build_set.unable_to_merge:
failing_reasons.append("it has a merge conflict") failing_reasons.append("it has a merge conflict")
if (not item.live) and (not dequeued):
self.dequeueItem(item)
changed = dequeued = True
if item.current_build_set.config_errors: if item.current_build_set.config_errors:
failing_reasons.append("it has an invalid configuration") failing_reasons.append("it has an invalid configuration")
if (not item.live) and (not dequeued):
self.dequeueItem(item)
changed = dequeued = True
if ready and self.provisionNodes(item): if ready and self.provisionNodes(item):
changed = True changed = True
if ready and self.executeJobs(item): if ready and self.executeJobs(item):
@ -855,6 +849,10 @@ class PipelineManager(object):
build_set.repo_state = event.repo_state build_set.repo_state = event.repo_state
if event.merged: if event.merged:
build_set.commit = event.commit build_set.commit = event.commit
items_ahead = item.getNonLiveItemsAhead()
for index, item in enumerate(items_ahead):
files = item.current_build_set.files
files.setFiles(event.files[:index + 1])
build_set.files.setFiles(event.files) build_set.files.setFiles(event.files)
elif event.updated: elif event.updated:
build_set.commit = (item.change.newrev or build_set.commit = (item.change.newrev or

View File

@ -2111,6 +2111,14 @@ class QueueItem(object):
return None return None
return self.job_graph.jobs.get(name) return self.job_graph.jobs.get(name)
def getNonLiveItemsAhead(self):
items = []
item_ahead = self.item_ahead
while item_ahead and not item_ahead.live:
items.append(item_ahead)
item_ahead = item_ahead.item_ahead
return reversed(items)
def haveAllJobsStarted(self): def haveAllJobsStarted(self):
if not self.hasJobGraph(): if not self.hasJobGraph():
return False return False