Clarify merger updates and resets

Several changes in an attempt to clarify exactly when updates and
resets should and do happen:

* Remove the repo_state argument from Merger.getRepo()

It was unclear under what circumstances the low-level repo object
honored repo_state (not much).  Remove it entirely and rely on
high-level Merger methods to deal with repo_state.

* Have merger.setRepoState() operate on one project instead of a
  list of items

Part of the reason we were passing repo_state to low-level
methods was to reset the state for required projects in the
executor.  Essentially there were three cases: projects of change
items, projects of non-change items, and projects of neither but
in required-projects.  The low-level repo_state usage only
handled the last, the first is easy, and the second we handled by
creating a list of non-change items and passing it to
setRepoState on the merger.

A simpler method of handling all of that is to reduce it to two
cases: projects of change items (which need to be merged) and the
rest (which need to be restored).  If we do that, we can maintain
a set of projects we've seen while merging in the first case,
then iterate over all the remaining projects and call
setRepoState on each in the second.

* Remove the update call from Repo.reset()

This lets us call Repo.reset() frequently (i.e., at the start of
any operation that writes to the merger's git repo working dir)
without performing a git fetch.  We need to make sure we call
Repo.update() where necessary.

* Remove the reset call from Merger.updateRepo()

This will now only call repo.update(), and even that will only
happen if the repo_state says we should.  So we can safely call
this before any significant operations and know that it will
update the repo if necessary.

* Add an update() call to getRepoState()

Because we removed the update() call from Repo.reset(), we need
to add one here next to the existing call to reset().

* Add a reset call to getFiles()

It relied on the reset in updateRepo.

* Set execution_context to False on the executor's main merger

The execution_context parameter determines whether we manipulate the
origin remotes to point at the previous commit.  This should be set
for mergers that operate on the build work dir, but it should not
be set for the main merger within the executor (so the main merger
behaves just like a standalone merger).  It previous was erroneously
set for the executor's main merger and this change corrects that.

* Add Merger.updateRepo() calls in the merger server merge method

The merger needs to update and reset each repo before merging changes.
Currently _mergeItem resets the repo the first time it encounters it.
But we still need to update the repo.  We don't want to update within
the merger method because the executor performs batch updates in
parallel before starting a merge and we don't want to re-do that work.
So instead we add it to the merger server invocation, so it's only
used in the merger:merge gearman function code path.

Change-Id: I740e958357dc7bf0a6506474c5991da12ab6264e
This commit is contained in:
James E. Blair 2021-04-16 16:33:51 -07:00
parent 397f708d56
commit d4c7d29360
5 changed files with 110 additions and 42 deletions

View File

@ -3007,10 +3007,10 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
super()._execute()
def doMergeChanges(self, merger, items, repo_state):
def doMergeChanges(self, *args, **kw):
# Get a merger in order to update the repos involved in this job.
commit = super(RecordingAnsibleJob, self).doMergeChanges(
merger, items, repo_state)
*args, **kw)
if not commit: # merge conflict
self.recordResult('MERGER_FAILURE')

View File

@ -122,6 +122,7 @@ class TestMergerRepo(ZuulTestCase):
parent_repo.delete_head("foobar")
parent_repo.create_head("foobar/sub")
work_repo.update()
work_repo.reset()
work_repo.checkout("foobar/sub")
@ -150,6 +151,7 @@ class TestMergerRepo(ZuulTestCase):
parent_repo.create_head("foobar")
work_repo.update()
work_repo.reset()
work_repo.checkout("foobar")
@ -449,6 +451,7 @@ class TestMergerRepo(ZuulTestCase):
merger = self.executor_server.merger
merger.updateRepo('gerrit', 'org/project1')
repo = merger.getRepo('gerrit', 'org/project1')
repo.reset()
# Branches master and stable must exist
self.assertEqual(['master', 'stable'], repo.getBranches())
@ -462,6 +465,7 @@ class TestMergerRepo(ZuulTestCase):
merger.updateRepo('gerrit', 'org/project1',
repo_state=repo_state_no_update)
repo = merger.getRepo('gerrit', 'org/project1')
repo.reset()
self.assertEqual(['master', 'stable'], repo.getBranches())
# Update with repo state and expect update
@ -469,6 +473,7 @@ class TestMergerRepo(ZuulTestCase):
merger.updateRepo('gerrit', 'org/project1',
repo_state=repo_state_update_ref)
repo = merger.getRepo('gerrit', 'org/project1')
repo.reset()
self.assertEqual(['master', 'stable', 'stable2'], repo.getBranches())
# Test new rev causes update
@ -480,6 +485,7 @@ class TestMergerRepo(ZuulTestCase):
merger.updateRepo('gerrit', 'org/project1',
repo_state=repo_state_no_update)
repo = merger.getRepo('gerrit', 'org/project1')
repo.reset()
self.assertEqual(['master', 'stable', 'stable2'], repo.getBranches())
# Update with repo state and expect update
@ -487,6 +493,7 @@ class TestMergerRepo(ZuulTestCase):
merger.updateRepo('gerrit', 'org/project1',
repo_state=repo_state_update_rev)
repo = merger.getRepo('gerrit', 'org/project1')
repo.reset()
self.assertEqual(['master', 'stable', 'stable2', 'stable3'],
repo.getBranches())
@ -496,6 +503,7 @@ class TestMergerRepo(ZuulTestCase):
merger.updateRepo('gerrit', 'org/project2',
repo_state=repo_state_no_update)
repo = merger.getRepo('gerrit', 'org/project2')
repo.reset()
self.assertEqual(['master'],
repo.getBranches())
@ -506,6 +514,7 @@ class TestMergerRepo(ZuulTestCase):
merger.updateRepo('gerrit', 'org/project2',
repo_state=repo_state_no_update)
repo = merger.getRepo('gerrit', 'org/project2')
repo.reset()
self.assertEqual(['master', 'stable'],
repo.getBranches())
@ -814,6 +823,9 @@ class TestMerger(ZuulTestCase):
item_c = self._item_from_fake_change(C)
# Merge A -> B -> C
# TODO(corvus): remove this if we update in mergeChanges
for item in [item_a, item_b, item_c]:
merger.updateRepo(item['connection'], item['project'])
result = merger.mergeChanges([item_a, item_b, item_c])
self.assertIsNotNone(result)
merge_state = result[3]
@ -847,6 +859,7 @@ class TestMerger(ZuulTestCase):
if parent_repo.git.version_info[:2] < (2, 13):
Repo._cleanup_leaked_ref_dirs(parent_path, None, [])
cache_repo.update()
cache_repo.reset()
self.assertNotIn(foobar_zuul_ref, [r.path for r in repo.refs])
@ -859,6 +872,9 @@ class TestMerger(ZuulTestCase):
item_b = self._item_from_fake_change(B)
# Merge A -> B -> C
# TODO(corvus): remove this if we update in mergeChanges
for item in [item_a, item_b, item_c]:
merger.updateRepo(item['connection'], item['project'])
result = merger.mergeChanges([item_a, item_b, item_c])
self.assertIsNotNone(result)
merge_state = result[3]

View File

@ -1086,18 +1086,19 @@ class AnsibleJob(object):
repo = merger.getRepo(
project['connection'],
project['name'],
repo_state=repo_state,
process_worker=self.executor_server.process_worker)
repos[project['canonical_name']] = repo
# The commit ID of the original item (before merging). Used
# later for line mapping.
item_commit = None
# The set of repos which have had their state restored
restored_repos = set()
merge_items = [i for i in args['items'] if i.get('number')]
if merge_items:
item_commit = self.doMergeChanges(
merger, merge_items, repo_state)
merger, merge_items, repo_state, restored_repos)
if item_commit is None:
# There was a merge conflict and we have already sent
# a work complete result, don't run any jobs
@ -1108,10 +1109,11 @@ class AnsibleJob(object):
self._send_aborted()
return
state_items = [i for i in args['items'] if not i.get('number')]
if state_items:
for project in args['projects']:
if (project['connection'], project['name']) in restored_repos:
continue
merger.setRepoState(
state_items, repo_state,
project['connection'], project['name'], repo_state,
process_worker=self.executor_server.process_worker)
# Early abort if abort requested
@ -1286,7 +1288,7 @@ class AnsibleJob(object):
filecomments.updateLines(fc, new_lines)
def doMergeChanges(self, merger, items, repo_state):
def doMergeChanges(self, merger, items, repo_state, restored_repos):
try:
ret = merger.mergeChanges(
items, repo_state=repo_state,
@ -1314,6 +1316,7 @@ class AnsibleJob(object):
orig_commit = ret[4]
for key, commit in recent.items():
(connection, project, branch) = key
restored_repos.add((connection, project))
# Compare the commit with the repo state. If it's included in the
# repo state and it's the same we've set this ref already earlier
# and don't have to set it again.

View File

@ -397,7 +397,6 @@ class Repo(object):
def reset(self, zuul_event_id=None, build=None, process_worker=None):
log = get_annotated_logger(self.log, zuul_event_id, build=build)
log.debug("Resetting repository %s", self.local_path)
self.update(zuul_event_id=zuul_event_id, build=build)
self.createRepoObject(zuul_event_id, build=build)
if process_worker is None:
@ -761,7 +760,7 @@ class Merger(object):
self.execution_context = execution_context
def _addProject(self, hostname, connection_name, project_name, url, sshkey,
repo_state, zuul_event_id, process_worker=None):
zuul_event_id, process_worker=None):
repo = None
key = '/'.join([hostname, project_name])
try:
@ -777,11 +776,6 @@ class Merger(object):
logger=self.logger, git_timeout=self.git_timeout,
zuul_event_id=zuul_event_id)
# If we got a repo state restore it
if repo_state:
self._restoreRepoState(
connection_name, project_name, repo, repo_state,
zuul_event_id, process_worker=process_worker)
self.repos[key] = repo
except Exception:
log = get_annotated_logger(self.log, zuul_event_id)
@ -790,7 +784,7 @@ class Merger(object):
return repo
def getRepo(self, connection_name, project_name,
repo_state=None, zuul_event_id=None, process_worker=None):
zuul_event_id=None, process_worker=None):
source = self.connections.getSource(connection_name)
project = source.getProject(project_name)
hostname = project.canonical_hostname
@ -807,12 +801,19 @@ class Merger(object):
" without a url" %
(connection_name, project_name,))
return self._addProject(hostname, connection_name, project_name, url,
sshkey, repo_state, zuul_event_id,
sshkey, zuul_event_id,
process_worker=process_worker)
def updateRepo(self, connection_name, project_name, repo_state=None,
zuul_event_id=None,
build=None, process_worker=None):
"""Fetch from origin if needed
If repo_state is None, then this will always git fetch.
If repo_state is provided, then this may no-op if
the shas specified by repo_state are already present.
"""
log = get_annotated_logger(self.log, zuul_event_id, build=build)
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
@ -832,8 +833,7 @@ class Merger(object):
else:
log.info("Updating local repository %s/%s",
connection_name, project_name)
repo.reset(zuul_event_id=zuul_event_id, build=build,
process_worker=process_worker)
repo.update(zuul_event_id=zuul_event_id, build=build)
except Exception:
log.exception("Unable to update %s/%s",
connection_name, project_name)
@ -842,13 +842,20 @@ class Merger(object):
def checkoutBranch(self, connection_name, project_name, branch,
repo_state=None, zuul_event_id=None,
process_worker=None):
"""Check out a branch
Call Merger.updateRepo() first. This does not reset the repo,
and is expected to be called only after a fresh clone.
"""
log = get_annotated_logger(self.log, zuul_event_id)
log.info("Checking out %s/%s branch %s",
connection_name, project_name, branch)
repo = self.getRepo(connection_name, project_name,
repo_state=repo_state,
process_worker=process_worker,
zuul_event_id=zuul_event_id)
# We don't need to reset because this is only called by the
# executor after a clone.
if repo_state:
self._restoreRepoState(connection_name, project_name, repo,
repo_state, zuul_event_id,
@ -1006,6 +1013,11 @@ class Merger(object):
def mergeChanges(self, items, files=None, dirs=None, repo_state=None,
repo_locks=None, branches=None, zuul_event_id=None,
process_worker=None):
"""Merge changes
Call Merger.updateRepo() first.
"""
# _mergeItem calls reset as necessary.
log = get_annotated_logger(self.log, zuul_event_id)
# connection+project+branch -> commit
recent = {}
@ -1043,29 +1055,34 @@ class Merger(object):
ret_recent[k] = v.hexsha
return commit.hexsha, read_files, repo_state, ret_recent, orig_commit
def setRepoState(self, items, repo_state, zuul_event_id=None,
process_worker=None):
# Sets the repo state for the items
seen = set()
for item in items:
repo = self.getRepo(item['connection'], item['project'],
zuul_event_id=zuul_event_id)
key = (item['connection'], item['project'], item['branch'])
def setRepoState(self, connection_name, project_name, repo_state,
zuul_event_id=None, process_worker=None):
"""Set the repo state
if key in seen:
continue
Call Merger.updateRepo() first.
"""
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
repo.reset(zuul_event_id=zuul_event_id,
process_worker=process_worker)
self._restoreRepoState(item['connection'], item['project'], repo,
repo_state, zuul_event_id)
# TODO: why is reset required here?
repo.reset(zuul_event_id=zuul_event_id,
process_worker=process_worker)
self._restoreRepoState(connection_name, project_name, repo,
repo_state, zuul_event_id)
def getRepoState(self, items, repo_locks, branches=None):
# Gets the repo state for items. Generally this will be
# called in any non-change pipeline. We will return the repo
# state for each item, but manipulated with any information in
# the item (eg, if it creates a ref, that will be in the repo
# state regardless of the actual state).
"""Gets the repo state for items.
This will perform repo updates as needed, so there is no need
to call Merger.updateRepo() first.
"""
# Generally this will be called in any non-change pipeline. We
# will return the repo state for each item, but manipulated with
# any information in the item (eg, if it creates a ref, that
# will be in the repo state regardless of the actual state).
seen = set()
recent = {}
repo_state = {}
@ -1080,6 +1097,7 @@ class Merger(object):
key = (item['connection'], item['project'])
if key not in seen:
try:
repo.update()
repo.reset()
seen.add(key)
except Exception:
@ -1103,11 +1121,36 @@ class Merger(object):
return (True, repo_state, item_in_branches)
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):
"""Get file contents on branch.
Call Merger.updateRepo() first to make sure the repo is up to
date.
"""
# We don't update the repo so that it can happen outside the
# lock.
repo = self.getRepo(connection_name, project_name)
# TODO: why is reset required here?
repo.reset()
# This does not fetch, update, or reset, it operates on the
# working state.
return repo.getFiles(files, dirs, branch=branch)
def getFilesChanges(self, connection_name, project_name, branch,
tosha=None, zuul_event_id=None):
"""Get a list of files changed in one or more commits
Gets files changed between tosha and branch (or just the
commit on branch if tosha is not specified).
Call Merger.updateRepo() first to make sure the repo is up to
date.
"""
# Note, the arguments to this method should be reworked. We
# fetch branch, and therefore it is typically actually a
# change ref. tosha is typically the branch name.
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
# This performs a fetch, and therefore update/reset are not
# required.
return repo.getFilesChanges(branch, tosha, zuul_event_id=zuul_event_id)

View File

@ -84,7 +84,8 @@ class BaseMergeServer(metaclass=ABCMeta):
# up-to-date copies of all the repos that are used by jobs, as
# well as to support the merger:cat functon to supply
# configuration information to Zuul when it starts.
self.merger = self._getMerger(self.merge_root, None)
self.merger = self._getMerger(self.merge_root, None,
execution_context=False)
# Repo locking is needed on the executor
self.repo_locks = self._repo_locks_class()
@ -102,7 +103,8 @@ class BaseMergeServer(metaclass=ABCMeta):
self.config,
self.merger_jobs)
def _getMerger(self, root, cache_root, logger=None):
def _getMerger(self, root, cache_root, logger=None,
execution_context=True):
return merger.Merger(
root,
self.connections,
@ -113,7 +115,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self.merge_speed_time,
cache_root,
logger,
execution_context=True,
execution_context=execution_context,
git_timeout=self.git_timeout,
)
@ -122,6 +124,8 @@ class BaseMergeServer(metaclass=ABCMeta):
return nullcontext()
def _update(self, connection_name, project_name, zuul_event_id=None):
# The executor overrides _update so it can do the update
# asynchronously.
self.merger.updateRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
@ -189,6 +193,8 @@ class BaseMergeServer(metaclass=ABCMeta):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
for item in args['items']:
self._update(item['connection'], item['project'])
ret = self.merger.mergeChanges(
args['items'], args.get('files'),
args.get('dirs', []),