Merge "Ensure ref-updated jobs run with their ref" into feature/zuulv3
This commit is contained in:
commit
b1439a554f
|
@ -139,7 +139,8 @@ class FakeGerritChange(object):
|
|||
'Verified': ('Verified', -2, 2)}
|
||||
|
||||
def __init__(self, gerrit, number, project, branch, subject,
|
||||
status='NEW', upstream_root=None, files={}):
|
||||
status='NEW', upstream_root=None, files={},
|
||||
parent=None):
|
||||
self.gerrit = gerrit
|
||||
self.source = gerrit
|
||||
self.reported = 0
|
||||
|
@ -174,16 +175,18 @@ class FakeGerritChange(object):
|
|||
'url': 'https://hostname/%s' % number}
|
||||
|
||||
self.upstream_root = upstream_root
|
||||
self.addPatchset(files=files)
|
||||
self.addPatchset(files=files, parent=parent)
|
||||
self.data['submitRecords'] = self.getSubmitRecords()
|
||||
self.open = status == 'NEW'
|
||||
|
||||
def addFakeChangeToRepo(self, msg, files, large):
|
||||
def addFakeChangeToRepo(self, msg, files, large, parent):
|
||||
path = os.path.join(self.upstream_root, self.project)
|
||||
repo = git.Repo(path)
|
||||
if parent is None:
|
||||
parent = 'refs/tags/init'
|
||||
ref = GerritChangeReference.create(
|
||||
repo, '1/%s/%s' % (self.number, self.latest_patchset),
|
||||
'refs/tags/init')
|
||||
parent)
|
||||
repo.head.reference = ref
|
||||
zuul.merger.merger.reset_repo_to_head(repo)
|
||||
repo.git.clean('-x', '-f', '-d')
|
||||
|
@ -211,7 +214,7 @@ class FakeGerritChange(object):
|
|||
repo.heads['master'].checkout()
|
||||
return r
|
||||
|
||||
def addPatchset(self, files=None, large=False):
|
||||
def addPatchset(self, files=None, large=False, parent=None):
|
||||
self.latest_patchset += 1
|
||||
if not files:
|
||||
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
|
||||
|
@ -219,7 +222,7 @@ class FakeGerritChange(object):
|
|||
(self.branch, self.number, self.latest_patchset))
|
||||
files = {fn: data}
|
||||
msg = self.subject + '-' + str(self.latest_patchset)
|
||||
c = self.addFakeChangeToRepo(msg, files, large)
|
||||
c = self.addFakeChangeToRepo(msg, files, large, parent)
|
||||
ps_files = [{'file': '/COMMIT_MSG',
|
||||
'type': 'ADDED'},
|
||||
{'file': 'README',
|
||||
|
@ -469,12 +472,12 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
|
|||
self.upstream_root = upstream_root
|
||||
|
||||
def addFakeChange(self, project, branch, subject, status='NEW',
|
||||
files=None):
|
||||
files=None, parent=None):
|
||||
"""Add a change to the fake Gerrit."""
|
||||
self.change_number += 1
|
||||
c = FakeGerritChange(self, self.change_number, project, branch,
|
||||
subject, upstream_root=self.upstream_root,
|
||||
status=status, files=files)
|
||||
status=status, files=files, parent=parent)
|
||||
self.changes[self.change_number] = c
|
||||
return c
|
||||
|
||||
|
@ -955,6 +958,13 @@ class FakeGithubPullRequest(object):
|
|||
}
|
||||
return (name, data)
|
||||
|
||||
def setMerged(self, commit_message):
|
||||
self.is_merged = True
|
||||
self.merge_message = commit_message
|
||||
|
||||
repo = self._getRepo()
|
||||
repo.heads[self.branch].commit = repo.commit(self.head_sha)
|
||||
|
||||
|
||||
class FakeGithubConnection(githubconnection.GithubConnection):
|
||||
log = logging.getLogger("zuul.test.FakeGithubConnection")
|
||||
|
@ -1102,8 +1112,7 @@ class FakeGithubConnection(githubconnection.GithubConnection):
|
|||
self.merge_not_allowed_count -= 1
|
||||
raise MergeFailure('Merge was not successful due to mergeability'
|
||||
' conflict')
|
||||
pull_request.is_merged = True
|
||||
pull_request.merge_message = commit_message
|
||||
pull_request.setMerged(commit_message)
|
||||
|
||||
def setCommitStatus(self, project, sha, state, url='', description='',
|
||||
context='default', user='zuul'):
|
||||
|
|
|
@ -348,19 +348,31 @@ class TestExecutorRepos(ZuulTestCase):
|
|||
p1 = "review.example.com/org/project1"
|
||||
p2 = "review.example.com/org/project2"
|
||||
projects = [p1, p2]
|
||||
upstream = self.getUpstreamRepos(projects)
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
event = A.getRefUpdatedEvent()
|
||||
A.setMerged()
|
||||
A_commit = str(upstream[p1].commit('master'))
|
||||
self.log.debug("A commit: %s" % A_commit)
|
||||
|
||||
# Add another commit to the repo that merged right after this
|
||||
# one to make sure that our post job runs with the one that we
|
||||
# intended rather than simply the current repo state.
|
||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B',
|
||||
parent='refs/changes/1/1/1')
|
||||
B.setMerged()
|
||||
B_commit = str(upstream[p1].commit('master'))
|
||||
self.log.debug("B commit: %s" % B_commit)
|
||||
|
||||
self.fake_gerrit.addEvent(event)
|
||||
self.waitUntilSettled()
|
||||
|
||||
upstream = self.getUpstreamRepos(projects)
|
||||
states = [
|
||||
{p1: dict(commit=str(upstream[p1].commit('master')),
|
||||
present=[A], branch='master'),
|
||||
{p1: dict(commit=A_commit,
|
||||
present=[A], absent=[B], branch='master'),
|
||||
p2: dict(commit=str(upstream[p2].commit('master')),
|
||||
absent=[A], branch='master'),
|
||||
absent=[A, B], branch='master'),
|
||||
},
|
||||
]
|
||||
|
||||
|
|
|
@ -125,16 +125,20 @@ class TestGithubDriver(ZuulTestCase):
|
|||
def test_push_event(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
old_sha = random_sha1()
|
||||
new_sha = random_sha1()
|
||||
self.fake_github.emitEvent(
|
||||
self.fake_github.getPushEvent('org/project', 'refs/heads/master',
|
||||
old_sha, new_sha))
|
||||
A = self.fake_github.openFakePullRequest('org/project', 'master', 'A')
|
||||
old_sha = '0' * 40
|
||||
new_sha = A.head_sha
|
||||
A.setMerged("merging A")
|
||||
pevent = self.fake_github.getPushEvent(project='org/project',
|
||||
ref='refs/heads/master',
|
||||
old_rev=old_sha,
|
||||
new_rev=new_sha)
|
||||
self.fake_github.emitEvent(pevent)
|
||||
self.waitUntilSettled()
|
||||
|
||||
build_params = self.builds[0].parameters
|
||||
self.assertEqual('refs/heads/master', build_params['zuul']['ref'])
|
||||
self.assertEqual(old_sha, build_params['zuul']['oldrev'])
|
||||
self.assertFalse('oldrev' in build_params['zuul'])
|
||||
self.assertEqual(new_sha, build_params['zuul']['newrev'])
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
|
@ -371,9 +375,15 @@ class TestGithubDriver(ZuulTestCase):
|
|||
project = 'org/project2'
|
||||
# pipeline reports pull status both on start and success
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
pevent = self.fake_github.getPushEvent(project=project,
|
||||
ref='refs/heads/master')
|
||||
|
||||
A = self.fake_github.openFakePullRequest(project, 'master', 'A')
|
||||
old_sha = '0' * 40
|
||||
new_sha = A.head_sha
|
||||
A.setMerged("merging A")
|
||||
pevent = self.fake_github.getPushEvent(project=project,
|
||||
ref='refs/heads/master',
|
||||
old_rev=old_sha,
|
||||
new_rev=new_sha)
|
||||
self.fake_github.emitEvent(pevent)
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
|
|
@ -25,12 +25,13 @@ class TestPushRequirements(ZuulTestCase):
|
|||
def test_push_requirements(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Create a github change, add a change and emit a push event
|
||||
A = self.fake_github.openFakePullRequest('org/project1', 'master', 'A')
|
||||
old_sha = A.head_sha
|
||||
new_sha = A.head_sha
|
||||
A.setMerged("merging A")
|
||||
pevent = self.fake_github.getPushEvent(project='org/project1',
|
||||
ref='refs/heads/master',
|
||||
old_rev=old_sha)
|
||||
new_rev=new_sha)
|
||||
|
||||
self.fake_github.emitEvent(pevent)
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
@ -43,7 +44,7 @@ class TestPushRequirements(ZuulTestCase):
|
|||
# Make a gerrit change, and emit a ref-updated event
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
|
||||
self.fake_gerrit.addEvent(B.getRefUpdatedEvent())
|
||||
|
||||
B.setMerged()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# All but one pipeline should be skipped, increasing builds by 1
|
||||
|
|
|
@ -1103,6 +1103,12 @@ class TestScheduler(ZuulTestCase):
|
|||
|
||||
def test_post(self):
|
||||
"Test that post jobs run"
|
||||
p = "review.example.com/org/project"
|
||||
upstream = self.getUpstreamRepos([p])
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.setMerged()
|
||||
A_commit = str(upstream[p].commit('master'))
|
||||
self.log.debug("A commit: %s" % A_commit)
|
||||
|
||||
e = {
|
||||
"type": "ref-updated",
|
||||
|
@ -1111,7 +1117,7 @@ class TestScheduler(ZuulTestCase):
|
|||
},
|
||||
"refUpdate": {
|
||||
"oldRev": "90f173846e3af9154517b88543ffbd1691f31366",
|
||||
"newRev": "d479a0bfcb34da57a31adb2a595c0cf687812543",
|
||||
"newRev": A_commit,
|
||||
"refName": "master",
|
||||
"project": "org/project",
|
||||
}
|
||||
|
@ -1156,7 +1162,7 @@ class TestScheduler(ZuulTestCase):
|
|||
"refUpdate": {
|
||||
"oldRev": "90f173846e3af9154517b88543ffbd1691f31366",
|
||||
"newRev": "0000000000000000000000000000000000000000",
|
||||
"refName": "master",
|
||||
"refName": "testbranch",
|
||||
"project": "org/project",
|
||||
}
|
||||
}
|
||||
|
@ -3080,6 +3086,12 @@ class TestScheduler(ZuulTestCase):
|
|||
|
||||
def test_client_enqueue_ref(self):
|
||||
"Test that the RPC client can enqueue a ref"
|
||||
p = "review.example.com/org/project"
|
||||
upstream = self.getUpstreamRepos([p])
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.setMerged()
|
||||
A_commit = str(upstream[p].commit('master'))
|
||||
self.log.debug("A commit: %s" % A_commit)
|
||||
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
self.gearman_server.port)
|
||||
|
@ -3091,7 +3103,7 @@ class TestScheduler(ZuulTestCase):
|
|||
trigger='gerrit',
|
||||
ref='master',
|
||||
oldrev='90f173846e3af9154517b88543ffbd1691f31366',
|
||||
newrev='d479a0bfcb34da57a31adb2a595c0cf687812543')
|
||||
newrev=A_commit)
|
||||
self.waitUntilSettled()
|
||||
job_names = [x.name for x in self.history]
|
||||
self.assertEqual(len(self.history), 1)
|
||||
|
|
|
@ -625,6 +625,7 @@ class ExecutorServer(object):
|
|||
self.hostname)
|
||||
self.merger_worker.registerFunction("merger:merge")
|
||||
self.merger_worker.registerFunction("merger:cat")
|
||||
self.merger_worker.registerFunction("merger:refstate")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -721,6 +722,9 @@ class ExecutorServer(object):
|
|||
elif job.name == 'merger:merge':
|
||||
self.log.debug("Got merge job: %s" % job.unique)
|
||||
self.merge(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
|
@ -800,6 +804,14 @@ class ExecutorServer(object):
|
|||
files=files)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
success, repo_state = self.merger.getRepoState(args['items'])
|
||||
result = dict(updated=success,
|
||||
repo_state=repo_state)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
|
@ -954,6 +966,10 @@ class AnsibleJob(object):
|
|||
# a work complete result, don't run any jobs
|
||||
return
|
||||
|
||||
state_items = [i for i in args['items'] if not i.get('number')]
|
||||
if state_items:
|
||||
merger.setRepoState(state_items, args['repo_state'])
|
||||
|
||||
for project in args['projects']:
|
||||
repo = repos[project['canonical_name']]
|
||||
# If this project is the Zuul project and this is a ref
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
import logging
|
||||
|
||||
from zuul import exceptions
|
||||
from zuul import model
|
||||
|
||||
|
||||
class DynamicChangeQueueContextManager(object):
|
||||
|
@ -483,20 +484,18 @@ class PipelineManager(object):
|
|||
def scheduleMerge(self, item, files=None, dirs=None):
|
||||
build_set = item.current_build_set
|
||||
|
||||
if not hasattr(item.change, 'branch'):
|
||||
self.log.debug("Change %s does not have an associated branch, "
|
||||
"not scheduling a merge job for item %s" %
|
||||
(item.change, item))
|
||||
build_set.merge_state = build_set.COMPLETE
|
||||
return True
|
||||
|
||||
self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
|
||||
(item, files, dirs))
|
||||
build_set = item.current_build_set
|
||||
build_set.merge_state = build_set.PENDING
|
||||
self.sched.merger.mergeChanges(build_set.merger_items,
|
||||
item.current_build_set, files, dirs,
|
||||
precedence=self.pipeline.precedence)
|
||||
if isinstance(item.change, model.Change):
|
||||
self.sched.merger.mergeChanges(build_set.merger_items,
|
||||
item.current_build_set, files, dirs,
|
||||
precedence=self.pipeline.precedence)
|
||||
else:
|
||||
self.sched.merger.getRepoState(build_set.merger_items,
|
||||
item.current_build_set,
|
||||
precedence=self.pipeline.precedence)
|
||||
return False
|
||||
|
||||
def prepareItem(self, item):
|
||||
|
@ -675,12 +674,13 @@ class PipelineManager(object):
|
|||
build_set = event.build_set
|
||||
item = build_set.item
|
||||
build_set.merge_state = build_set.COMPLETE
|
||||
build_set.repo_state = event.repo_state
|
||||
if event.merged:
|
||||
build_set.commit = event.commit
|
||||
build_set.files.setFiles(event.files)
|
||||
build_set.repo_state = event.repo_state
|
||||
elif event.updated:
|
||||
build_set.commit = item.change.newrev
|
||||
build_set.commit = (item.change.newrev or
|
||||
'0000000000000000000000000000000000000000')
|
||||
if not build_set.commit:
|
||||
self.log.info("Unable to merge change %s" % item.change)
|
||||
item.setUnableToMerge()
|
||||
|
|
|
@ -44,6 +44,9 @@ class IndependentPipelineManager(PipelineManager):
|
|||
if hasattr(change, 'number'):
|
||||
history = history or []
|
||||
history.append(change.number)
|
||||
else:
|
||||
# Don't enqueue dependencies ahead of a non-change ref.
|
||||
return True
|
||||
|
||||
ret = self.checkForChangesNeededBy(change, change_queue)
|
||||
if ret in [True, False]:
|
||||
|
|
|
@ -116,6 +116,11 @@ class MergeClient(object):
|
|||
repo_state=repo_state)
|
||||
self.submitJob('merger:merge', data, build_set, precedence)
|
||||
|
||||
def getRepoState(self, items, build_set,
|
||||
precedence=zuul.model.PRECEDENCE_NORMAL):
|
||||
data = dict(items=items)
|
||||
self.submitJob('merger:refstate', data, build_set, precedence)
|
||||
|
||||
def getFiles(self, connection_name, project_name, branch, files, dirs=[],
|
||||
precedence=zuul.model.PRECEDENCE_HIGH):
|
||||
data = dict(connection=connection_name,
|
||||
|
|
|
@ -20,6 +20,8 @@ import logging
|
|||
|
||||
import zuul.model
|
||||
|
||||
NULL_REF = '0000000000000000000000000000000000000000'
|
||||
|
||||
|
||||
def reset_repo_to_head(repo):
|
||||
# This lets us reset the repo even if there is a file in the root
|
||||
|
@ -178,8 +180,13 @@ class Repo(object):
|
|||
self.setRef(path, hexsha, repo)
|
||||
unseen.discard(path)
|
||||
for path in unseen:
|
||||
self.log.debug("Delete reference %s", path)
|
||||
git.refs.SymbolicReference.delete(repo, ref.path)
|
||||
self.deleteRef(path, repo)
|
||||
|
||||
def deleteRef(self, path, repo=None):
|
||||
if repo is None:
|
||||
repo = self.createRepoObject()
|
||||
self.log.debug("Delete reference %s", path)
|
||||
git.refs.SymbolicReference.delete(repo, path)
|
||||
|
||||
def checkout(self, ref):
|
||||
repo = self.createRepoObject()
|
||||
|
@ -369,6 +376,16 @@ class Merger(object):
|
|||
recent[key] = ref.object
|
||||
project[ref.path] = ref.object.hexsha
|
||||
|
||||
def _alterRepoState(self, connection_name, project_name,
|
||||
repo_state, path, hexsha):
|
||||
projects = repo_state.setdefault(connection_name, {})
|
||||
project = projects.setdefault(project_name, {})
|
||||
if hexsha == NULL_REF:
|
||||
if path in project:
|
||||
del project[path]
|
||||
else:
|
||||
project[path] = hexsha
|
||||
|
||||
def _restoreRepoState(self, connection_name, project_name, repo,
|
||||
repo_state):
|
||||
projects = repo_state.get(connection_name, {})
|
||||
|
@ -470,12 +487,8 @@ class Merger(object):
|
|||
if repo_state is None:
|
||||
repo_state = {}
|
||||
for item in items:
|
||||
if item.get("number") and item.get("patchset"):
|
||||
self.log.debug("Merging for change %s,%s." %
|
||||
(item["number"], item["patchset"]))
|
||||
elif item.get("newrev") and item.get("oldrev"):
|
||||
self.log.debug("Merging for rev %s with oldrev %s." %
|
||||
(item["newrev"], item["oldrev"]))
|
||||
self.log.debug("Merging for change %s,%s" %
|
||||
(item["number"], item["patchset"]))
|
||||
commit = self._mergeItem(item, recent, repo_state)
|
||||
if not commit:
|
||||
return None
|
||||
|
@ -492,6 +505,49 @@ class Merger(object):
|
|||
ret_recent[k] = v.hexsha
|
||||
return commit.hexsha, read_files, repo_state, ret_recent
|
||||
|
||||
def setRepoState(self, items, repo_state):
|
||||
# Sets the repo state for the items
|
||||
seen = set()
|
||||
for item in items:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
key = (item['connection'], item['project'], item['branch'])
|
||||
|
||||
if key in seen:
|
||||
continue
|
||||
|
||||
repo.reset()
|
||||
self._restoreRepoState(item['connection'], item['project'], repo,
|
||||
repo_state)
|
||||
|
||||
def getRepoState(self, items):
|
||||
# Gets the repo state for items. Generally this will be
|
||||
# called in any non-change pipeline. We will return the repo
|
||||
# state for each item, but manipulated with any information in
|
||||
# the item (eg, if it creates a ref, that will be in the repo
|
||||
# state regardless of the actual state).
|
||||
seen = set()
|
||||
recent = {}
|
||||
repo_state = {}
|
||||
for item in items:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
key = (item['connection'], item['project'], item['branch'])
|
||||
if key not in seen:
|
||||
try:
|
||||
repo.reset()
|
||||
except Exception:
|
||||
self.log.exception("Unable to reset repo %s" % repo)
|
||||
return (False, {})
|
||||
|
||||
self._saveRepoState(item['connection'], item['project'], repo,
|
||||
repo_state, recent)
|
||||
|
||||
if item.get('newrev'):
|
||||
# This is a ref update rather than a branch tip, so make sure
|
||||
# our returned state includes this change.
|
||||
self._alterRepoState(item['connection'], item['project'],
|
||||
repo_state, item['ref'], item['newrev'])
|
||||
return (True, repo_state)
|
||||
|
||||
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):
|
||||
repo = self.getRepo(connection_name, project_name)
|
||||
return repo.getFiles(files, dirs, branch=branch)
|
||||
|
|
|
@ -58,6 +58,7 @@ class MergeServer(object):
|
|||
def register(self):
|
||||
self.worker.registerFunction("merger:merge")
|
||||
self.worker.registerFunction("merger:cat")
|
||||
self.worker.registerFunction("merger:refstate")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -80,6 +81,9 @@ class MergeServer(object):
|
|||
elif job.name == 'merger:cat':
|
||||
self.log.debug("Got cat job: %s" % job.unique)
|
||||
self.cat(job)
|
||||
elif job.name == 'merger:refstate':
|
||||
self.log.debug("Got refstate job: %s" % job.unique)
|
||||
self.refstate(job)
|
||||
else:
|
||||
self.log.error("Unable to handle job %s" % job.name)
|
||||
job.sendWorkFail()
|
||||
|
@ -104,6 +108,14 @@ class MergeServer(object):
|
|||
recent) = ret
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def refstate(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
|
||||
success, repo_state = self.merger.getItemRepoState(args['items'])
|
||||
result = dict(updated=success,
|
||||
repo_state=repo_state)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def cat(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
self.merger.updateRepo(args['connection'], args['project'])
|
||||
|
|
Loading…
Reference in New Issue