Thin the workspace repo clone

When the executor clones a repo from the cache to the workspace,
it performs a lot of unecessary work:

* It checks out HEAD and prepares a workspace which we will
  immediately change.
* It copies all of the branch refs, half of which we will immediately
  delete, and in some configurations (exclude_unprotected_branches)
  we will immediately delete most of the rest.  Deleting refs with
  gitpython is much more expensive than creating them.

This change updates the initial clone to do none of those, instead
relying on the repo state restoration to take care of that for us.

Change-Id: Ie8846c48ccd6255953f46640f5559bb41491d425
This commit is contained in:
James E. Blair 2023-11-08 16:00:01 -08:00
parent 07e28cca57
commit 518194af1d
3 changed files with 102 additions and 74 deletions

View File

@ -231,7 +231,10 @@ class TestMergerRepo(ZuulTestCase):
repo = git.Repo(self.workspace_root) repo = git.Repo(self.workspace_root)
new_sha = repo.heads.foobar.commit.hexsha new_sha = repo.heads.foobar.commit.hexsha
work_repo.setRefs({'refs/heads/master': new_sha}, True) work_repo.setRefs({
'refs/heads/master': new_sha,
'refs/remotes/origin/master': new_sha,
})
self.assertEqual(work_repo.getBranchHead('master').hexsha, new_sha) self.assertEqual(work_repo.getBranchHead('master').hexsha, new_sha)
self.assertIn('master', repo.remotes.origin.refs) self.assertIn('master', repo.remotes.origin.refs)

View File

@ -996,6 +996,7 @@ class AnsibleJob(object):
# TODO(corvus): Remove default setting after 4.3.0; this is to handle # TODO(corvus): Remove default setting after 4.3.0; this is to handle
# scheduler/executor version skew. # scheduler/executor version skew.
self.scheme = self.job.workspace_scheme or zuul.model.SCHEME_GOLANG self.scheme = self.job.workspace_scheme or zuul.model.SCHEME_GOLANG
self.workspace_merger = None
self.log = get_annotated_logger( self.log = get_annotated_logger(
logger, self.zuul_event_id, build=build_request.uuid logger, self.zuul_event_id, build=build_request.uuid
) )
@ -1277,12 +1278,25 @@ class AnsibleJob(object):
extra_repo_state = extra_rs_path and ExtraRepoState.fromZK( extra_repo_state = extra_rs_path and ExtraRepoState.fromZK(
ctx, extra_rs_path) ctx, extra_rs_path)
d = {} d = {}
# Combine the two
for rs in (merge_repo_state, extra_repo_state): for rs in (merge_repo_state, extra_repo_state):
if not rs: if not rs:
continue continue
for connection in rs.state.keys(): for connection in rs.state.keys():
d.setdefault(connection, {}).update( d.setdefault(connection, {}).update(
rs.state.get(connection, {})) rs.state.get(connection, {}))
# Ensure that we have an origin ref for every local branch.
# Some of these will be overwritten later as we merge changes,
# but for starters, we can use the current head of each
# branch.
for connection_state in d.values():
for project_state in connection_state.values():
for path, hexsha in list(project_state.items()):
if path.startswith('refs/heads/'):
name = path[11:]
remote_name = f'refs/remotes/origin/{name}'
if remote_name not in connection_state:
project_state[remote_name] = hexsha
self.repo_state = d self.repo_state = d
def _base_job_data(self): def _base_job_data(self):
@ -1402,7 +1416,7 @@ class AnsibleJob(object):
job_output.write("{now} | Preparing job workspace\n".format( job_output.write("{now} | Preparing job workspace\n".format(
now=datetime.datetime.now() now=datetime.datetime.now()
)) ))
merger = self.executor_server._getMerger( self.workspace_merger = self.executor_server._getMerger(
self.jobdir.src_root, self.jobdir.src_root,
self.executor_server.merge_root, self.executor_server.merge_root,
logger=self.log, logger=self.log,
@ -1415,7 +1429,7 @@ class AnsibleJob(object):
'BuildCloneRepo', 'BuildCloneRepo',
attributes={'connection': project['connection'], attributes={'connection': project['connection'],
'project': project['name']}): 'project': project['name']}):
repo = merger.getRepo( repo = self.workspace_merger.getRepo(
project['connection'], project['connection'],
project['name'], project['name'],
process_worker=self.executor_server.process_worker) process_worker=self.executor_server.process_worker)
@ -1432,7 +1446,7 @@ class AnsibleJob(object):
with tracer.start_as_current_span( with tracer.start_as_current_span(
'BuildMergeChanges'): 'BuildMergeChanges'):
item_commit = self.doMergeChanges( item_commit = self.doMergeChanges(
merger, merge_items, self.repo_state, restored_repos) merge_items, self.repo_state, restored_repos)
if item_commit is None: if item_commit is None:
# There was a merge conflict and we have already sent # There was a merge conflict and we have already sent
# a work complete result, don't run any jobs # a work complete result, don't run any jobs
@ -1450,7 +1464,7 @@ class AnsibleJob(object):
'BuildSetRepoState', 'BuildSetRepoState',
attributes={'connection': project['connection'], attributes={'connection': project['connection'],
'project': project['name']}): 'project': project['name']}):
merger.setRepoState( self.workspace_merger.setRepoState(
project['connection'], project['name'], self.repo_state, project['connection'], project['name'], self.repo_state,
process_worker=self.executor_server.process_worker) process_worker=self.executor_server.process_worker)
@ -1555,7 +1569,7 @@ class AnsibleJob(object):
result = 'DISK_FULL' result = 'DISK_FULL'
data, secret_data = self.getResultData() data, secret_data = self.getResultData()
warnings = [] warnings = []
self.mapLines(merger, args, data, item_commit, warnings) self.mapLines(args, data, item_commit, warnings)
warnings.extend(get_warnings_from_result_data(data, logger=self.log)) warnings.extend(get_warnings_from_result_data(data, logger=self.log))
result_data = dict(result=result, result_data = dict(result=result,
error_detail=error_detail, error_detail=error_detail,
@ -1587,7 +1601,7 @@ class AnsibleJob(object):
self.log.exception("Unable to load result data:") self.log.exception("Unable to load result data:")
return data, secret_data return data, secret_data
def mapLines(self, merger, args, data, commit, warnings): def mapLines(self, args, data, commit, warnings):
# The data and warnings arguments are mutated in this method. # The data and warnings arguments are mutated in this method.
# If we received file comments, map the line numbers before # If we received file comments, map the line numbers before
@ -1612,8 +1626,8 @@ class AnsibleJob(object):
if (project['canonical_name'] != if (project['canonical_name'] !=
args['zuul']['project']['canonical_name']): args['zuul']['project']['canonical_name']):
continue continue
repo = merger.getRepo(project['connection'], repo = self.workspace_merger.getRepo(project['connection'],
project['name']) project['name'])
# If the repo doesn't exist, abort # If the repo doesn't exist, abort
if not repo: if not repo:
return return
@ -1661,9 +1675,9 @@ class AnsibleJob(object):
filecomments.updateLines(fc, new_lines) filecomments.updateLines(fc, new_lines)
def doMergeChanges(self, merger, items, repo_state, restored_repos): def doMergeChanges(self, items, repo_state, restored_repos):
try: try:
ret = merger.mergeChanges( ret = self.workspace_merger.mergeChanges(
items, repo_state=repo_state, items, repo_state=repo_state,
process_worker=self.executor_server.process_worker) process_worker=self.executor_server.process_worker)
except ValueError: except ValueError:
@ -1698,7 +1712,7 @@ class AnsibleJob(object):
repo_state_commit = project_repo_state.get( repo_state_commit = project_repo_state.get(
'refs/heads/%s' % branch) 'refs/heads/%s' % branch)
if repo_state_commit != commit: if repo_state_commit != commit:
repo = merger.getRepo(connection, project) repo = self.workspace_merger.getRepo(connection, project)
repo.setRef('refs/heads/' + branch, commit) repo.setRef('refs/heads/' + branch, commit)
return orig_commit return orig_commit
@ -2248,19 +2262,32 @@ class AnsibleJob(object):
logger=self.log, logger=self.log,
scheme=zuul.model.SCHEME_GOLANG, scheme=zuul.model.SCHEME_GOLANG,
cache_scheme=self.scheme) cache_scheme=self.scheme)
# Since we're not restoring the repo state and
# we're skipping the ref setup after cloning, we
# do need to at least ensure the branch we're
# going to check out exists.
repo = self.workspace_merger.getRepo(p['connection'],
p['name'])
repo_state = {
p['connection']: {
p['name']: {
f'refs/heads/{branch}':
repo.getBranchHead(branch).hexsha
}
}
}
break break
repo_state = None
if merger is None: if merger is None:
merger = self.executor_server._getMerger( merger = self.executor_server._getMerger(
pi.root, pi.root,
self.executor_server.merge_root, self.executor_server.merge_root,
logger=self.log, logger=self.log,
scheme=zuul.model.SCHEME_GOLANG) scheme=zuul.model.SCHEME_GOLANG)
# If we don't have this repo yet prepared we need to
# If we don't have this repo yet prepared we need to restore # restore the repo state. Otherwise we have
# the repo state. Otherwise we have speculative merges in the # speculative merges in the repo and must not restore
# repo and must not restore the repo state again. # the repo state again.
repo_state = self.repo_state repo_state = self.repo_state
self.log.debug("Cloning %s@%s into new untrusted space %s", self.log.debug("Cloning %s@%s into new untrusted space %s",

View File

@ -1,5 +1,6 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013-2014 OpenStack Foundation # Copyright 2013-2014 OpenStack Foundation
# Copyright 2021-2023 Acme Gating, LLC
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -14,12 +15,11 @@
# under the License. # under the License.
from contextlib import contextmanager from contextlib import contextmanager
from logging import Logger
from typing import Optional
from urllib.parse import urlsplit, urlunsplit, urlparse from urllib.parse import urlsplit, urlunsplit, urlparse
import hashlib import hashlib
import logging import logging
import math import math
import sys
import os import os
import re import re
import shutil import shutil
@ -29,7 +29,6 @@ from concurrent.futures.process import BrokenProcessPool
import git import git
import gitdb import gitdb
import paramiko import paramiko
from zuul.zk import ZooKeeperClient
from zuul.lib import strings from zuul.lib import strings
import zuul.model import zuul.model
@ -75,12 +74,13 @@ class Repo(object):
def __init__(self, remote, local, email, username, speed_limit, speed_time, def __init__(self, remote, local, email, username, speed_limit, speed_time,
sshkey=None, cache_path=None, logger=None, git_timeout=300, sshkey=None, cache_path=None, logger=None, git_timeout=300,
zuul_event_id=None, retry_timeout=None): zuul_event_id=None, retry_timeout=None, skip_refs=False):
if logger is None: if logger is None:
self.log = logging.getLogger("zuul.Repo") self.log = logging.getLogger("zuul.Repo")
else: else:
self.log = logger self.log = logger
log = get_annotated_logger(self.log, zuul_event_id) log = get_annotated_logger(self.log, zuul_event_id)
self.skip_refs = skip_refs
self.env = { self.env = {
'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit, 'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
'GIT_HTTP_LOW_SPEED_TIME': speed_time, 'GIT_HTTP_LOW_SPEED_TIME': speed_time,
@ -174,8 +174,10 @@ class Repo(object):
repo = git.Repo(self.local_path) repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env) repo.git.update_environment(**self.env)
# Create local branches corresponding to all the remote branches # Create local branches corresponding to all the remote
if not repo_is_cloned: # branches. Skip this when cloning the workspace repo since
# we will restore the refs there.
if not repo_is_cloned and not self.skip_refs:
origin = repo.remotes.origin origin = repo.remotes.origin
for ref in origin.refs: for ref in origin.refs:
if ref.remote_head == 'HEAD': if ref.remote_head == 'HEAD':
@ -237,11 +239,16 @@ class Repo(object):
mygit = git.cmd.Git(os.getcwd()) mygit = git.cmd.Git(os.getcwd())
mygit.update_environment(**self.env) mygit.update_environment(**self.env)
kwargs = dict(kill_after_timeout=self.git_timeout)
if self.skip_refs:
kwargs.update(dict(
no_checkout=True,
))
for attempt in range(1, self.retry_attempts + 1): for attempt in range(1, self.retry_attempts + 1):
try: try:
with timeout_handler(self.local_path): with timeout_handler(self.local_path):
mygit.clone(git.cmd.Git.polish_url(url), self.local_path, mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
kill_after_timeout=self.git_timeout) **kwargs)
break break
except Exception: except Exception:
if attempt < self.retry_attempts: if attempt < self.retry_attempts:
@ -379,7 +386,12 @@ class Repo(object):
else: else:
messages.append("Unable to detach HEAD to %s" % ref) messages.append("Unable to detach HEAD to %s" % ref)
else: else:
raise Exception("Couldn't detach HEAD to any existing commit") # There are no remote refs; proceed with the assumption we
# don't have a checkout yet.
if log:
log.debug("Couldn't detach HEAD to any existing commit")
else:
messages.append("Couldn't detach HEAD to any existing commit")
# Delete local heads that no longer exist on the remote end # Delete local heads that no longer exist on the remote end
zuul_refs_to_keep = [ zuul_refs_to_keep = [
@ -502,40 +514,43 @@ class Repo(object):
return 'Created reference %s at %s in %s' % ( return 'Created reference %s at %s in %s' % (
path, hexsha, repo.git_dir) path, hexsha, repo.git_dir)
def setRefs(self, refs, keep_remotes=False, zuul_event_id=None): def setRefs(self, refs, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id) repo = self.createRepoObject(zuul_event_id)
ref_log = get_annotated_logger( ref_log = get_annotated_logger(
logging.getLogger("zuul.Repo.Ref"), zuul_event_id) logging.getLogger("zuul.Repo.Ref"), zuul_event_id)
self._setRefs(repo, refs, keep_remotes=keep_remotes, log=ref_log) self._setRefs(repo, refs, log=ref_log)
@staticmethod @staticmethod
def setRefsAsync(local_path, refs, keep_remotes=False): def setRefsAsync(local_path, refs):
repo = git.Repo(local_path) repo = git.Repo(local_path)
messages = Repo._setRefs(repo, refs, keep_remotes=keep_remotes) messages = Repo._setRefs(repo, refs)
return messages return messages
@staticmethod @staticmethod
def _setRefs(repo, refs, keep_remotes=False, log=None): def _setRefs(repo, refs, log=None):
messages = [] messages = []
current_refs = {}
for ref in repo.refs: # Rewrite packed refs with our content. In practice, this
current_refs[ref.path] = ref # should take care of almost every ref in the repo, except
unseen = set(current_refs.keys()) # maybe HEAD and master.
for path, hexsha in refs.items(): refs_path = f"{repo.git_dir}/packed-refs"
if log: encoding = sys.getfilesystemencoding()
log.debug("Create reference %s at %s in %s", with open(refs_path, 'wb') as f:
path, hexsha, repo.git_dir) f.write(b'# pack-refs with: peeled fully-peeled sorted \n')
message = Repo._setRef(path, hexsha, repo) sorted_paths = sorted(refs.keys())
messages.append(message) for path in sorted_paths:
unseen.discard(path) hexsha = refs[path]
ref = current_refs.get(path) f.write(f'{hexsha} {path}\n'.encode(encoding))
if keep_remotes and ref: if log:
unseen.discard('refs/remotes/origin/{}'.format(ref.name)) log.debug("Set reference %s at %s in %s",
for path in unseen: path, hexsha, repo.git_dir)
if log:
log.debug("Delete reference %s", path) # Delete all the loose refs
message = Repo._deleteRef(path, repo) for dname in ('remotes', 'tags', 'heads'):
messages.append(message) path = f"{repo.git_dir}/refs/{dname}"
if os.path.exists(path):
shutil.rmtree(path)
return messages return messages
def setRemoteRef(self, branch, rev, zuul_event_id=None): def setRemoteRef(self, branch, rev, zuul_event_id=None):
@ -882,22 +897,10 @@ class MergerTree:
class Merger(object): class Merger(object):
def __init__( def __init__(self, working_root, connections, zk_client, email,
self, username, speed_limit, speed_time, cache_root=None,
working_root: str, logger=None, execution_context=False, git_timeout=300,
connections, scheme=None, cache_scheme=None):
zk_client: ZooKeeperClient,
email: str,
username: str,
speed_limit: str,
speed_time: str,
cache_root: Optional[str] = None,
logger: Optional[Logger] = None,
execution_context: bool = False,
git_timeout: int = 300,
scheme: str = None,
cache_scheme: str = None,
):
self.logger = logger self.logger = logger
if logger is None: if logger is None:
self.log = logging.getLogger("zuul.Merger") self.log = logging.getLogger("zuul.Merger")
@ -969,7 +972,8 @@ class Merger(object):
url, path, self.email, self.username, self.speed_limit, url, path, self.email, self.username, self.speed_limit,
self.speed_time, sshkey=sshkey, cache_path=cache_path, self.speed_time, sshkey=sshkey, cache_path=cache_path,
logger=self.logger, git_timeout=self.git_timeout, logger=self.logger, git_timeout=self.git_timeout,
zuul_event_id=zuul_event_id, retry_timeout=retry_timeout) zuul_event_id=zuul_event_id, retry_timeout=retry_timeout,
skip_refs=self.execution_context)
self.repos[key] = repo self.repos[key] = repo
except Exception: except Exception:
@ -1100,12 +1104,10 @@ class Merger(object):
log.debug("Restore repo state for project %s/%s", log.debug("Restore repo state for project %s/%s",
connection_name, project_name) connection_name, project_name)
if process_worker is None: if process_worker is None:
repo.setRefs(project, keep_remotes=self.execution_context, repo.setRefs(project, zuul_event_id=zuul_event_id)
zuul_event_id=zuul_event_id)
else: else:
job = process_worker.submit( job = process_worker.submit(
Repo.setRefsAsync, repo.local_path, project, Repo.setRefsAsync, repo.local_path, project)
keep_remotes=self.execution_context)
messages = job.result() messages = job.result()
ref_log = get_annotated_logger( ref_log = get_annotated_logger(
logging.getLogger("zuul.Repo.Ref"), zuul_event_id) logging.getLogger("zuul.Repo.Ref"), zuul_event_id)
@ -1286,10 +1288,6 @@ class Merger(object):
repo = self.getRepo(connection_name, project_name, repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id) zuul_event_id=zuul_event_id)
# TODO: why is reset required here?
repo.reset(zuul_event_id=zuul_event_id,
process_worker=process_worker)
self._restoreRepoState(connection_name, project_name, repo, self._restoreRepoState(connection_name, project_name, repo,
repo_state, zuul_event_id) repo_state, zuul_event_id)