Use previously stored repo state on executor

When the initial speculative merge for a change is performed at
the request of the pipeline manager, the repo state used to
construct that merge is saved in a data structure.  Pass that
structure to the executor when running jobs so that, after cloning
each repo into the jobdir, the repos are made to appear the same
as those an the merger before it started its merge.  The subsequent
merge operatons on the executor will repeat the same operations
producing the same content (though the actual commits will be
different due to timestamps).

It would be more efficient to have the executors pull changes from
the mergers, however, that would require the mergers to run an
accessible git service, which is one of the things that adds
significant complexity to a zuul deployment.  This method only
requires that the mergers be able to initiate outgoing connections
to gearman and sources.

Because the initial merge may happen well before jobs are executed,
save the dependency chain for a given BuildSet when it's configuration
is being finalized.  This will cause us to save not only the repository
configuration that the merger uses, but also the exact sequence of
changes applied on top of that state.  (Currently, we build the series
of changes we apply before running each job, however, the queue state
can change (especially if items are merged) in the period between the
inital merge and job launch).

The initial merge is performed before we have a shadow layout for the
item, yet, we must specify a merge mode for each project for which we
merge a change.  Currently, we are defaulting to the 'merge-resolve'
merge mode for every project during the initial speculative merge, but
then the secondary merge on the executor will use the correct merge
mode since we have a layout at that point.  With this change, where
we are trying to replicate the initial merge exactly, we can't rely
on that behavior any more.  Instead, when attempting to find the merge
mode to use for a project, we use the shadow layout of the nearest
item ahead, or else the current live layout, to find the merge mode,
and only if those fail, do we use the default.  This means that a change
to a project's merge-mode will not use that merge mode.  However,
subsequent changes will.  This seems to be the best we can do, short
of detecting this case and merging such changes twice.  This seems
rare enough that we don't need to do that.

The test_delayed_merge_conflict method is updated to essentially invert
the meaning of the test.  Since the old behavior was for the initial
merge check to be completely independent of the executor merge, this
test examined the case where the initial merge worked but between that
time and when the executor performed its merge, a conflicting change
landed.  That should no longer be possible since the executor merge
now uses the results of the initial merge.  We keep the test, but invert
its final assertion -- instead of checking for a merge conflict being
reported, we check that no merge conflict is reported.

Change-Id: I34cd58ec9775c1d151db02034c342bd971af036f
This commit is contained in:
James E. Blair 2017-04-28 15:44:14 -07:00
parent 34c7daaaa4
commit 1960d687c9
9 changed files with 100 additions and 43 deletions

View File

@ -1189,9 +1189,10 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def doMergeChanges(self, items):
def doMergeChanges(self, items, repo_state):
# Get a merger in order to update the repos involved in this job.
commit = super(RecordingAnsibleJob, self).doMergeChanges(items)
commit = super(RecordingAnsibleJob, self).doMergeChanges(
items, repo_state)
if not commit: # merge conflict
self.recordResult('MERGER_FAILURE')
return commit

View File

@ -994,8 +994,9 @@ class TestScheduler(ZuulTestCase):
"Test that delayed check merge conflicts are handled properly"
# Hold jobs in the gearman queue so that we can test whether
# the executor returns a merge failure after the scheduler has
# successfully merged.
# the executor sucesfully merges a change based on an old
# repo state (frozen by the scheduler) which would otherwise
# conflict.
self.gearman_server.hold_jobs_in_queue = True
A = self.fake_gerrit.addFakeChange('org/project',
'master', 'A',
@ -1068,9 +1069,12 @@ class TestScheduler(ZuulTestCase):
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='MERGER_FAILURE', changes='2,1'),
dict(name='project-merge', result='MERGER_FAILURE',
changes='2,1 3,1'),
dict(name='project-merge', result='SUCCESS', changes='2,1'),
dict(name='project-test1', result='SUCCESS', changes='2,1'),
dict(name='project-test2', result='SUCCESS', changes='2,1'),
dict(name='project-merge', result='SUCCESS', changes='2,1 3,1'),
dict(name='project-test1', result='SUCCESS', changes='2,1 3,1'),
dict(name='project-test2', result='SUCCESS', changes='2,1 3,1'),
], ordered=False)
def test_post(self):

View File

@ -169,7 +169,8 @@ class ExecutorClient(object):
self.log.debug("Function %s is not registered" % name)
return False
def execute(self, job, item, pipeline, dependent_items=[]):
def execute(self, job, item, pipeline, dependent_items=[],
merger_items=[]):
tenant = pipeline.layout.tenant
uuid = str(uuid4().hex)
self.log.info(
@ -179,8 +180,11 @@ class ExecutorClient(object):
item.current_build_set.getJobNodeSet(job.name),
item.change,
[x.change for x in dependent_items]))
dependent_items = dependent_items[:]
dependent_items.reverse()
all_items = dependent_items + [item]
# TODOv3(jeblair): This ansible vars data structure will
# replace the environment variables below.
project = dict(
@ -210,7 +214,7 @@ class ExecutorClient(object):
changes_str = '^'.join(
['%s:%s:%s' % (i.change.project.name, i.change.branch,
i.change.refspec)
for i in dependent_items + [item]])
for i in all_items])
params['ZUUL_BRANCH'] = item.change.branch
params['ZUUL_CHANGES'] = changes_str
params['ZUUL_REF'] = ('refs/zuul/%s/%s' %
@ -220,7 +224,7 @@ class ExecutorClient(object):
zuul_changes = ' '.join(['%s,%s' % (i.change.number,
i.change.patchset)
for i in dependent_items + [item]])
for i in all_items])
params['ZUUL_CHANGE_IDS'] = zuul_changes
params['ZUUL_CHANGE'] = str(item.change.number)
params['ZUUL_PATCHSET'] = str(item.change.patchset)
@ -253,13 +257,11 @@ class ExecutorClient(object):
# ZUUL_OLDREV
# ZUUL_NEWREV
all_items = dependent_items + [item]
merger_items = [i.makeMergerItem() for i in all_items]
params['job'] = job.name
params['timeout'] = job.timeout
params['items'] = merger_items
params['projects'] = []
params['repo_state'] = item.current_build_set.repo_state
if job.name != 'noop':
params['playbooks'] = [x.toDict() for x in job.run]

View File

@ -586,7 +586,7 @@ class AnsibleJob(object):
merge_items = [i for i in args['items'] if i.get('refspec')]
if merge_items:
if not self.doMergeChanges(merge_items):
if not self.doMergeChanges(merge_items, args['repo_state']):
# There was a merge conflict and we have already sent
# a work complete result, don't run any jobs
return
@ -632,10 +632,10 @@ class AnsibleJob(object):
result = dict(result=result)
self.job.sendWorkComplete(json.dumps(result))
def doMergeChanges(self, items):
def doMergeChanges(self, items, repo_state):
# Get a merger in order to update the repos involved in this job.
merger = self.executor_server._getMerger(self.jobdir.src_root)
ret = merger.mergeChanges(items) # noqa
ret = merger.mergeChanges(items, repo_state=repo_state)
if not ret: # merge conflict
result = dict(result='MERGER_FAILURE')
self.job.sendWorkComplete(json.dumps(result))

View File

@ -192,17 +192,6 @@ class PipelineManager(object):
def getFailingDependentItems(self, item):
return None
def getDependentItems(self, item):
orig_item = item
items = []
while item.item_ahead:
items.append(item.item_ahead)
item = item.item_ahead
self.log.info("Change %s depends on changes %s" %
(orig_item.change,
[x.change for x in items]))
return items
def getItemForChange(self, change):
for item in self.pipeline.getAllItems():
if item.change.equals(change):
@ -364,7 +353,7 @@ class PipelineManager(object):
def _executeJobs(self, item, jobs):
self.log.debug("Executing jobs for change %s" % item.change)
dependent_items = self.getDependentItems(item)
build_set = item.current_build_set
for job in jobs:
self.log.debug("Found job %s for change %s" % (job, item.change))
try:
@ -372,7 +361,8 @@ class PipelineManager(object):
self.sched.nodepool.useNodeSet(nodeset)
build = self.sched.executor.execute(job, item,
self.pipeline,
dependent_items)
build_set.dependent_items,
build_set.merger_items)
self.log.debug("Adding build %s of job %s to item %s" %
(build, job, item))
item.addBuild(build)
@ -502,13 +492,9 @@ class PipelineManager(object):
self.log.debug("Scheduling merge for item %s (files: %s)" %
(item, files))
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
merger_items = [i.makeMergerItem() for i in all_items]
build_set = item.current_build_set
build_set.merge_state = build_set.PENDING
self.sched.merger.mergeChanges(merger_items,
self.sched.merger.mergeChanges(build_set.merger_items,
item.current_build_set, files,
precedence=self.pipeline.precedence)
return False
@ -683,6 +669,7 @@ class PipelineManager(object):
if event.merged:
build_set.commit = event.commit
build_set.files.setFiles(event.files)
build_set.repo_state = event.repo_state
elif event.updated:
build_set.commit = item.change.newrev
if not build_set.commit:

View File

@ -130,6 +130,7 @@ class MergeClient(object):
updated = data.get('updated', False)
commit = data.get('commit')
files = data.get('files', {})
repo_state = data.get('repo_state', {})
job.files = files
self.log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s" %
@ -137,7 +138,8 @@ class MergeClient(object):
job.setComplete()
if job.build_set:
self.sched.onMergeCompleted(job.build_set, zuul_url,
merged, updated, commit, files)
merged, updated, commit, files,
repo_state)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)

View File

@ -14,6 +14,7 @@
# under the License.
import git
import gitdb
import os
import logging
@ -128,6 +129,22 @@ class Repo(object):
repo = self.createRepoObject()
return repo.refs
def setRefs(self, refs):
repo = self.createRepoObject()
current_refs = {}
for ref in repo.refs:
current_refs[ref.path] = ref
unseen = set(current_refs.keys())
for path, hexsha in refs.items():
binsha = gitdb.util.to_bin_sha(hexsha)
obj = git.objects.Object.new_from_sha(repo, binsha)
self.log.debug("Create reference %s", path)
git.refs.Reference.create(repo, path, obj, force=True)
unseen.discard(path)
for path in unseen:
self.log.debug("Delete reference %s", path)
git.refs.SymbolicReference.delete(repo, ref.path)
def checkout(self, ref):
repo = self.createRepoObject()
self.log.debug("Checking out %s" % ref)
@ -299,8 +316,21 @@ class Merger(object):
for ref in repo.getRefs():
if ref.path.startswith('refs/zuul'):
continue
if ref.path.startswith('refs/remotes'):
continue
project[ref.path] = ref.object.hexsha
def _restoreRepoState(self, connection_name, project_name, repo,
repo_state):
projects = repo_state.get(connection_name, {})
project = projects.get(project_name, {})
if not project:
# We don't have a state for this project.
return
self.log.debug("Restore repo state for project %s/%s",
connection_name, project_name)
repo.setRefs(project)
def _mergeChange(self, item, ref):
repo = self.getRepo(item['connection'], item['project'])
try:
@ -349,6 +379,9 @@ class Merger(object):
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return None
self._restoreRepoState(item['connection'], item['project'], repo,
repo_state)
base = repo.getBranchHead(item['branch'])
# Save the repo state so that later mergers can repeat
# this process.
@ -368,6 +401,7 @@ class Merger(object):
# commits of each project-branch
for key, mrc in recent.items():
connection, project, branch = key
zuul_ref = None
try:
repo = self.getRepo(connection, project)
zuul_ref = branch + '/' + item['ref']

View File

@ -1131,7 +1131,6 @@ class BuildSet(object):
def __init__(self, item):
self.item = item
self.other_changes = []
self.builds = {}
self.result = None
self.next_build_set = None
@ -1139,6 +1138,8 @@ class BuildSet(object):
self.ref = None
self.commit = None
self.zuul_url = None
self.dependent_items = None
self.merger_items = None
self.unable_to_merge = False
self.config_error = None # None or an error message string.
self.failing_reasons = []
@ -1146,6 +1147,7 @@ class BuildSet(object):
self.nodesets = {} # job -> nodeset
self.node_requests = {} # job -> reqs
self.files = RepoFiles()
self.repo_state = {}
self.layout = None
self.tries = {}
@ -1159,13 +1161,19 @@ class BuildSet(object):
# The change isn't enqueued until after it's created
# so we don't know what the other changes ahead will be
# until jobs start.
if not self.other_changes:
if self.dependent_items is None:
items = []
next_item = self.item.item_ahead
while next_item:
self.other_changes.append(next_item.change)
items.append(next_item)
next_item = next_item.item_ahead
self.dependent_items = items
if not self.ref:
self.ref = 'Z' + uuid4().hex
if self.merger_items is None:
items = [self.item] + self.dependent_items
items.reverse()
self.merger_items = [i.makeMergerItem() for i in items]
def getStateName(self, state_num):
return self.states_map.get(
@ -1217,9 +1225,26 @@ class BuildSet(object):
return self.tries.get(job_name, 0)
def getMergeMode(self):
if self.layout:
# We may be called before this build set has a shadow layout
# (ie, we are called to perform the merge to create that
# layout). It's possible that the change we are merging will
# update the merge-mode for the project, but there's not much
# we can do about that here. Instead, do the best we can by
# using the nearest shadow layout to determine the merge mode,
# or if that fails, the current live layout, or if that fails,
# use the default: merge-resolve.
item = self.item
layout = None
while item:
layout = item.current_build_set.layout
if layout:
break
item = item.item_ahead
if not layout:
layout = self.item.pipeline.layout
if layout:
project = self.item.change.project
project_config = self.layout.project_configs.get(
project_config = layout.project_configs.get(
project.canonical_name)
if project_config:
return project_config.merge_mode

View File

@ -138,16 +138,18 @@ class MergeCompletedEvent(ResultEvent):
:arg bool merged: Whether the merge succeeded (changes with refs).
:arg bool updated: Whether the repo was updated (changes without refs).
:arg str commit: The SHA of the merged commit (changes with refs).
:arg dict repo_state: The starting repo state before the merge.
"""
def __init__(self, build_set, zuul_url, merged, updated, commit,
files):
files, repo_state):
self.build_set = build_set
self.zuul_url = zuul_url
self.merged = merged
self.updated = updated
self.commit = commit
self.files = files
self.repo_state = repo_state
class NodesProvisionedEvent(ResultEvent):
@ -316,11 +318,11 @@ class Scheduler(threading.Thread):
self.log.debug("Done adding complete event for build: %s" % build)
def onMergeCompleted(self, build_set, zuul_url, merged, updated,
commit, files):
commit, files, repo_state):
self.log.debug("Adding merge complete event for build set: %s" %
build_set)
event = MergeCompletedEvent(build_set, zuul_url, merged,
updated, commit, files)
updated, commit, files, repo_state)
self.result_event_queue.put(event)
self.wake_event.set()