Reference change dependencies by key

In order to cache changes in Zookeeper we need to make change objects
JSON serializable. This means that we can no longer reference other
change objects directly. Instead we will use a cache key consisting of
the connection name and a connection specific cache key.

Those cache keys can be resolved by getting the source instance using
the connection name and then retrieving the change instance via the new
`getChangeByKey()` interface.

The pipeline manager provides a helper method for resolving a list of
cache keys. Cache keys that where resolved once are also cached by the
manager as long as the reference is needed by any change in the
pipeline. The cache will be cleaned up at the end of a run of the queue
processor.

Until we can run multiple schedulers the change cache in the pipeline
manager will avoid hitting Zookeeper every time we resolve a cache key.

Later on when we have the pipeline state in Zookeeper we probably want
to clear the change cache in the pipeline manager at the end of the
queue processor. This way we make sure the change is recent enough when
we start processing a pipeline.

Change-Id: I09845d65864edc0e54af5f24d3c7be8fe2f7a919
This commit is contained in:
Simon Westphahl 2021-08-24 14:49:50 +02:00
parent 22c379bf80
commit 88f84bc5d5
16 changed files with 123 additions and 34 deletions

View File

@ -41,7 +41,7 @@ from zuul.driver.gerrit.gcloudauth import GCloudAuth
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.driver.git.gitwatcher import GitWatcher
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Tag, Branch, Project
from zuul.model import Ref, Tag, Branch, Project, CacheStat
from zuul.zk.event_queues import ConnectionEventQueue, EventReceiverElection
# HTTP timeout in seconds
@ -798,6 +798,8 @@ class GerritConnection(BaseConnection):
change = GerritChange(None)
change.number = number
change.patchset = patchset
change.cache_stat = CacheStat((change.number, change.patchset),
None, None)
self._change_cache.setdefault(change.number, {})
self._change_cache[change.number][change.patchset] = change
try:
@ -902,8 +904,8 @@ class GerritConnection(BaseConnection):
# already merged. So even if it is "ABANDONED", we should not
# ignore it.
if (not dep.is_merged) and dep not in needs_changes:
git_needs_changes.append(dep)
needs_changes.add(dep)
git_needs_changes.append(dep.cache_key)
needs_changes.add(dep.cache_key)
change.git_needs_changes = git_needs_changes
compat_needs_changes = []
@ -914,8 +916,8 @@ class GerritConnection(BaseConnection):
dep = self._getChange(dep_num, dep_ps, history=history,
event=event)
if dep.open and dep not in needs_changes:
compat_needs_changes.append(dep)
needs_changes.add(dep)
compat_needs_changes.append(dep.cache_key)
needs_changes.add(dep.cache_key)
change.compat_needs_changes = compat_needs_changes
needed_by_changes = set()
@ -928,8 +930,8 @@ class GerritConnection(BaseConnection):
event=event)
if (dep.open and dep.is_current_patchset and
dep not in needed_by_changes):
git_needed_by_changes.append(dep)
needed_by_changes.add(dep)
git_needed_by_changes.append(dep.cache_key)
needed_by_changes.add(dep.cache_key)
except Exception:
log.exception("Failed to get git-needed change %s,%s",
dep_num, dep_ps)
@ -952,13 +954,20 @@ class GerritConnection(BaseConnection):
event=event)
if (dep.open and dep.is_current_patchset
and dep not in needed_by_changes):
compat_needed_by_changes.append(dep)
needed_by_changes.add(dep)
compat_needed_by_changes.append(dep.cache_key)
needed_by_changes.add(dep.cache_key)
except Exception:
log.exception("Failed to get commit-needed change %s,%s",
dep_num, dep_ps)
change.compat_needed_by_changes = compat_needed_by_changes
def getChangeByKey(self, key):
try:
number, patchset = key
return self._change_cache[number][patchset]
except (KeyError, ValueError):
return None
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:

View File

@ -81,6 +81,9 @@ class GerritSource(BaseSource):
event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
changes = []
if not change.uris:

View File

@ -44,7 +44,7 @@ from zuul.connection import CachedBranchConnection
from zuul.driver.github.graphql import GraphQLClient
from zuul.web.handler import BaseWebController
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Branch, Tag, Project
from zuul.model import Ref, Branch, Tag, Project, CacheStat
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
from zuul.model import DequeueEvent
@ -1307,6 +1307,7 @@ class GithubConnection(CachedBranchConnection):
if lock.acquire(blocking=False):
try:
self._updateChange(change, event)
change.cache_stat = CacheStat(key, None, None)
self._change_cache[key] = change
if self.sched:
@ -1331,6 +1332,9 @@ class GithubConnection(CachedBranchConnection):
raise
return change
def getChangeByKey(self, key):
return self._change_cache.get(key)
def getChangesDependingOn(self, change, projects, tenant):
changes = []
if not change.uris:

View File

@ -95,6 +95,9 @@ class GithubSource(BaseSource):
event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(change, projects, tenant)

View File

@ -32,7 +32,7 @@ from zuul.connection import CachedBranchConnection
from zuul.web.handler import BaseWebController
from zuul.lib.logutil import get_annotated_logger
from zuul.exceptions import MergeFailure
from zuul.model import Branch, Project, Ref, Tag
from zuul.model import Branch, CacheStat, Project, Ref, Tag
from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest
from zuul.zk.event_queues import ConnectionEventQueue
@ -535,6 +535,7 @@ class GitlabConnection(CachedBranchConnection):
change.patchset = patch_number
change.url = url or self.getMRUrl(project.name, number)
change.uris = [change.url.split('://', 1)[-1]] # remove scheme
change.cache_stat = CacheStat(key, None, None)
self._change_cache[key] = change
try:
log.debug("Getting change mr#%s from project %s" % (
@ -614,6 +615,9 @@ class GitlabConnection(CachedBranchConnection):
"Set approval: %s on MR %s#%s (%s)", approve,
project_name, number, patchset)
def getChangeByKey(self, key):
return self._change_cache.get(key)
def getChangesDependingOn(self, change, projects, tenant):
""" Reverse lookup of MR depending on this one
"""

View File

@ -82,6 +82,9 @@ class GitlabSource(BaseSource):
event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(
change, projects, tenant)

View File

@ -26,7 +26,7 @@ import voluptuous as v
from zuul.connection import BaseConnection
from zuul.lib.logutil import get_annotated_logger
from zuul.web.handler import BaseWebController
from zuul.model import Ref, Branch, Tag
from zuul.model import Ref, Branch, Tag, CacheStat
from zuul.lib import dependson
from zuul.zk.event_queues import ConnectionEventQueue
@ -626,6 +626,7 @@ class PagureConnection(BaseConnection):
change.uris = [
'%s/%s/pull/%s' % (self.baseurl, project, number),
]
change.cache_stat = CacheStat(key, None, None)
self._change_cache[key] = change
try:
self.log.debug("Getting change pr#%s from project %s" % (
@ -762,6 +763,9 @@ class PagureConnection(BaseConnection):
number, project, flag.get('status')))
return flag.get('status')
def getChangeByKey(self, key):
return self._change_cache.get(key)
def getChangesDependingOn(self, change, projects, tenant):
""" Reverse lookup of PR depending on this one
"""

View File

@ -88,6 +88,9 @@ class PagureSource(BaseSource):
event=event)
return change
def getChangeByKey(self, key):
return self.connection.getChangeByKey(key)
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(
change, projects, tenant)

View File

@ -111,7 +111,8 @@ class ZuulDriver(Driver, TriggerInterface):
# This is very inefficient, especially on systems with large
# numbers of github installations. This can be improved later
# with persistent storage of dependency information.
needed_by_changes = set(change.needed_by_changes)
needed_by_changes = set(
pipeline.manager.resolveChangeKeys(change.needed_by_changes))
for source in self.sched.connections.getSources():
log.debug(" Checking source: %s", source)
needed_by_changes.update(

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import logging
import textwrap
import time
@ -61,6 +62,9 @@ class PipelineManager(metaclass=ABCMeta):
# Cached dynamic layouts (layout uuid -> layout)
self._layout_cache = {}
self.sql = self.sched.sql
# A small local cache to avoid hitting the ZK-based connection
# change cache for multiple hits in the same pipeline run.
self._change_cache = {}
def __str__(self):
return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
@ -150,6 +154,44 @@ class PipelineManager(metaclass=ABCMeta):
i.live]
return items.index(item)
def resolveChangeKeys(self, change_keys):
resolved_changes = []
for key in change_keys:
change = self._change_cache.get(key)
if change is None:
connection_name, connection_key = key
source = self.sched.connections.getSource(connection_name)
change = source.getChangeByKey(connection_key)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
def _maintainCache(self):
active_layout_uuids = set()
referenced_change_keys = set()
for item in self.pipeline.getAllItems():
if item.layout_uuid:
active_layout_uuids.add(item.layout_uuid)
if isinstance(item.change, model.Change):
referenced_change_keys.update(item.change.needs_changes)
referenced_change_keys.update(item.change.needed_by_changes)
# Clean up unused layouts in the cache
unused_layouts = set(self._layout_cache.keys()) - active_layout_uuids
if unused_layouts:
self.log.debug("Removing unused layouts %s from cache",
unused_layouts)
for uid in unused_layouts:
with contextlib.suppress(KeyError):
del self._layout_cache[uid]
# Clean up change cache
unused_keys = set(self._change_cache.keys()) - referenced_change_keys
for key in unused_keys:
with contextlib.suppress(KeyError):
del self._change_cache[key]
def isChangeAlreadyInPipeline(self, change):
# Checks live items in the pipeline
for item in self.pipeline.getAllItems():
@ -623,7 +665,7 @@ class PipelineManager(metaclass=ABCMeta):
change.project.connection_name)
source.setChangeAttributes(
change,
commit_needs_changes=[d for d in dependencies],
commit_needs_changes=[d.cache_key for d in dependencies],
refresh_deps=False)
def provisionNodes(self, item):
@ -1350,17 +1392,7 @@ class PipelineManager(metaclass=ABCMeta):
self.log.debug("Queue %s status is now:\n %s" %
(queue.name, status))
# Cleanup unused layouts in the cache
active_layout_uuids = {i.layout_uuid
for i in self.pipeline.getAllItems()
if i.layout_uuid}
unused_layouts = set(self._layout_cache.keys()) - active_layout_uuids
if unused_layouts:
self.log.debug("Removing unused layouts %s from cache",
unused_layouts)
for uid in unused_layouts:
del self._layout_cache[uid]
self._maintainCache()
self.log.debug("Finished queue processor: %s (changed: %s)" %
(self.pipeline.name, changed))
return changed

View File

@ -85,8 +85,8 @@ class DependentPipelineManager(SharedQueuePipelineManager):
for project, _ in change_queue.project_branches:
sources.add(project.source)
seen = set(change.needed_by_changes)
needed_by_changes = change.needed_by_changes[:]
needed_by_changes = self.resolveChangeKeys(change.needed_by_changes)
seen = set(needed_by_changes)
for source in sources:
log.debug(" Checking source: %s", source)
projects = [project_branch[0]
@ -157,7 +157,7 @@ class DependentPipelineManager(SharedQueuePipelineManager):
# a git level dependency, we need to enqueue it before the current
# change.
if (needed_change not in history or
needed_change in change.git_needs_changes):
needed_change.cache_key in change.git_needs_changes):
r = self.addChange(needed_change, event, quiet=quiet,
ignore_requirements=ignore_requirements,
change_queue=change_queue, history=history,
@ -185,7 +185,7 @@ class DependentPipelineManager(SharedQueuePipelineManager):
changes_needed = []
# Ignore supplied change_queue
with self.getChangeQueue(change, event) as change_queue:
for needed_change in change.needs_changes:
for needed_change in self.resolveChangeKeys(change.needs_changes):
log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
@ -234,7 +234,7 @@ class DependentPipelineManager(SharedQueuePipelineManager):
if not item.change.needs_changes:
return None
failing_items = set()
for needed_change in item.change.needs_changes:
for needed_change in self.resolveChangeKeys(item.change.needs_changes):
needed_item = self.getItemForChange(needed_change)
if not needed_item:
continue

View File

@ -86,7 +86,7 @@ class IndependentPipelineManager(PipelineManager):
log.debug(" No changes needed")
return True
changes_needed = []
for needed_change in change.needs_changes:
for needed_change in self.resolveChangeKeys(change.needs_changes):
log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:

View File

@ -14,7 +14,7 @@
# under the License.
import abc
from collections import OrderedDict, defaultdict, UserDict
from collections import OrderedDict, defaultdict, namedtuple, UserDict
import copy
import json
import logging
@ -3538,6 +3538,10 @@ class Bundle:
return any(i.change.updatesConfig(tenant) for i in self.items)
# Cache info of a ref
CacheStat = namedtuple("CacheStat", ["key", "uuid", "version"])
class Ref(object):
"""An existing state of a Project."""
@ -3547,6 +3551,13 @@ class Ref(object):
self.oldrev = None
self.newrev = None
self.files = []
# Cache info about this ref:
# CacheStat(cache key, uuid, version)
self.cache_stat = None
@property
def cache_key(self):
return (self.project.connection_name, self.cache_stat.key)
def _id(self):
return self.newrev

View File

@ -223,8 +223,10 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
return msg
def _formatItemReportOtherBundleItems(self, item):
related_changes = item.pipeline.manager.resolveChangeKeys(
item.change.needs_changes)
return "Related changes:\n{}".format("\n".join(
c.url for c in item.change.needs_changes if c is not item.change))
c.url for c in related_changes if c is not item.change))
def _getItemReportJobsFields(self, item):
# Extract the report elements from an item

View File

@ -2190,7 +2190,9 @@ class Scheduler(threading.Thread):
for other_change in source.getCachedChanges():
if other_change.commit_needs_changes is None:
continue
for dep in other_change.commit_needs_changes:
for connection_name, key in other_change.commit_needs_changes:
dep_source = self.connections.getSource(connection_name)
dep = dep_source.getChangeByKey(key)
if change.isUpdateOf(dep):
source.setChangeAttributes(
other_change, refresh_deps=True)

View File

@ -76,6 +76,14 @@ class BaseSource(object, metaclass=abc.ABCMeta):
"""
def getChangeByKey(self, key):
"""Get the change corresponding to the supplied cache key.
The key may not correspond to this source. Return None if it
doesn't.
"""
raise NotImplementedError
@abc.abstractmethod
def getChangesDependingOn(self, change, projects, tenant):
"""Return changes which depend on changes at the supplied URIs.