Ensure ref-updated jobs run with their ref
We were incorrectly preparing the current state of the repo for ref updated (eg, post) jobs. This ensures that we run with the actual supplied ref, even if the remote has moved on since then. Change-Id: I52f05406246e6e39805fd8365412f3cb77fe3a0a
This commit is contained in:
parent
7a27545180
commit
289f5930fa
|
@ -139,7 +139,8 @@ class FakeGerritChange(object):
|
|||
'Verified': ('Verified', -2, 2)}
|
||||
|
||||
def __init__(self, gerrit, number, project, branch, subject,
|
||||
status='NEW', upstream_root=None, files={}):
|
||||
status='NEW', upstream_root=None, files={},
|
||||
parent=None):
|
||||
self.gerrit = gerrit
|
||||
self.source = gerrit
|
||||
self.reported = 0
|
||||
|
@ -174,16 +175,18 @@ class FakeGerritChange(object):
|
|||
'url': 'https://hostname/%s' % number}
|
||||
|
||||
self.upstream_root = upstream_root
|
||||
self.addPatchset(files=files)
|
||||
self.addPatchset(files=files, parent=parent)
|
||||
self.data['submitRecords'] = self.getSubmitRecords()
|
||||
self.open = status == 'NEW'
|
||||
|
||||
def addFakeChangeToRepo(self, msg, files, large):
|
||||
def addFakeChangeToRepo(self, msg, files, large, parent):
|
||||
path = os.path.join(self.upstream_root, self.project)
|
||||
repo = git.Repo(path)
|
||||
if parent is None:
|
||||
parent = 'refs/tags/init'
|
||||
ref = GerritChangeReference.create(
|
||||
repo, '1/%s/%s' % (self.number, self.latest_patchset),
|
||||
'refs/tags/init')
|
||||
parent)
|
||||
repo.head.reference = ref
|
||||
zuul.merger.merger.reset_repo_to_head(repo)
|
||||
repo.git.clean('-x', '-f', '-d')
|
||||
|
@ -211,7 +214,7 @@ class FakeGerritChange(object):
|
|||
repo.heads['master'].checkout()
|
||||
return r
|
||||
|
||||
def addPatchset(self, files=None, large=False):
|
||||
def addPatchset(self, files=None, large=False, parent=None):
|
||||
self.latest_patchset += 1
|
||||
if not files:
|
||||
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
|
||||
|
@ -219,7 +222,7 @@ class FakeGerritChange(object):
|
|||
(self.branch, self.number, self.latest_patchset))
|
||||
files = {fn: data}
|
||||
msg = self.subject + '-' + str(self.latest_patchset)
|
||||
c = self.addFakeChangeToRepo(msg, files, large)
|
||||
c = self.addFakeChangeToRepo(msg, files, large, parent)
|
||||
ps_files = [{'file': '/COMMIT_MSG',
|
||||
'type': 'ADDED'},
|
||||
{'file': 'README',
|
||||
|
@ -469,12 +472,12 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
|
|||
self.upstream_root = upstream_root
|
||||
|
||||
def addFakeChange(self, project, branch, subject, status='NEW',
|
||||
files=None):
|
||||
files=None, parent=None):
|
||||
"""Add a change to the fake Gerrit."""
|
||||
self.change_number += 1
|
||||
c = FakeGerritChange(self, self.change_number, project, branch,
|
||||
subject, upstream_root=self.upstream_root,
|
||||
status=status, files=files)
|
||||
status=status, files=files, parent=parent)
|
||||
self.changes[self.change_number] = c
|
||||
return c
|
||||
|
||||
|
@ -863,6 +866,13 @@ class FakeGithubPullRequest(object):
|
|||
}
|
||||
return (name, data)
|
||||
|
||||
def setMerged(self, commit_message):
|
||||
self.is_merged = True
|
||||
self.merge_message = commit_message
|
||||
|
||||
repo = self._getRepo()
|
||||
repo.heads[self.branch].commit = repo.commit(self.head_sha)
|
||||
|
||||
|
||||
class FakeGithubConnection(githubconnection.GithubConnection):
|
||||
log = logging.getLogger("zuul.test.FakeGithubConnection")
|
||||
|
@ -1011,8 +1021,7 @@ class FakeGithubConnection(githubconnection.GithubConnection):
|
|||
self.merge_not_allowed_count -= 1
|
||||
raise MergeFailure('Merge was not successful due to mergeability'
|
||||
' conflict')
|
||||
pull_request.is_merged = True
|
||||
pull_request.merge_message = commit_message
|
||||
pull_request.setMerged(commit_message)
|
||||
|
||||
def getCommitStatuses(self, project, sha):
|
||||
return self.statuses.get(project, {}).get(sha, [])
|
||||
|
|
|
@ -338,19 +338,31 @@ class TestExecutorRepos(ZuulTestCase):
|
|||
p1 = "review.example.com/org/project1"
|
||||
p2 = "review.example.com/org/project2"
|
||||
projects = [p1, p2]
|
||||
upstream = self.getUpstreamRepos(projects)
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
event = A.getRefUpdatedEvent()
|
||||
A.setMerged()
|
||||
A_commit = str(upstream[p1].commit('master'))
|
||||
self.log.debug("A commit: %s" % A_commit)
|
||||
|
||||
# Add another commit to the repo that merged right after this
|
||||
# one to make sure that our post job runs with the one that we
|
||||
# intended rather than simply the current repo state.
|
||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B',
|
||||
parent='refs/changes/1/1/1')
|
||||
B.setMerged()
|
||||
B_commit = str(upstream[p1].commit('master'))
|
||||
self.log.debug("B commit: %s" % B_commit)
|
||||
|
||||
self.fake_gerrit.addEvent(event)
|
||||
self.waitUntilSettled()
|
||||
|
||||
upstream = self.getUpstreamRepos(projects)
|
||||
states = [
|
||||
{p1: dict(commit=str(upstream[p1].commit('master')),
|
||||
present=[A], branch='master'),
|
||||
{p1: dict(commit=A_commit,
|
||||
present=[A], absent=[B], branch='master'),
|
||||
p2: dict(commit=str(upstream[p2].commit('master')),
|
||||
absent=[A], branch='master'),
|
||||
absent=[A, B], branch='master'),
|
||||
},
|
||||
]
|
||||
|
||||
|
|
|
@ -125,16 +125,20 @@ class TestGithubDriver(ZuulTestCase):
|
|||
def test_push_event(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
old_sha = random_sha1()
|
||||
new_sha = random_sha1()
|
||||
self.fake_github.emitEvent(
|
||||
self.fake_github.getPushEvent('org/project', 'refs/heads/master',
|
||||
old_sha, new_sha))
|
||||
A = self.fake_github.openFakePullRequest('org/project', 'master', 'A')
|
||||
old_sha = '0' * 40
|
||||
new_sha = A.head_sha
|
||||
A.setMerged("merging A")
|
||||
pevent = self.fake_github.getPushEvent(project='org/project',
|
||||
ref='refs/heads/master',
|
||||
old_rev=old_sha,
|
||||
new_rev=new_sha)
|
||||
self.fake_github.emitEvent(pevent)
|
||||
self.waitUntilSettled()
|
||||
|
||||
build_params = self.builds[0].parameters
|
||||
self.assertEqual('refs/heads/master', build_params['zuul']['ref'])
|
||||
self.assertEqual(old_sha, build_params['zuul']['oldrev'])
|
||||
self.assertFalse('oldrev' in build_params['zuul'])
|
||||
self.assertEqual(new_sha, build_params['zuul']['newrev'])
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
|
@ -366,9 +370,15 @@ class TestGithubDriver(ZuulTestCase):
|
|||
project = 'org/project2'
|
||||
# pipeline reports pull status both on start and success
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
pevent = self.fake_github.getPushEvent(project=project,
|
||||
ref='refs/heads/master')
|
||||
|
||||
A = self.fake_github.openFakePullRequest(project, 'master', 'A')
|
||||
old_sha = '0' * 40
|
||||
new_sha = A.head_sha
|
||||
A.setMerged("merging A")
|
||||
pevent = self.fake_github.getPushEvent(project=project,
|
||||
ref='refs/heads/master',
|
||||
old_rev=old_sha,
|
||||
new_rev=new_sha)
|
||||
self.fake_github.emitEvent(pevent)
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
|
|
@ -25,12 +25,13 @@ class TestPushRequirements(ZuulTestCase):
|
|||
def test_push_requirements(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Create a github change, add a change and emit a push event
|
||||
A = self.fake_github.openFakePullRequest('org/project1', 'master', 'A')
|
||||
old_sha = A.head_sha
|
||||
new_sha = A.head_sha
|
||||
A.setMerged("merging A")
|
||||
pevent = self.fake_github.getPushEvent(project='org/project1',
|
||||
ref='refs/heads/master',
|
||||
old_rev=old_sha)
|
||||
new_rev=new_sha)
|
||||
|
||||
self.fake_github.emitEvent(pevent)
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
@ -43,7 +44,7 @@ class TestPushRequirements(ZuulTestCase):
|
|||
# Make a gerrit change, and emit a ref-updated event
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
self.fake_gerrit.addEvent(B.getRefUpdatedEvent())
|
||||
|
||||
B.setMerged()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# All but one pipeline should be skipped, increasing builds by 1
|
||||
|
|
|
@ -1103,6 +1103,12 @@ class TestScheduler(ZuulTestCase):
|
|||
|
||||
def test_post(self):
|
||||
"Test that post jobs run"
|
||||
p = "review.example.com/org/project"
|
||||
upstream = self.getUpstreamRepos([p])
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.setMerged()
|
||||
A_commit = str(upstream[p].commit('master'))
|
||||
self.log.debug("A commit: %s" % A_commit)
|
||||
|
||||
e = {
|
||||
"type": "ref-updated",
|
||||
|
@ -1111,7 +1117,7 @@ class TestScheduler(ZuulTestCase):
|
|||
},
|
||||
"refUpdate": {
|
||||
"oldRev": "90f173846e3af9154517b88543ffbd1691f31366",
|
||||
"newRev": "d479a0bfcb34da57a31adb2a595c0cf687812543",
|
||||
"newRev": A_commit,
|
||||
"refName": "master",
|
||||
"project": "org/project",
|
||||
}
|
||||
|
@ -1156,7 +1162,7 @@ class TestScheduler(ZuulTestCase):
|
|||
"refUpdate": {
|
||||
"oldRev": "90f173846e3af9154517b88543ffbd1691f31366",
|
||||
"newRev": "0000000000000000000000000000000000000000",
|
||||
"refName": "master",
|
||||
"refName": "testbranch",
|
||||
"project": "org/project",
|
||||
}
|
||||
}
|
||||
|
@ -3054,6 +3060,12 @@ class TestScheduler(ZuulTestCase):
|
|||
|
||||
def test_client_enqueue_ref(self):
|
||||
"Test that the RPC client can enqueue a ref"
|
||||
p = "review.example.com/org/project"
|
||||
upstream = self.getUpstreamRepos([p])
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.setMerged()
|
||||
A_commit = str(upstream[p].commit('master'))
|
||||
self.log.debug("A commit: %s" % A_commit)
|
||||
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
self.gearman_server.port)
|
||||
|
@ -3065,7 +3077,7 @@ class TestScheduler(ZuulTestCase):
|
|||
trigger='gerrit',
|
||||
ref='master',
|
||||
oldrev='90f173846e3af9154517b88543ffbd1691f31366',
|
||||
newrev='d479a0bfcb34da57a31adb2a595c0cf687812543')
|
||||
newrev=A_commit)
|
||||
self.waitUntilSettled()
|
||||
job_names = [x.name for x in self.history]
|
||||
self.assertEqual(len(self.history), 1)
|
||||
|
|
|
@ -625,6 +625,7 @@ class ExecutorServer(object):
|
|||
self.hostname)
|
||||
self.merger_worker.registerFunction("merger:merge")
|
||||
self.merger_worker.registerFunction("merger:cat")
|
||||
self.merger_worker.registerFunction("merger:refstate")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -721,6 +722,9 @@ class ExecutorServer(object):
|
|||
elif job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
|
@ -800,6 +804,14 @@ class ExecutorServer(object):
|
|||
files=files)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
success, repo_state = self.merger.getRepoState(args['items'])
|
||||
result = dict(updated=success,
|
||||
repo_state=repo_state)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
|
@ -954,6 +966,10 @@ class AnsibleJob(object):
|
|||
# a work complete result, don't run any jobs
|
||||
return
|
||||
|
||||
state_items = [i for i in args['items'] if not i.get('number')]
|
||||
if state_items:
|
||||
merger.setRepoState(state_items, args['repo_state'])
|
||||
|
||||
for project in args['projects']:
|
||||
repo = repos[project['canonical_name']]
|
||||
# If this project is the Zuul project and this is a ref
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
import logging
|
||||
|
||||
from zuul import exceptions
|
||||
from zuul import model
|
||||
|
||||
|
||||
class DynamicChangeQueueContextManager(object):
|
||||
|
@ -483,20 +484,18 @@ class PipelineManager(object):
|
|||
def scheduleMerge(self, item, files=None, dirs=None):
|
||||
build_set = item.current_build_set
|
||||
|
||||
if not hasattr(item.change, 'branch'):
|
||||
self.log.debug("Change %s does not have an associated branch, "
|
||||
"not scheduling a merge job for item %s" %
|
||||
(item.change, item))
|
||||
build_set.merge_state = build_set.COMPLETE
|
||||
return True
|
||||
|
||||
self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
|
||||
(item, files, dirs))
|
||||
build_set = item.current_build_set
|
||||
build_set.merge_state = build_set.PENDING
|
||||
self.sched.merger.mergeChanges(build_set.merger_items,
|
||||
item.current_build_set, files, dirs,
|
||||
precedence=self.pipeline.precedence)
|
||||
if isinstance(item.change, model.Change):
|
||||
self.sched.merger.mergeChanges(build_set.merger_items,
|
||||
item.current_build_set, files, dirs,
|
||||
precedence=self.pipeline.precedence)
|
||||
else:
|
||||
self.sched.merger.getRepoState(build_set.merger_items,
|
||||
item.current_build_set,
|
||||
precedence=self.pipeline.precedence)
|
||||
return False
|
||||
|
||||
def prepareItem(self, item):
|
||||
|
@ -675,12 +674,13 @@ class PipelineManager(object):
|
|||
build_set = event.build_set
|
||||
item = build_set.item
|
||||
build_set.merge_state = build_set.COMPLETE
|
||||
build_set.repo_state = event.repo_state
|
||||
if event.merged:
|
||||
build_set.commit = event.commit
|
||||
build_set.files.setFiles(event.files)
|
||||
build_set.repo_state = event.repo_state
|
||||
elif event.updated:
|
||||
build_set.commit = item.change.newrev
|
||||
build_set.commit = (item.change.newrev or
|
||||
'0000000000000000000000000000000000000000')
|
||||
if not build_set.commit:
|
||||
self.log.info("Unable to merge change %s" % item.change)
|
||||
item.setUnableToMerge()
|
||||
|
|
|
@ -44,6 +44,9 @@ class IndependentPipelineManager(PipelineManager):
|
|||
if hasattr(change, 'number'):
|
||||
history = history or []
|
||||
history.append(change.number)
|
||||
else:
|
||||
# Don't enqueue dependencies ahead of a non-change ref.
|
||||
return True
|
||||
|
||||
ret = self.checkForChangesNeededBy(change, change_queue)
|
||||
if ret in [True, False]:
|
||||
|
|
|
@ -116,6 +116,11 @@ class MergeClient(object):
|
|||
repo_state=repo_state)
|
||||
self.submitJob('merger:merge', data, build_set, precedence)
|
||||
|
||||
def getRepoState(self, items, build_set,
|
||||
precedence=zuul.model.PRECEDENCE_NORMAL):
|
||||
data = dict(items=items)
|
||||
self.submitJob('merger:refstate', data, build_set, precedence)
|
||||
|
||||
def getFiles(self, connection_name, project_name, branch, files, dirs=[],
|
||||
precedence=zuul.model.PRECEDENCE_HIGH):
|
||||
data = dict(connection=connection_name,
|
||||
|
|
|
@ -20,6 +20,8 @@ import logging
|
|||
|
||||
import zuul.model
|
||||
|
||||
NULL_REF = '0000000000000000000000000000000000000000'
|
||||
|
||||
|
||||
def reset_repo_to_head(repo):
|
||||
# This lets us reset the repo even if there is a file in the root
|
||||
|
@ -178,8 +180,13 @@ class Repo(object):
|
|||
self.setRef(path, hexsha, repo)
|
||||
unseen.discard(path)
|
||||
for path in unseen:
|
||||
self.log.debug("Delete reference %s", path)
|
||||
git.refs.SymbolicReference.delete(repo, ref.path)
|
||||
self.deleteRef(path, repo)
|
||||
|
||||
def deleteRef(self, path, repo=None):
|
||||
if repo is None:
|
||||
repo = self.createRepoObject()
|
||||
self.log.debug("Delete reference %s", path)
|
||||
git.refs.SymbolicReference.delete(repo, path)
|
||||
|
||||
def checkout(self, ref):
|
||||
repo = self.createRepoObject()
|
||||
|
@ -369,6 +376,16 @@ class Merger(object):
|
|||
recent[key] = ref.object
|
||||
project[ref.path] = ref.object.hexsha
|
||||
|
||||
def _alterRepoState(self, connection_name, project_name,
|
||||
repo_state, path, hexsha):
|
||||
projects = repo_state.setdefault(connection_name, {})
|
||||
project = projects.setdefault(project_name, {})
|
||||
if hexsha == NULL_REF:
|
||||
if path in project:
|
||||
del project[path]
|
||||
else:
|
||||
project[path] = hexsha
|
||||
|
||||
def _restoreRepoState(self, connection_name, project_name, repo,
|
||||
repo_state):
|
||||
projects = repo_state.get(connection_name, {})
|
||||
|
@ -470,12 +487,8 @@ class Merger(object):
|
|||
if repo_state is None:
|
||||
repo_state = {}
|
||||
for item in items:
|
||||
if item.get("number") and item.get("patchset"):
|
||||
self.log.debug("Merging for change %s,%s." %
|
||||
(item["number"], item["patchset"]))
|
||||
elif item.get("newrev") and item.get("oldrev"):
|
||||
self.log.debug("Merging for rev %s with oldrev %s." %
|
||||
(item["newrev"], item["oldrev"]))
|
||||
self.log.debug("Merging for change %s,%s" %
|
||||
(item["number"], item["patchset"]))
|
||||
commit = self._mergeItem(item, recent, repo_state)
|
||||
if not commit:
|
||||
return None
|
||||
|
@ -492,6 +505,49 @@ class Merger(object):
|
|||
ret_recent[k] = v.hexsha
|
||||
return commit.hexsha, read_files, repo_state, ret_recent
|
||||
|
||||
def setRepoState(self, items, repo_state):
|
||||
# Sets the repo state for the items
|
||||
seen = set()
|
||||
for item in items:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
key = (item['connection'], item['project'], item['branch'])
|
||||
|
||||
if key in seen:
|
||||
continue
|
||||
|
||||
repo.reset()
|
||||
self._restoreRepoState(item['connection'], item['project'], repo,
|
||||
repo_state)
|
||||
|
||||
def getRepoState(self, items):
|
||||
# Gets the repo state for items. Generally this will be
|
||||
# called in any non-change pipeline. We will return the repo
|
||||
# state for each item, but manipulated with any information in
|
||||
# the item (eg, if it creates a ref, that will be in the repo
|
||||
# state regardless of the actual state).
|
||||
seen = set()
|
||||
recent = {}
|
||||
repo_state = {}
|
||||
for item in items:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
key = (item['connection'], item['project'], item['branch'])
|
||||
if key not in seen:
|
||||
try:
|
||||
repo.reset()
|
||||
except Exception:
|
||||
self.log.exception("Unable to reset repo %s" % repo)
|
||||
return (False, {})
|
||||
|
||||
self._saveRepoState(item['connection'], item['project'], repo,
|
||||
repo_state, recent)
|
||||
|
||||
if item.get('newrev'):
|
||||
# This is a ref update rather than a branch tip, so make sure
|
||||
# our returned state includes this change.
|
||||
self._alterRepoState(item['connection'], item['project'],
|
||||
repo_state, item['ref'], item['newrev'])
|
||||
return (True, repo_state)
|
||||
|
||||
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):
|
||||
repo = self.getRepo(connection_name, project_name)
|
||||
return repo.getFiles(files, dirs, branch=branch)
|
||||
|
|
|
@ -58,6 +58,7 @@ class MergeServer(object):
|
|||
def register(self):
|
||||
self.worker.registerFunction("merger:merge")
|
||||
self.worker.registerFunction("merger:cat")
|
||||
self.worker.registerFunction("merger:refstate")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -80,6 +81,9 @@ class MergeServer(object):
|
|||
elif job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
|
@ -104,6 +108,14 @@ class MergeServer(object):
|
|||
recent) = ret
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
|
||||
success, repo_state = self.merger.getItemRepoState(args['items'])
|
||||
result = dict(updated=success,
|
||||
repo_state=repo_state)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def cat(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
self.merger.updateRepo(args['connection'], args['project'])
|
||||
|
|
Loading…
Reference in New Issue