Store initial repo state in the merger

When we ask a merger to speculatively merge changes, record the
complete starting state of each repo (defined as all of the refs
other than Zuul refs) and return that at the completion of all
of the merges.

This will later be used so that when a pipeline manager asks a
merger to speculatively merge a change, the process can later
be repeated by the (potentially multiple) executors which will
end up running jobs for the change.  Between the time that the
merger runs and the jobs run, the underlying repos may have changed.
This ensures a consistent state throughout.

The facility which used saved zuul refs within the merger repo
to short-cut the merge sequence for an additional change added to
a previously completed merge sequence is removed, because in that
case, we would not be able to know the original repo state for the
earlier merge sequence.  This is slightly less efficient, however,
we are proposing removing zuul refs anyway due to the maintenance
burden they cause.

Change-Id: If0215d53c3b08877ded7276955a55fc5e617b244
This commit is contained in:
James E. Blair 2017-04-28 13:31:27 -07:00
parent 8d144dc4f8
commit 34c7daaaa4
5 changed files with 50 additions and 47 deletions

View File

@ -484,16 +484,14 @@ class ExecutorServer(object):
def merge(self, job): def merge(self, job):
args = json.loads(job.arguments) args = json.loads(job.arguments)
ret = self.merger.mergeChanges(args['items'], args.get('files')) ret = self.merger.mergeChanges(args['items'], args.get('files'),
args.get('repo_state'))
result = dict(merged=(ret is not None), result = dict(merged=(ret is not None),
zuul_url=self.zuul_url) zuul_url=self.zuul_url)
if args.get('files'): if ret is None:
if ret: result['commit'] = result['files'] = result['repo_state'] = None
result['commit'], result['files'] = ret
else:
result['commit'], result['files'] = (None, None)
else: else:
result['commit'] = ret result['commit'], result['files'], result['repo_state'] = ret
job.sendWorkComplete(json.dumps(result)) job.sendWorkComplete(json.dumps(result))
@ -588,13 +586,10 @@ class AnsibleJob(object):
merge_items = [i for i in args['items'] if i.get('refspec')] merge_items = [i for i in args['items'] if i.get('refspec')]
if merge_items: if merge_items:
commit = self.doMergeChanges(merge_items) if not self.doMergeChanges(merge_items):
if not commit:
# There was a merge conflict and we have already sent # There was a merge conflict and we have already sent
# a work complete result, don't run any jobs # a work complete result, don't run any jobs
return return
else:
commit = args['items'][-1]['newrev'] # noqa
# Delete the origin remote from each repo we set up since # Delete the origin remote from each repo we set up since
# it will not be valid within the jobs. # it will not be valid within the jobs.
@ -640,11 +635,12 @@ class AnsibleJob(object):
def doMergeChanges(self, items): def doMergeChanges(self, items):
# Get a merger in order to update the repos involved in this job. # Get a merger in order to update the repos involved in this job.
merger = self.executor_server._getMerger(self.jobdir.src_root) merger = self.executor_server._getMerger(self.jobdir.src_root)
commit = merger.mergeChanges(items) # noqa ret = merger.mergeChanges(items) # noqa
if not commit: # merge conflict if not ret: # merge conflict
result = dict(result='MERGER_FAILURE') result = dict(result='MERGER_FAILURE')
self.job.sendWorkComplete(json.dumps(result)) self.job.sendWorkComplete(json.dumps(result))
return commit return False
return True
def runPlaybooks(self, args): def runPlaybooks(self, args):
result = None result = None

View File

@ -509,9 +509,8 @@ class PipelineManager(object):
build_set = item.current_build_set build_set = item.current_build_set
build_set.merge_state = build_set.PENDING build_set.merge_state = build_set.PENDING
self.sched.merger.mergeChanges(merger_items, self.sched.merger.mergeChanges(merger_items,
item.current_build_set, item.current_build_set, files,
files, precedence=self.pipeline.precedence)
self.pipeline.precedence)
return False return False
def prepareItem(self, item): def prepareItem(self, item):

View File

@ -107,10 +107,11 @@ class MergeClient(object):
timeout=300) timeout=300)
return job return job
def mergeChanges(self, items, build_set, files=None, def mergeChanges(self, items, build_set, files=None, repo_state=None,
precedence=zuul.model.PRECEDENCE_NORMAL): precedence=zuul.model.PRECEDENCE_NORMAL):
data = dict(items=items, data = dict(items=items,
files=files) files=files,
repo_state=repo_state)
self.submitJob('merger:merge', data, build_set, precedence) self.submitJob('merger:merge', data, build_set, precedence)
def getFiles(self, connection_name, project_name, branch, files, def getFiles(self, connection_name, project_name, branch, files,

View File

@ -124,6 +124,10 @@ class Repo(object):
ref = repo.refs[refname] ref = repo.refs[refname]
return ref.commit return ref.commit
def getRefs(self):
repo = self.createRepoObject()
return repo.refs
def checkout(self, ref): def checkout(self, ref):
repo = self.createRepoObject() repo = self.createRepoObject()
self.log.debug("Checking out %s" % ref) self.log.debug("Checking out %s" % ref)
@ -285,6 +289,18 @@ class Merger(object):
raise Exception("Project %s/%s does not have branch %s" % raise Exception("Project %s/%s does not have branch %s" %
(connection_name, project_name, branch)) (connection_name, project_name, branch))
def _saveRepoState(self, connection_name, project_name, repo,
repo_state):
projects = repo_state.setdefault(connection_name, {})
project = projects.setdefault(project_name, {})
if project:
# We already have a state for this project.
return
for ref in repo.getRefs():
if ref.path.startswith('refs/zuul'):
continue
project[ref.path] = ref.object.hexsha
def _mergeChange(self, item, ref): def _mergeChange(self, item, ref):
repo = self.getRepo(item['connection'], item['project']) repo = self.getRepo(item['connection'], item['project'])
try: try:
@ -314,27 +330,13 @@ class Merger(object):
return commit return commit
def _mergeItem(self, item, recent): def _mergeItem(self, item, recent, repo_state):
self.log.debug("Processing refspec %s for project %s/%s / %s ref %s" % self.log.debug("Processing refspec %s for project %s/%s / %s ref %s" %
(item['refspec'], item['connection'], (item['refspec'], item['connection'],
item['project'], item['branch'], item['ref'])) item['project'], item['branch'], item['ref']))
repo = self.getRepo(item['connection'], item['project']) repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch']) key = (item['connection'], item['project'], item['branch'])
# See if we have a commit for this change already in this repo
zuul_ref = item['branch'] + '/' + item['ref']
with repo.createRepoObject().git.custom_environment(
GIT_SSH_COMMAND=self._get_ssh_cmd(item['connection'])):
commit = repo.getCommitFromRef(zuul_ref)
if commit:
self.log.debug(
"Found commit %s for ref %s" % (commit, zuul_ref))
# Store this as the most recent commit for this
# project-branch
recent[key] = commit
return commit
self.log.debug("Unable to find commit for ref %s" % (zuul_ref,))
# We need to merge the change # We need to merge the change
# Get the most recent commit for this project-branch # Get the most recent commit for this project-branch
base = recent.get(key) base = recent.get(key)
@ -348,6 +350,10 @@ class Merger(object):
self.log.exception("Unable to reset repo %s" % repo) self.log.exception("Unable to reset repo %s" % repo)
return None return None
base = repo.getBranchHead(item['branch']) base = repo.getBranchHead(item['branch'])
# Save the repo state so that later mergers can repeat
# this process.
self._saveRepoState(item['connection'], item['project'], repo,
repo_state)
else: else:
self.log.debug("Found base commit %s for %s" % (base, key,)) self.log.debug("Found base commit %s for %s" % (base, key,))
# Merge the change # Merge the change
@ -365,17 +371,22 @@ class Merger(object):
try: try:
repo = self.getRepo(connection, project) repo = self.getRepo(connection, project)
zuul_ref = branch + '/' + item['ref'] zuul_ref = branch + '/' + item['ref']
repo.createZuulRef(zuul_ref, mrc) if not repo.getCommitFromRef(zuul_ref):
repo.createZuulRef(zuul_ref, mrc)
except Exception: except Exception:
self.log.exception("Unable to set zuul ref %s for " self.log.exception("Unable to set zuul ref %s for "
"item %s" % (zuul_ref, item)) "item %s" % (zuul_ref, item))
return None return None
return commit return commit
def mergeChanges(self, items, files=None): def mergeChanges(self, items, files=None, repo_state=None):
# connection+project+branch -> commit
recent = {} recent = {}
commit = None commit = None
read_files = [] read_files = []
# connection -> project -> ref -> commit
if repo_state is None:
repo_state = {}
for item in items: for item in items:
if item.get("number") and item.get("patchset"): if item.get("number") and item.get("patchset"):
self.log.debug("Merging for change %s,%s." % self.log.debug("Merging for change %s,%s." %
@ -383,7 +394,7 @@ class Merger(object):
elif item.get("newrev") and item.get("oldrev"): elif item.get("newrev") and item.get("oldrev"):
self.log.debug("Merging for rev %s with oldrev %s." % self.log.debug("Merging for rev %s with oldrev %s." %
(item["newrev"], item["oldrev"])) (item["newrev"], item["oldrev"]))
commit = self._mergeItem(item, recent) commit = self._mergeItem(item, recent, repo_state)
if not commit: if not commit:
return None return None
if files: if files:
@ -394,9 +405,7 @@ class Merger(object):
project=item['project'], project=item['project'],
branch=item['branch'], branch=item['branch'],
files=repo_files)) files=repo_files))
if files: return commit.hexsha, read_files, repo_state
return commit.hexsha, read_files
return commit.hexsha
def getFiles(self, connection_name, project_name, branch, files): def getFiles(self, connection_name, project_name, branch, files):
repo = self.getRepo(connection_name, project_name) repo = self.getRepo(connection_name, project_name)

View File

@ -103,16 +103,14 @@ class MergeServer(object):
def merge(self, job): def merge(self, job):
args = json.loads(job.arguments) args = json.loads(job.arguments)
ret = self.merger.mergeChanges(args['items'], args.get('files')) ret = self.merger.mergeChanges(args['items'], args.get('files'),
args.get('repo_state'))
result = dict(merged=(ret is not None), result = dict(merged=(ret is not None),
zuul_url=self.zuul_url) zuul_url=self.zuul_url)
if args.get('files'): if ret is None:
if ret: result['commit'] = result['files'] = result['repo_state'] = None
result['commit'], result['files'] = ret
else:
result['commit'], result['files'] = (None, None)
else: else:
result['commit'] = ret result['commit'], result['files'], result['repo_state'] = ret
job.sendWorkComplete(json.dumps(result)) job.sendWorkComplete(json.dumps(result))
def cat(self, job): def cat(self, job):