Perform pre-launch merge checks

Move merge scheduling to its own function in the pipeline manager,
and call it in the main _processOneItem loop once the item has
entered the active window, in addition to the previous location in
the executor when getting the layout information.

Since we are now scheduling merges for all items in any pipeline,
make sure we properly handle both Ref and Change objects.

Also, if the executor encounters a merger failure, immediately report
that result.

Change-Id: I1c9db6993994bf8e841ecd8554c37a3ec0afc798
Co-Authored-By: Adam Gandelman <adamg@ubuntu.com>
Story: 2000773
Task: 3468
This commit is contained in:
K Jonathan Harker 2017-03-15 19:07:11 -07:00
parent 49bee7b6c6
commit ae04e4ce8f
5 changed files with 196 additions and 52 deletions

View File

@ -770,12 +770,15 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def runPlaybooks(self, args):
def doMergeChanges(self, items):
# Get a merger in order to update the repos involved in this job.
commit = super(RecordingAnsibleJob, self).doMergeChanges(items)
if not commit: # merge conflict
self.recordResult('MERGER_FAILURE')
return commit
def recordResult(self, result):
build = self.executor_server.job_builds[self.job.unique]
build.jobdir = self.jobdir
result = super(RecordingAnsibleJob, self).runPlaybooks(args)
self.executor_server.lock.acquire()
self.executor_server.build_history.append(
BuildHistory(name=build.name, result=result, changes=build.changes,
@ -786,6 +789,13 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
self.executor_server.running_builds.remove(build)
del self.executor_server.job_builds[self.job.unique]
self.executor_server.lock.release()
def runPlaybooks(self, args):
build = self.executor_server.job_builds[self.job.unique]
build.jobdir = self.jobdir
result = super(RecordingAnsibleJob, self).runPlaybooks(args)
self.recordResult(result)
return result
def runAnsible(self, cmd, timeout, trusted=False):

View File

@ -924,18 +924,17 @@ class TestScheduler(ZuulTestCase):
a = source.getChange(event, refresh=True)
self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
@skip("Disabled for early v3 development")
def test_build_configuration_conflict(self):
"Test that merge conflicts are handled"
def test_project_merge_conflict(self):
"Test that gate merge conflicts are handled properly"
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/conflict-project',
'master', 'A')
A.addPatchset(['conflict'])
B = self.fake_gerrit.addFakeChange('org/conflict-project',
'master', 'B')
B.addPatchset(['conflict'])
C = self.fake_gerrit.addFakeChange('org/conflict-project',
A = self.fake_gerrit.addFakeChange('org/project',
'master', 'A',
files={'conflict': 'foo'})
B = self.fake_gerrit.addFakeChange('org/project',
'master', 'B',
files={'conflict': 'bar'})
C = self.fake_gerrit.addFakeChange('org/project',
'master', 'C')
A.addApproval('code-review', 2)
B.addApproval('code-review', 2)
@ -949,15 +948,13 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(B.reported, 1)
self.assertEqual(C.reported, 1)
self.gearman_server.release('.*-merge')
self.gearman_server.release('project-merge')
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
self.gearman_server.release('project-merge')
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
self.gearman_server.release('project-merge')
self.waitUntilSettled()
self.assertEqual(len(self.history), 2) # A and C merge jobs
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
@ -968,7 +965,97 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
self.assertEqual(len(self.history), 6)
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='1,1 3,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1 3,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1 3,1'),
], ordered=False)
def test_delayed_merge_conflict(self):
"Test that delayed check merge conflicts are handled properly"
# Hold jobs in the gearman queue so that we can test whether
# the executor returns a merge failure after the scheduler has
# successfully merged.
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project',
'master', 'A',
files={'conflict': 'foo'})
B = self.fake_gerrit.addFakeChange('org/project',
'master', 'B',
files={'conflict': 'bar'})
C = self.fake_gerrit.addFakeChange('org/project',
'master', 'C')
C.setDependsOn(B, 1)
# A enters the gate queue; B and C enter the check queue
A.addApproval('code-review', 2)
self.fake_gerrit.addEvent(A.addApproval('approved', 1))
self.waitUntilSettled()
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 0) # Check does not report start
self.assertEqual(C.reported, 0) # Check does not report start
# A merges while B and C are queued in check
# Release A project-merge
queue = self.gearman_server.getQueue()
self.release(queue[0])
self.waitUntilSettled()
# Release A project-test*
# gate has higher precedence, so A's test jobs are added in
# front of the merge jobs for B and C
queue = self.gearman_server.getQueue()
self.release(queue[0])
self.release(queue[1])
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'NEW')
self.assertEqual(C.data['status'], 'NEW')
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 0)
self.assertEqual(C.reported, 0)
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
# B and C report merge conflicts
# Release B project-merge
queue = self.gearman_server.getQueue()
self.release(queue[0])
self.waitUntilSettled()
# Release C
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'NEW')
self.assertEqual(C.data['status'], 'NEW')
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 1)
self.assertEqual(C.reported, 1)
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='MERGER_FAILURE', changes='2,1'),
dict(name='project-merge', result='MERGER_FAILURE',
changes='2,1 3,1'),
], ordered=False)
def test_post(self):
"Test that post jobs run"

View File

@ -467,7 +467,10 @@ class ExecutorServer(object):
result = dict(merged=(ret is not None),
zuul_url=self.zuul_url)
if args.get('files'):
result['commit'], result['files'] = ret
if ret:
result['commit'], result['files'] = ret
else:
result['commit'], result['files'] = (None, None)
else:
result['commit'] = ret
job.sendWorkComplete(json.dumps(result))
@ -552,11 +555,13 @@ class AnsibleJob(object):
project['name']))
repo.remotes.origin.config_writer.set('url', project['url'])
# Get a merger in order to update the repos involved in this job.
merger = self.executor_server._getMerger(self.jobdir.src_root)
merge_items = [i for i in args['items'] if i.get('refspec')]
if merge_items:
commit = merger.mergeChanges(merge_items) # noqa
commit = self.doMergeChanges(merge_items)
if not commit:
# There was a merge conflict and we have already sent
# a work complete result, don't run any jobs
return
else:
commit = args['items'][-1]['newrev'] # noqa
@ -596,6 +601,15 @@ class AnsibleJob(object):
result = dict(result=result)
self.job.sendWorkComplete(json.dumps(result))
def doMergeChanges(self, items):
# Get a merger in order to update the repos involved in this job.
merger = self.executor_server._getMerger(self.jobdir.src_root)
commit = merger.mergeChanges(items) # noqa
if not commit: # merge conflict
result = dict(result='MERGER_FAILURE')
self.job.sendWorkComplete(json.dumps(result))
return commit
def runPlaybooks(self, args):
result = None

View File

@ -440,11 +440,15 @@ class PipelineManager(object):
# the merger.
number = None
patchset = None
refspec = None
branch = None
oldrev = None
newrev = None
if hasattr(item.change, 'number'):
number = item.change.number
patchset = item.change.patchset
refspec = item.change.refspec
branch = item.change.branch
elif hasattr(item.change, 'newrev'):
oldrev = item.change.oldrev
newrev = item.change.newrev
@ -456,8 +460,8 @@ class PipelineManager(object):
item.change.project),
connection_name=connection_name,
merge_mode=item.current_build_set.getMergeMode(project),
refspec=item.change.refspec,
branch=item.change.branch,
refspec=refspec,
branch=branch,
ref=item.current_build_set.ref,
number=number,
patchset=patchset,
@ -515,30 +519,54 @@ class PipelineManager(object):
if build_set.merge_state == build_set.COMPLETE:
if build_set.unable_to_merge:
return None
self.log.debug("Preparing dynamic layout for: %s" % item.change)
return self._loadDynamicLayout(item)
build_set.merge_state = build_set.PENDING
self.log.debug("Preparing dynamic layout for: %s" % item.change)
def scheduleMerge(self, item, files=None):
build_set = item.current_build_set
if not hasattr(item.change, 'branch'):
self.log.debug("Change %s does not have an associated branch, "
"not scheduling a merge job for item %s" %
(item.change, item))
build_set.merge_state = build_set.COMPLETE
return True
self.log.debug("Scheduling merge for item %s (files: %s)" %
(item, files))
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
merger_items = map(self._makeMergerItem, all_items)
build_set = item.current_build_set
build_set.merge_state = build_set.PENDING
self.sched.merger.mergeChanges(merger_items,
item.current_build_set,
['zuul.yaml', '.zuul.yaml'],
files,
self.pipeline.precedence)
return False
def prepareLayout(self, item):
# Get a copy of the layout in the context of the current
# queue.
# Returns True if the ref is ready, false otherwise
if not item.current_build_set.ref:
item.current_build_set.setConfiguration()
if not item.current_build_set.layout:
item.current_build_set.layout = self.getLayout(item)
if not item.current_build_set.layout:
def prepareItem(self, item):
# This runs on every iteration of _processOneItem
# Returns True if the item is ready, false otherwise
build_set = item.current_build_set
if not build_set.ref:
build_set.setConfiguration()
if build_set.merge_state == build_set.NEW:
return self.scheduleMerge(item, ['zuul.yaml', '.zuul.yaml'])
if build_set.config_error:
return False
if item.current_build_set.config_error:
return True
def prepareJobs(self, item):
# This only runs once the item is in the pipeline's action window
# Returns True if the item is ready, false otherwise
build_set = item.current_build_set
if not build_set.layout:
build_set.layout = self.getLayout(item)
if not build_set.layout:
return False
if not item.job_graph:
try:
item.freezeJobGraph()
@ -553,11 +581,13 @@ class PipelineManager(object):
def _processOneItem(self, item, nnfi):
changed = False
ready = False
failing_reasons = [] # Reasons this item is failing
item_ahead = item.item_ahead
if item_ahead and (not item_ahead.live):
item_ahead = None
change_queue = item.queue
failing_reasons = [] # Reasons this item is failing
if self.checkForChangesNeededBy(item.change, change_queue) is not True:
# It's not okay to enqueue this change, we should remove it.
@ -572,10 +602,11 @@ class PipelineManager(object):
except exceptions.MergeFailure:
pass
return (True, nnfi)
dep_items = self.getFailingDependentItems(item)
actionable = change_queue.isActionable(item)
item.active = actionable
ready = False
dep_items = self.getFailingDependentItems(item)
if dep_items:
failing_reasons.append('a needed change is failing')
self.cancelJobs(item, prime=False)
@ -594,15 +625,16 @@ class PipelineManager(object):
changed = True
self.cancelJobs(item)
if actionable:
ready = self.prepareLayout(item)
ready = self.prepareItem(item) and self.prepareJobs(item)
if item.current_build_set.unable_to_merge:
failing_reasons.append("it has a merge conflict")
if item.current_build_set.config_error:
failing_reasons.append("it has an invalid configuration")
if ready and self.provisionNodes(item):
changed = True
if actionable and ready and self.executeJobs(item):
if ready and self.executeJobs(item):
changed = True
if item.didAnyJobFail():
failing_reasons.append("at least one job failed")
if (not item.live) and (not item.items_behind):
@ -740,10 +772,11 @@ class PipelineManager(object):
# TODOv3(jeblair): consider a new reporter action for this
actions = self.pipeline.merge_failure_actions
item.setReportedResult('CONFIG_ERROR')
elif item.didMergerFail():
actions = self.pipeline.merge_failure_actions
item.setReportedResult('MERGER_FAILURE')
elif not item.getJobs():
# We don't send empty reports with +1,
# and the same for -1's (merge failures or transient errors)
# as they cannot be followed by +1's
# We don't send empty reports with +1
self.log.debug("No jobs for change %s" % item.change)
actions = []
elif item.didAllJobsSucceed():
@ -751,9 +784,6 @@ class PipelineManager(object):
actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
self.pipeline._consecutive_failures = 0
elif item.didMergerFail():
actions = self.pipeline.merge_failure_actions
item.setReportedResult('MERGER_FAILURE')
else:
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')

View File

@ -111,7 +111,10 @@ class MergeServer(object):
result = dict(merged=(ret is not None),
zuul_url=self.zuul_url)
if args.get('files'):
result['commit'], result['files'] = ret
if ret:
result['commit'], result['files'] = ret
else:
result['commit'], result['files'] = (None, None)
else:
result['commit'] = ret
job.sendWorkComplete(json.dumps(result))