Populate missing change cache entries

The drivers are expected to populate the change cache before
passing trigger events to the scheduler so that all the difficult
work is done outside the main loop.  Further, the cache cleanup
is designed to accomodate this so that events in-flight don't have
their change cache entries removed early.

However, at several points since moving the change cache into ZK,
programming errors have caused us to encounter enqueued changes
without entries in the cache.  This usually causes Zuul to abort
pipeline processing and is unrecoverable.

We should continue to address all incidences of those since they
represent Zuul not working as designed.  However, it would be nice
if Zuul was able to recover from this.

To that end, this change allows missing changes to be added to the
change cache.

That is primarily accomplished by adjusting the Source.getChange
method to accept a ChangeKey instead of an Event.  Events are only
available when the triggering event happens, whereas a ChangeKey
is available when loading the pipeline state.

A ChangeKey represents the minimal distinguishing characteristics
of a change, and so can be used in all cases.  Some drivers obtain
extra information from events, so we still pass it into the getChange
method if available, but it's entirely optional -- we should still
get a workable Change object whether or not it's supplied.

Ref (and derived: Branch, Tag) objects currently only store their
newrev attribute in the ChangeKey, however we need to be able to
create Ref objects with an oldrev as well.  Since the old and new
revs of a Ref are not inherent to the ref but rather the generating
event, we can't get that from the source system.  So we need to
extend the ChangeKey object to include that.  Adding an extra
attribute is troublesome since the ChangeKey is not a ZKObject and
therefore doesn't have access to the model api version.  However,
it's not too much of a stretch to say that the "revision" field
(which like all ChangeKey fileds is driver-dependent) should include
the old and new revs.  Therefore, in these cases the field is
upgraded in a backwards compatible way to include old and newrev
in the standard "old..new" git encoding format.  We also need to
support "None" since that is a valid value in Zuul.

So that we can continue to identify cache errors, any time we encounter
a change key that is not in the cache and we also don't have an
event object, we log an error.

Almost all of this commit is the refactor to accept change keys
instead of events in getChange.  The functional change to populate
the cache if it's missing basically consists of just removing
getChangeByKey and replacing it with getChange.  A test which deletes
the cache midway through is added.

Change-Id: I4252bea6430cd434dbfaacd583db584cc796dfaa
This commit is contained in:
James E. Blair 2022-02-03 14:33:08 -08:00
parent 6e991361ac
commit df220cd4d6
22 changed files with 524 additions and 354 deletions

View File

@ -1442,20 +1442,23 @@ class TestScheduler(ZuulTestCase):
event.change_number = '1'
event.patch_number = '2'
a = source.getChange(event)
a = source.getChange(source.getChangeKey(event), event=event)
mgr = tenant.layout.pipelines['gate'].manager
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('Code-Review', 2)
a = source.getChange(event, refresh=True)
a = source.getChange(source.getChangeKey(event),
refresh=True, event=event)
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('Approved', 1)
a = source.getChange(event, refresh=True)
a = source.getChange(source.getChangeKey(event),
refresh=True, event=event)
self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.setWorkInProgress(True)
a = source.getChange(event, refresh=True)
a = source.getChange(source.getChangeKey(event),
refresh=True, event=event)
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
def test_project_merge_conflict(self):
@ -4294,7 +4297,8 @@ class TestScheduler(ZuulTestCase):
_, project = tenant.getProject('org/project')
for branch in project.source.getProjectBranches(project, tenant):
event = self._create_dummy_event(project, branch)
change = project.source.getChange(event)
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key, event=event)
cached_versions[branch] = change.cache_version
# The pipeline triggers every second, so we should have seen
@ -4342,7 +4346,8 @@ class TestScheduler(ZuulTestCase):
_, project = tenant.getProject('org/project')
for branch in project.source.getProjectBranches(project, tenant):
event = self._create_dummy_event(project, branch)
change = project.source.getChange(event)
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key, event=event)
# Make sure the timer driver refreshed the cache
self.assertGreater(change.cache_version,
cached_versions[branch])

View File

@ -166,6 +166,32 @@ class TestScaleOutScheduler(ZuulTestCase):
dict(name='project-test2', result='SUCCESS', changes='1,1 2,1'),
], ordered=False)
def test_change_cache_error(self):
# Test that if a change is deleted from the change cache,
# pipeline processing can continue
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# Delete the change cache
for connection in self.scheds.first.connections.connections.values():
if hasattr(connection, '_change_cache'):
connection.maintainCache([], max_age=0)
# Release
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
def test_pipeline_summary(self):
# Test that we can deal with a truncated pipeline summary
self.executor_server.hold_jobs_in_build = True

View File

@ -1286,7 +1286,7 @@ class DummySource:
def getProject(self, project_name):
return project_name
def getChangeByKey(self, key):
def getChange(self, change_key):
return DummyChange('project')

View File

@ -334,8 +334,5 @@ class ZKChangeCacheMixin:
self._change_cache.updateChangeWithRetry(change.cache_stat.key,
change, _update_attrs)
def getChangeByKey(self, key):
return self._change_cache.get(key)
def estimateCacheDataSize(self):
return self._change_cache.estimateDataSize()

View File

@ -324,10 +324,8 @@ class GerritEventConnector(threading.Thread):
# cache as it may be a dependency
if event.change_number:
refresh = True
key = ChangeKey(self.connection.connection_name, None,
'GerritChange', str(event.change_number),
str(event.patch_number))
if self.connection._change_cache.get(key) is None:
change_key = self.connection.source.getChangeKey(event)
if self.connection._change_cache.get(change_key) is None:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@ -350,8 +348,7 @@ class GerritEventConnector(threading.Thread):
# we need to update those objects by reference so that they
# have the correct/new information and also avoid hitting
# gerrit multiple times.
self.connection._getChange(event.change_number,
event.patch_number,
self.connection._getChange(change_key,
refresh=True, event=event)
@ -743,110 +740,101 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
def getChange(self, event, refresh=False):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number,
refresh=refresh)
elif event.ref and event.ref.startswith('refs/tags/'):
change = self._getTag(event, refresh=refresh)
elif event.ref and not event.ref.startswith('refs/'):
# Pre 2.13 Gerrit ref-updated events don't have branch prefixes.
change = self._getBranch(event, branch=event.ref,
ref=f'refs/heads/{event.ref}',
refresh=refresh)
elif event.ref and event.ref.startswith('refs/heads/'):
# From the timer trigger or Post 2.13 Gerrit
change = self._getBranch(event,
branch=event.ref[len('refs/heads/'):],
ref=event.ref, refresh=refresh)
elif event.ref:
# catch-all ref (ie, not a branch or head)
change = self._getRef(event, refresh=refresh)
else:
self.log.warning("Unable to get change for %s" % (event,))
change = None
return change
def getChange(self, change_key, refresh=False, event=None):
if change_key.connection_name != self.connection_name:
return None
if change_key.change_type == 'GerritChange':
return self._getChange(change_key, refresh=refresh, event=event)
elif change_key.change_type == 'Tag':
return self._getTag(change_key, refresh=refresh, event=event)
elif change_key.change_type == 'Branch':
return self._getBranch(change_key, refresh=refresh, event=event)
elif change_key.change_type == 'Ref':
return self._getRef(change_key, refresh=refresh, event=event)
def _getChange(self, number, patchset, refresh=False, history=None,
def _getChange(self, change_key, refresh=False, history=None,
event=None):
# Ensure number and patchset are str
number = str(number)
patchset = str(patchset)
key = ChangeKey(self.connection_name, None,
'GerritChange', number, patchset)
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
if change and not refresh:
return change
if not change:
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
change = GerritChange(None)
change.number = number
change.patchset = patchset
return self._updateChange(key, change, event, history)
change.number = change_key.stable_id
change.patchset = change_key.revision
return self._updateChange(change_key, change, event, history)
def _getTag(self, event, refresh=False):
tag = event.ref[len('refs/tags/'):]
key = ChangeKey(self.connection_name, event.project_name,
'Tag', tag, event.newrev)
change = self._change_cache.get(key)
def _getTag(self, change_key, refresh=False, event=None):
tag = change_key.stable_id
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda c: None)
change_key, change, lambda c: None)
return change
project = self.source.getProject(event.project_name)
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
change = Tag(project)
change.tag = tag
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self._getWebUrl(project, sha=event.newrev)
change.ref = f'refs/tags/{tag}'
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = self._getWebUrl(project, sha=change.newrev)
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def _getBranch(self, event, branch, ref, refresh=False):
key = ChangeKey(self.connection_name, event.project_name,
'Branch', branch, event.newrev)
change = self._change_cache.get(key)
def _getBranch(self, change_key, refresh=False, event=None):
branch = change_key.stable_id
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda c: None)
change_key, change, lambda c: None)
return change
project = self.source.getProject(event.project_name)
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
change = Branch(project)
change.branch = branch
change.ref = ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self._getWebUrl(project, sha=event.newrev)
change.ref = f'refs/heads/{branch}'
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = self._getWebUrl(project, sha=change.newrev)
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def _getRef(self, event, refresh=False):
key = ChangeKey(self.connection_name, event.project_name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
def _getRef(self, change_key, refresh=False, event=None):
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda c: None)
change_key, change, lambda c: None)
return change
project = self.source.getProject(event.project_name)
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self._getWebUrl(project, sha=event.newrev)
change.ref = change_key.stable_id
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = self._getWebUrl(project, sha=change.newrev)
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def _getDependsOnFromCommit(self, message, change, event):
@ -939,7 +927,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
dep_num, dep_ps = data.depends_on
log.debug("Updating %s: Getting git-dependent change %s,%s",
change, dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps, history=history,
dep_key = ChangeKey(self.connection_name, None,
'GerritChange', str(dep_num), str(dep_ps))
dep = self._getChange(dep_key, history=history,
event=event)
# This is a git commit dependency. So we only ignore it if it is
# already merged. So even if it is "ABANDONED", we should not
@ -953,7 +943,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
change.message, change, event):
log.debug("Updating %s: Getting commit-dependent "
"change %s,%s", change, dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps, history=history,
dep_key = ChangeKey(self.connection_name, None,
'GerritChange', str(dep_num), str(dep_ps))
dep = self._getChange(dep_key, history=history,
event=event)
if dep.open and dep not in needs_changes:
compat_needs_changes.append(dep.cache_key)
@ -965,7 +957,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
try:
log.debug("Updating %s: Getting git-needed change %s,%s",
change, dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps, history=history,
dep_key = ChangeKey(self.connection_name, None,
'GerritChange', str(dep_num), str(dep_ps))
dep = self._getChange(dep_key, history=history,
event=event)
if (dep.open and dep.is_current_patchset and
dep not in needed_by_changes):
@ -987,8 +981,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
# change). In case the dep is already in history we already
# refreshed this change so refresh is not needed in this case.
refresh = (dep_num, dep_ps) not in history
dep_key = ChangeKey(self.connection_name, None,
'GerritChange', str(dep_num), str(dep_ps))
dep = self._getChange(
dep_num, dep_ps, refresh=refresh, history=history,
dep_key, refresh=refresh, history=history,
event=event)
if (dep.open and dep.is_current_patchset
and dep not in needed_by_changes):
@ -1094,8 +1090,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
changes = [] # type: List[GerritChange]
for record in data:
try:
changes.append(
self._getChange(record.number, record.current_patchset))
change_key = ChangeKey(self.connection_name, None,
'GerritChange',
str(record.number),
str(record.current_patchset))
changes.append(self._getChange(change_key))
except Exception:
self.log.exception("Unable to query change %s",
record.number)

View File

@ -23,7 +23,6 @@ from zuul.model import Change, TriggerEvent
from zuul.driver.util import time_to_seconds
from zuul import exceptions
EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes

View File

@ -22,6 +22,7 @@ from zuul.model import Project
from zuul.driver.gerrit.gerritmodel import GerritRefFilter
from zuul.driver.util import scalar_or_list, to_list
from zuul.lib.dependson import find_dependency_headers
from zuul.zk.change_cache import ChangeKey
class GerritSource(BaseSource):
@ -54,8 +55,36 @@ class GerritSource(BaseSource):
def postConfig(self):
pass
def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh)
def getChangeKey(self, event):
connection_name = self.connection.connection_name
if event.change_number:
return ChangeKey(connection_name, None,
'GerritChange',
str(event.change_number),
str(event.patch_number))
revision = f'{event.oldrev}..{event.newrev}'
if event.ref and event.ref.startswith('refs/tags/'):
tag = event.ref[len('refs/tags/'):]
return ChangeKey(connection_name, event.project_name,
'Tag', tag, revision)
if event.ref and not event.ref.startswith('refs/'):
# Pre 2.13 Gerrit ref-updated events don't have branch prefixes.
return ChangeKey(connection_name, event.project_name,
'Branch', event.ref, revision)
if event.ref and event.ref.startswith('refs/heads/'):
# From the timer trigger or Post 2.13 Gerrit
branch = event.ref[len('refs/heads/'):]
return ChangeKey(connection_name, event.project_name,
'Branch', branch, revision)
if event.ref:
# catch-all ref (ie, not a branch or head)
return ChangeKey(connection_name, event.project_name,
'Ref', event.ref, revision)
self.log.warning("Unable to format change key for %s" % (event,))
def getChange(self, change_key, refresh=False, event=None):
return self.connection.getChange(change_key, refresh=refresh,
event=event)
def getChangeByURL(self, url, event):
try:
@ -76,14 +105,13 @@ class GerritSource(BaseSource):
results = self.connection.simpleQuery(query, event=event)
if not results:
return None
change = self.connection._getChange(
results[0].number, results[0].current_patchset,
event=event)
change_key = ChangeKey(self.connection.connection_name, None,
'GerritChange',
str(results[0].number),
str(results[0].current_patchset))
change = self.connection._getChange(change_key, event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
changes = []
if not change.uris:
@ -107,8 +135,11 @@ class GerritSource(BaseSource):
if key in seen:
continue
seen.add(key)
change = self.connection._getChange(
result.number, result.current_patchset)
change_key = ChangeKey(self.connection.connection_name, None,
'GerritChange',
str(result.number),
str(result.current_patchset))
change = self.connection._getChange(change_key)
changes.append(change)
return changes

View File

@ -25,7 +25,6 @@ from zuul.driver.git.gitwatcher import GitWatcher
from zuul.model import Ref, Branch
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
@ -102,40 +101,40 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection):
refs[ref] = sha
return refs
def getChange(self, event, refresh=False):
key = ChangeKey(self.connection_name, event.project_name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
def getChange(self, change_key, refresh=False, event=None):
change = self._change_cache.get(change_key)
if change:
return change
if event.ref and event.ref.startswith('refs/heads/'):
branch = event.ref[len('refs/heads/'):]
project = self.getProject(event.project_name)
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
if change_key.change_type == 'Branch':
branch = change_key.stable_id
change = Branch(project)
change.branch = branch
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.ref = f'refs/heads/{branch}'
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = ""
change.files = self.getChangeFilesUpdated(
event.project_name, change.branch, event.oldrev)
elif event.ref:
change_key.project_name, branch, change_key.oldrev)
elif change_key.change_type == 'Ref':
# catch-all ref (ie, not a branch or head)
project = self.getProject(event.project_name)
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.ref = change_key.stable_id
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = ""
else:
self.log.warning("Unable to get change for %s", event)
self.log.warning("Unable to get change for %s", change_key)
return None
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def getProjectBranches(self, project, tenant, min_ltime=-1):
@ -161,7 +160,8 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection):
# Force changes cache update before passing
# the event to the scheduler
self.getChange(event)
change_key = self.source.getChangeKey(event)
self.getChange(change_key)
self.logEvent(event)
# Pass the event to the scheduler
self.sched.addTriggerEvent(self.driver_name, event)

View File

@ -17,7 +17,6 @@ import re
from zuul.model import TriggerEvent
from zuul.model import EventFilter
EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes

View File

@ -15,6 +15,7 @@
import logging
from zuul.source import BaseSource
from zuul.model import Project
from zuul.zk.change_cache import ChangeKey
class GitSource(BaseSource):
@ -35,15 +36,25 @@ class GitSource(BaseSource):
def canMerge(self, change, allow_needs, event=None, allow_refresh=False):
raise NotImplementedError()
def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh)
def getChangeKey(self, event):
connection_name = self.connection.connection_name
revision = f'{event.oldrev}..{event.newrev}'
if event.ref and event.ref.startswith('refs/heads/'):
branch = event.ref[len('refs/heads/'):]
return ChangeKey(connection_name, event.project_name,
'Branch', branch, revision)
if event.ref:
return ChangeKey(connection_name, event.project_name,
'Ref', event.ref, revision)
self.log.warning("Unable to format change key for %s" % (self,))
def getChange(self, change_key, refresh=False, event=None):
return self.connection.getChange(change_key, refresh=refresh,
event=event)
def getChangeByURL(self, url, event):
return None
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
return []

View File

@ -397,12 +397,10 @@ class GithubEventProcessor(object):
project = self.connection.source.getProject(event.project_name)
change = None
if event.change_number:
change = self.connection._getChange(
project,
event.change_number,
event.patch_number,
refresh=True,
event=event)
change_key = self.connection.source.getChangeKey(event)
change = self.connection._getChange(change_key,
refresh=True,
event=event)
self.log.debug("Refreshed change %s,%s",
event.change_number, event.patch_number)
@ -1296,47 +1294,43 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return self._github_client_manager.getGithubClient(
project_name=project_name, zuul_event_id=zuul_event_id)
def getChange(self, event, refresh=False):
"""Get the change representing an event."""
def getChange(self, change_key, refresh=False, event=None):
if change_key.connection_name != self.connection_name:
return None
if change_key.change_type == 'PullRequest':
return self._getChange(change_key, refresh=refresh, event=event)
elif change_key.change_type == 'Tag':
return self._getTag(change_key, refresh=refresh, event=event)
elif change_key.change_type == 'Branch':
return self._getBranch(change_key, refresh=refresh, event=event)
elif change_key.change_type == 'Ref':
return self._getRef(change_key, refresh=refresh, event=event)
project = self.source.getProject(event.project_name)
if event.change_number:
change = self._getChange(project, event.change_number,
event.patch_number, refresh=refresh,
event=event)
else:
if event.ref and event.ref.startswith('refs/tags/'):
change = self._getTag(project, event, refresh=refresh)
elif event.ref and event.ref.startswith('refs/heads/'):
change = self._getBranch(project, event, refresh=refresh)
else:
change = self._getRef(project, event, refresh=refresh)
return change
def _getChange(self, project, number, patchset=None, refresh=False,
event=None):
def _getChange(self, change_key, refresh=False, event=None):
# Note(tobiash): We force the pull request number to int centrally here
# because it can originate from different sources (github event, manual
# enqueue event) where some might just parse the string and forward it.
number = int(number)
key = ChangeKey(self.connection_name, project.name,
'PullRequest', str(number),
str(patchset))
change = self._change_cache.get(key)
number = int(change_key.stable_id)
change = self._change_cache.get(change_key)
if change and not refresh:
return change
project = self.source.getProject(change_key.project_name)
if not change:
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
change = PullRequest(project.name)
change.project = project
change.number = number
change.patchset = patchset
change.patchset = change_key.revision
# This can be called multi-threaded during github event
# preprocessing. In order to avoid data races perform locking
# by cached key. Try to acquire the lock non-blocking at first.
# If the lock is already taken we're currently updating the very
# same chnange right now and would likely get the same data again.
lock = self._change_update_lock.setdefault(key, threading.Lock())
lock = self._change_update_lock.setdefault(change_key,
threading.Lock())
if lock.acquire(blocking=False):
try:
pull = self.getPull(change.project.name, change.number,
@ -1346,11 +1340,11 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self._updateChange(c, event, pull)
change = self._change_cache.updateChangeWithRetry(
key, change, _update_change)
change_key, change, _update_change)
finally:
# We need to remove the lock here again so we don't leak
# them.
del self._change_update_lock[key]
del self._change_update_lock[change_key]
lock.release()
else:
# We didn't get the lock so we don't need to update the same
@ -1363,75 +1357,81 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
log.debug('Finished updating change %s', change)
return change
def _getTag(self, project, event, refresh=False):
tag = event.ref[len('refs/tags/'):]
key = ChangeKey(self.connection_name, project.name,
'Tag', tag, event.newrev)
change = self._change_cache.get(key)
def _getTag(self, change_key, refresh=False, event=None):
tag = change_key.stable_id
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda c: None)
change_key, change, lambda c: None)
return change
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
change = Tag(project)
change.tag = tag
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.ref = f'refs/tags/{tag}'
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
# Build the url pointing to this tag/release on GitHub.
change.url = self.getGitwebUrl(project, sha=event.newrev, tag=tag)
change.url = self.getGitwebUrl(project, sha=change.newrev, tag=tag)
if hasattr(event, 'commits'):
change.files = self.getPushedFileNames(event)
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def _getBranch(self, project, event, refresh=False):
branch = event.ref[len('refs/heads/'):]
key = ChangeKey(self.connection_name, project.name,
'Branch', branch, event.newrev)
change = self._change_cache.get(key)
def _getBranch(self, change_key, refresh=False, event=None):
branch = change_key.stable_id
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda c: None)
change_key, change, lambda c: None)
return change
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
change = Branch(project)
change.branch = branch
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
change.ref = f'refs/heads/{branch}'
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = self.getGitwebUrl(project, sha=change.newrev)
if hasattr(event, 'commits'):
change.files = self.getPushedFileNames(event)
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def _getRef(self, project, event, refresh=False):
key = ChangeKey(self.connection_name, project.name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
def _getRef(self, change_key, refresh=False, event=None):
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda c: None)
change_key, change, lambda c: None)
return change
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
change.ref = change_key.stable_id
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = self.getGitwebUrl(project, sha=change.newrev)
if hasattr(event, 'commits'):
change.files = self.getPushedFileNames(event)
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def getChangesDependingOn(self, change, projects, tenant):
@ -1491,8 +1491,9 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
for key in keys:
(proj, num, sha) = key
project = self.source.getProject(proj)
change = self._getChange(project, int(num), patchset=sha)
dep_change_key = ChangeKey(self.connection_name, proj,
'PullRequest', str(num), str(sha))
change = self._getChange(dep_change_key)
changes.append(change)
return changes
@ -1770,10 +1771,11 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
cached_pr_numbers = self._sha_pr_cache.get(project_name, sha)
if len(cached_pr_numbers) > 1:
raise Exception('Multiple pulls found with head sha %s' % sha)
project = self.getProject(project_name)
if len(cached_pr_numbers) == 1:
for pr in cached_pr_numbers:
pr_body = self._getChange(project, pr, sha, event=event).pr
pr_change_key = ChangeKey(self.connection_name, project_name,
'PullRequest', str(pr), str(sha))
pr_body = self._getChange(pr_change_key, event=event).pr
return pr_body
github = self.getGithubClient(project_name, zuul_event_id=event)
@ -1789,8 +1791,10 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
# with multiple pulls found.
found_pr_body = None
for item in issues:
pr_body = self._getChange(
project, item.issue.number, sha, event=event).pr
pr_change_key = ChangeKey(self.connection_name, project_name,
'PullRequest', str(item.issue.number),
str(sha))
pr_body = self._getChange(pr_change_key, event=event).pr
self._sha_pr_cache.update(project_name, pr_body)
if pr_body['head']['sha'] == sha:
if found_pr_body:

View File

@ -22,6 +22,7 @@ from zuul.source import BaseSource
from zuul.model import Project
from zuul.driver.github.githubmodel import GithubRefFilter
from zuul.driver.util import scalar_or_list, to_list
from zuul.zk.change_cache import ChangeKey
class GithubSource(BaseSource):
@ -64,8 +65,31 @@ class GithubSource(BaseSource):
"""Called after configuration has been processed."""
pass
def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh)
def getChangeKey(self, event):
connection_name = self.connection.connection_name
if event.change_number:
return ChangeKey(connection_name, event.project_name,
'PullRequest',
str(event.change_number),
str(event.patch_number))
revision = f'{event.oldrev}..{event.newrev}'
if event.ref and event.ref.startswith('refs/tags/'):
tag = event.ref[len('refs/tags/'):]
return ChangeKey(connection_name, event.project_name,
'Tag', tag, revision)
if event.ref and event.ref.startswith('refs/heads/'):
branch = event.ref[len('refs/heads/'):]
return ChangeKey(connection_name, event.project_name,
'Branch', branch, revision)
if event.ref:
return ChangeKey(connection_name, event.project_name,
'Ref', event.ref, revision)
self.log.warning("Unable to format change key for %s" % (self,))
def getChange(self, change_key, refresh=False, event=None):
return self.connection.getChange(change_key, refresh=refresh,
event=event)
change_re = re.compile(r"/(.*?)/(.*?)/pull/(\d+)[\w]*")
@ -88,16 +112,13 @@ class GithubSource(BaseSource):
if not pull:
return None
proj = pull.get('base').get('repo').get('full_name')
project = self.getProject(proj)
change = self.connection._getChange(
project, num,
patchset=pull.get('head').get('sha'),
event=event)
change_key = ChangeKey(self.connection.connection_name, proj,
'PullRequest',
str(num),
pull.get('head').get('sha'))
change = self.connection._getChange(change_key, event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(change, projects, tenant)

View File

@ -41,7 +41,6 @@ from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest
from zuul.zk.branch_cache import BranchCache
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue
@ -237,12 +236,8 @@ class GitlabEventConnector(threading.Thread):
event.timestamp = timestamp
event.project_hostname = self.connection.canonical_hostname
if event.change_number:
project = self.connection.source.getProject(event.project_name)
self.connection._getChange(project,
event.change_number,
event.patch_number,
refresh=True,
url=event.change_url,
change_key = self.connection.source.getChangeKey(event)
self.connection._getChange(change_key, refresh=True,
event=event)
# If this event references a branch and we're excluding
@ -562,36 +557,39 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
project.name)
return cloneurl
def getChange(self, event, refresh=False):
project = self.source.getProject(event.project_name)
if event.change_number:
def getChange(self, change_key, refresh=False, event=None):
if change_key.connection_name != self.connection_name:
return None
if change_key.change_type == 'MergeRequest':
self.log.info("Getting change for %s#%s" % (
project, event.change_number))
change = self._getChange(
project, event.change_number, event.patch_number,
refresh=refresh, event=event)
change_key.project_name, change_key.stable_id))
change = self._getChange(change_key,
refresh=refresh, event=event)
else:
self.log.info("Getting change for %s ref:%s" % (
project, event.ref))
change = self._getNonMRRef(project, event)
change_key.project_name, change_key.stable_id))
change = self._getNonMRRef(change_key, event=event)
return change
def _getChange(self, project, number, patch_number=None,
refresh=False, url=None, event=None):
def _getChange(self, change_key, refresh=False, event=None):
log = get_annotated_logger(self.log, event)
key = ChangeKey(self.connection_name, project.name,
'MergeRequest', str(number),
str(patch_number))
change = self._change_cache.get(key)
number = int(change_key.stable_id)
change = self._change_cache.get(change_key)
if change and not refresh:
log.debug("Getting change from cache %s" % str(key))
log.debug("Getting change from cache %s" % str(change_key))
return change
project = self.source.getProject(change_key.project_name)
if not change:
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
if event:
url = event.change_url
change = MergeRequest(project.name)
change.project = project
change.number = number
# patch_number is the tips commit SHA of the MR
change.patchset = patch_number
change.patchset = change_key.revision
change.url = url or self.getMRUrl(project.name, number)
change.uris = [change.url.split('://', 1)[-1]] # remove scheme
@ -603,10 +601,46 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def _update_change(c):
self._updateChange(c, event, mr)
change = self._change_cache.updateChangeWithRetry(key, change,
change = self._change_cache.updateChangeWithRetry(change_key, change,
_update_change)
return change
def _getNonMRRef(self, change_key, refresh=False, event=None):
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
change_key, change, lambda c: None)
return change
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
if change_key.change_type == 'Tag':
change = Tag(project)
tag = change_key.stable_id
change.tag = tag
change.ref = f'refs/tags/{tag}'
elif change_key.change_type == 'Branch':
branch = change_key.stable_id
change = Branch(project)
change.branch = branch
change.ref = f'refs/heads/{branch}'
else:
change = Ref(project)
change.ref = change_key.stable_id
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = self.getGitwebUrl(project, sha=change.newrev)
# Explicitly set files to None and let the pipelines processor
# call the merger asynchronuously
change.files = None
try:
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(change_key)
return change
def _updateChange(self, change, event, mr):
log = get_annotated_logger(self.log, event)
change.mr = mr
@ -632,36 +666,6 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
log.info("Updated change from Gitlab %s" % change)
return change
def _getNonMRRef(self, project, event, refresh=False):
key = ChangeKey(self.connection_name, project.name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda c: None)
return change
if event.ref and event.ref.startswith('refs/tags/'):
change = Tag(project)
change.tag = event.tag
elif event.ref and event.ref.startswith('refs/heads/'):
change = Branch(project)
change.branch = event.branch
else:
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
# Explicitly set files to None and let the pipelines processor
# call the merger asynchronuously
change.files = None
try:
self._change_cache.set(key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
return change
def canMerge(self, change, allow_needs, event=None):
log = get_annotated_logger(self.log, event)
can_merge = True if change.merge_status == "can_be_merged" else False

View File

@ -18,9 +18,9 @@ import urllib
from zuul.model import Project
from zuul.source import BaseSource
from zuul.driver.gitlab.gitlabmodel import GitlabRefFilter
from zuul.driver.util import scalar_or_list, to_list
from zuul.zk.change_cache import ChangeKey
class GitlabSource(BaseSource):
@ -57,8 +57,30 @@ class GitlabSource(BaseSource):
"""Called after configuration has been processed."""
raise NotImplementedError()
def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh)
def getChangeKey(self, event):
connection_name = self.connection.connection_name
if event.change_number:
return ChangeKey(connection_name, event.project_name,
'MergeRequest',
str(event.change_number),
str(event.patch_number))
revision = f'{event.oldrev}..{event.newrev}'
if event.ref and event.ref.startswith('refs/tags/'):
tag = event.ref[len('refs/tags/'):]
return ChangeKey(connection_name, event.project_name,
'Tag', tag, revision)
if event.ref and event.ref.startswith('refs/heads/'):
branch = event.ref[len('refs/heads/'):]
return ChangeKey(connection_name, event.project_name,
'Branch', branch, revision)
if event.ref:
return ChangeKey(connection_name, event.project_name,
'Ref', event.ref, revision)
self.log.warning("Unable to format change key for %s" % (self,))
def getChange(self, change_key, refresh=False, event=None):
return self.connection.getChange(change_key, refresh=refresh,
event=event)
def getChangeByURL(self, url, event):
try:
@ -76,15 +98,12 @@ class GitlabSource(BaseSource):
mr = self.connection.getMR(project_name, num)
if not mr:
return None
project = self.getProject(project_name)
change = self.connection._getChange(
project, num, mr['sha'], url=url,
event=event)
change_key = ChangeKey(self.connection.connection_name, project_name,
'MergeRequest',
str(num), mr['sha'])
change = self.connection._getChange(change_key, event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(
change, projects, tenant)

View File

@ -33,7 +33,6 @@ from zuul.lib import dependson
from zuul.zk.branch_cache import BranchCache
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue
@ -212,12 +211,8 @@ class PagureEventConnector(threading.Thread):
if event:
event.timestamp = timestamp
if event.change_number:
project = self.connection.source.getProject(event.project_name)
self.connection._getChange(project,
event.change_number,
event.patch_number,
refresh=True,
url=event.change_url,
change_key = self.connection.source.getChangeKey(event)
self.connection._getChange(change_key, refresh=True,
event=event)
event.project_hostname = self.connection.canonical_hostname
self.connection.logEvent(event)
@ -600,81 +595,91 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def getGitUrl(self, project):
return '%s/%s' % (self.cloneurl, project.name)
def getChange(self, event, refresh=False):
project = self.source.getProject(event.project_name)
if event.change_number:
def getChange(self, change_key, refresh=False, event=None):
if change_key.connection_name != self.connection_name:
return None
if change_key.change_type == 'PullRequest':
self.log.info("Getting change for %s#%s" % (
project, event.change_number))
change = self._getChange(
project, event.change_number, event.patch_number,
refresh=refresh, event=event)
change_key.project_name, change_key.stable_id))
change = self._getChange(change_key,
refresh=refresh, event=event)
else:
self.log.info("Getting change for %s ref:%s" % (
project, event.ref))
change = self._getNonPRRef(project, event, refresh=refresh)
change_key.project_name, change_key.stable_id))
change = self._getNonPRRef(change_key, event=event)
return change
def _getChange(self, project, number, patchset=None,
refresh=False, url=None, event=None):
key = ChangeKey(self.connection_name, project.name,
'PullRequest', str(number),
str(patchset))
change = self._change_cache.get(key)
def _getChange(self, change_key, refresh=False, event=None):
log = get_annotated_logger(self.log, event)
number = int(change_key.stable_id)
change = self._change_cache.get(change_key)
if change and not refresh:
self.log.debug("Getting change from cache %s" % str(key))
log.debug("Getting change from cache %s" % str(change_key))
return change
project = self.source.getProject(change_key.project_name)
if not change:
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
if event:
url = event.change_url
change = PullRequest(project.name)
change.project = project
change.number = number
# patchset is the tips commit of the PR
change.patchset = patchset
change.url = url
change.patchset = change_key.revision
change.url = url or self.getPullUrl(project.name, number)
change.uris = [
'%s/%s/pull/%s' % (self.baseurl, project, number),
'%s/%s/pull/%s' % (self.baseurl, project.name, number),
]
self.log.debug("Getting change pr#%s from project %s" % (
log.debug("Getting change pr#%s from project %s" % (
number, project.name))
self.log.info("Updating change from pagure %s" % change)
log.info("Updating change from pagure %s" % change)
pull = self.getPull(change.project.name, change.number)
def _update_change(c):
self._updateChange(c, event, pull)
change = self._change_cache.updateChangeWithRetry(key, change,
change = self._change_cache.updateChangeWithRetry(change_key, change,
_update_change)
return change
def _getNonPRRef(self, project, event, refresh=False):
key = ChangeKey(self.connection_name, project.name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
def _getNonPRRef(self, change_key, refresh=False, event=None):
change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
key, change, lambda: None)
change_key, change, lambda: None)
return change
if event.ref and event.ref.startswith('refs/tags/'):
if not event:
self.log.error("Change %s not found in cache and no event",
change_key)
project = self.source.getProject(change_key.project_name)
if change_key.change_type == 'Tag':
change = Tag(project)
change.tag = event.tag
elif event.ref and event.ref.startswith('refs/heads/'):
tag = change_key.stable_id
change.tag = tag
change.ref = f'refs/tags/{tag}'
elif change_key.change_type == 'Branch':
branch = change_key.stable_id
change = Branch(project)
change.branch = event.branch
change.branch = branch
change.ref = f'refs/heads/{branch}'
else:
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
change.ref = change_key.stable_id
change.oldrev = change_key.oldrev
change.newrev = change_key.newrev
change.url = self.getGitwebUrl(project, sha=change.newrev)
# Pagure does not send files details in the git-receive event.
# Explicitly set files to None and let the pipelines processor
# call the merger asynchronuously
change.files = None
try:
self._change_cache.set(key, change)
self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
change = self._change_cache.get(key)
change = self._change_cache.get(change_key)
return change
def _hasRequiredStatusChecks(self, change):

View File

@ -18,9 +18,9 @@ import logging
from zuul.source import BaseSource
from zuul.model import Project
from zuul.driver.pagure.paguremodel import PagureRefFilter
from zuul.driver.util import scalar_or_list, to_list
from zuul.zk.change_cache import ChangeKey
class PagureSource(BaseSource):
@ -61,8 +61,30 @@ class PagureSource(BaseSource):
"""Called after configuration has been processed."""
raise NotImplementedError()
def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh)
def getChangeKey(self, event):
connection_name = self.connection.connection_name
if event.change_number:
return ChangeKey(connection_name, event.project_name,
'PullRequest',
str(event.change_number),
str(event.patch_number))
revision = f'{event.oldrev}..{event.newrev}'
if event.ref and event.ref.startswith('refs/tags/'):
tag = event.ref[len('refs/tags/'):]
return ChangeKey(connection_name, event.project_name,
'Tag', tag, revision)
if event.ref and event.ref.startswith('refs/heads/'):
branch = event.ref[len('refs/heads/'):]
return ChangeKey(connection_name, event.project_name,
'Branch', branch, revision)
if event.ref:
return ChangeKey(connection_name, event.project_name,
'Ref', event.ref, revision)
self.log.warning("Unable to format change key for %s" % (self,))
def getChange(self, change_key, refresh=False, event=None):
return self.connection.getChange(change_key, refresh=refresh,
event=event)
def getChangeByURL(self, url, event):
try:
@ -80,17 +102,12 @@ class PagureSource(BaseSource):
pull = self.connection.getPull(project_name, num, event=event)
if not pull:
return None
project = self.getProject(project_name)
change = self.connection._getChange(
project, num,
patchset=pull.get('commit_stop'),
url=url,
event=event)
change_key = ChangeKey(self.connection.connection_name, project_name,
'PullRequest',
str(num), pull.get('commit_stop'))
change = self.connection._getChange(change_key, event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(
change, projects, tenant)

View File

@ -180,8 +180,10 @@ class TimerDriver(Driver, TriggerInterface):
event.timestamp = time.time()
# Refresh the branch in order to update the item in the
# change cache.
change_key = project.source.getChangeKey(event)
with self.project_update_locks[project.canonical_name]:
project.source.getChange(event, refresh=True)
project.source.getChange(change_key, refresh=True,
event=event)
log = get_annotated_logger(self.log, event)
log.debug("Adding event")
self.sched.addTriggerEvent(self.name, event)

View File

@ -218,7 +218,7 @@ class PipelineManager(metaclass=ABCMeta):
change = self._change_cache.get(key)
if change is None:
source = self.sched.connections.getSource(key.connection_name)
change = source.getChangeByKey(key)
change = source.getChange(key)
if change is None:
self.log.error("Unable to resolve change from key %s", key)
self._change_cache[change.cache_key] = change

View File

@ -5479,7 +5479,7 @@ class Change(Branch):
key = ChangeKey.fromReference(reference)
if key not in related:
source = sched.connections.getSource(key.connection_name)
change = source.getChangeByKey(key)
change = source.getChange(key)
change.getRelatedChanges(sched, related)
def getSafeAttributes(self):

View File

@ -1664,7 +1664,8 @@ class Scheduler(threading.Thread):
(trusted, project) = tenant.getProject(canonical_name)
if project is None:
raise ValueError('Unknown project %s' % event.project_name)
change = project.source.getChange(event)
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key, event=event)
if change.project.name != project.name:
if event.change:
item = 'Change %s' % event.change
@ -1698,7 +1699,9 @@ class Scheduler(threading.Thread):
if project is None:
raise ValueError(f'Unknown project {event.project_name}')
try:
change = project.source.getChange(event, refresh=True)
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key,
event=event, refresh=True)
except Exception as exc:
raise ValueError('Unknown change') from exc
@ -1716,7 +1719,8 @@ class Scheduler(threading.Thread):
trusted, project = tenant.getProject(canonical_name)
if project is None:
return
change = project.source.getChange(event)
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key, event=event)
for shared_queue in pipeline.queues:
for item in shared_queue.queue:
if item.change.project != change.project:
@ -1961,7 +1965,8 @@ class Scheduler(threading.Thread):
return
try:
change = project.source.getChange(event)
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key, event=event)
except exceptions.ChangeNotFound as e:
log.debug("Unable to get change %s from source %s",
e.change, project.source)
@ -2039,7 +2044,8 @@ class Scheduler(threading.Thread):
if project is None:
return
try:
change = project.source.getChange(event)
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key, event=event)
except exceptions.ChangeNotFound as e:
log.debug("Unable to get change %s from source %s",
e.change, project.source)

View File

@ -58,14 +58,25 @@ class BaseSource(object, metaclass=abc.ABCMeta):
"""Called after configuration has been processed."""
@abc.abstractmethod
def getChange(self, event):
"""Get the change representing an event.
def getChangeKey(self, event):
"""Get a ChangeKey from a ChangeManagementEvent or TriggerEvent"""
@abc.abstractmethod
def getChange(self, change_key, refresh=False, event=None):
"""Get the change represented by a change_key
This method is called very frequently, and should generally
return quickly. The connection is expected to cache change
objects and automatically update them as related events are
received.
The event is optional, and if present may be used to annotate
log entries and supply additional information about the change
if a refresh is necessary.
If the change key does not correspond to this source, return
None.
"""
@abc.abstractmethod
@ -98,14 +109,6 @@ class BaseSource(object, metaclass=abc.ABCMeta):
raise
return dep
def getChangeByKey(self, key):
"""Get the change corresponding to the supplied cache key.
The key may not correspond to this source. Return None if it
doesn't.
"""
raise NotImplementedError
@abc.abstractmethod
def getChangesDependingOn(self, change, projects, tenant):
"""Return changes which depend on changes at the supplied URIs.

View File

@ -56,7 +56,9 @@ class ChangeKey:
refers to a change, it should use ChangeKey.reference. This is a
dictionary with structured information about the change. The
contents can be used to construct a ChangeKey, and that can be
used to pull the Change from the cache.
used to pull the Change from the cache. The reference is used by
other objects in ZooKeeper to refer to changes, so the
serialization format must be stable or backwards compatible.
The cache itself uses a sha256 digest of the reference as the
actual cache key in ZK. This reduces and stabilizes the length of
@ -111,6 +113,26 @@ class ChangeKey:
self.stable_id == str_or_none(other.stable_id),
])
# Convenience methods for drivers that encode old/newrev in
# revision. Revision is not guaranteed to use this format.
@property
def oldrev(self):
if '..' in self.revision:
old = self.revision.split('..')[0]
if old == 'None':
return None
return old
return None
@property
def newrev(self):
if '..' in self.revision:
new = self.revision.split('..')[1]
if new == 'None':
return None
return new
return self.revision
class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
@ -246,7 +268,7 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
# max_age will any change in it be removed.
for key in to_keep.copy():
source = sched.connections.getSource(key.connection_name)
change = source.getChangeByKey(key)
change = source.getChange(key)
change.getRelatedChanges(sched, to_keep)
to_prune = set(outdated_versions.keys()) - to_keep
for key in to_prune: