Merge "Refresh cached branches in timer driver"
This commit is contained in:
commit
0279c3116e
@ -23,6 +23,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from unittest import mock, skip
|
from unittest import mock, skip
|
||||||
|
from uuid import uuid4
|
||||||
from kazoo.exceptions import NoNodeError
|
from kazoo.exceptions import NoNodeError
|
||||||
|
|
||||||
import git
|
import git
|
||||||
@ -4241,6 +4242,17 @@ class TestScheduler(ZuulTestCase):
|
|||||||
self.commitConfigUpdate('common-config', config_file)
|
self.commitConfigUpdate('common-config', config_file)
|
||||||
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
|
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
|
||||||
|
|
||||||
|
# Collect the currently cached branches in order to later check,
|
||||||
|
# that the timer driver refreshes the cache.
|
||||||
|
cached_versions = {}
|
||||||
|
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
|
||||||
|
for project_name in tenant.layout.project_configs:
|
||||||
|
_, project = tenant.getProject('org/project')
|
||||||
|
for branch in project.source.getProjectBranches(project, tenant):
|
||||||
|
event = self._create_dummy_event(project, branch)
|
||||||
|
change = project.source.getChange(event)
|
||||||
|
cached_versions[branch] = change.cache_version
|
||||||
|
|
||||||
# The pipeline triggers every second, so we should have seen
|
# The pipeline triggers every second, so we should have seen
|
||||||
# several by now.
|
# several by now.
|
||||||
for _ in iterate_timeout(60, 'jobs started'):
|
for _ in iterate_timeout(60, 'jobs started'):
|
||||||
@ -4282,6 +4294,26 @@ class TestScheduler(ZuulTestCase):
|
|||||||
self.assertEqual(job.name, 'project-bitrot')
|
self.assertEqual(job.name, 'project-bitrot')
|
||||||
self.assertIn(job.ref, ('refs/heads/stable', 'refs/heads/master'))
|
self.assertIn(job.ref, ('refs/heads/stable', 'refs/heads/master'))
|
||||||
|
|
||||||
|
for project_name in tenant.layout.project_configs:
|
||||||
|
_, project = tenant.getProject('org/project')
|
||||||
|
for branch in project.source.getProjectBranches(project, tenant):
|
||||||
|
event = self._create_dummy_event(project, branch)
|
||||||
|
change = project.source.getChange(event)
|
||||||
|
# Make sure the timer driver refreshed the cache
|
||||||
|
self.assertGreater(change.cache_version,
|
||||||
|
cached_versions[branch])
|
||||||
|
|
||||||
|
def _create_dummy_event(self, project, branch):
|
||||||
|
event = zuul.model.TriggerEvent()
|
||||||
|
event.type = 'test'
|
||||||
|
event.project_hostname = project.canonical_hostname
|
||||||
|
event.project_name = project.name
|
||||||
|
event.ref = f'refs/heads/{branch}'
|
||||||
|
event.branch = branch
|
||||||
|
event.zuul_event_id = str(uuid4().hex)
|
||||||
|
event.timestamp = time.time()
|
||||||
|
return event
|
||||||
|
|
||||||
def test_timer(self):
|
def test_timer(self):
|
||||||
"Test that a periodic job is triggered"
|
"Test that a periodic job is triggered"
|
||||||
self._test_timer('layouts/timer.yaml')
|
self._test_timer('layouts/timer.yaml')
|
||||||
|
@ -748,19 +748,20 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
change = self._getChange(event.change_number, event.patch_number,
|
change = self._getChange(event.change_number, event.patch_number,
|
||||||
refresh=refresh)
|
refresh=refresh)
|
||||||
elif event.ref and event.ref.startswith('refs/tags/'):
|
elif event.ref and event.ref.startswith('refs/tags/'):
|
||||||
change = self._getTag(event)
|
change = self._getTag(event, refresh=refresh)
|
||||||
elif event.ref and not event.ref.startswith('refs/'):
|
elif event.ref and not event.ref.startswith('refs/'):
|
||||||
# Pre 2.13 Gerrit ref-updated events don't have branch prefixes.
|
# Pre 2.13 Gerrit ref-updated events don't have branch prefixes.
|
||||||
change = self._getBranch(event, branch=event.ref,
|
change = self._getBranch(event, branch=event.ref,
|
||||||
ref=f'refs/heads/{event.ref}')
|
ref=f'refs/heads/{event.ref}',
|
||||||
|
refresh=refresh)
|
||||||
elif event.ref and event.ref.startswith('refs/heads/'):
|
elif event.ref and event.ref.startswith('refs/heads/'):
|
||||||
# From the timer trigger or Post 2.13 Gerrit
|
# From the timer trigger or Post 2.13 Gerrit
|
||||||
change = self._getBranch(event,
|
change = self._getBranch(event,
|
||||||
branch=event.ref[len('refs/heads/'):],
|
branch=event.ref[len('refs/heads/'):],
|
||||||
ref=event.ref)
|
ref=event.ref, refresh=refresh)
|
||||||
elif event.ref:
|
elif event.ref:
|
||||||
# catch-all ref (ie, not a branch or head)
|
# catch-all ref (ie, not a branch or head)
|
||||||
change = self._getRef(event)
|
change = self._getRef(event, refresh=refresh)
|
||||||
else:
|
else:
|
||||||
self.log.warning("Unable to get change for %s" % (event,))
|
self.log.warning("Unable to get change for %s" % (event,))
|
||||||
change = None
|
change = None
|
||||||
@ -782,12 +783,15 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
change.patchset = patchset
|
change.patchset = patchset
|
||||||
return self._updateChange(key, change, event, history)
|
return self._updateChange(key, change, event, history)
|
||||||
|
|
||||||
def _getTag(self, event):
|
def _getTag(self, event, refresh=False):
|
||||||
tag = event.ref[len('refs/tags/'):]
|
tag = event.ref[len('refs/tags/'):]
|
||||||
key = ChangeKey(self.connection_name, event.project_name,
|
key = ChangeKey(self.connection_name, event.project_name,
|
||||||
'Tag', tag, event.newrev)
|
'Tag', tag, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda c: None)
|
||||||
return change
|
return change
|
||||||
project = self.source.getProject(event.project_name)
|
project = self.source.getProject(event.project_name)
|
||||||
change = Tag(project)
|
change = Tag(project)
|
||||||
@ -802,11 +806,14 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getBranch(self, event, branch, ref):
|
def _getBranch(self, event, branch, ref, refresh=False):
|
||||||
key = ChangeKey(self.connection_name, event.project_name,
|
key = ChangeKey(self.connection_name, event.project_name,
|
||||||
'Branch', branch, event.newrev)
|
'Branch', branch, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda c: None)
|
||||||
return change
|
return change
|
||||||
project = self.source.getProject(event.project_name)
|
project = self.source.getProject(event.project_name)
|
||||||
change = Branch(project)
|
change = Branch(project)
|
||||||
@ -821,11 +828,14 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getRef(self, event):
|
def _getRef(self, event, refresh=False):
|
||||||
key = ChangeKey(self.connection_name, event.project_name,
|
key = ChangeKey(self.connection_name, event.project_name,
|
||||||
'Ref', event.ref, event.newrev)
|
'Ref', event.ref, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda c: None)
|
||||||
return change
|
return change
|
||||||
project = self.source.getProject(event.project_name)
|
project = self.source.getProject(event.project_name)
|
||||||
change = Ref(project)
|
change = Ref(project)
|
||||||
|
@ -1306,11 +1306,11 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
event=event)
|
event=event)
|
||||||
else:
|
else:
|
||||||
if event.ref and event.ref.startswith('refs/tags/'):
|
if event.ref and event.ref.startswith('refs/tags/'):
|
||||||
change = self._getTag(project, event)
|
change = self._getTag(project, event, refresh=refresh)
|
||||||
elif event.ref and event.ref.startswith('refs/heads/'):
|
elif event.ref and event.ref.startswith('refs/heads/'):
|
||||||
change = self._getBranch(project, event)
|
change = self._getBranch(project, event, refresh=refresh)
|
||||||
else:
|
else:
|
||||||
change = self._getRef(project, event)
|
change = self._getRef(project, event, refresh=refresh)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getChange(self, project, number, patchset=None, refresh=False,
|
def _getChange(self, project, number, patchset=None, refresh=False,
|
||||||
@ -1363,12 +1363,15 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
log.debug('Finished updating change %s', change)
|
log.debug('Finished updating change %s', change)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getTag(self, project, event):
|
def _getTag(self, project, event, refresh=False):
|
||||||
tag = event.ref[len('refs/tags/'):]
|
tag = event.ref[len('refs/tags/'):]
|
||||||
key = ChangeKey(self.connection_name, project.name,
|
key = ChangeKey(self.connection_name, project.name,
|
||||||
'Tag', tag, event.newrev)
|
'Tag', tag, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda c: None)
|
||||||
return change
|
return change
|
||||||
change = Tag(project)
|
change = Tag(project)
|
||||||
change.tag = tag
|
change.tag = tag
|
||||||
@ -1385,12 +1388,15 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getBranch(self, project, event):
|
def _getBranch(self, project, event, refresh=False):
|
||||||
branch = event.ref[len('refs/heads/'):]
|
branch = event.ref[len('refs/heads/'):]
|
||||||
key = ChangeKey(self.connection_name, project.name,
|
key = ChangeKey(self.connection_name, project.name,
|
||||||
'Branch', branch, event.newrev)
|
'Branch', branch, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda c: None)
|
||||||
return change
|
return change
|
||||||
change = Branch(project)
|
change = Branch(project)
|
||||||
change.branch = branch
|
change.branch = branch
|
||||||
@ -1406,11 +1412,14 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getRef(self, project, event):
|
def _getRef(self, project, event, refresh=False):
|
||||||
key = ChangeKey(self.connection_name, project.name,
|
key = ChangeKey(self.connection_name, project.name,
|
||||||
'Ref', event.ref, event.newrev)
|
'Ref', event.ref, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda c: None)
|
||||||
return change
|
return change
|
||||||
change = Ref(project)
|
change = Ref(project)
|
||||||
change.ref = event.ref
|
change.ref = event.ref
|
||||||
|
@ -632,11 +632,14 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
log.info("Updated change from Gitlab %s" % change)
|
log.info("Updated change from Gitlab %s" % change)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getNonMRRef(self, project, event):
|
def _getNonMRRef(self, project, event, refresh=False):
|
||||||
key = ChangeKey(self.connection_name, project.name,
|
key = ChangeKey(self.connection_name, project.name,
|
||||||
'Ref', event.ref, event.newrev)
|
'Ref', event.ref, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda c: None)
|
||||||
return change
|
return change
|
||||||
if event.ref and event.ref.startswith('refs/tags/'):
|
if event.ref and event.ref.startswith('refs/tags/'):
|
||||||
change = Tag(project)
|
change = Tag(project)
|
||||||
|
@ -611,7 +611,7 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
else:
|
else:
|
||||||
self.log.info("Getting change for %s ref:%s" % (
|
self.log.info("Getting change for %s ref:%s" % (
|
||||||
project, event.ref))
|
project, event.ref))
|
||||||
change = self._getNonPRRef(project, event)
|
change = self._getNonPRRef(project, event, refresh=refresh)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getChange(self, project, number, patchset=None,
|
def _getChange(self, project, number, patchset=None,
|
||||||
@ -646,11 +646,14 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||||||
_update_change)
|
_update_change)
|
||||||
return change
|
return change
|
||||||
|
|
||||||
def _getNonPRRef(self, project, event):
|
def _getNonPRRef(self, project, event, refresh=False):
|
||||||
key = ChangeKey(self.connection_name, project.name,
|
key = ChangeKey(self.connection_name, project.name,
|
||||||
'Ref', event.ref, event.newrev)
|
'Ref', event.ref, event.newrev)
|
||||||
change = self._change_cache.get(key)
|
change = self._change_cache.get(key)
|
||||||
if change:
|
if change:
|
||||||
|
if refresh:
|
||||||
|
self._change_cache.updateChangeWithRetry(
|
||||||
|
key, change, lambda: None)
|
||||||
return change
|
return change
|
||||||
if event.ref and event.ref.startswith('refs/tags/'):
|
if event.ref and event.ref.startswith('refs/tags/'):
|
||||||
change = Tag(project)
|
change = Tag(project)
|
||||||
|
@ -173,6 +173,9 @@ class TimerDriver(Driver, TriggerInterface):
|
|||||||
event.branch = branch
|
event.branch = branch
|
||||||
event.zuul_event_id = str(uuid4().hex)
|
event.zuul_event_id = str(uuid4().hex)
|
||||||
event.timestamp = time.time()
|
event.timestamp = time.time()
|
||||||
|
# Refresh the branch in order to update the item in the
|
||||||
|
# change cache.
|
||||||
|
project.source.getChange(event, refresh=True)
|
||||||
log = get_annotated_logger(self.log, event)
|
log = get_annotated_logger(self.log, event)
|
||||||
log.debug("Adding event")
|
log.debug("Adding event")
|
||||||
self.sched.addTriggerEvent(self.name, event)
|
self.sched.addTriggerEvent(self.name, event)
|
||||||
|
Loading…
Reference in New Issue
Block a user