Move the merger to a JSON-compatible API

In order to split out the merger as a new component, remove any
direct Zuul model object manipulation, instead passing in a
lists of dictionaries for merge instructions.  Change the merger
algorithm so that it is compatible with this new method and makes
no assumptions about whether it has merged any changes previously.

There is likely to be a minor performance impact as some information
which was previously kept in-memory will now be fetched from the git
index.

The merger is also no-longer pre-populated with clones of git repos
at startup.  Some tests were adjusted to accomodate this.

Change-Id: I3df86ae36b4969d568d1fb03df1e6569553d1226
This commit is contained in:
James E. Blair 2014-01-24 13:38:51 -08:00
parent 78acec9757
commit ac2c324e4f
3 changed files with 118 additions and 91 deletions

View File

@ -66,9 +66,10 @@ logging.basicConfig(level=logging.DEBUG,
def repack_repo(path):
output = subprocess.Popen(
['git', '--git-dir=%s/.git' % path, 'repack', '-afd'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
output = subprocess.Popen(cmd, close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode:
raise Exception("git repack returned %d" % output.returncode)
@ -158,6 +159,7 @@ class FakeChange(object):
repo.head.reference = 'master'
repo.head.reset(index=True, working_tree=True)
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
return r
def addPatchset(self, files=[], large=False):
@ -944,11 +946,17 @@ class TestScheduler(testtools.TestCase):
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_shas = [c.hexsha for c in repo.iter_commits(ref)]
commit_messages = ['%s-1' % commit.subject for commit in commits]
self.log.debug("Checking if job %s has changes; commit_messages %s;"
" repo_messages %s; sha %s" % (job, commit_messages,
repo_messages, sha))
for msg in commit_messages:
if msg not in repo_messages:
self.log.debug(" messages do not match")
return False
if repo_shas[0] != sha:
self.log.debug(" sha does not match")
return False
self.log.debug(" OK")
return True
def registerJobs(self):
@ -2343,6 +2351,10 @@ class TestScheduler(testtools.TestCase):
def test_merger_repack_large_change(self):
"Test that the merger works with large changes after a repack"
# https://bugs.launchpad.net/zuul/+bug/1078946
# This test assumes the repo is already cloned; make sure it is
url = self.sched.triggers['gerrit'].getGitUrl(
self.sched.layout.projects['org/project1'])
self.sched.merger.addProject('org/project1', url)
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addPatchset(large=True)
path = os.path.join(self.upstream_root, "org/project1")

View File

@ -15,7 +15,8 @@
import git
import os
import logging
import model
import zuul.model
class ZuulReference(git.Reference):
@ -83,7 +84,14 @@ class Repo(object):
def getBranchHead(self, branch):
repo = self.createRepoObject()
branch_head = repo.heads[branch]
return branch_head
return branch_head.commit
def getCommitFromRef(self, refname):
repo = self.createRepoObject()
if not refname in repo.refs:
return None
ref = repo.refs[refname]
return ref.commit
def checkout(self, ref):
repo = self.createRepoObject()
@ -96,6 +104,7 @@ class Repo(object):
self.log.debug("Cherry-picking %s" % ref)
self.fetch(ref)
repo.git.cherry_pick("FETCH_HEAD")
return repo.head.commit
def merge(self, ref, strategy=None):
repo = self.createRepoObject()
@ -106,6 +115,7 @@ class Repo(object):
self.fetch(ref)
self.log.debug("Merging %s with args %s" % (ref, args))
repo.git.merge(*args)
return repo.head.commit
def fetch(self, ref):
repo = self.createRepoObject()
@ -161,27 +171,34 @@ class Merger(object):
os.environ['GIT_SSH'] = name
def addProject(self, project, url):
repo = None
try:
path = os.path.join(self.working_root, project.name)
path = os.path.join(self.working_root, project)
repo = Repo(url, path, self.email, self.username)
self.repos[project] = repo
except:
except Exception:
self.log.exception("Unable to add project %s" % project)
return repo
def getRepo(self, project):
return self.repos.get(project, None)
def getRepo(self, project, url):
if project in self.repos:
return self.repos[project]
if not url:
raise Exception("Unable to set up repo for project %s"
" without a url" % (project,))
return self.addProject(project, url)
def updateRepo(self, project):
repo = self.getRepo(project)
def updateRepo(self, project, url):
repo = self.getRepo(project, url)
try:
self.log.info("Updating local repository %s", project)
repo.update()
except:
self.log.exception("Unable to update %s", project)
def _mergeChange(self, change, ref, target_ref):
repo = self.getRepo(change.project)
def _mergeChange(self, item, ref):
repo = self.getRepo(item['project'], item['url'])
try:
repo.checkout(ref)
except:
@ -189,90 +206,77 @@ class Merger(object):
return False
try:
mode = change.project.merge_mode
if mode == model.MERGER_MERGE:
repo.merge(change.refspec)
elif mode == model.MERGER_MERGE_RESOLVE:
repo.merge(change.refspec, 'resolve')
elif mode == model.MERGER_CHERRY_PICK:
repo.cherryPick(change.refspec)
mode = item['merge_mode']
if mode == zuul.model.MERGER_MERGE:
commit = repo.merge(item['refspec'])
elif mode == zuul.model.MERGER_MERGE_RESOLVE:
commit = repo.merge(item['refspec'], 'resolve')
elif mode == zuul.model.MERGER_CHERRY_PICK:
commit = repo.cherryPick(item['refspec'])
else:
raise Exception("Unsupported merge mode: %s" % mode)
except Exception:
# Log exceptions at debug level because they are
# usually benign merge conflicts
self.log.debug("Unable to merge %s" % change, exc_info=True)
self.log.debug("Unable to merge %s" % item, exc_info=True)
return False
try:
# Keep track of the last commit, it's the commit that
# will be passed to jenkins because it's the commit
# for the triggering change
zuul_ref = change.branch + '/' + target_ref
commit = repo.createZuulRef(zuul_ref, 'HEAD').hexsha
except:
self.log.exception("Unable to set zuul ref %s for change %s" %
(zuul_ref, change))
return False
return commit
def mergeChanges(self, items, target_ref=None):
# Merge shortcuts:
# if this is the only change just merge it against its branch.
# elif there are changes ahead of us that are from the same project and
# branch we can merge against the commit associated with that change
# instead of going back up the tree.
#
# Shortcuts assume some external entity is checking whether or not
# changes from other projects can merge.
commit = False
item = items[-1]
sibling_filter = lambda i: (i.change.project == item.change.project and
i.change.branch == item.change.branch)
sibling_items = filter(sibling_filter, items)
# Only current change to merge against tip of change.branch
if len(sibling_items) == 1:
repo = self.getRepo(item.change.project)
def _mergeItem(self, item, recent):
self.log.debug("Processing refspec %s for project %s / %s ref %s" %
(item['refspec'], item['project'], item['branch'],
item['ref']))
repo = self.getRepo(item['project'], item['url'])
key = (item['project'], item['branch'])
# See if we have a commit for this change already in this repo
zuul_ref = item['branch'] + '/' + item['ref']
commit = repo.getCommitFromRef(zuul_ref)
if commit:
self.log.debug("Found commit %s for ref %s" % (commit, zuul_ref))
# Store this as the most recent commit for this
# project-branch
recent[key] = commit
return commit
self.log.debug("Unable to find commit for ref %s" % (zuul_ref,))
# We need to merge the change
# Get the most recent commit for this project-branch
base = recent.get(key)
if not base:
# There is none, so use the branch tip
# we need to reset here in order to call getBranchHead
self.log.debug("No base commit found for %s" % (key,))
try:
repo.reset()
except:
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return False
commit = self._mergeChange(item.change,
repo.getBranchHead(item.change.branch),
target_ref=target_ref)
# Sibling changes exist. Merge current change against newest sibling.
elif (len(sibling_items) >= 2 and
sibling_items[-2].current_build_set.commit):
last_commit = sibling_items[-2].current_build_set.commit
commit = self._mergeChange(item.change, last_commit,
target_ref=target_ref)
# Either change did not merge or we did not need to merge as there were
# previous merge conflicts.
if not commit:
return commit
project_branches = []
for i in reversed(items):
# Here we create all of the necessary zuul refs and potentially
# push them back to Gerrit.
if (i.change.project, i.change.branch) in project_branches:
continue
repo = self.getRepo(i.change.project)
if (i.change.project != item.change.project or
i.change.branch != item.change.branch):
# Create a zuul ref for all dependent changes project
# branch combinations as this is the ref that jenkins will
# use to test. The ref for change has already been set so
# we skip it here.
try:
zuul_ref = i.change.branch + '/' + target_ref
repo.createZuulRef(zuul_ref, i.current_build_set.commit)
except:
self.log.exception("Unable to set zuul ref %s for "
"change %s" % (zuul_ref, i.change))
return False
project_branches.append((i.change.project, i.change.branch))
return None
base = repo.getBranchHead(item['branch'])
else:
self.log.debug("Found base commit %s for %s" % (base, key,))
# Merge the change
commit = self._mergeChange(item, base)
# Store this commit as the most recent for this project-branch
recent[key] = commit
# Set the Zuul ref for this item to point to the most recent
# commits of each project-branch
for key, commit in recent.items():
project, branch = key
try:
repo = self.getRepo(project, None)
zuul_ref = branch + '/' + item['ref']
repo.createZuulRef(zuul_ref, commit)
except Exception:
self.log.exception("Unable to set zuul ref %s for "
"item %s" % (zuul_ref, item))
return None
return commit
def mergeChanges(self, items):
recent = {}
commit = None
for item in items:
commit = self._mergeItem(item, recent)
if not commit:
return None
return commit.hexsha

View File

@ -377,9 +377,6 @@ class Scheduler(threading.Thread):
# location.
self.merger = merger.Merger(merge_root, sshkey,
merge_email, merge_name)
for project in self.layout.projects.values():
url = self.triggers['gerrit'].getGitUrl(project)
self.merger.addProject(project, url)
def setLauncher(self, launcher):
self.launcher = launcher
@ -708,7 +705,8 @@ class Scheduler(threading.Thread):
# This is done before enqueuing the changes to avoid calling an
# update per pipeline accepting the change.
self.log.info("Fetching references for %s" % project)
self.merger.updateRepo(project)
url = self.triggers['gerrit'].getGitUrl(project)
self.merger.updateRepo(project, url)
for pipeline in self.layout.pipelines.values():
change = event.getChange(project,
@ -1040,6 +1038,18 @@ class BasePipelineManager(object):
self.dequeueItem(item)
self.reportStats(item)
def _makeMergerItem(self, item):
# Create a dictionary with all info about the item needed by
# the merger.
return dict(project=item.change.project.name,
url=self.sched.triggers['gerrit'].getGitUrl(
item.change.project),
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
branch=item.change.branch,
ref=item.current_build_set.ref,
)
def prepareRef(self, item):
# Returns False on success.
# Returns True if we were unable to prepare the ref.
@ -1051,7 +1061,8 @@ class BasePipelineManager(object):
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
commit = self.sched.merger.mergeChanges(all_items, ref)
merger_items = map(self._makeMergerItem, all_items)
commit = self.sched.merger.mergeChanges(merger_items)
item.current_build_set.commit = commit
if not commit:
self.log.info("Unable to merge change %s" % item.change)