Use connection to qualify projects in merger

Fully qualify projects in the merger with connection names.
This lets us drop the URL parameter (which always seemed
unecessary, as the merger can figure that out on its own given a
uniquely identified project).

On disk, use the canonical hostname, so that the checked out
versions of repositories include the canonical hostname, and so that
repos on mergers survive changes in connection names.

This simplifies both the API and the JSON data structure passed to
the merger.

The addProject method of the merger is flagged as an internal method
now, as all "public" API methods indirectly call it.

In the executor, after cloning and merging are completed, the 'origin'
remote is removed from the resulting repositories since it may not
be valid for use within a running job.

Change-Id: Idcc9808948b018a271b32492766a96876979d1fa
This commit is contained in:
James E. Blair 2017-04-27 12:03:15 -07:00
parent a04b079f4c
commit 2a53567014
10 changed files with 157 additions and 132 deletions

View File

@ -701,7 +701,8 @@ class FakeBuild(object):
"""
for change in changes:
path = os.path.join(self.jobdir.src_root, change.project)
hostname = change.gerrit.canonical_hostname
path = os.path.join(self.jobdir.src_root, hostname, change.project)
try:
repo = git.Repo(path)
except NoSuchPathError as e:

View File

@ -69,11 +69,13 @@ class TestOpenStack(AnsibleZuulTestCase):
# Check that a change to nova triggered a keystone clone
executor_git_dir = os.path.join(self.executor_src_root,
'review.example.com',
'openstack', 'keystone', '.git')
self.assertTrue(os.path.exists(executor_git_dir),
msg='openstack/keystone should be cloned.')
jobdir_git_dir = os.path.join(build.jobdir.src_root,
'review.example.com',
'openstack', 'keystone', '.git')
self.assertTrue(os.path.exists(jobdir_git_dir),
msg='openstack/keystone should be cloned.')
@ -90,11 +92,13 @@ class TestOpenStack(AnsibleZuulTestCase):
# Check that a change to keystone triggered a nova clone
executor_git_dir = os.path.join(self.executor_src_root,
'review.example.com',
'openstack', 'nova', '.git')
self.assertTrue(os.path.exists(executor_git_dir),
msg='openstack/nova should be cloned.')
jobdir_git_dir = os.path.join(build.jobdir.src_root,
'review.example.com',
'openstack', 'nova', '.git')
self.assertTrue(os.path.exists(jobdir_git_dir),
msg='openstack/nova should be cloned.')

View File

@ -1470,10 +1470,12 @@ class TestScheduler(ZuulTestCase):
self.assertEmptyQueues()
self.build_history = []
path = os.path.join(self.merger_src_root, "org/project")
path = os.path.join(self.merger_src_root, "review.example.com",
"org/project")
if os.path.exists(path):
repack_repo(path)
path = os.path.join(self.executor_src_root, "org/project")
path = os.path.join(self.executor_src_root, "review.example.com",
"org/project")
if os.path.exists(path):
repack_repo(path)
@ -1497,15 +1499,19 @@ class TestScheduler(ZuulTestCase):
tenant = self.sched.abide.tenants.get('tenant-one')
trusted, project = tenant.getProject('org/project')
url = self.fake_gerrit.getGitUrl(project)
self.merge_server.merger.addProject('org/project', url)
self.merge_server.merger._addProject('review.example.com',
'org/project', url)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addPatchset(large=True)
path = os.path.join(self.upstream_root, "org/project")
# TODOv3(jeblair): add hostname to upstream root
path = os.path.join(self.upstream_root, 'org/project')
repack_repo(path)
path = os.path.join(self.merger_src_root, "org/project")
path = os.path.join(self.merger_src_root, 'review.example.com',
'org/project')
if os.path.exists(path):
repack_repo(path)
path = os.path.join(self.executor_src_root, "org/project")
path = os.path.join(self.executor_src_root, 'review.example.com',
'org/project')
if os.path.exists(path):
repack_repo(path)
@ -3881,8 +3887,6 @@ For CI problems and help debugging, contact ci@example.org"""
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
queue = self.gearman_server.getQueue()
ref = self.getParameter(queue[-1], 'ZUUL_REF')
self.gearman_server.hold_jobs_in_queue = False
self.gearman_server.release()
self.waitUntilSettled()
@ -3890,21 +3894,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.executor_server.release('.*-merge')
self.waitUntilSettled()
path = os.path.join(self.builds[0].jobdir.src_root, "org/project1")
repo = git.Repo(path)
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_messages.reverse()
correct_messages = [
'initial commit', 'add content from fixture', 'A-1']
self.assertEqual(repo_messages, correct_messages)
path = os.path.join(self.builds[0].jobdir.src_root, "org/project2")
repo = git.Repo(path)
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_messages.reverse()
correct_messages = [
'initial commit', 'add content from fixture', 'B-1']
self.assertEqual(repo_messages, correct_messages)
self.assertTrue(self.builds[0].hasChanges(A, B))
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -4684,7 +4674,8 @@ class TestSchedulerMerges(ZuulTestCase):
build = self.builds[-1]
ref = self.getParameter(build, 'ZUUL_REF')
path = os.path.join(build.jobdir.src_root, project)
path = os.path.join(build.jobdir.src_root, 'review.example.com',
project)
repo = git.Repo(path)
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
repo_messages.reverse()
@ -4754,8 +4745,8 @@ class TestSchedulerMerges(ZuulTestCase):
build = self.builds[-1]
self.assertEqual(self.getParameter(build, 'ZUUL_BRANCH'), 'mp')
ref = self.getParameter(build, 'ZUUL_REF')
path = os.path.join(
build.jobdir.src_root, 'org/project-merge-branches')
path = os.path.join(build.jobdir.src_root, 'review.example.com',
'org/project-merge-branches')
repo = git.Repo(path)
repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
@ -4799,8 +4790,8 @@ class TestSchedulerMerges(ZuulTestCase):
self.log.debug("Got Zuul ref for change A: %s" % ref_A)
self.log.debug("Got Zuul commit for change A: %s" % commit_A)
path = os.path.join(
job_A.jobdir.src_root, "org/project-merge-branches")
path = os.path.join(job_A.jobdir.src_root, 'review.example.com',
'org/project-merge-branches')
repo = git.Repo(path)
repo_messages = [c.message.strip()
for c in repo.iter_commits(ref_A)]
@ -4821,8 +4812,8 @@ class TestSchedulerMerges(ZuulTestCase):
self.log.debug("Got Zuul ref for change B: %s" % ref_B)
self.log.debug("Got Zuul commit for change B: %s" % commit_B)
path = os.path.join(
job_B.jobdir.src_root, "org/project-merge-branches")
path = os.path.join(job_B.jobdir.src_root, 'review.example.com',
'org/project-merge-branches')
repo = git.Repo(path)
repo_messages = [c.message.strip()
for c in repo.iter_commits(ref_B)]
@ -4842,8 +4833,8 @@ class TestSchedulerMerges(ZuulTestCase):
commit_C = self.getParameter(job_C, 'ZUUL_COMMIT')
self.log.debug("Got Zuul ref for change C: %s" % ref_C)
self.log.debug("Got Zuul commit for change C: %s" % commit_C)
path = os.path.join(
job_C.jobdir.src_root, "org/project-merge-branches")
path = os.path.join(job_C.jobdir.src_root, 'review.example.com',
'org/project-merge-branches')
repo = git.Repo(path)
repo_messages = [c.message.strip()
for c in repo.iter_commits(ref_C)]

View File

@ -890,9 +890,10 @@ class TenantParser(object):
project.unparsed_config = model.UnparsedTenantConfig()
# Get main config files. These files are permitted the
# full range of configuration.
url = project.source.getGitUrl(project)
job = merger.getFiles(project.name, url, 'master',
files=['zuul.yaml', '.zuul.yaml'])
job = merger.getFiles(
project.source.connection.connection_name,
project.name, 'master',
files=['zuul.yaml', '.zuul.yaml'])
job.source_context = model.SourceContext(project, 'master',
'', True)
jobs.append(job)
@ -910,7 +911,6 @@ class TenantParser(object):
project.unparsed_config = model.UnparsedTenantConfig()
# Get in-project-repo config files which have a restricted
# set of options.
url = project.source.getGitUrl(project)
# For each branch in the repo, get the zuul.yaml for that
# branch. Remember the branch and then implicitly add a
# branch selector to each job there. This makes the
@ -918,8 +918,10 @@ class TenantParser(object):
for branch in project.source.getProjectBranches(project):
project.unparsed_branch_config[branch] = \
model.UnparsedTenantConfig()
job = merger.getFiles(project.name, url, branch,
files=['.zuul.yaml'])
job = merger.getFiles(
project.source.connection.connection_name,
project.name, branch,
files=['.zuul.yaml'])
job.source_context = model.SourceContext(
project, branch, '', False)
jobs.append(job)
@ -1068,7 +1070,8 @@ class ConfigLoader(object):
for branch in branches:
incdata = None
data = files.getFile(project.name, branch, fn)
data = files.getFile(project.source.connection.connection_name,
project.name, branch, fn)
if data:
source_context = model.SourceContext(project, branch,
fn, trusted)

View File

@ -284,18 +284,16 @@ class ExecutorClient(object):
(trusted, project) = tenant.getProject(repo)
connection = project.source.connection
params['projects'].append(
dict(name=project.name,
connection_name=connection.connection_name,
url=project.source.getGitUrl(project)))
dict(connection=connection.connection_name,
name=project.name))
projects.add(project)
for item in all_items:
if item.change.project not in projects:
project = item.change.project
connection = item.change.project.source.connection
params['projects'].append(
dict(name=project.name,
connection_name=connection.connection_name,
url=project.source.getGitUrl(project)))
dict(connection=connection.connection_name,
name=project.name))
projects.add(project)
build = Build(job, uuid)

View File

@ -71,11 +71,6 @@ class Watchdog(object):
def stop(self):
self._running = False
# TODOv3(mordred): put git repos in a hierarchy that includes source
# hostname, eg: git.openstack.org/openstack/nova. Also, configure
# sources to have an alias, so that the review.openstack.org source
# repos end up in git.openstack.org.
class JobDirPlaybook(object):
def __init__(self, root):
@ -162,13 +157,14 @@ class JobDir(object):
class UpdateTask(object):
def __init__(self, project, url):
self.project = project
self.url = url
def __init__(self, connection_name, project_name):
self.connection_name = connection_name
self.project_name = project_name
self.event = threading.Event()
def __eq__(self, other):
if other.project == self.project:
if (other.connection_name == self.connection_name and
other.project_name == self.project_name):
return True
return False
@ -408,15 +404,16 @@ class ExecutorServer(object):
if task is None:
# We are asked to stop
return
self.log.info("Updating repo %s from %s" % (task.project, task.url))
self.merger.updateRepo(task.project, task.url)
self.log.debug("Finished updating repo %s from %s" %
(task.project, task.url))
self.log.info("Updating repo %s/%s" % (
task.connection_name, task.project_name))
self.merger.updateRepo(task.connection_name, task.project_name)
self.log.debug("Finished updating repo %s/%s" %
(task.connection_name, task.project_name))
task.setComplete()
def update(self, project, url):
def update(self, connection_name, project_name):
# Update a repository in the main merger
task = UpdateTask(project, url)
task = UpdateTask(connection_name, project_name)
task = self.update_queue.put(task)
return task
@ -475,9 +472,9 @@ class ExecutorServer(object):
def cat(self, job):
args = json.loads(job.arguments)
task = self.update(args['project'], args['url'])
task = self.update(args['connection'], args['project'])
task.wait()
files = self.merger.getFiles(args['project'], args['url'],
files = self.merger.getFiles(args['connection'], args['project'],
args['branch'], args['files'])
result = dict(updated=True,
files=files,
@ -562,21 +559,31 @@ class AnsibleJob(object):
tasks = []
for project in args['projects']:
self.log.debug("Job %s: updating project %s" %
(self.job.unique, project['name']))
(self.job.unique, project))
tasks.append(self.executor_server.update(
project['name'], project['url']))
project['connection'], project['name']))
for task in tasks:
task.wait()
self.log.debug("Job %s: git updates complete" % (self.job.unique,))
repos = []
for project in args['projects']:
self.log.debug("Cloning %s" % (project['name'],))
self.log.debug("Cloning %s/%s" % (project['connection'],
project['name'],))
source = self.executor_server.connections.getSource(
project['connection'])
project_object = source.getProject(project['name'])
url = source.getGitUrl(project_object)
repo = git.Repo.clone_from(
os.path.join(self.executor_server.merge_root,
source.canonical_hostname,
project['name']),
os.path.join(self.jobdir.src_root,
source.canonical_hostname,
project['name']))
repo.remotes.origin.config_writer.set('url', project['url'])
repo.remotes.origin.config_writer.set('url', url)
repos.append(repo)
merge_items = [i for i in args['items'] if i.get('refspec')]
if merge_items:
@ -588,6 +595,11 @@ class AnsibleJob(object):
else:
commit = args['items'][-1]['newrev'] # noqa
# Delete the origin remote from each repo we set up since
# it will not be valid within the jobs.
for repo in repos:
repo.delete_remote(repo.remotes.origin)
# is the playbook in a repo that we have already prepared?
self.preparePlaybookRepos(args)
@ -753,17 +765,16 @@ class AnsibleJob(object):
source = self.executor_server.connections.getSource(
playbook['connection'])
project = source.getProject(playbook['project'])
# TODO(jeblair): construct the url in the merger itself
url = source.getGitUrl(project)
if not playbook['trusted']:
# This is a project repo, so it is safe to use the already
# checked out version (from speculative merging) of the
# playbook
for i in args['items']:
if (i['connection_name'] == playbook['connection'] and
if (i['connection'] == playbook['connection'] and
i['project'] == playbook['project']):
# We already have this repo prepared
path = os.path.join(self.jobdir.src_root,
project.canonical_hostname,
project.name,
playbook['path'])
jobdir_playbook.path = self.findPlaybook(
@ -776,9 +787,11 @@ class AnsibleJob(object):
# tip into a dedicated space.
merger = self.executor_server._getMerger(jobdir_playbook.root)
merger.checkoutBranch(project.name, url, playbook['branch'])
merger.checkoutBranch(playbook['connection'], project.name,
playbook['branch'])
path = os.path.join(jobdir_playbook.root,
project.canonical_hostname,
project.name,
playbook['path'])
jobdir_playbook.path = self.findPlaybook(
@ -819,8 +832,6 @@ class AnsibleJob(object):
source = self.executor_server.connections.getSource(
role['connection'])
project = source.getProject(role['project'])
# TODO(jeblair): construct the url in the merger itself
url = source.getGitUrl(project)
role_repo = None
if not role['trusted']:
# This is a project repo, so it is safe to use the already
@ -828,12 +839,13 @@ class AnsibleJob(object):
# role
for i in args['items']:
if (i['connection_name'] == role['connection'] and
if (i['connection'] == role['connection'] and
i['project'] == role['project']):
# We already have this repo prepared;
# copy it into location.
path = os.path.join(self.jobdir.src_root,
project.canonical_hostname,
project.name)
link = os.path.join(root, role['name'])
os.symlink(path, link)
@ -846,13 +858,15 @@ class AnsibleJob(object):
if not role_repo:
merger = self.executor_server._getMerger(root)
merger.checkoutBranch(project.name, url, 'master')
role_repo = os.path.join(root, project.name)
merger.checkoutBranch(role['connection'], project.name,
'master')
role_repo = os.path.join(root, project.canonical_hostname,
project.name)
role_path = self.findRole(role_repo, trusted=role['trusted'])
if role_path is None:
# In the case of a bare role, add the containing directory
role_path = root
role_path = os.path.join(root, project.canonical_hostname)
self.jobdir.roles_path.append(role_path)
def prepareAnsibleFiles(self, args):

View File

@ -113,16 +113,16 @@ class MergeClient(object):
files=files)
self.submitJob('merger:merge', data, build_set, precedence)
def updateRepo(self, project, url, build_set,
def updateRepo(self, connection_name, project_name, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL):
data = dict(project=project,
url=url)
data = dict(connection=connection_name,
project=project_name)
self.submitJob('merger:update', data, build_set, precedence)
def getFiles(self, project, url, branch, files,
def getFiles(self, connection_name, project_name, branch, files,
precedence=zuul.model.PRECEDENCE_HIGH):
data = dict(project=project,
url=url,
data = dict(connection=connection_name,
project=project_name,
branch=branch,
files=files)
job = self.submitJob('merger:cat', data, None, precedence)

View File

@ -234,49 +234,60 @@ class Merger(object):
elif 'GIT_SSH' in os.environ:
del os.environ['GIT_SSH']
def addProject(self, project, url):
def _addProject(self, hostname, project_name, url):
repo = None
key = '/'.join([hostname, project_name])
try:
path = os.path.join(self.working_root, project)
path = os.path.join(self.working_root, hostname, project_name)
repo = Repo(url, path, self.email, self.username)
self.repos[project] = repo
self.repos[key] = repo
except Exception:
self.log.exception("Unable to add project %s" % project)
self.log.exception("Unable to add project %s/%s" %
(hostname, project_name))
return repo
def getRepo(self, project, url):
if project in self.repos:
return self.repos[project]
def getRepo(self, connection_name, project_name):
source = self.connections.getSource(connection_name)
project = source.getProject(project_name)
hostname = project.canonical_hostname
url = source.getGitUrl(project)
key = '/'.join([hostname, project_name])
if key in self.repos:
return self.repos[key]
if not url:
raise Exception("Unable to set up repo for project %s"
" without a url" % (project,))
return self.addProject(project, url)
raise Exception("Unable to set up repo for project %s/%s"
" without a url" %
(connection_name, project_name,))
return self._addProject(hostname, project_name, url)
def updateRepo(self, project, url):
def updateRepo(self, connection_name, project_name):
# TODOv3(jhesketh): Reimplement
# da90a50b794f18f74de0e2c7ec3210abf79dda24 after merge..
# Likely we'll handle connection context per projects differently.
# self._setGitSsh()
repo = self.getRepo(project, url)
repo = self.getRepo(connection_name, project_name)
try:
self.log.info("Updating local repository %s", project)
self.log.info("Updating local repository %s/%s",
connection_name, project_name)
repo.reset()
except Exception:
self.log.exception("Unable to update %s", project)
self.log.exception("Unable to update %s/%s",
connection_name, project_name)
def checkoutBranch(self, project, url, branch):
repo = self.getRepo(project, url)
def checkoutBranch(self, connection_name, project_name, branch):
repo = self.getRepo(connection_name, project_name)
if repo.hasBranch(branch):
self.log.info("Checking out branch %s of %s" % (branch, project))
self.log.info("Checking out branch %s of %s/%s" %
(branch, connection_name, project_name))
head = repo.getBranchHead(branch)
repo.checkout(head)
else:
raise Exception("Project %s does not have branch %s" %
(project, branch))
raise Exception("Project %s/%s does not have branch %s" %
(connection_name, project_name, branch))
def _mergeChange(self, item, ref):
repo = self.getRepo(item['project'], item['url'])
repo = self.getRepo(item['connection'], item['project'])
try:
repo.checkout(ref)
except Exception:
@ -305,16 +316,16 @@ class Merger(object):
return commit
def _mergeItem(self, item, recent):
self.log.debug("Processing refspec %s for project %s / %s ref %s" %
(item['refspec'], item['project'], item['branch'],
item['ref']))
repo = self.getRepo(item['project'], item['url'])
key = (item['project'], item['branch'])
self.log.debug("Processing refspec %s for project %s/%s / %s ref %s" %
(item['refspec'], item['connection'],
item['project'], item['branch'], item['ref']))
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
# See if we have a commit for this change already in this repo
zuul_ref = item['branch'] + '/' + item['ref']
with repo.createRepoObject().git.custom_environment(
GIT_SSH_COMMAND=self._get_ssh_cmd(item['connection_name'])):
GIT_SSH_COMMAND=self._get_ssh_cmd(item['connection'])):
commit = repo.getCommitFromRef(zuul_ref)
if commit:
self.log.debug(
@ -342,7 +353,7 @@ class Merger(object):
self.log.debug("Found base commit %s for %s" % (base, key,))
# Merge the change
with repo.createRepoObject().git.custom_environment(
GIT_SSH_COMMAND=self._get_ssh_cmd(item['connection_name'])):
GIT_SSH_COMMAND=self._get_ssh_cmd(item['connection'])):
commit = self._mergeChange(item, base)
if not commit:
return None
@ -351,9 +362,9 @@ class Merger(object):
# Set the Zuul ref for this item to point to the most recent
# commits of each project-branch
for key, mrc in recent.items():
project, branch = key
connection, project, branch = key
try:
repo = self.getRepo(project, None)
repo = self.getRepo(connection, project)
zuul_ref = branch + '/' + item['ref']
repo.createZuulRef(zuul_ref, mrc)
except Exception:
@ -377,15 +388,17 @@ class Merger(object):
if not commit:
return None
if files:
repo = self.getRepo(item['project'], item['url'])
repo = self.getRepo(item['connection'], item['project'])
repo_files = repo.getFiles(files, commit=commit)
read_files.append(dict(project=item['project'],
branch=item['branch'],
files=repo_files))
read_files.append(dict(
connection=item['connection'],
project=item['project'],
branch=item['branch'],
files=repo_files))
if files:
return commit.hexsha, read_files
return commit.hexsha
def getFiles(self, project, url, branch, files):
repo = self.getRepo(project, url)
def getFiles(self, connection_name, project_name, branch, files):
repo = self.getRepo(connection_name, project_name)
return repo.getFiles(files, branch=branch)

View File

@ -121,16 +121,15 @@ class MergeServer(object):
def update(self, job):
args = json.loads(job.arguments)
self.merger.updateRepo(args['project'],
args['url'])
self.merger.updateRepo(args['connection'], args['project'])
result = dict(updated=True,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))
def cat(self, job):
args = json.loads(job.arguments)
self.merger.updateRepo(args['project'], args['url'])
files = self.merger.getFiles(args['project'], args['url'],
self.merger.updateRepo(args['connection'], args['project'])
files = self.merger.getFiles(args['connection'], args['project'],
args['branch'], args['files'])
result = dict(updated=True,
files=files,

View File

@ -1103,20 +1103,23 @@ class RepoFiles(object):
"""
def __init__(self):
self.projects = {}
self.connections = {}
def __repr__(self):
return '<RepoFiles %s>' % self.projects
return '<RepoFiles %s>' % self.connections
def setFiles(self, items):
self.projects = {}
self.hostnames = {}
for item in items:
project = self.projects.setdefault(item['project'], {})
connection = self.connections.setdefault(
item['connection'], {})
project = connection.setdefault(item['project'], {})
branch = project.setdefault(item['branch'], {})
branch.update(item['files'])
def getFile(self, project, branch, fn):
return self.projects.get(project, {}).get(branch, {}).get(fn)
def getFile(self, connection_name, project_name, branch, fn):
host = self.connections.get(connection_name, {})
return host.get(project_name, {}).get(branch, {}).get(fn)
class BuildSet(object):
@ -1713,11 +1716,10 @@ class QueueItem(object):
branch = None
source = self.change.project.source
connection_name = source.connection.connection_name
project = self.change.project.name
project = self.change.project
return dict(project=project,
url=source.getGitUrl(self.change.project),
connection_name=connection_name,
return dict(project=project.name,
connection=connection_name,
merge_mode=self.current_build_set.getMergeMode(),
refspec=refspec,
branch=branch,