From 289f5930facc76a9d5f3328a3c0a11be59eca596 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 27 Jul 2017 15:02:29 -0700 Subject: [PATCH] Ensure ref-updated jobs run with their ref We were incorrectly preparing the current state of the repo for ref updated (eg, post) jobs. This ensures that we run with the actual supplied ref, even if the remote has moved on since then. Change-Id: I52f05406246e6e39805fd8365412f3cb77fe3a0a --- tests/base.py | 29 ++++++++----- tests/unit/test_executor.py | 20 +++++++-- tests/unit/test_github_driver.py | 26 ++++++++---- tests/unit/test_push_reqs.py | 9 ++-- tests/unit/test_scheduler.py | 18 ++++++-- zuul/executor/server.py | 16 +++++++ zuul/manager/__init__.py | 24 +++++------ zuul/manager/independent.py | 3 ++ zuul/merger/client.py | 5 +++ zuul/merger/merger.py | 72 ++++++++++++++++++++++++++++---- zuul/merger/server.py | 12 ++++++ 11 files changed, 185 insertions(+), 49 deletions(-) diff --git a/tests/base.py b/tests/base.py index 35c8324544..5a06f85abd 100755 --- a/tests/base.py +++ b/tests/base.py @@ -139,7 +139,8 @@ class FakeGerritChange(object): 'Verified': ('Verified', -2, 2)} def __init__(self, gerrit, number, project, branch, subject, - status='NEW', upstream_root=None, files={}): + status='NEW', upstream_root=None, files={}, + parent=None): self.gerrit = gerrit self.source = gerrit self.reported = 0 @@ -174,16 +175,18 @@ class FakeGerritChange(object): 'url': 'https://hostname/%s' % number} self.upstream_root = upstream_root - self.addPatchset(files=files) + self.addPatchset(files=files, parent=parent) self.data['submitRecords'] = self.getSubmitRecords() self.open = status == 'NEW' - def addFakeChangeToRepo(self, msg, files, large): + def addFakeChangeToRepo(self, msg, files, large, parent): path = os.path.join(self.upstream_root, self.project) repo = git.Repo(path) + if parent is None: + parent = 'refs/tags/init' ref = GerritChangeReference.create( repo, '1/%s/%s' % (self.number, self.latest_patchset), - 'refs/tags/init') + parent) repo.head.reference = ref zuul.merger.merger.reset_repo_to_head(repo) repo.git.clean('-x', '-f', '-d') @@ -211,7 +214,7 @@ class FakeGerritChange(object): repo.heads['master'].checkout() return r - def addPatchset(self, files=None, large=False): + def addPatchset(self, files=None, large=False, parent=None): self.latest_patchset += 1 if not files: fn = '%s-%s' % (self.branch.replace('/', '_'), self.number) @@ -219,7 +222,7 @@ class FakeGerritChange(object): (self.branch, self.number, self.latest_patchset)) files = {fn: data} msg = self.subject + '-' + str(self.latest_patchset) - c = self.addFakeChangeToRepo(msg, files, large) + c = self.addFakeChangeToRepo(msg, files, large, parent) ps_files = [{'file': '/COMMIT_MSG', 'type': 'ADDED'}, {'file': 'README', @@ -469,12 +472,12 @@ class FakeGerritConnection(gerritconnection.GerritConnection): self.upstream_root = upstream_root def addFakeChange(self, project, branch, subject, status='NEW', - files=None): + files=None, parent=None): """Add a change to the fake Gerrit.""" self.change_number += 1 c = FakeGerritChange(self, self.change_number, project, branch, subject, upstream_root=self.upstream_root, - status=status, files=files) + status=status, files=files, parent=parent) self.changes[self.change_number] = c return c @@ -863,6 +866,13 @@ class FakeGithubPullRequest(object): } return (name, data) + def setMerged(self, commit_message): + self.is_merged = True + self.merge_message = commit_message + + repo = self._getRepo() + repo.heads[self.branch].commit = repo.commit(self.head_sha) + class FakeGithubConnection(githubconnection.GithubConnection): log = logging.getLogger("zuul.test.FakeGithubConnection") @@ -1011,8 +1021,7 @@ class FakeGithubConnection(githubconnection.GithubConnection): self.merge_not_allowed_count -= 1 raise MergeFailure('Merge was not successful due to mergeability' ' conflict') - pull_request.is_merged = True - pull_request.merge_message = commit_message + pull_request.setMerged(commit_message) def getCommitStatuses(self, project, sha): return self.statuses.get(project, {}).get(sha, []) diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py index 46f3b26e66..f69113586a 100755 --- a/tests/unit/test_executor.py +++ b/tests/unit/test_executor.py @@ -338,19 +338,31 @@ class TestExecutorRepos(ZuulTestCase): p1 = "review.example.com/org/project1" p2 = "review.example.com/org/project2" projects = [p1, p2] + upstream = self.getUpstreamRepos(projects) A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') event = A.getRefUpdatedEvent() A.setMerged() + A_commit = str(upstream[p1].commit('master')) + self.log.debug("A commit: %s" % A_commit) + + # Add another commit to the repo that merged right after this + # one to make sure that our post job runs with the one that we + # intended rather than simply the current repo state. + B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B', + parent='refs/changes/1/1/1') + B.setMerged() + B_commit = str(upstream[p1].commit('master')) + self.log.debug("B commit: %s" % B_commit) + self.fake_gerrit.addEvent(event) self.waitUntilSettled() - upstream = self.getUpstreamRepos(projects) states = [ - {p1: dict(commit=str(upstream[p1].commit('master')), - present=[A], branch='master'), + {p1: dict(commit=A_commit, + present=[A], absent=[B], branch='master'), p2: dict(commit=str(upstream[p2].commit('master')), - absent=[A], branch='master'), + absent=[A, B], branch='master'), }, ] diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index 0e199dfa82..4077cca2d0 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -125,16 +125,20 @@ class TestGithubDriver(ZuulTestCase): def test_push_event(self): self.executor_server.hold_jobs_in_build = True - old_sha = random_sha1() - new_sha = random_sha1() - self.fake_github.emitEvent( - self.fake_github.getPushEvent('org/project', 'refs/heads/master', - old_sha, new_sha)) + A = self.fake_github.openFakePullRequest('org/project', 'master', 'A') + old_sha = '0' * 40 + new_sha = A.head_sha + A.setMerged("merging A") + pevent = self.fake_github.getPushEvent(project='org/project', + ref='refs/heads/master', + old_rev=old_sha, + new_rev=new_sha) + self.fake_github.emitEvent(pevent) self.waitUntilSettled() build_params = self.builds[0].parameters self.assertEqual('refs/heads/master', build_params['zuul']['ref']) - self.assertEqual(old_sha, build_params['zuul']['oldrev']) + self.assertFalse('oldrev' in build_params['zuul']) self.assertEqual(new_sha, build_params['zuul']['newrev']) self.executor_server.hold_jobs_in_build = False @@ -366,9 +370,15 @@ class TestGithubDriver(ZuulTestCase): project = 'org/project2' # pipeline reports pull status both on start and success self.executor_server.hold_jobs_in_build = True - pevent = self.fake_github.getPushEvent(project=project, - ref='refs/heads/master') + A = self.fake_github.openFakePullRequest(project, 'master', 'A') + old_sha = '0' * 40 + new_sha = A.head_sha + A.setMerged("merging A") + pevent = self.fake_github.getPushEvent(project=project, + ref='refs/heads/master', + old_rev=old_sha, + new_rev=new_sha) self.fake_github.emitEvent(pevent) self.waitUntilSettled() diff --git a/tests/unit/test_push_reqs.py b/tests/unit/test_push_reqs.py index d3a1febe8c..80c3be97c1 100644 --- a/tests/unit/test_push_reqs.py +++ b/tests/unit/test_push_reqs.py @@ -25,12 +25,13 @@ class TestPushRequirements(ZuulTestCase): def test_push_requirements(self): self.executor_server.hold_jobs_in_build = True - # Create a github change, add a change and emit a push event A = self.fake_github.openFakePullRequest('org/project1', 'master', 'A') - old_sha = A.head_sha + new_sha = A.head_sha + A.setMerged("merging A") pevent = self.fake_github.getPushEvent(project='org/project1', ref='refs/heads/master', - old_rev=old_sha) + new_rev=new_sha) + self.fake_github.emitEvent(pevent) self.waitUntilSettled() @@ -43,7 +44,7 @@ class TestPushRequirements(ZuulTestCase): # Make a gerrit change, and emit a ref-updated event B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') self.fake_gerrit.addEvent(B.getRefUpdatedEvent()) - + B.setMerged() self.waitUntilSettled() # All but one pipeline should be skipped, increasing builds by 1 diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 2b1a30fa3f..d77a7be5f5 100755 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -1103,6 +1103,12 @@ class TestScheduler(ZuulTestCase): def test_post(self): "Test that post jobs run" + p = "review.example.com/org/project" + upstream = self.getUpstreamRepos([p]) + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.setMerged() + A_commit = str(upstream[p].commit('master')) + self.log.debug("A commit: %s" % A_commit) e = { "type": "ref-updated", @@ -1111,7 +1117,7 @@ class TestScheduler(ZuulTestCase): }, "refUpdate": { "oldRev": "90f173846e3af9154517b88543ffbd1691f31366", - "newRev": "d479a0bfcb34da57a31adb2a595c0cf687812543", + "newRev": A_commit, "refName": "master", "project": "org/project", } @@ -1156,7 +1162,7 @@ class TestScheduler(ZuulTestCase): "refUpdate": { "oldRev": "90f173846e3af9154517b88543ffbd1691f31366", "newRev": "0000000000000000000000000000000000000000", - "refName": "master", + "refName": "testbranch", "project": "org/project", } } @@ -3054,6 +3060,12 @@ class TestScheduler(ZuulTestCase): def test_client_enqueue_ref(self): "Test that the RPC client can enqueue a ref" + p = "review.example.com/org/project" + upstream = self.getUpstreamRepos([p]) + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.setMerged() + A_commit = str(upstream[p].commit('master')) + self.log.debug("A commit: %s" % A_commit) client = zuul.rpcclient.RPCClient('127.0.0.1', self.gearman_server.port) @@ -3065,7 +3077,7 @@ class TestScheduler(ZuulTestCase): trigger='gerrit', ref='master', oldrev='90f173846e3af9154517b88543ffbd1691f31366', - newrev='d479a0bfcb34da57a31adb2a595c0cf687812543') + newrev=A_commit) self.waitUntilSettled() job_names = [x.name for x in self.history] self.assertEqual(len(self.history), 1) diff --git a/zuul/executor/server.py b/zuul/executor/server.py index b166111542..8d23cb77c3 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -625,6 +625,7 @@ class ExecutorServer(object): self.hostname) self.merger_worker.registerFunction("merger:merge") self.merger_worker.registerFunction("merger:cat") + self.merger_worker.registerFunction("merger:refstate") def stop(self): self.log.debug("Stopping") @@ -721,6 +722,9 @@ class ExecutorServer(object): elif job.name == 'merger:merge': self.log.debug("Got merge job: %s" % job.unique) self.merge(job) + elif job.name == 'merger:refstate': + self.log.debug("Got refstate job: %s" % job.unique) + self.refstate(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -800,6 +804,14 @@ class ExecutorServer(object): files=files) job.sendWorkComplete(json.dumps(result)) + def refstate(self, job): + args = json.loads(job.arguments) + with self.merger_lock: + success, repo_state = self.merger.getRepoState(args['items']) + result = dict(updated=success, + repo_state=repo_state) + job.sendWorkComplete(json.dumps(result)) + def merge(self, job): args = json.loads(job.arguments) with self.merger_lock: @@ -954,6 +966,10 @@ class AnsibleJob(object): # a work complete result, don't run any jobs return + state_items = [i for i in args['items'] if not i.get('number')] + if state_items: + merger.setRepoState(state_items, args['repo_state']) + for project in args['projects']: repo = repos[project['canonical_name']] # If this project is the Zuul project and this is a ref diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index dfb3238a2b..8282f86a44 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -13,6 +13,7 @@ import logging from zuul import exceptions +from zuul import model class DynamicChangeQueueContextManager(object): @@ -483,20 +484,18 @@ class PipelineManager(object): def scheduleMerge(self, item, files=None, dirs=None): build_set = item.current_build_set - if not hasattr(item.change, 'branch'): - self.log.debug("Change %s does not have an associated branch, " - "not scheduling a merge job for item %s" % - (item.change, item)) - build_set.merge_state = build_set.COMPLETE - return True - self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" % (item, files, dirs)) build_set = item.current_build_set build_set.merge_state = build_set.PENDING - self.sched.merger.mergeChanges(build_set.merger_items, - item.current_build_set, files, dirs, - precedence=self.pipeline.precedence) + if isinstance(item.change, model.Change): + self.sched.merger.mergeChanges(build_set.merger_items, + item.current_build_set, files, dirs, + precedence=self.pipeline.precedence) + else: + self.sched.merger.getRepoState(build_set.merger_items, + item.current_build_set, + precedence=self.pipeline.precedence) return False def prepareItem(self, item): @@ -675,12 +674,13 @@ class PipelineManager(object): build_set = event.build_set item = build_set.item build_set.merge_state = build_set.COMPLETE + build_set.repo_state = event.repo_state if event.merged: build_set.commit = event.commit build_set.files.setFiles(event.files) - build_set.repo_state = event.repo_state elif event.updated: - build_set.commit = item.change.newrev + build_set.commit = (item.change.newrev or + '0000000000000000000000000000000000000000') if not build_set.commit: self.log.info("Unable to merge change %s" % item.change) item.setUnableToMerge() diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index 06c9a01a1a..7b0a9f53c3 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -44,6 +44,9 @@ class IndependentPipelineManager(PipelineManager): if hasattr(change, 'number'): history = history or [] history.append(change.number) + else: + # Don't enqueue dependencies ahead of a non-change ref. + return True ret = self.checkForChangesNeededBy(change, change_queue) if ret in [True, False]: diff --git a/zuul/merger/client.py b/zuul/merger/client.py index dd9c8d5518..5191a44f9c 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -116,6 +116,11 @@ class MergeClient(object): repo_state=repo_state) self.submitJob('merger:merge', data, build_set, precedence) + def getRepoState(self, items, build_set, + precedence=zuul.model.PRECEDENCE_NORMAL): + data = dict(items=items) + self.submitJob('merger:refstate', data, build_set, precedence) + def getFiles(self, connection_name, project_name, branch, files, dirs=[], precedence=zuul.model.PRECEDENCE_HIGH): data = dict(connection=connection_name, diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index c5d1f2ad55..ed98696ec6 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -20,6 +20,8 @@ import logging import zuul.model +NULL_REF = '0000000000000000000000000000000000000000' + def reset_repo_to_head(repo): # This lets us reset the repo even if there is a file in the root @@ -178,8 +180,13 @@ class Repo(object): self.setRef(path, hexsha, repo) unseen.discard(path) for path in unseen: - self.log.debug("Delete reference %s", path) - git.refs.SymbolicReference.delete(repo, ref.path) + self.deleteRef(path, repo) + + def deleteRef(self, path, repo=None): + if repo is None: + repo = self.createRepoObject() + self.log.debug("Delete reference %s", path) + git.refs.SymbolicReference.delete(repo, path) def checkout(self, ref): repo = self.createRepoObject() @@ -369,6 +376,16 @@ class Merger(object): recent[key] = ref.object project[ref.path] = ref.object.hexsha + def _alterRepoState(self, connection_name, project_name, + repo_state, path, hexsha): + projects = repo_state.setdefault(connection_name, {}) + project = projects.setdefault(project_name, {}) + if hexsha == NULL_REF: + if path in project: + del project[path] + else: + project[path] = hexsha + def _restoreRepoState(self, connection_name, project_name, repo, repo_state): projects = repo_state.get(connection_name, {}) @@ -470,12 +487,8 @@ class Merger(object): if repo_state is None: repo_state = {} for item in items: - if item.get("number") and item.get("patchset"): - self.log.debug("Merging for change %s,%s." % - (item["number"], item["patchset"])) - elif item.get("newrev") and item.get("oldrev"): - self.log.debug("Merging for rev %s with oldrev %s." % - (item["newrev"], item["oldrev"])) + self.log.debug("Merging for change %s,%s" % + (item["number"], item["patchset"])) commit = self._mergeItem(item, recent, repo_state) if not commit: return None @@ -492,6 +505,49 @@ class Merger(object): ret_recent[k] = v.hexsha return commit.hexsha, read_files, repo_state, ret_recent + def setRepoState(self, items, repo_state): + # Sets the repo state for the items + seen = set() + for item in items: + repo = self.getRepo(item['connection'], item['project']) + key = (item['connection'], item['project'], item['branch']) + + if key in seen: + continue + + repo.reset() + self._restoreRepoState(item['connection'], item['project'], repo, + repo_state) + + def getRepoState(self, items): + # Gets the repo state for items. Generally this will be + # called in any non-change pipeline. We will return the repo + # state for each item, but manipulated with any information in + # the item (eg, if it creates a ref, that will be in the repo + # state regardless of the actual state). + seen = set() + recent = {} + repo_state = {} + for item in items: + repo = self.getRepo(item['connection'], item['project']) + key = (item['connection'], item['project'], item['branch']) + if key not in seen: + try: + repo.reset() + except Exception: + self.log.exception("Unable to reset repo %s" % repo) + return (False, {}) + + self._saveRepoState(item['connection'], item['project'], repo, + repo_state, recent) + + if item.get('newrev'): + # This is a ref update rather than a branch tip, so make sure + # our returned state includes this change. + self._alterRepoState(item['connection'], item['project'], + repo_state, item['ref'], item['newrev']) + return (True, repo_state) + def getFiles(self, connection_name, project_name, branch, files, dirs=[]): repo = self.getRepo(connection_name, project_name) return repo.getFiles(files, dirs, branch=branch) diff --git a/zuul/merger/server.py b/zuul/merger/server.py index c342e1ac83..fc599c1177 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -58,6 +58,7 @@ class MergeServer(object): def register(self): self.worker.registerFunction("merger:merge") self.worker.registerFunction("merger:cat") + self.worker.registerFunction("merger:refstate") def stop(self): self.log.debug("Stopping") @@ -80,6 +81,9 @@ class MergeServer(object): elif job.name == 'merger:cat': self.log.debug("Got cat job: %s" % job.unique) self.cat(job) + elif job.name == 'merger:refstate': + self.log.debug("Got refstate job: %s" % job.unique) + self.refstate(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -104,6 +108,14 @@ class MergeServer(object): recent) = ret job.sendWorkComplete(json.dumps(result)) + def refstate(self, job): + args = json.loads(job.arguments) + + success, repo_state = self.merger.getItemRepoState(args['items']) + result = dict(updated=success, + repo_state=repo_state) + job.sendWorkComplete(json.dumps(result)) + def cat(self, job): args = json.loads(job.arguments) self.merger.updateRepo(args['connection'], args['project'])