From ae04e4ce8fca33872f3677838d6a813d2c378e79 Mon Sep 17 00:00:00 2001 From: K Jonathan Harker Date: Wed, 15 Mar 2017 19:07:11 -0700 Subject: [PATCH] Perform pre-launch merge checks Move merge scheduling to its own function in the pipeline manager, and call it in the main _processOneItem loop once the item has entered the active window, in addition to the previous location in the executor when getting the layout information. Since we are now scheduling merges for all items in any pipeline, make sure we properly handle both Ref and Change objects. Also, if the executor encounters a merger failure, immediately report that result. Change-Id: I1c9db6993994bf8e841ecd8554c37a3ec0afc798 Co-Authored-By: Adam Gandelman Story: 2000773 Task: 3468 --- tests/base.py | 20 ++++-- tests/unit/test_scheduler.py | 119 ++++++++++++++++++++++++++++++----- zuul/executor/server.py | 22 +++++-- zuul/manager/__init__.py | 82 ++++++++++++++++-------- zuul/merger/server.py | 5 +- 5 files changed, 196 insertions(+), 52 deletions(-) diff --git a/tests/base.py b/tests/base.py index ffe988671e..826a8489d4 100755 --- a/tests/base.py +++ b/tests/base.py @@ -770,12 +770,15 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): class RecordingAnsibleJob(zuul.executor.server.AnsibleJob): - def runPlaybooks(self, args): + def doMergeChanges(self, items): + # Get a merger in order to update the repos involved in this job. + commit = super(RecordingAnsibleJob, self).doMergeChanges(items) + if not commit: # merge conflict + self.recordResult('MERGER_FAILURE') + return commit + + def recordResult(self, result): build = self.executor_server.job_builds[self.job.unique] - build.jobdir = self.jobdir - - result = super(RecordingAnsibleJob, self).runPlaybooks(args) - self.executor_server.lock.acquire() self.executor_server.build_history.append( BuildHistory(name=build.name, result=result, changes=build.changes, @@ -786,6 +789,13 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob): self.executor_server.running_builds.remove(build) del self.executor_server.job_builds[self.job.unique] self.executor_server.lock.release() + + def runPlaybooks(self, args): + build = self.executor_server.job_builds[self.job.unique] + build.jobdir = self.jobdir + + result = super(RecordingAnsibleJob, self).runPlaybooks(args) + self.recordResult(result) return result def runAnsible(self, cmd, timeout, trusted=False): diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 7de9be0a9f..4d4b319860 100755 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -924,18 +924,17 @@ class TestScheduler(ZuulTestCase): a = source.getChange(event, refresh=True) self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds())) - @skip("Disabled for early v3 development") - def test_build_configuration_conflict(self): - "Test that merge conflicts are handled" + def test_project_merge_conflict(self): + "Test that gate merge conflicts are handled properly" self.gearman_server.hold_jobs_in_queue = True - A = self.fake_gerrit.addFakeChange('org/conflict-project', - 'master', 'A') - A.addPatchset(['conflict']) - B = self.fake_gerrit.addFakeChange('org/conflict-project', - 'master', 'B') - B.addPatchset(['conflict']) - C = self.fake_gerrit.addFakeChange('org/conflict-project', + A = self.fake_gerrit.addFakeChange('org/project', + 'master', 'A', + files={'conflict': 'foo'}) + B = self.fake_gerrit.addFakeChange('org/project', + 'master', 'B', + files={'conflict': 'bar'}) + C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') A.addApproval('code-review', 2) B.addApproval('code-review', 2) @@ -949,15 +948,13 @@ class TestScheduler(ZuulTestCase): self.assertEqual(B.reported, 1) self.assertEqual(C.reported, 1) - self.gearman_server.release('.*-merge') + self.gearman_server.release('project-merge') self.waitUntilSettled() - self.gearman_server.release('.*-merge') + self.gearman_server.release('project-merge') self.waitUntilSettled() - self.gearman_server.release('.*-merge') + self.gearman_server.release('project-merge') self.waitUntilSettled() - self.assertEqual(len(self.history), 2) # A and C merge jobs - self.gearman_server.hold_jobs_in_queue = False self.gearman_server.release() self.waitUntilSettled() @@ -968,7 +965,97 @@ class TestScheduler(ZuulTestCase): self.assertEqual(A.reported, 2) self.assertEqual(B.reported, 2) self.assertEqual(C.reported, 2) - self.assertEqual(len(self.history), 6) + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project-merge', result='SUCCESS', changes='1,1 3,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1 3,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1 3,1'), + ], ordered=False) + + def test_delayed_merge_conflict(self): + "Test that delayed check merge conflicts are handled properly" + + # Hold jobs in the gearman queue so that we can test whether + # the executor returns a merge failure after the scheduler has + # successfully merged. + self.gearman_server.hold_jobs_in_queue = True + A = self.fake_gerrit.addFakeChange('org/project', + 'master', 'A', + files={'conflict': 'foo'}) + B = self.fake_gerrit.addFakeChange('org/project', + 'master', 'B', + files={'conflict': 'bar'}) + C = self.fake_gerrit.addFakeChange('org/project', + 'master', 'C') + C.setDependsOn(B, 1) + + # A enters the gate queue; B and C enter the check queue + A.addApproval('code-review', 2) + self.fake_gerrit.addEvent(A.addApproval('approved', 1)) + self.waitUntilSettled() + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 0) # Check does not report start + self.assertEqual(C.reported, 0) # Check does not report start + + # A merges while B and C are queued in check + # Release A project-merge + queue = self.gearman_server.getQueue() + self.release(queue[0]) + self.waitUntilSettled() + + # Release A project-test* + # gate has higher precedence, so A's test jobs are added in + # front of the merge jobs for B and C + queue = self.gearman_server.getQueue() + self.release(queue[0]) + self.release(queue[1]) + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'NEW') + self.assertEqual(C.data['status'], 'NEW') + self.assertEqual(A.reported, 2) + self.assertEqual(B.reported, 0) + self.assertEqual(C.reported, 0) + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + ], ordered=False) + + # B and C report merge conflicts + # Release B project-merge + queue = self.gearman_server.getQueue() + self.release(queue[0]) + self.waitUntilSettled() + + # Release C + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'NEW') + self.assertEqual(C.data['status'], 'NEW') + self.assertEqual(A.reported, 2) + self.assertEqual(B.reported, 1) + self.assertEqual(C.reported, 1) + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project-merge', result='MERGER_FAILURE', changes='2,1'), + dict(name='project-merge', result='MERGER_FAILURE', + changes='2,1 3,1'), + ], ordered=False) def test_post(self): "Test that post jobs run" diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 60b30c7d46..636ab6786b 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -467,7 +467,10 @@ class ExecutorServer(object): result = dict(merged=(ret is not None), zuul_url=self.zuul_url) if args.get('files'): - result['commit'], result['files'] = ret + if ret: + result['commit'], result['files'] = ret + else: + result['commit'], result['files'] = (None, None) else: result['commit'] = ret job.sendWorkComplete(json.dumps(result)) @@ -552,11 +555,13 @@ class AnsibleJob(object): project['name'])) repo.remotes.origin.config_writer.set('url', project['url']) - # Get a merger in order to update the repos involved in this job. - merger = self.executor_server._getMerger(self.jobdir.src_root) merge_items = [i for i in args['items'] if i.get('refspec')] if merge_items: - commit = merger.mergeChanges(merge_items) # noqa + commit = self.doMergeChanges(merge_items) + if not commit: + # There was a merge conflict and we have already sent + # a work complete result, don't run any jobs + return else: commit = args['items'][-1]['newrev'] # noqa @@ -596,6 +601,15 @@ class AnsibleJob(object): result = dict(result=result) self.job.sendWorkComplete(json.dumps(result)) + def doMergeChanges(self, items): + # Get a merger in order to update the repos involved in this job. + merger = self.executor_server._getMerger(self.jobdir.src_root) + commit = merger.mergeChanges(items) # noqa + if not commit: # merge conflict + result = dict(result='MERGER_FAILURE') + self.job.sendWorkComplete(json.dumps(result)) + return commit + def runPlaybooks(self, args): result = None diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 32f0cbba32..cb6341a576 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -440,11 +440,15 @@ class PipelineManager(object): # the merger. number = None patchset = None + refspec = None + branch = None oldrev = None newrev = None if hasattr(item.change, 'number'): number = item.change.number patchset = item.change.patchset + refspec = item.change.refspec + branch = item.change.branch elif hasattr(item.change, 'newrev'): oldrev = item.change.oldrev newrev = item.change.newrev @@ -456,8 +460,8 @@ class PipelineManager(object): item.change.project), connection_name=connection_name, merge_mode=item.current_build_set.getMergeMode(project), - refspec=item.change.refspec, - branch=item.change.branch, + refspec=refspec, + branch=branch, ref=item.current_build_set.ref, number=number, patchset=patchset, @@ -515,30 +519,54 @@ class PipelineManager(object): if build_set.merge_state == build_set.COMPLETE: if build_set.unable_to_merge: return None + self.log.debug("Preparing dynamic layout for: %s" % item.change) return self._loadDynamicLayout(item) - build_set.merge_state = build_set.PENDING - self.log.debug("Preparing dynamic layout for: %s" % item.change) + + def scheduleMerge(self, item, files=None): + build_set = item.current_build_set + + if not hasattr(item.change, 'branch'): + self.log.debug("Change %s does not have an associated branch, " + "not scheduling a merge job for item %s" % + (item.change, item)) + build_set.merge_state = build_set.COMPLETE + return True + + self.log.debug("Scheduling merge for item %s (files: %s)" % + (item, files)) dependent_items = self.getDependentItems(item) dependent_items.reverse() all_items = dependent_items + [item] merger_items = map(self._makeMergerItem, all_items) + build_set = item.current_build_set + build_set.merge_state = build_set.PENDING self.sched.merger.mergeChanges(merger_items, item.current_build_set, - ['zuul.yaml', '.zuul.yaml'], + files, self.pipeline.precedence) + return False - def prepareLayout(self, item): - # Get a copy of the layout in the context of the current - # queue. - # Returns True if the ref is ready, false otherwise - if not item.current_build_set.ref: - item.current_build_set.setConfiguration() - if not item.current_build_set.layout: - item.current_build_set.layout = self.getLayout(item) - if not item.current_build_set.layout: + def prepareItem(self, item): + # This runs on every iteration of _processOneItem + # Returns True if the item is ready, false otherwise + build_set = item.current_build_set + if not build_set.ref: + build_set.setConfiguration() + if build_set.merge_state == build_set.NEW: + return self.scheduleMerge(item, ['zuul.yaml', '.zuul.yaml']) + if build_set.config_error: return False - if item.current_build_set.config_error: + return True + + def prepareJobs(self, item): + # This only runs once the item is in the pipeline's action window + # Returns True if the item is ready, false otherwise + build_set = item.current_build_set + if not build_set.layout: + build_set.layout = self.getLayout(item) + if not build_set.layout: return False + if not item.job_graph: try: item.freezeJobGraph() @@ -553,11 +581,13 @@ class PipelineManager(object): def _processOneItem(self, item, nnfi): changed = False + ready = False + failing_reasons = [] # Reasons this item is failing + item_ahead = item.item_ahead if item_ahead and (not item_ahead.live): item_ahead = None change_queue = item.queue - failing_reasons = [] # Reasons this item is failing if self.checkForChangesNeededBy(item.change, change_queue) is not True: # It's not okay to enqueue this change, we should remove it. @@ -572,10 +602,11 @@ class PipelineManager(object): except exceptions.MergeFailure: pass return (True, nnfi) - dep_items = self.getFailingDependentItems(item) + actionable = change_queue.isActionable(item) item.active = actionable - ready = False + + dep_items = self.getFailingDependentItems(item) if dep_items: failing_reasons.append('a needed change is failing') self.cancelJobs(item, prime=False) @@ -594,15 +625,16 @@ class PipelineManager(object): changed = True self.cancelJobs(item) if actionable: - ready = self.prepareLayout(item) + ready = self.prepareItem(item) and self.prepareJobs(item) if item.current_build_set.unable_to_merge: failing_reasons.append("it has a merge conflict") if item.current_build_set.config_error: failing_reasons.append("it has an invalid configuration") if ready and self.provisionNodes(item): changed = True - if actionable and ready and self.executeJobs(item): + if ready and self.executeJobs(item): changed = True + if item.didAnyJobFail(): failing_reasons.append("at least one job failed") if (not item.live) and (not item.items_behind): @@ -740,10 +772,11 @@ class PipelineManager(object): # TODOv3(jeblair): consider a new reporter action for this actions = self.pipeline.merge_failure_actions item.setReportedResult('CONFIG_ERROR') + elif item.didMergerFail(): + actions = self.pipeline.merge_failure_actions + item.setReportedResult('MERGER_FAILURE') elif not item.getJobs(): - # We don't send empty reports with +1, - # and the same for -1's (merge failures or transient errors) - # as they cannot be followed by +1's + # We don't send empty reports with +1 self.log.debug("No jobs for change %s" % item.change) actions = [] elif item.didAllJobsSucceed(): @@ -751,9 +784,6 @@ class PipelineManager(object): actions = self.pipeline.success_actions item.setReportedResult('SUCCESS') self.pipeline._consecutive_failures = 0 - elif item.didMergerFail(): - actions = self.pipeline.merge_failure_actions - item.setReportedResult('MERGER_FAILURE') else: actions = self.pipeline.failure_actions item.setReportedResult('FAILURE') diff --git a/zuul/merger/server.py b/zuul/merger/server.py index c2738a2885..540105eeeb 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -111,7 +111,10 @@ class MergeServer(object): result = dict(merged=(ret is not None), zuul_url=self.zuul_url) if args.get('files'): - result['commit'], result['files'] = ret + if ret: + result['commit'], result['files'] = ret + else: + result['commit'], result['files'] = (None, None) else: result['commit'] = ret job.sendWorkComplete(json.dumps(result))