Merge "Annotate merger logs with event id"

This commit is contained in:
Zuul 2019-05-20 21:45:53 +00:00 committed by Gerrit Code Review
commit b45b375554
10 changed files with 275 additions and 178 deletions

View File

@ -1588,10 +1588,11 @@ class RecordingMergeClient(zuul.merger.client.MergeClient):
self.history = {}
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
precedence=zuul.model.PRECEDENCE_NORMAL, event=None):
self.history.setdefault(name, [])
self.history[name].append((data, build_set))
return super().submitJob(name, data, build_set, precedence)
return super().submitJob(
name, data, build_set, precedence, event=event)
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):

View File

@ -68,12 +68,12 @@ class TestMergerRepo(ZuulTestCase):
self.assertEqual(
os.path.join(self.upstream_root, 'org/project1'),
work_repo.createRepoObject().remotes[0].url,
work_repo.createRepoObject(None).remotes[0].url,
message="Parent clone still point to upstream project1")
self.assertEqual(
os.path.join(self.upstream_root, 'org/project2'),
sub_repo.createRepoObject().remotes[0].url,
sub_repo.createRepoObject(None).remotes[0].url,
message="Sub repository points to upstream project2")
def test_set_refs(self):
@ -121,7 +121,7 @@ class TestMergerRepo(ZuulTestCase):
# test, we try cloning again.
with testtools.ExpectedException(git.exc.GitCommandError,
r'.*exit code\(-9\)'):
work_repo._ensure_cloned()
work_repo._ensure_cloned(None)
def test_fetch_timeout(self):
parent_path = os.path.join(self.upstream_root, 'org/project1')
@ -197,7 +197,7 @@ class TestMergerRepo(ZuulTestCase):
fn = os.path.join(cache_repo.local_path, 'commit_filename')
with open(fn, 'a') as f:
f.write("test")
repo = cache_repo.createRepoObject()
repo = cache_repo.createRepoObject(None)
repo.index.add([fn])
repo.index.commit('test commit')

View File

@ -2041,7 +2041,7 @@ class TestScheduler(ZuulTestCase):
trusted, project = tenant.getProject('org/project')
url = self.fake_gerrit.getGitUrl(project)
self.executor_server.merger._addProject('review.example.com',
'org/project', url, None)
'org/project', url, None, None)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addPatchset(large=True)
# TODOv3(jeblair): add hostname to upstream root

View File

@ -2732,6 +2732,7 @@ class ExecutorServer(object):
def fileschanges(self, job):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
task = self.update(args['connection'], args['project'])
task.wait()
lock = self.repo_locks.getRepoLock(
@ -2740,29 +2741,35 @@ class ExecutorServer(object):
files = self.merger.getFilesChanges(
args['connection'], args['project'],
args['branch'],
args['tosha'])
args['tosha'], zuul_event_id=zuul_event_id)
result = dict(updated=True,
files=files)
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))
def refstate(self, job):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
success, repo_state = self.merger.getRepoState(
args['items'], repo_locks=self.repo_locks)
result = dict(updated=success,
repo_state=repo_state)
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))
def merge(self, job):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
ret = self.merger.mergeChanges(args['items'], args.get('files'),
args.get('dirs', []),
args.get('repo_state'),
repo_locks=self.repo_locks)
repo_locks=self.repo_locks,
zuul_event_id=zuul_event_id)
result = dict(merged=(ret is not None))
if ret is None:
result['commit'] = result['files'] = result['repo_state'] = None
else:
(result['commit'], result['files'], result['repo_state'],
recent, orig_commit) = ret
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))

View File

@ -18,7 +18,12 @@ from zuul.model import TriggerEvent
def get_annotated_logger(logger, event, build=None):
extra = {}
# Note(tobiash): When running with python 3.5 log adapters cannot be
# stacked. We need to detect this case and modify the original one.
if isinstance(logger, EventIdLogAdapter):
extra = logger.extra
else:
extra = {}
if event is not None:
if isinstance(event, TriggerEvent):
@ -29,6 +34,9 @@ def get_annotated_logger(logger, event, build=None):
if build is not None:
extra['build'] = build
if isinstance(logger, EventIdLogAdapter):
return logger
return EventIdLogAdapter(logger, extra)

View File

@ -623,11 +623,13 @@ class PipelineManager(object):
if isinstance(item.change, model.Change):
self.sched.merger.mergeChanges(build_set.merger_items,
item.current_build_set, files, dirs,
precedence=self.pipeline.precedence)
precedence=self.pipeline.precedence,
event=item.event)
else:
self.sched.merger.getRepoState(build_set.merger_items,
item.current_build_set,
precedence=self.pipeline.precedence)
precedence=self.pipeline.precedence,
event=item.event)
return False
def scheduleFilesChanges(self, item):
@ -638,7 +640,8 @@ class PipelineManager(object):
self.sched.merger.getFilesChanges(
item.change.project.connection_name, item.change.project.name,
item.change.ref, item.change.branch, build_set=build_set)
item.change.ref, item.change.branch, build_set=build_set,
event=item.event)
return False
def prepareItem(self, item):

View File

@ -21,6 +21,7 @@ import gear
import zuul.model
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
def getJobData(job):
@ -100,63 +101,92 @@ class MergeClient(object):
return False
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
precedence=zuul.model.PRECEDENCE_NORMAL, event=None):
log = get_annotated_logger(self.log, event)
uuid = str(uuid4().hex)
job = MergeJob(name,
json.dumps(data),
unique=uuid)
job.build_set = build_set
self.log.debug("Submitting job %s with data %s" % (job, data))
log.debug("Submitting job %s with data %s", job, data)
self.jobs.add(job)
self.gearman.submitJob(job, precedence=precedence,
timeout=300)
return job
def mergeChanges(self, items, build_set, files=None, dirs=None,
repo_state=None, precedence=zuul.model.PRECEDENCE_NORMAL):
repo_state=None, precedence=zuul.model.PRECEDENCE_NORMAL,
event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
data = dict(items=items,
files=files,
dirs=dirs,
repo_state=repo_state)
self.submitJob('merger:merge', data, build_set, precedence)
repo_state=repo_state,
zuul_event_id=zuul_event_id)
self.submitJob('merger:merge', data, build_set, precedence,
event=event)
def getRepoState(self, items, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
data = dict(items=items)
self.submitJob('merger:refstate', data, build_set, precedence)
precedence=zuul.model.PRECEDENCE_NORMAL,
event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
data = dict(items=items, zuul_event_id=zuul_event_id)
self.submitJob('merger:refstate', data, build_set, precedence,
event=event)
def getFiles(self, connection_name, project_name, branch, files, dirs=[],
precedence=zuul.model.PRECEDENCE_HIGH):
precedence=zuul.model.PRECEDENCE_HIGH, event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
data = dict(connection=connection_name,
project=project_name,
branch=branch,
files=files,
dirs=dirs)
job = self.submitJob('merger:cat', data, None, precedence)
dirs=dirs,
zuul_event_id=zuul_event_id)
job = self.submitJob('merger:cat', data, None, precedence, event=event)
return job
def getFilesChanges(self, connection_name, project_name, branch,
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH,
build_set=None):
build_set=None, event=None):
if event is not None:
zuul_event_id = event.zuul_event_id
else:
zuul_event_id = None
data = dict(connection=connection_name,
project=project_name,
branch=branch,
tosha=tosha)
tosha=tosha,
zuul_event_id=zuul_event_id)
job = self.submitJob('merger:fileschanges', data, build_set,
precedence)
precedence, event=event)
return job
def onBuildCompleted(self, job):
data = getJobData(job)
zuul_event_id = data.get('zuul_event_id')
log = get_annotated_logger(self.log, zuul_event_id)
merged = data.get('merged', False)
job.updated = data.get('updated', False)
commit = data.get('commit')
files = data.get('files', {})
repo_state = data.get('repo_state', {})
job.files = files
self.log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s" %
(job, merged, job.updated, commit))
log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s", job, merged, job.updated, commit)
job.setComplete()
if job.build_set:
if job.name == 'merger:fileschanges':

View File

@ -27,6 +27,8 @@ import paramiko
import zuul.model
from zuul.lib.logutil import get_annotated_logger
NULL_REF = '0000000000000000000000000000000000000000'
@ -79,11 +81,12 @@ class Repo(object):
def __init__(self, remote, local, email, username, speed_limit, speed_time,
sshkey=None, cache_path=None, logger=None, git_timeout=300,
retry_attempts=3, retry_interval=30):
retry_attempts=3, retry_interval=30, zuul_event_id=None):
if logger is None:
self.log = logging.getLogger("zuul.Repo")
else:
self.log = logger
log = get_annotated_logger(self.log, zuul_event_id)
self.env = {
'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit,
'GIT_HTTP_LOW_SPEED_TIME': speed_time,
@ -104,13 +107,13 @@ class Repo(object):
try:
self._setup_known_hosts()
except Exception:
self.log.exception("Unable to set up known_hosts for %s" % remote)
log.exception("Unable to set up known_hosts for %s", remote)
try:
self._ensure_cloned()
self._ensure_cloned(zuul_event_id)
self._git_set_remote_url(
git.Repo(self.local_path), self.remote_url)
except Exception:
self.log.exception("Unable to initialize repo for %s" % remote)
log.exception("Unable to initialize repo for %s", remote)
def __repr__(self):
return "<Repo {} {}>".format(hex(id(self)), self.local_path)
@ -145,7 +148,8 @@ class Repo(object):
# connection and DoS Gerrit.
client.close()
def _ensure_cloned(self):
def _ensure_cloned(self, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
repo_is_cloned = os.path.exists(os.path.join(self.local_path, '.git'))
if self._initialized and repo_is_cloned:
try:
@ -167,9 +171,9 @@ class Repo(object):
else:
clone_url = self.remote_url
self.log.debug("Cloning from %s to %s" % (
redact_url(clone_url), self.local_path))
self._git_clone(clone_url)
log.debug("Cloning from %s to %s",
redact_url(clone_url), self.local_path)
self._git_clone(clone_url, zuul_event_id)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
@ -193,7 +197,8 @@ class Repo(object):
def isInitialized(self):
return self._initialized
def _git_clone(self, url):
def _git_clone(self, url, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
mygit = git.cmd.Git(os.getcwd())
mygit.update_environment(**self.env)
@ -206,12 +211,12 @@ class Repo(object):
except Exception:
if attempt < self.retry_attempts:
time.sleep(self.retry_interval)
self.log.warning("Retry %s: Clone %s" % (
attempt, self.local_path))
log.warning("Retry %s: Clone %s", attempt, self.local_path)
else:
raise
def _git_fetch(self, repo, remote, ref=None, **kwargs):
def _git_fetch(self, repo, remote, zuul_event_id, ref=None, **kwargs):
log = get_annotated_logger(self.log, zuul_event_id)
for attempt in range(1, self.retry_attempts + 1):
try:
with timeout_handler(self.local_path):
@ -245,9 +250,9 @@ class Repo(object):
shutil.rmtree(self.local_path)
else:
time.sleep(self.retry_interval)
self.log.exception("Retry %s: Fetch %s %s %s" % (
log.exception("Retry %s: Fetch %s %s %s" % (
attempt, self.local_path, remote, ref))
self._ensure_cloned()
self._ensure_cloned(zuul_event_id)
else:
raise
@ -255,16 +260,17 @@ class Repo(object):
with repo.remotes.origin.config_writer as config_writer:
config_writer.set('url', url)
def createRepoObject(self):
self._ensure_cloned()
def createRepoObject(self, zuul_event_id):
self._ensure_cloned(zuul_event_id)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
return repo
def reset(self):
self.log.debug("Resetting repository %s" % self.local_path)
self.update()
repo = self.createRepoObject()
def reset(self, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Resetting repository %s", self.local_path)
self.update(zuul_event_id=zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
origin = repo.remotes.origin
seen = set()
head = None
@ -282,10 +288,10 @@ class Repo(object):
seen.add(ref.remote_head)
if head is None:
head = ref.remote_head
self.log.debug("Reset to %s", head)
log.debug("Reset to %s", head)
repo.head.reference = head
for ref in stale_refs:
self.log.debug("Delete stale ref %s", ref.remote_head)
log.debug("Delete stale ref %s", ref.remote_head)
# A stale ref means the upstream branch (e.g. foobar) was deleted
# so we need to delete both our local head (if existing) and the
# remote tracking head. Both repo.heads and ref.remote_head
@ -296,89 +302,94 @@ class Repo(object):
break
git.refs.RemoteReference.delete(repo, ref, force=True)
def prune(self):
repo = self.createRepoObject()
def prune(self, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
origin = repo.remotes.origin
stale_refs = origin.stale_refs
if stale_refs:
self.log.debug("Pruning stale refs: %s", stale_refs)
log.debug("Pruning stale refs: %s", stale_refs)
git.refs.RemoteReference.delete(repo, force=True, *stale_refs)
def getBranchHead(self, branch):
repo = self.createRepoObject()
def getBranchHead(self, branch, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
branch_head = repo.heads[branch]
return branch_head.commit
def hasBranch(self, branch):
repo = self.createRepoObject()
def hasBranch(self, branch, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
origin = repo.remotes.origin
return branch in origin.refs
def getBranches(self):
def getBranches(self, zuul_event_id=None):
# TODO(jeblair): deprecate with override-branch; replaced by
# getRefs().
repo = self.createRepoObject()
repo = self.createRepoObject(zuul_event_id)
return [x.name for x in repo.heads]
def getCommitFromRef(self, refname):
repo = self.createRepoObject()
def getCommitFromRef(self, refname, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
if refname not in repo.refs:
return None
ref = repo.refs[refname]
return ref.commit
def getRefs(self):
repo = self.createRepoObject()
def getRefs(self, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
return repo.refs
def setRef(self, path, hexsha, repo=None):
self.log.debug("Create reference %s at %s in %s",
path, hexsha, self.local_path)
def setRef(self, path, hexsha, repo=None, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Create reference %s at %s in %s",
path, hexsha, self.local_path)
if repo is None:
repo = self.createRepoObject()
repo = self.createRepoObject(zuul_event_id)
binsha = gitdb.util.to_bin_sha(hexsha)
obj = git.objects.Object.new_from_sha(repo, binsha)
git.refs.Reference.create(repo, path, obj, force=True)
def setRefs(self, refs, keep_remotes=False):
repo = self.createRepoObject()
def setRefs(self, refs, keep_remotes=False, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
current_refs = {}
for ref in repo.refs:
current_refs[ref.path] = ref
unseen = set(current_refs.keys())
for path, hexsha in refs.items():
self.setRef(path, hexsha, repo)
self.setRef(path, hexsha, repo, zuul_event_id=zuul_event_id)
unseen.discard(path)
ref = current_refs.get(path)
if keep_remotes and ref:
unseen.discard('refs/remotes/origin/{}'.format(ref.name))
for path in unseen:
self.deleteRef(path, repo)
self.deleteRef(path, repo, zuul_event_id=zuul_event_id)
def setRemoteRef(self, branch, rev):
repo = self.createRepoObject()
def setRemoteRef(self, branch, rev, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
try:
origin_ref = repo.remotes.origin.refs[branch]
except IndexError:
self.log.warning("No remote ref found for branch %s", branch)
log.warning("No remote ref found for branch %s", branch)
return
self.log.debug("Updating remote reference %s to %s", origin_ref, rev)
log.debug("Updating remote reference %s to %s", origin_ref, rev)
origin_ref.commit = rev
def deleteRef(self, path, repo=None):
def deleteRef(self, path, repo=None, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
if repo is None:
repo = self.createRepoObject()
self.log.debug("Delete reference %s", path)
repo = self.createRepoObject(zuul_event_id)
log.debug("Delete reference %s", path)
git.refs.SymbolicReference.delete(repo, path)
def checkout(self, ref):
repo = self.createRepoObject()
def checkout(self, ref, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
# NOTE(pabelanger): We need to check for detached repo head, otherwise
# gitpython will raise an exception if we access the reference.
if not repo.head.is_detached and repo.head.reference == ref:
self.log.debug("Repo is already at %s" % ref)
log.debug("Repo is already at %s" % ref)
else:
self.log.debug("Checking out %s" % ref)
log.debug("Checking out %s" % ref)
# Perform a hard reset to the correct ref before checking out so
# that we clean up anything that might be left over from a merge
# while still only preparing the working copy once.
@ -389,26 +400,28 @@ class Repo(object):
return repo.head.commit
def cherryPick(self, ref):
repo = self.createRepoObject()
self.log.debug("Cherry-picking %s" % ref)
self.fetch(ref)
def cherryPick(self, ref, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
log.debug("Cherry-picking %s", ref)
self.fetch(ref, zuul_event_id=zuul_event_id)
repo.git.cherry_pick("FETCH_HEAD")
return repo.head.commit
def merge(self, ref, strategy=None):
repo = self.createRepoObject()
def merge(self, ref, strategy=None, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
args = []
if strategy:
args += ['-s', strategy]
args.append('FETCH_HEAD')
self.fetch(ref)
self.log.debug("Merging %s with args %s" % (ref, args))
self.fetch(ref, zuul_event_id=zuul_event_id)
log.debug("Merging %s with args %s", ref, args)
repo.git.merge(*args)
return repo.head.commit
def fetch(self, ref):
repo = self.createRepoObject()
def fetch(self, ref, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
# NOTE: The following is currently not applicable, but if we
# switch back to fetch methods from GitPython, we need to
# consider it:
@ -416,37 +429,39 @@ class Repo(object):
# interpret it improperly causing an AssertionError. Because the
# data was fetched properly subsequent fetches don't seem to fail.
# So try again if an AssertionError is caught.
self._git_fetch(repo, 'origin', ref)
self._git_fetch(repo, 'origin', zuul_event_id, ref=ref)
def revParse(self, ref):
repo = self.createRepoObject()
def revParse(self, ref, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
return repo.git.rev_parse(ref)
def fetchFrom(self, repository, ref):
repo = self.createRepoObject()
self._git_fetch(repo, repository, ref)
def fetchFrom(self, repository, ref, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
self._git_fetch(repo, repository, zuul_event_id, ref=ref)
def push(self, local, remote):
repo = self.createRepoObject()
self.log.debug("Pushing %s:%s to %s" % (local, remote,
self.remote_url))
def push(self, local, remote, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
log.debug("Pushing %s:%s to %s", local, remote, self.remote_url)
repo.remotes.origin.push('%s:%s' % (local, remote))
def update(self):
repo = self.createRepoObject()
self.log.debug("Updating repository %s" % self.local_path)
def update(self, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
log.debug("Updating repository %s" % self.local_path)
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
# behavior covered by 'git --fetch', so we run both
# commands in that case. Starting with 1.9, 'git fetch
# --tags' is all that is necessary. See
# https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20
self._git_fetch(repo, 'origin')
self._git_fetch(repo, 'origin', tags=True)
self._git_fetch(repo, 'origin', zuul_event_id)
self._git_fetch(repo, 'origin', zuul_event_id, tags=True)
def getFiles(self, files, dirs=[], branch=None, commit=None):
def getFiles(self, files, dirs=[], branch=None, commit=None,
zuul_event_id=None):
ret = {}
repo = self.createRepoObject()
repo = self.createRepoObject(zuul_event_id)
if branch:
tree = repo.heads[branch].commit.tree
else:
@ -466,10 +481,11 @@ class Repo(object):
'utf-8')
return ret
def getFilesChanges(self, branch, tosha=None):
repo = self.createRepoObject()
self.fetch(branch)
head = repo.commit(self.revParse('FETCH_HEAD'))
def getFilesChanges(self, branch, tosha=None, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
self.fetch(branch, zuul_event_id=zuul_event_id)
head = repo.commit(
self.revParse('FETCH_HEAD', zuul_event_id=zuul_event_id))
files = set()
if tosha:
@ -480,19 +496,22 @@ class Repo(object):
files.update(head.stats.files.keys())
return list(files)
def deleteRemote(self, remote):
repo = self.createRepoObject()
def deleteRemote(self, remote, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
repo.delete_remote(repo.remotes[remote])
def setRemoteUrl(self, url):
def setRemoteUrl(self, url, zuul_event_id=None):
if self.remote_url == url:
return
self.log.debug("Set remote url to %s" % redact_url(url))
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Set remote url to %s", redact_url(url))
self.remote_url = url
self._git_set_remote_url(self.createRepoObject(), self.remote_url)
self._git_set_remote_url(
self.createRepoObject(zuul_event_id),
self.remote_url)
def mapLine(self, commit, filename, lineno):
repo = self.createRepoObject()
def mapLine(self, commit, filename, lineno, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
# Trace the specified line back to the specified commit and
# return the line number in that commit.
cur_commit = None
@ -534,7 +553,7 @@ class Merger(object):
# behavior e.g. to keep the 'origin' remote intact.
self.execution_context = execution_context
def _addProject(self, hostname, project_name, url, sshkey):
def _addProject(self, hostname, project_name, url, sshkey, zuul_event_id):
repo = None
key = '/'.join([hostname, project_name])
try:
@ -547,15 +566,17 @@ class Merger(object):
repo = Repo(
url, path, self.email, self.username, self.speed_limit,
self.speed_time, sshkey=sshkey, cache_path=cache_path,
logger=self.logger, git_timeout=self.git_timeout)
logger=self.logger, git_timeout=self.git_timeout,
zuul_event_id=zuul_event_id)
self.repos[key] = repo
except Exception:
self.log.exception("Unable to add project %s/%s" %
(hostname, project_name))
log = get_annotated_logger(self.log, zuul_event_id)
log.exception("Unable to add project %s/%s",
hostname, project_name)
return repo
def getRepo(self, connection_name, project_name):
def getRepo(self, connection_name, project_name, zuul_event_id=None):
source = self.connections.getSource(connection_name)
project = source.getProject(project_name)
hostname = project.canonical_hostname
@ -571,23 +592,29 @@ class Merger(object):
raise Exception("Unable to set up repo for project %s/%s"
" without a url" %
(connection_name, project_name,))
return self._addProject(hostname, project_name, url, sshkey)
return self._addProject(hostname, project_name, url, sshkey,
zuul_event_id)
def updateRepo(self, connection_name, project_name):
repo = self.getRepo(connection_name, project_name)
def updateRepo(self, connection_name, project_name, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
try:
self.log.info("Updating local repository %s/%s",
connection_name, project_name)
log.info("Updating local repository %s/%s",
connection_name, project_name)
repo.reset()
except Exception:
self.log.exception("Unable to update %s/%s",
connection_name, project_name)
log.exception("Unable to update %s/%s",
connection_name, project_name)
def checkoutBranch(self, connection_name, project_name, branch):
self.log.info("Checking out %s/%s branch %s",
connection_name, project_name, branch)
repo = self.getRepo(connection_name, project_name)
repo.checkout(branch)
def checkoutBranch(self, connection_name, project_name, branch,
zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
log.info("Checking out %s/%s branch %s",
connection_name, project_name, branch)
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
repo.checkout(branch, zuul_event_id=zuul_event_id)
def _saveRepoState(self, connection_name, project_name, repo,
repo_state, recent):
@ -616,51 +643,58 @@ class Merger(object):
project[path] = hexsha
def _restoreRepoState(self, connection_name, project_name, repo,
repo_state):
repo_state, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
projects = repo_state.get(connection_name, {})
project = projects.get(project_name, {})
if not project:
# We don't have a state for this project.
return
self.log.debug("Restore repo state for project %s/%s",
connection_name, project_name)
repo.setRefs(project, keep_remotes=self.execution_context)
log.debug("Restore repo state for project %s/%s",
connection_name, project_name)
repo.setRefs(project, keep_remotes=self.execution_context,
zuul_event_id=zuul_event_id)
def _mergeChange(self, item, ref):
repo = self.getRepo(item['connection'], item['project'])
def _mergeChange(self, item, ref, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.getRepo(item['connection'], item['project'],
zuul_event_id=zuul_event_id)
try:
repo.checkout(ref)
repo.checkout(ref, zuul_event_id=zuul_event_id)
except Exception:
self.log.exception("Unable to checkout %s" % ref)
log.exception("Unable to checkout %s", ref)
return None, None
try:
mode = item['merge_mode']
if mode == zuul.model.MERGER_MERGE:
commit = repo.merge(item['ref'])
commit = repo.merge(item['ref'], zuul_event_id=zuul_event_id)
elif mode == zuul.model.MERGER_MERGE_RESOLVE:
commit = repo.merge(item['ref'], 'resolve')
commit = repo.merge(item['ref'], 'resolve',
zuul_event_id=zuul_event_id)
elif mode == zuul.model.MERGER_CHERRY_PICK:
commit = repo.cherryPick(item['ref'])
commit = repo.cherryPick(item['ref'],
zuul_event_id=zuul_event_id)
else:
raise Exception("Unsupported merge mode: %s" % mode)
except git.GitCommandError:
# Log git exceptions at debug level because they are
# usually benign merge conflicts
self.log.debug("Unable to merge %s" % item, exc_info=True)
log.debug("Unable to merge %s", item, exc_info=True)
return None, None
except Exception:
self.log.exception("Exception while merging a change:")
log.exception("Exception while merging a change:")
return None, None
orig_commit = repo.revParse('FETCH_HEAD')
return orig_commit, commit
def _mergeItem(self, item, recent, repo_state):
self.log.debug("Processing ref %s for project %s/%s / %s uuid %s" %
(item['ref'], item['connection'],
item['project'], item['branch'],
item['buildset_uuid']))
def _mergeItem(self, item, recent, repo_state, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Processing ref %s for project %s/%s / %s uuid %s" %
(item['ref'], item['connection'],
item['project'], item['branch'],
item['buildset_uuid']))
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
@ -670,14 +704,14 @@ class Merger(object):
if not base:
# There is none, so use the branch tip
# we need to reset here in order to call getBranchHead
self.log.debug("No base commit found for %s" % (key,))
log.debug("No base commit found for %s" % (key,))
try:
repo.reset()
repo.reset(zuul_event_id=zuul_event_id)
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
log.exception("Unable to reset repo %s" % repo)
return None, None
self._restoreRepoState(item['connection'], item['project'], repo,
repo_state)
repo_state, zuul_event_id)
base = repo.getBranchHead(item['branch'])
# Save the repo state so that later mergers can repeat
@ -685,16 +719,17 @@ class Merger(object):
self._saveRepoState(item['connection'], item['project'], repo,
repo_state, recent)
else:
self.log.debug("Found base commit %s for %s" % (base, key,))
log.debug("Found base commit %s for %s" % (base, key,))
if self.execution_context:
# Set origin branch to the rev of the current (speculative) base.
# This allows tools to determine the commits that are part of a
# change by looking at origin/master..master.
repo.setRemoteRef(item['branch'], base)
repo.setRemoteRef(item['branch'], base,
zuul_event_id=zuul_event_id)
# Merge the change
orig_commit, commit = self._mergeChange(item, base)
orig_commit, commit = self._mergeChange(item, base, zuul_event_id)
if not commit:
return None, None
# Store this commit as the most recent for this project-branch
@ -702,7 +737,8 @@ class Merger(object):
return orig_commit, commit
def mergeChanges(self, items, files=None, dirs=None, repo_state=None,
repo_locks=None):
repo_locks=None, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
# connection+project+branch -> commit
recent = {}
commit = None
@ -719,9 +755,10 @@ class Merger(object):
else:
lock = nullcontext()
with lock:
self.log.debug("Merging for change %s,%s" %
(item["number"], item["patchset"]))
orig_commit, commit = self._mergeItem(item, recent, repo_state)
log.debug("Merging for change %s,%s" %
(item["number"], item["patchset"]))
orig_commit, commit = self._mergeItem(
item, recent, repo_state, zuul_event_id)
if not commit:
return None
if files or dirs:
@ -737,11 +774,12 @@ class Merger(object):
ret_recent[k] = v.hexsha
return commit.hexsha, read_files, repo_state, ret_recent, orig_commit
def setRepoState(self, items, repo_state):
def setRepoState(self, items, repo_state, zuul_event_id=None):
# Sets the repo state for the items
seen = set()
for item in items:
repo = self.getRepo(item['connection'], item['project'])
repo = self.getRepo(item['connection'], item['project'],
zuul_event_id=zuul_event_id)
key = (item['connection'], item['project'], item['branch'])
if key in seen:
@ -749,7 +787,7 @@ class Merger(object):
repo.reset()
self._restoreRepoState(item['connection'], item['project'], repo,
repo_state)
repo_state, zuul_event_id)
def getRepoState(self, items, repo_locks=None):
# Gets the repo state for items. Generally this will be
@ -794,6 +832,7 @@ class Merger(object):
return repo.getFiles(files, dirs, branch=branch)
def getFilesChanges(self, connection_name, project_name, branch,
tosha=None):
repo = self.getRepo(connection_name, project_name)
return repo.getFilesChanges(branch, tosha)
tosha=None, zuul_event_id=None):
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
return repo.getFilesChanges(branch, tosha, zuul_event_id=zuul_event_id)

View File

@ -137,23 +137,27 @@ class MergeServer(object):
def merge(self, job):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
ret = self.merger.mergeChanges(
args['items'], args.get('files'),
args.get('dirs'), args.get('repo_state'))
args.get('dirs'), args.get('repo_state'),
zuul_event_id=zuul_event_id)
result = dict(merged=(ret is not None))
if ret is None:
result['commit'] = result['files'] = result['repo_state'] = None
else:
(result['commit'], result['files'], result['repo_state'],
recent, orig_commit) = ret
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))
def refstate(self, job):
args = json.loads(job.arguments)
zuul_event_id = args.get('zuul_event_id')
success, repo_state = self.merger.getRepoState(args['items'])
result = dict(updated=success,
repo_state=repo_state)
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))
def cat(self, job):
@ -168,9 +172,13 @@ class MergeServer(object):
def fileschanges(self, job):
args = json.loads(job.arguments)
self.merger.updateRepo(args['connection'], args['project'])
zuul_event_id = args.get('zuul_event_id')
self.merger.updateRepo(args['connection'], args['project'],
zuul_event_id=zuul_event_id)
files = self.merger.getFilesChanges(
args['connection'], args['project'], args['branch'], args['tosha'])
args['connection'], args['project'], args['branch'], args['tosha'],
zuul_event_id=zuul_event_id)
result = dict(updated=True,
files=files)
result['zuul_event_id'] = zuul_event_id
job.sendWorkComplete(json.dumps(result))

View File

@ -48,6 +48,7 @@ class ManagementEvent(object):
def __init__(self):
self._wait_event = threading.Event()
self._exc_info = None
self.zuul_event_id = None
def exception(self, exc_info):
self._exc_info = exc_info