Merge "Cache branches in connections/sources"

This commit is contained in:
Zuul 2018-08-10 16:39:18 +00:00 committed by Gerrit Code Review
commit 4895b9b40f
10 changed files with 188 additions and 27 deletions

View File

@ -311,6 +311,10 @@ class TestExecutorRepos(ZuulTestCase):
p1 = 'review.example.com/org/project1' p1 = 'review.example.com/org/project1'
projects = [p1] projects = [p1]
self.create_branch('org/project1', 'stable/havana') self.create_branch('org/project1', 'stable/havana')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project1', 'stable/havana'))
self.waitUntilSettled()
# The pipeline triggers every second, so we should have seen # The pipeline triggers every second, so we should have seen
# several by now. # several by now.

View File

@ -806,6 +806,12 @@ class TestGithubDriver(ZuulTestCase):
repo = github.repo_from_project(project) repo = github.repo_from_project(project)
repo._create_branch(branch) repo._create_branch(branch)
self.fake_github.emitEvent(
self.fake_github.getPushEvent(
project,
ref='refs/heads/%s' % branch))
self.waitUntilSettled()
A = self.fake_github.openFakePullRequest(project, branch, 'A') A = self.fake_github.openFakePullRequest(project, branch, 'A')
old_sha = A.head_sha old_sha = A.head_sha
A.setMerged("merging A") A.setMerged("merging A")

View File

@ -155,6 +155,11 @@ class TestScheduler(ZuulTestCase):
def test_job_branch(self): def test_job_branch(self):
"Test the correct variant of a job runs on a branch" "Test the correct variant of a job runs on a branch"
self.create_branch('org/project', 'stable') self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A') A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A')
A.addApproval('Code-Review', 2) A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@ -181,6 +186,10 @@ class TestScheduler(ZuulTestCase):
self.executor_server.merger_worker.unRegisterFunction(f) self.executor_server.merger_worker.unRegisterFunction(f)
self.create_branch('org/project', 'stable') self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A') A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled() self.waitUntilSettled()
@ -2707,7 +2716,15 @@ class TestScheduler(ZuulTestCase):
@simple_layout('layouts/job-variants.yaml') @simple_layout('layouts/job-variants.yaml')
def test_job_branch_variants(self): def test_job_branch_variants(self):
self.create_branch('org/project', 'stable/diablo') self.create_branch('org/project', 'stable/diablo')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable/diablo'))
self.create_branch('org/project', 'stable/essex') self.create_branch('org/project', 'stable/essex')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable/essex'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled() self.waitUntilSettled()
@ -3876,6 +3893,10 @@ class TestScheduler(ZuulTestCase):
self.addCleanup(client.shutdown) self.addCleanup(client.shutdown)
self.create_branch('org/project', 'stable') self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True self.executor_server.hold_jobs_in_build = True
self.commitConfigUpdate('common-config', 'layouts/timer.yaml') self.commitConfigUpdate('common-config', 'layouts/timer.yaml')
self.sched.reconfigure(self.config) self.sched.reconfigure(self.config)
@ -5495,6 +5516,10 @@ class TestSchedulerTemplatedProject(ZuulTestCase):
# This tests that there are no implied branch matchers added # This tests that there are no implied branch matchers added
# to project templates in unbranched projects. # to project templates in unbranched projects.
self.create_branch('org/layered-project', 'stable') self.create_branch('org/layered-project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/layered-project', 'stable'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange( A = self.fake_gerrit.addFakeChange(
'org/layered-project', 'stable', 'A') 'org/layered-project', 'stable', 'A')
@ -5664,6 +5689,10 @@ class TestSchedulerMerges(ZuulTestCase):
def test_merge_branch(self): def test_merge_branch(self):
"Test that the right commits are on alternate branches" "Test that the right commits are on alternate branches"
self.create_branch('org/project-merge-branches', 'mp') self.create_branch('org/project-merge-branches', 'mp')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project-merge-branches', 'mp'))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange( A = self.fake_gerrit.addFakeChange(
@ -5709,6 +5738,10 @@ class TestSchedulerMerges(ZuulTestCase):
def test_merge_multi_branch(self): def test_merge_multi_branch(self):
"Test that dependent changes on multiple branches are merged" "Test that dependent changes on multiple branches are merged"
self.create_branch('org/project-merge-branches', 'mp') self.create_branch('org/project-merge-branches', 'mp')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project-merge-branches', 'mp'))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange( A = self.fake_gerrit.addFakeChange(
@ -6400,6 +6433,10 @@ class TestSchedulerBranchMatcher(ZuulTestCase):
not be run on a change to that branch. not be run on a change to that branch.
''' '''
self.create_branch('org/project', 'featureA') self.create_branch('org/project', 'featureA')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'featureA'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'featureA', 'A') A = self.fake_gerrit.addFakeChange('org/project', 'featureA', 'A')
A.addApproval('Code-Review', 2) A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) self.fake_gerrit.addEvent(A.addApproval('Approved', 1))

View File

@ -2791,6 +2791,10 @@ class TestRoleBranches(RoleTestCase):
# dependency chain. # dependency chain.
# First we create some branch-specific content in project1: # First we create some branch-specific content in project1:
self.create_branch('project1', 'stable') self.create_branch('project1', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'project1', 'stable'))
self.waitUntilSettled()
# A pre-playbook with unique stable branch content. # A pre-playbook with unique stable branch content.
p = self._addPlaybook('project1', 'stable', p = self._addPlaybook('project1', 'stable',
@ -3143,6 +3147,10 @@ class TestPragma(ZuulTestCase):
def test_no_pragma(self): def test_no_pragma(self):
self.create_branch('org/project', 'stable') self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
with open(os.path.join(FIXTURE_DIR, with open(os.path.join(FIXTURE_DIR,
'config/pragma/git/', 'config/pragma/git/',
'org_project/nopragma.yaml')) as f: 'org_project/nopragma.yaml')) as f:
@ -3166,6 +3174,10 @@ class TestPragma(ZuulTestCase):
def test_pragma(self): def test_pragma(self):
self.create_branch('org/project', 'stable') self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
with open(os.path.join(FIXTURE_DIR, with open(os.path.join(FIXTURE_DIR,
'config/pragma/git/', 'config/pragma/git/',
'org_project/pragma.yaml')) as f: 'org_project/pragma.yaml')) as f:

View File

@ -1267,7 +1267,7 @@ class TenantParser(object):
} }
return vs.Schema(tenant) return vs.Schema(tenant)
def fromYaml(self, abide, project_key_dir, conf, old_tenant): def fromYaml(self, abide, project_key_dir, conf):
self.getSchema()(conf) self.getSchema()(conf)
tenant = model.Tenant(conf['name']) tenant = model.Tenant(conf['name'])
if conf.get('max-nodes-per-job') is not None: if conf.get('max-nodes-per-job') is not None:
@ -1289,7 +1289,7 @@ class TenantParser(object):
tenant.addUntrustedProject(tpc) tenant.addUntrustedProject(tpc)
for tpc in config_tpcs + untrusted_tpcs: for tpc in config_tpcs + untrusted_tpcs:
self._getProjectBranches(tenant, tpc, old_tenant) self._getProjectBranches(tenant, tpc)
self._resolveShadowProjects(tenant, tpc) self._resolveShadowProjects(tenant, tpc)
# We prepare a stack to store config loading issues # We prepare a stack to store config loading issues
@ -1335,16 +1335,9 @@ class TenantParser(object):
shadow_projects.append(project) shadow_projects.append(project)
tpc.shadow_projects = frozenset(shadow_projects) tpc.shadow_projects = frozenset(shadow_projects)
def _getProjectBranches(self, tenant, tpc, old_tenant): def _getProjectBranches(self, tenant, tpc):
# If we're performing a tenant reconfiguration, we will have branches = sorted(tpc.project.source.getProjectBranches(
# an old_tenant object, however, we may be doing so because of tpc.project, tenant))
# a branch creation event, so if we don't have any cached
# data, query the branches again as well.
if old_tenant and tpc.parsed_branch_config:
branches = old_tenant.getProjectBranches(tpc.project)[:]
else:
branches = sorted(tpc.project.source.getProjectBranches(
tpc.project, tenant))
if 'master' in branches: if 'master' in branches:
branches.remove('master') branches.remove('master')
branches = ['master'] + branches branches = ['master'] + branches
@ -1901,7 +1894,7 @@ class ConfigLoader(object):
for conf_tenant in unparsed_abide.tenants: for conf_tenant in unparsed_abide.tenants:
# When performing a full reload, do not use cached data. # When performing a full reload, do not use cached data.
tenant = self.tenant_parser.fromYaml(abide, project_key_dir, tenant = self.tenant_parser.fromYaml(abide, project_key_dir,
conf_tenant, old_tenant=None) conf_tenant)
abide.tenants[tenant.name] = tenant abide.tenants[tenant.name] = tenant
if len(tenant.layout.loading_errors): if len(tenant.layout.loading_errors):
self.log.warning( self.log.warning(
@ -1923,7 +1916,7 @@ class ConfigLoader(object):
new_tenant = self.tenant_parser.fromYaml( new_tenant = self.tenant_parser.fromYaml(
new_abide, new_abide,
project_key_dir, project_key_dir,
tenant.unparsed_config, old_tenant=tenant) tenant.unparsed_config)
new_abide.tenants[tenant.name] = new_tenant new_abide.tenants[tenant.name] = new_tenant
if len(new_tenant.layout.loading_errors): if len(new_tenant.layout.loading_errors):
self.log.warning( self.log.warning(

View File

@ -68,12 +68,25 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
def registerScheduler(self, sched): def registerScheduler(self, sched):
self.sched = sched self.sched = sched
def clearBranchCache(self):
"""Clear the branch cache for this connection.
This is called immediately prior to performing a full
reconfiguration. The branch cache should be cleared so that a
full reconfiguration can be used to correct any errors in
cached data.
"""
pass
def maintainCache(self, relevant): def maintainCache(self, relevant):
"""Make cache contain relevant changes. """Make cache contain relevant changes.
This lets the user supply a list of change objects that are This lets the user supply a list of change objects that are
still in use. Anything in our cache that isn't in the supplied still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache.""" list should be safe to remove from the cache."""
pass
def getWebController(self, zuul_web): def getWebController(self, zuul_web):
"""Return a cherrypy web controller to register with zuul-web. """Return a cherrypy web controller to register with zuul-web.

View File

@ -142,9 +142,13 @@ class GerritEventConnector(threading.Thread):
if event.oldrev == '0' * 40: if event.oldrev == '0' * 40:
event.branch_created = True event.branch_created = True
event.branch = event.ref event.branch = event.ref
project = self.connection.source.getProject(event.project_name)
self.connection._clearBranchCache(project)
if event.newrev == '0' * 40: if event.newrev == '0' * 40:
event.branch_deleted = True event.branch_deleted = True
event.branch = event.ref event.branch = event.ref
project = self.connection.source.getProject(event.project_name)
self.connection._clearBranchCache(project)
self._getChange(event) self._getChange(event)
self.connection.logEvent(event) self.connection.logEvent(event)
@ -292,6 +296,7 @@ class GerritConnection(BaseConnection):
def __init__(self, driver, connection_name, connection_config): def __init__(self, driver, connection_name, connection_config):
super(GerritConnection, self).__init__(driver, connection_name, super(GerritConnection, self).__init__(driver, connection_name,
connection_config) connection_config)
self._project_branch_cache = {}
if 'server' not in self.connection_config: if 'server' not in self.connection_config:
raise Exception('server is required for gerrit connections in ' raise Exception('server is required for gerrit connections in '
'%s' % self.connection_name) '%s' % self.connection_name)
@ -377,6 +382,15 @@ class GerritConnection(BaseConnection):
def addProject(self, project: Project) -> None: def addProject(self, project: Project) -> None:
self.projects[project.name] = project self.projects[project.name] = project
def clearBranchCache(self):
self._project_branch_cache = {}
def _clearBranchCache(self, project):
try:
del self._project_branch_cache[project.name]
except KeyError:
pass
def maintainCache(self, relevant): def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are # This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied # still in use. Anything in our cache that isn't in the supplied
@ -763,9 +777,14 @@ class GerritConnection(BaseConnection):
return changes return changes
def getProjectBranches(self, project: Project, tenant) -> List[str]: def getProjectBranches(self, project: Project, tenant) -> List[str]:
branches = self._project_branch_cache.get(project.name)
if branches is not None:
return branches
refs = self.getInfoRefs(project) refs = self.getInfoRefs(project)
heads = [str(k[len('refs/heads/'):]) for k in refs.keys() heads = [str(k[len('refs/heads/'):]) for k in refs.keys()
if k.startswith('refs/heads/')] if k.startswith('refs/heads/')]
self._project_branch_cache[project.name] = heads
return heads return heads
def addEvent(self, data): def addEvent(self, data):

View File

@ -259,6 +259,7 @@ class GithubEventConnector(threading.Thread):
event.branch_deleted = True event.branch_deleted = True
if event.branch: if event.branch:
project = self.connection.source.getProject(event.project_name)
if event.branch_deleted: if event.branch_deleted:
# We currently cannot determine if a deleted branch was # We currently cannot determine if a deleted branch was
# protected so we need to assume it was. GitHub doesn't allow # protected so we need to assume it was. GitHub doesn't allow
@ -268,15 +269,19 @@ class GithubEventConnector(threading.Thread):
# of the branch. # of the branch.
# FIXME(tobiash): Find a way to handle that case # FIXME(tobiash): Find a way to handle that case
event.branch_protected = True event.branch_protected = True
self.connection._clearBranchCache(project)
elif event.branch_created: elif event.branch_created:
# A new branch never can be protected because that needs to be # A new branch never can be protected because that needs to be
# configured after it has been created. # configured after it has been created.
event.branch_protected = False event.branch_protected = False
self.connection._clearBranchCache(project)
else: else:
# An updated branch can be protected or not so we have to ask # An updated branch can be protected or not so we have to ask
# GitHub whether it is. # GitHub whether it is.
b = self.connection.getBranch(event.project_name, event.branch) b = self.connection.getBranch(event.project_name, event.branch)
event.branch_protected = b.get('protected') event.branch_protected = b.get('protected')
self.connection.checkBranchCache(project, event.branch,
event.branch_protected)
return event return event
@ -457,7 +462,8 @@ class GithubConnection(BaseConnection):
super(GithubConnection, self).__init__( super(GithubConnection, self).__init__(
driver, connection_name, connection_config) driver, connection_name, connection_config)
self._change_cache = {} self._change_cache = {}
self._project_branch_cache = {} self._project_branch_cache_include_unprotected = {}
self._project_branch_cache_exclude_unprotected = {}
self.projects = {} self.projects = {}
self.git_ssh_key = self.connection_config.get('sshkey') self.git_ssh_key = self.connection_config.get('sshkey')
self.server = self.connection_config.get('server', 'github.com') self.server = self.connection_config.get('server', 'github.com')
@ -929,10 +935,22 @@ class GithubConnection(BaseConnection):
def addProject(self, project): def addProject(self, project):
self.projects[project.name] = project self.projects[project.name] = project
def getProjectBranches(self, project, tenant): def clearBranchCache(self):
github = self.getGithubClient(project.name) self._project_branch_cache_exclude_unprotected = {}
exclude_unprotected = tenant.getExcludeUnprotectedBranches(project) self._project_branch_cache_include_unprotected = {}
def getProjectBranches(self, project, tenant):
exclude_unprotected = tenant.getExcludeUnprotectedBranches(project)
if exclude_unprotected:
cache = self._project_branch_cache_exclude_unprotected
else:
cache = self._project_branch_cache_include_unprotected
branches = cache.get(project.name)
if branches is not None:
return branches
github = self.getGithubClient(project.name)
url = github.session.build_url('repos', project.name, url = github.session.build_url('repos', project.name,
'branches') 'branches')
@ -946,23 +964,21 @@ class GithubConnection(BaseConnection):
url, headers=headers, params=params) url, headers=headers, params=params)
# check if we need to do further paged calls # check if we need to do further paged calls
url = resp.links.get( url = resp.links.get('next', {}).get('url')
'next', {}).get('url')
if resp.status_code == 403: if resp.status_code == 403:
self.log.error(str(resp)) self.log.error(str(resp))
rate_limit = github.rate_limit() rate_limit = github.rate_limit()
if rate_limit['resources']['core']['remaining'] == 0: if rate_limit['resources']['core']['remaining'] == 0:
self.log.warning( self.log.warning(
"Rate limit exceeded, using stale branch list") "Rate limit exceeded, using empty branch list")
# failed to list branches so use a stale branch list return []
return self._project_branch_cache.get(project.name, [])
branches.extend([x['name'] for x in resp.json()]) branches.extend([x['name'] for x in resp.json()])
self.log_rate_limit(self.log, github) self.log_rate_limit(self.log, github)
self._project_branch_cache[project.name] = branches cache[project.name] = branches
return self._project_branch_cache[project.name] return branches
def getBranch(self, project_name, branch): def getBranch(self, project_name, branch):
github = self.getGithubClient(project_name) github = self.getGithubClient(project_name)
@ -1329,6 +1345,46 @@ class GithubConnection(BaseConnection):
log.debug('GitHub API rate limit remaining: %s reset: %s', log.debug('GitHub API rate limit remaining: %s reset: %s',
remaining, reset) remaining, reset)
def _clearBranchCache(self, project):
self.log.debug("Clearing branch cache for %s", project.name)
for cache in [
self._project_branch_cache_exclude_unprotected,
self._project_branch_cache_include_unprotected,
]:
try:
del cache[project.name]
except KeyError:
pass
def checkBranchCache(self, project, branch, protected):
# If the branch appears in the exclude_unprotected cache but
# is unprotected, clear the exclude cache.
# If the branch does not appear in the exclude_unprotected
# cache but is protected, clear the exclude cache.
# All branches should always appear in the include_unprotected
# cache, so we never clear it.
cache = self._project_branch_cache_exclude_unprotected
branches = cache.get(project.name, [])
if (branch in branches) and (not protected):
self.log.debug("Clearing protected branch cache for %s",
project.name)
try:
del cache[project.name]
except KeyError:
pass
return
if (branch not in branches) and (protected):
self.log.debug("Clearing protected branch cache for %s",
project.name)
try:
del cache[project.name]
except KeyError:
pass
return
class GithubWebController(BaseWebController): class GithubWebController(BaseWebController):

View File

@ -639,6 +639,10 @@ class Scheduler(threading.Thread):
self.config = event.config self.config = event.config
try: try:
self.log.info("Full reconfiguration beginning") self.log.info("Full reconfiguration beginning")
for connection in self.connections.connections.values():
self.log.debug("Clear branch cache for: %s" % connection)
connection.clearBranchCache()
loader = configloader.ConfigLoader( loader = configloader.ConfigLoader(
self.connections, self, self.merger) self.connections, self, self.merger)
tenant_config, script = self._checkTenantSourceConf(self.config) tenant_config, script = self._checkTenantSourceConf(self.config)
@ -665,6 +669,8 @@ class Scheduler(threading.Thread):
# If a change landed to a project, clear out the cached # If a change landed to a project, clear out the cached
# config of the changed branch before reconfiguring. # config of the changed branch before reconfiguring.
for (project, branch) in event.project_branches: for (project, branch) in event.project_branches:
self.log.debug("Clearing unparsed config: %s @%s",
project.canonical_name, branch)
self.abide.clearUnparsedConfigCache(project.canonical_name, self.abide.clearUnparsedConfigCache(project.canonical_name,
branch) branch)
old_tenant = self.abide.tenants[event.tenant_name] old_tenant = self.abide.tenants[event.tenant_name]

View File

@ -49,7 +49,14 @@ class BaseSource(object, metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
def getChange(self, event): def getChange(self, event):
"""Get the change representing an event.""" """Get the change representing an event.
This method is called very frequently, and should generally
return quickly. The connection is expected to cache change
objects and automatically update them as related events are
received.
"""
@abc.abstractmethod @abc.abstractmethod
def getChangeByURL(self, url): def getChangeByURL(self, url):
@ -91,7 +98,15 @@ class BaseSource(object, metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
def getProjectBranches(self, project, tenant): def getProjectBranches(self, project, tenant):
"""Get branches for a project""" """Get branches for a project
This method is called very frequently, and should generally
return quickly. The connection is expected to cache branch
lists for all projects queried, and further, to automatically
clear or update that cache when it observes branch creation or
deletion events.
"""
@abc.abstractmethod @abc.abstractmethod
def getRequireFilters(self, config): def getRequireFilters(self, config):