zuul/zuul/merger/merger.py

754 lines
29 KiB
Python

# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
from urllib.parse import urlsplit, urlunsplit, urlparse
import logging
import os
import re
import shutil
import time
import git
import gitdb
import paramiko
import zuul.model
NULL_REF = '0000000000000000000000000000000000000000'
def reset_repo_to_head(repo):
# This lets us reset the repo even if there is a file in the root
# directory named 'HEAD'. Currently, GitPython does not allow us
# to instruct it to always include the '--' to disambiguate. This
# should no longer be necessary if this PR merges:
# https://github.com/gitpython-developers/GitPython/pull/319
try:
repo.git.reset('--hard', 'HEAD', '--')
except git.GitCommandError as e:
# git nowadays may use 1 as status to indicate there are still unstaged
# modifications after the reset
if e.status != 1:
raise
def redact_url(url):
parsed = urlsplit(url)
if parsed.password is None:
return url
# items[1] is the netloc containing credentials and hostname
items = list(parsed)
items[1] = re.sub('.*@', '******@', items[1])
return urlunsplit(items)
@contextmanager
def timeout_handler(path):
try:
yield
except git.exc.GitCommandError as e:
if e.status == -9:
# Timeout. The repo could be in a bad state, so delete it.
shutil.rmtree(path)
raise
class Repo(object):
commit_re = re.compile(r'^commit ([0-9a-f]{40})$')
diff_re = re.compile(r'^@@ -\d+,\d \+(\d+),\d @@$')
def __init__(self, remote, local, email, username, speed_limit, speed_time,
sshkey=None, cache_path=None, logger=None, git_timeout=300,
retry_attempts=3, retry_interval=30):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
self.log = logger
self.env = {
'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
'GIT_HTTP_LOW_SPEED_TIME': speed_time,
}
self.git_timeout = git_timeout
self.sshkey = sshkey
if sshkey:
self.env['GIT_SSH_COMMAND'] = 'ssh -i %s' % (sshkey,)
self.remote_url = remote
self.local_path = local
self.email = email
self.username = username
self.cache_path = cache_path
self._initialized = False
self.retry_attempts = retry_attempts
self.retry_interval = retry_interval
try:
self._setup_known_hosts()
except Exception:
self.log.exception("Unable to set up known_hosts for %s" % remote)
try:
self._ensure_cloned()
self._git_set_remote_url(
git.Repo(self.local_path), self.remote_url)
except Exception:
self.log.exception("Unable to initialize repo for %s" % remote)
def _setup_known_hosts(self):
url = urlparse(self.remote_url)
if 'ssh' not in url.scheme:
return
port = url.port or 22
username = url.username or self.username
path = os.path.expanduser('~/.ssh')
os.makedirs(path, exist_ok=True)
path = os.path.expanduser('~/.ssh/known_hosts')
if not os.path.exists(path):
with open(path, 'w'):
pass
client = paramiko.SSHClient()
client.load_system_host_keys()
client.load_host_keys(path)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(url.hostname,
username=username,
port=port,
key_filename=self.sshkey)
finally:
# If we don't close on exceptions to connect we can leak the
# connection and DoS Gerrit.
client.close()
def _ensure_cloned(self):
repo_is_cloned = os.path.exists(os.path.join(self.local_path, '.git'))
if self._initialized and repo_is_cloned:
try:
# validate that the repo isn't corrupt
git.Repo(self.local_path)
return
except Exception:
# the repo is corrupt, delete the local path
shutil.rmtree(self.local_path)
repo_is_cloned = False
self._initialized = False
# If the repo does not exist, clone the repo.
rewrite_url = False
if not repo_is_cloned:
if self.cache_path:
clone_url = self.cache_path
rewrite_url = True
else:
clone_url = self.remote_url
self.log.debug("Cloning from %s to %s" % (
redact_url(clone_url), self.local_path))
self._git_clone(clone_url)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
# Create local branches corresponding to all the remote branches
if not repo_is_cloned:
origin = repo.remotes.origin
for ref in origin.refs:
if ref.remote_head == 'HEAD':
continue
repo.create_head(ref.remote_head, ref, force=True)
with repo.config_writer() as config_writer:
if self.email:
config_writer.set_value('user', 'email', self.email)
if self.username:
config_writer.set_value('user', 'name', self.username)
config_writer.write()
if rewrite_url:
self._git_set_remote_url(repo, self.remote_url)
self._initialized = True
def isInitialized(self):
return self._initialized
def _git_clone(self, url):
mygit = git.cmd.Git(os.getcwd())
mygit.update_environment(**self.env)
for attempt in range(1, self.retry_attempts + 1):
try:
with timeout_handler(self.local_path):
mygit.clone(git.cmd.Git.polish_url(url), self.local_path,
kill_after_timeout=self.git_timeout)
break
except Exception:
if attempt < self.retry_attempts:
time.sleep(self.retry_interval)
self.log.warning("Retry %s: Clone %s" % (
attempt, self.local_path))
else:
raise
def _git_fetch(self, repo, remote, ref=None, **kwargs):
for attempt in range(1, self.retry_attempts + 1):
try:
with timeout_handler(self.local_path):
repo.git.fetch(remote, ref,
kill_after_timeout=self.git_timeout,
**kwargs)
break
except Exception as e:
if attempt < self.retry_attempts:
if 'fatal: bad config' in e.stderr:
# This error can be introduced by a merge conflict
# in the .gitmodules which was left by the last
# merge operation. In this case reset and clean
# the repo and try again immediately.
reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
else:
time.sleep(self.retry_interval)
self.log.exception("Retry %s: Fetch %s %s %s" % (
attempt, self.local_path, remote, ref))
self._ensure_cloned()
else:
raise
def _git_set_remote_url(self, repo, url):
with repo.remotes.origin.config_writer as config_writer:
config_writer.set('url', url)
def createRepoObject(self):
self._ensure_cloned()
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
return repo
def reset(self):
self.log.debug("Resetting repository %s" % self.local_path)
self.update()
repo = self.createRepoObject()
origin = repo.remotes.origin
seen = set()
head = None
stale_refs = origin.stale_refs
# Update our local heads to match the remote, and pick one to
# reset the repo to. We don't delete anything at this point
# because we want to make sure the repo is in a state stable
# enough for git to operate.
for ref in origin.refs:
if ref.remote_head == 'HEAD':
continue
if ref in stale_refs:
continue
repo.create_head(ref.remote_head, ref, force=True)
seen.add(ref.remote_head)
if head is None:
head = ref.remote_head
self.log.debug("Reset to %s", head)
repo.head.reference = head
reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
for ref in stale_refs:
self.log.debug("Delete stale ref %s", ref.remote_head)
# A stale ref means the upstream branch (e.g. foobar) was deleted
# so we need to delete both our local head (if existing) and the
# remote tracking head. Both repo.heads and ref.remote_head
# contain the pure branch name so they can be compared easily.
for head in repo.heads:
if head.name == ref.remote_head:
repo.delete_head(ref.remote_head, force=True)
break
git.refs.RemoteReference.delete(repo, ref, force=True)
def prune(self):
repo = self.createRepoObject()
origin = repo.remotes.origin
stale_refs = origin.stale_refs
if stale_refs:
self.log.debug("Pruning stale refs: %s", stale_refs)
git.refs.RemoteReference.delete(repo, force=True, *stale_refs)
def getBranchHead(self, branch):
repo = self.createRepoObject()
branch_head = repo.heads[branch]
return branch_head.commit
def hasBranch(self, branch):
repo = self.createRepoObject()
origin = repo.remotes.origin
return branch in origin.refs
def getBranches(self):
# TODO(jeblair): deprecate with override-branch; replaced by
# getRefs().
repo = self.createRepoObject()
return [x.name for x in repo.heads]
def getCommitFromRef(self, refname):
repo = self.createRepoObject()
if refname not in repo.refs:
return None
ref = repo.refs[refname]
return ref.commit
def getRefs(self):
repo = self.createRepoObject()
return repo.refs
def setRef(self, path, hexsha, repo=None):
self.log.debug("Create reference %s at %s in %s",
path, hexsha, self.local_path)
if repo is None:
repo = self.createRepoObject()
binsha = gitdb.util.to_bin_sha(hexsha)
obj = git.objects.Object.new_from_sha(repo, binsha)
git.refs.Reference.create(repo, path, obj, force=True)
def setRefs(self, refs, keep_remotes=False):
repo = self.createRepoObject()
current_refs = {}
for ref in repo.refs:
current_refs[ref.path] = ref
unseen = set(current_refs.keys())
for path, hexsha in refs.items():
self.setRef(path, hexsha, repo)
unseen.discard(path)
ref = current_refs.get(path)
if keep_remotes and ref:
unseen.discard('refs/remotes/origin/{}'.format(ref.name))
for path in unseen:
self.deleteRef(path, repo)
def setRemoteRef(self, branch, rev):
repo = self.createRepoObject()
try:
origin_ref = repo.remotes.origin.refs[branch]
except IndexError:
self.log.warning("No remote ref found for branch %s", branch)
return
self.log.debug("Updating remote reference %s to %s", origin_ref, rev)
origin_ref.commit = rev
def deleteRef(self, path, repo=None):
if repo is None:
repo = self.createRepoObject()
self.log.debug("Delete reference %s", path)
git.refs.SymbolicReference.delete(repo, path)
def checkout(self, ref):
repo = self.createRepoObject()
self.log.debug("Checking out %s" % ref)
# Perform a hard reset before checking out so that we clean up
# anything that might be left over from a merge.
reset_repo_to_head(repo)
repo.git.checkout(ref)
return repo.head.commit
def cherryPick(self, ref):
repo = self.createRepoObject()
self.log.debug("Cherry-picking %s" % ref)
self.fetch(ref)
repo.git.cherry_pick("FETCH_HEAD")
return repo.head.commit
def merge(self, ref, strategy=None):
repo = self.createRepoObject()
args = []
if strategy:
args += ['-s', strategy]
args.append('FETCH_HEAD')
self.fetch(ref)
self.log.debug("Merging %s with args %s" % (ref, args))
repo.git.merge(*args)
return repo.head.commit
def fetch(self, ref):
repo = self.createRepoObject()
# NOTE: The following is currently not applicable, but if we
# switch back to fetch methods from GitPython, we need to
# consider it:
# The git.remote.fetch method may read in git progress info and
# interpret it improperly causing an AssertionError. Because the
# data was fetched properly subsequent fetches don't seem to fail.
# So try again if an AssertionError is caught.
self._git_fetch(repo, 'origin', ref)
def revParse(self, ref):
repo = self.createRepoObject()
return repo.git.rev_parse(ref)
def fetchFrom(self, repository, ref):
repo = self.createRepoObject()
self._git_fetch(repo, repository, ref)
def push(self, local, remote):
repo = self.createRepoObject()
self.log.debug("Pushing %s:%s to %s" % (local, remote,
self.remote_url))
repo.remotes.origin.push('%s:%s' % (local, remote))
def update(self):
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
# behavior covered by 'git --fetch', so we run both
# commands in that case. Starting with 1.9, 'git fetch
# --tags' is all that is necessary. See
# https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
self._git_fetch(repo, 'origin')
self._git_fetch(repo, 'origin', tags=True)
def getFiles(self, files, dirs=[], branch=None, commit=None):
ret = {}
repo = self.createRepoObject()
if branch:
tree = repo.heads[branch].commit.tree
else:
tree = repo.commit(commit).tree
for fn in files:
if fn in tree:
ret[fn] = tree[fn].data_stream.read().decode('utf8')
else:
ret[fn] = None
if dirs:
for dn in dirs:
if dn not in tree:
continue
for blob in tree[dn].traverse():
if blob.path.endswith(".yaml"):
ret[blob.path] = blob.data_stream.read().decode(
'utf-8')
return ret
def getFilesChanges(self, branch, tosha=None):
repo = self.createRepoObject()
self.fetch(branch)
head = repo.commit(self.revParse('FETCH_HEAD'))
files = set()
merge_bases = []
if tosha is not None:
merge_bases = repo.merge_base(head, tosha)
files.update(set(head.stats.files.keys()))
if merge_bases:
hexsha_list = [b.hexsha for b in merge_bases]
for cmt in head.iter_parents():
if cmt.hexsha in hexsha_list:
break
files.update(set(cmt.stats.files.keys()))
return list(files)
def deleteRemote(self, remote):
repo = self.createRepoObject()
repo.delete_remote(repo.remotes[remote])
def setRemoteUrl(self, url):
if self.remote_url == url:
return
self.log.debug("Set remote url to %s" % redact_url(url))
self.remote_url = url
self._git_set_remote_url(self.createRepoObject(), self.remote_url)
def mapLine(self, commit, filename, lineno):
repo = self.createRepoObject()
# Trace the specified line back to the specified commit and
# return the line number in that commit.
cur_commit = None
out = repo.git.log(L='%s,%s:%s' % (lineno, lineno, filename))
for l in out.split('\n'):
if cur_commit is None:
m = self.commit_re.match(l)
if m:
if m.group(1) == commit:
cur_commit = commit
continue
m = self.diff_re.match(l)
if m:
return int(m.group(1))
return None
class Merger(object):
def __init__(self, working_root, connections, email, username,
speed_limit, speed_time, cache_root=None, logger=None,
execution_context=False):
self.logger = logger
if logger is None:
self.log = logging.getLogger("zuul.Merger")
else:
self.log = logger
self.repos = {}
self.working_root = working_root
os.makedirs(working_root, exist_ok=True)
self.connections = connections
self.email = email
self.username = username
self.speed_limit = speed_limit
self.speed_time = speed_time
self.cache_root = cache_root
# Flag to determine if the merger is used for preparing repositories
# for job execution. This flag can be used to enable executor specific
# behavior e.g. to keep the 'origin' remote intact.
self.execution_context = execution_context
def _addProject(self, hostname, project_name, url, sshkey):
repo = None
key = '/'.join([hostname, project_name])
try:
path = os.path.join(self.working_root, hostname, project_name)
if self.cache_root:
cache_path = os.path.join(self.cache_root, hostname,
project_name)
else:
cache_path = None
repo = Repo(
url, path, self.email, self.username, self.speed_limit,
self.speed_time, sshkey, cache_path, self.logger)
self.repos[key] = repo
except Exception:
self.log.exception("Unable to add project %s/%s" %
(hostname, project_name))
return repo
def getRepo(self, connection_name, project_name):
source = self.connections.getSource(connection_name)
project = source.getProject(project_name)
hostname = project.canonical_hostname
url = source.getGitUrl(project)
key = '/'.join([hostname, project_name])
if key in self.repos:
repo = self.repos[key]
repo.setRemoteUrl(url)
return repo
sshkey = self.connections.connections.get(connection_name).\
connection_config.get('sshkey')
if not url:
raise Exception("Unable to set up repo for project %s/%s"
" without a url" %
(connection_name, project_name,))
return self._addProject(hostname, project_name, url, sshkey)
def updateRepo(self, connection_name, project_name):
repo = self.getRepo(connection_name, project_name)
try:
self.log.info("Updating local repository %s/%s",
connection_name, project_name)
repo.reset()
except Exception:
self.log.exception("Unable to update %s/%s",
connection_name, project_name)
def checkoutBranch(self, connection_name, project_name, branch):
self.log.info("Checking out %s/%s branch %s",
connection_name, project_name, branch)
repo = self.getRepo(connection_name, project_name)
repo.checkout(branch)
def _saveRepoState(self, connection_name, project_name, repo,
repo_state, recent):
projects = repo_state.setdefault(connection_name, {})
project = projects.setdefault(project_name, {})
for ref in repo.getRefs():
if ref.path.startswith('refs/zuul/'):
continue
if ref.path.startswith('refs/remotes/'):
continue
if ref.path.startswith('refs/heads/'):
branch = ref.path[len('refs/heads/'):]
key = (connection_name, project_name, branch)
if key not in recent:
recent[key] = ref.object
project[ref.path] = ref.object.hexsha
def _alterRepoState(self, connection_name, project_name,
repo_state, path, hexsha):
projects = repo_state.setdefault(connection_name, {})
project = projects.setdefault(project_name, {})
if hexsha == NULL_REF:
if path in project:
del project[path]
else:
project[path] = hexsha
def _restoreRepoState(self, connection_name, project_name, repo,
repo_state):
projects = repo_state.get(connection_name, {})
project = projects.get(project_name, {})
if not project:
# We don't have a state for this project.
return
self.log.debug("Restore repo state for project %s/%s",
connection_name, project_name)
repo.setRefs(project, keep_remotes=self.execution_context)
def _mergeChange(self, item, ref):
repo = self.getRepo(item['connection'], item['project'])
try:
repo.checkout(ref)
except Exception:
self.log.exception("Unable to checkout %s" % ref)
return None, None
try:
mode = item['merge_mode']
if mode == zuul.model.MERGER_MERGE:
commit = repo.merge(item['ref'])
elif mode == zuul.model.MERGER_MERGE_RESOLVE:
commit = repo.merge(item['ref'], 'resolve')
elif mode == zuul.model.MERGER_CHERRY_PICK:
commit = repo.cherryPick(item['ref'])
else:
raise Exception("Unsupported merge mode: %s" % mode)
except git.GitCommandError:
# Log git exceptions at debug level because they are
# usually benign merge conflicts
self.log.debug("Unable to merge %s" % item, exc_info=True)
return None, None
except Exception:
self.log.exception("Exception while merging a change:")
return None, None
orig_commit = repo.revParse('FETCH_HEAD')
return orig_commit, commit
def _mergeItem(self, item, recent, repo_state):
self.log.debug("Processing ref %s for project %s/%s / %s uuid %s" %
(item['ref'], item['connection'],
item['project'], item['branch'],
item['buildset_uuid']))
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
# We need to merge the change
# Get the most recent commit for this project-branch
base = recent.get(key)
if not base:
# There is none, so use the branch tip
# we need to reset here in order to call getBranchHead
self.log.debug("No base commit found for %s" % (key,))
try:
repo.reset()
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return None, None
self._restoreRepoState(item['connection'], item['project'], repo,
repo_state)
base = repo.getBranchHead(item['branch'])
# Save the repo state so that later mergers can repeat
# this process.
self._saveRepoState(item['connection'], item['project'], repo,
repo_state, recent)
else:
self.log.debug("Found base commit %s for %s" % (base, key,))
if self.execution_context:
# Set origin branch to the rev of the current (speculative) base.
# This allows tools to determine the commits that are part of a
# change by looking at origin/master..master.
repo.setRemoteRef(item['branch'], base)
# Merge the change
orig_commit, commit = self._mergeChange(item, base)
if not commit:
return None, None
# Store this commit as the most recent for this project-branch
recent[key] = commit
return orig_commit, commit
def mergeChanges(self, items, files=None, dirs=None, repo_state=None):
# connection+project+branch -> commit
recent = {}
commit = None
read_files = []
# connection -> project -> ref -> commit
if repo_state is None:
repo_state = {}
for item in items:
self.log.debug("Merging for change %s,%s" %
(item["number"], item["patchset"]))
orig_commit, commit = self._mergeItem(item, recent, repo_state)
if not commit:
return None
if files or dirs:
repo = self.getRepo(item['connection'], item['project'])
repo_files = repo.getFiles(files, dirs, commit=commit)
read_files.append(dict(
connection=item['connection'],
project=item['project'],
branch=item['branch'],
files=repo_files))
ret_recent = {}
for k, v in recent.items():
ret_recent[k] = v.hexsha
return commit.hexsha, read_files, repo_state, ret_recent, orig_commit
def setRepoState(self, items, repo_state):
# Sets the repo state for the items
seen = set()
for item in items:
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
if key in seen:
continue
repo.reset()
self._restoreRepoState(item['connection'], item['project'], repo,
repo_state)
def getRepoState(self, items):
# Gets the repo state for items. Generally this will be
# called in any non-change pipeline. We will return the repo
# state for each item, but manipulated with any information in
# the item (eg, if it creates a ref, that will be in the repo
# state regardless of the actual state).
seen = set()
recent = {}
repo_state = {}
for item in items:
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
if key not in seen:
try:
repo.reset()
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return (False, {})
self._saveRepoState(item['connection'], item['project'], repo,
repo_state, recent)
if item.get('newrev'):
# This is a ref update rather than a branch tip, so make sure
# our returned state includes this change.
self._alterRepoState(item['connection'], item['project'],
repo_state, item['ref'], item['newrev'])
return (True, repo_state)
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):
repo = self.getRepo(connection_name, project_name)
return repo.getFiles(files, dirs, branch=branch)
def getFilesChanges(self, connection_name, project_name, branch,
tosha=None):
repo = self.getRepo(connection_name, project_name)
return repo.getFilesChanges(branch, tosha)