From 88f84bc5d59814533bd5896881516941ce54cc57 Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Tue, 24 Aug 2021 14:49:50 +0200 Subject: [PATCH] Reference change dependencies by key In order to cache changes in Zookeeper we need to make change objects JSON serializable. This means that we can no longer reference other change objects directly. Instead we will use a cache key consisting of the connection name and a connection specific cache key. Those cache keys can be resolved by getting the source instance using the connection name and then retrieving the change instance via the new `getChangeByKey()` interface. The pipeline manager provides a helper method for resolving a list of cache keys. Cache keys that where resolved once are also cached by the manager as long as the reference is needed by any change in the pipeline. The cache will be cleaned up at the end of a run of the queue processor. Until we can run multiple schedulers the change cache in the pipeline manager will avoid hitting Zookeeper every time we resolve a cache key. Later on when we have the pipeline state in Zookeeper we probably want to clear the change cache in the pipeline manager at the end of the queue processor. This way we make sure the change is recent enough when we start processing a pipeline. Change-Id: I09845d65864edc0e54af5f24d3c7be8fe2f7a919 --- zuul/driver/gerrit/gerritconnection.py | 27 ++++++++----- zuul/driver/gerrit/gerritsource.py | 3 ++ zuul/driver/github/githubconnection.py | 6 ++- zuul/driver/github/githubsource.py | 3 ++ zuul/driver/gitlab/gitlabconnection.py | 6 ++- zuul/driver/gitlab/gitlabsource.py | 3 ++ zuul/driver/pagure/pagureconnection.py | 6 ++- zuul/driver/pagure/paguresource.py | 3 ++ zuul/driver/zuul/__init__.py | 3 +- zuul/manager/__init__.py | 56 ++++++++++++++++++++------ zuul/manager/dependent.py | 10 ++--- zuul/manager/independent.py | 2 +- zuul/model.py | 13 +++++- zuul/reporter/__init__.py | 4 +- zuul/scheduler.py | 4 +- zuul/source/__init__.py | 8 ++++ 16 files changed, 123 insertions(+), 34 deletions(-) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index f7e509b34a..8502940c70 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -41,7 +41,7 @@ from zuul.driver.gerrit.gcloudauth import GCloudAuth from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent from zuul.driver.git.gitwatcher import GitWatcher from zuul.lib.logutil import get_annotated_logger -from zuul.model import Ref, Tag, Branch, Project +from zuul.model import Ref, Tag, Branch, Project, CacheStat from zuul.zk.event_queues import ConnectionEventQueue, EventReceiverElection # HTTP timeout in seconds @@ -798,6 +798,8 @@ class GerritConnection(BaseConnection): change = GerritChange(None) change.number = number change.patchset = patchset + change.cache_stat = CacheStat((change.number, change.patchset), + None, None) self._change_cache.setdefault(change.number, {}) self._change_cache[change.number][change.patchset] = change try: @@ -902,8 +904,8 @@ class GerritConnection(BaseConnection): # already merged. So even if it is "ABANDONED", we should not # ignore it. if (not dep.is_merged) and dep not in needs_changes: - git_needs_changes.append(dep) - needs_changes.add(dep) + git_needs_changes.append(dep.cache_key) + needs_changes.add(dep.cache_key) change.git_needs_changes = git_needs_changes compat_needs_changes = [] @@ -914,8 +916,8 @@ class GerritConnection(BaseConnection): dep = self._getChange(dep_num, dep_ps, history=history, event=event) if dep.open and dep not in needs_changes: - compat_needs_changes.append(dep) - needs_changes.add(dep) + compat_needs_changes.append(dep.cache_key) + needs_changes.add(dep.cache_key) change.compat_needs_changes = compat_needs_changes needed_by_changes = set() @@ -928,8 +930,8 @@ class GerritConnection(BaseConnection): event=event) if (dep.open and dep.is_current_patchset and dep not in needed_by_changes): - git_needed_by_changes.append(dep) - needed_by_changes.add(dep) + git_needed_by_changes.append(dep.cache_key) + needed_by_changes.add(dep.cache_key) except Exception: log.exception("Failed to get git-needed change %s,%s", dep_num, dep_ps) @@ -952,13 +954,20 @@ class GerritConnection(BaseConnection): event=event) if (dep.open and dep.is_current_patchset and dep not in needed_by_changes): - compat_needed_by_changes.append(dep) - needed_by_changes.add(dep) + compat_needed_by_changes.append(dep.cache_key) + needed_by_changes.add(dep.cache_key) except Exception: log.exception("Failed to get commit-needed change %s,%s", dep_num, dep_ps) change.compat_needed_by_changes = compat_needed_by_changes + def getChangeByKey(self, key): + try: + number, patchset = key + return self._change_cache[number][patchset] + except (KeyError, ValueError): + return None + def isMerged(self, change, head=None): self.log.debug("Checking if change %s is merged" % change) if not change.number: diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py index 50f71d9ed5..7c18ef8710 100644 --- a/zuul/driver/gerrit/gerritsource.py +++ b/zuul/driver/gerrit/gerritsource.py @@ -81,6 +81,9 @@ class GerritSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): changes = [] if not change.uris: diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 598bfc0345..7a8bac6724 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -44,7 +44,7 @@ from zuul.connection import CachedBranchConnection from zuul.driver.github.graphql import GraphQLClient from zuul.web.handler import BaseWebController from zuul.lib.logutil import get_annotated_logger -from zuul.model import Ref, Branch, Tag, Project +from zuul.model import Ref, Branch, Tag, Project, CacheStat from zuul.exceptions import MergeFailure from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent from zuul.model import DequeueEvent @@ -1307,6 +1307,7 @@ class GithubConnection(CachedBranchConnection): if lock.acquire(blocking=False): try: self._updateChange(change, event) + change.cache_stat = CacheStat(key, None, None) self._change_cache[key] = change if self.sched: @@ -1331,6 +1332,9 @@ class GithubConnection(CachedBranchConnection): raise return change + def getChangeByKey(self, key): + return self._change_cache.get(key) + def getChangesDependingOn(self, change, projects, tenant): changes = [] if not change.uris: diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py index 3028bb52aa..1b23401ff1 100644 --- a/zuul/driver/github/githubsource.py +++ b/zuul/driver/github/githubsource.py @@ -95,6 +95,9 @@ class GithubSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): return self.connection.getChangesDependingOn(change, projects, tenant) diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 35f1f282ae..b6e6f6e50d 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -32,7 +32,7 @@ from zuul.connection import CachedBranchConnection from zuul.web.handler import BaseWebController from zuul.lib.logutil import get_annotated_logger from zuul.exceptions import MergeFailure -from zuul.model import Branch, Project, Ref, Tag +from zuul.model import Branch, CacheStat, Project, Ref, Tag from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest from zuul.zk.event_queues import ConnectionEventQueue @@ -535,6 +535,7 @@ class GitlabConnection(CachedBranchConnection): change.patchset = patch_number change.url = url or self.getMRUrl(project.name, number) change.uris = [change.url.split('://', 1)[-1]] # remove scheme + change.cache_stat = CacheStat(key, None, None) self._change_cache[key] = change try: log.debug("Getting change mr#%s from project %s" % ( @@ -614,6 +615,9 @@ class GitlabConnection(CachedBranchConnection): "Set approval: %s on MR %s#%s (%s)", approve, project_name, number, patchset) + def getChangeByKey(self, key): + return self._change_cache.get(key) + def getChangesDependingOn(self, change, projects, tenant): """ Reverse lookup of MR depending on this one """ diff --git a/zuul/driver/gitlab/gitlabsource.py b/zuul/driver/gitlab/gitlabsource.py index 9296c641c9..fbd582367f 100644 --- a/zuul/driver/gitlab/gitlabsource.py +++ b/zuul/driver/gitlab/gitlabsource.py @@ -82,6 +82,9 @@ class GitlabSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): return self.connection.getChangesDependingOn( change, projects, tenant) diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py index e65a329b89..4c7e762073 100644 --- a/zuul/driver/pagure/pagureconnection.py +++ b/zuul/driver/pagure/pagureconnection.py @@ -26,7 +26,7 @@ import voluptuous as v from zuul.connection import BaseConnection from zuul.lib.logutil import get_annotated_logger from zuul.web.handler import BaseWebController -from zuul.model import Ref, Branch, Tag +from zuul.model import Ref, Branch, Tag, CacheStat from zuul.lib import dependson from zuul.zk.event_queues import ConnectionEventQueue @@ -626,6 +626,7 @@ class PagureConnection(BaseConnection): change.uris = [ '%s/%s/pull/%s' % (self.baseurl, project, number), ] + change.cache_stat = CacheStat(key, None, None) self._change_cache[key] = change try: self.log.debug("Getting change pr#%s from project %s" % ( @@ -762,6 +763,9 @@ class PagureConnection(BaseConnection): number, project, flag.get('status'))) return flag.get('status') + def getChangeByKey(self, key): + return self._change_cache.get(key) + def getChangesDependingOn(self, change, projects, tenant): """ Reverse lookup of PR depending on this one """ diff --git a/zuul/driver/pagure/paguresource.py b/zuul/driver/pagure/paguresource.py index 78a62b911f..4202df070d 100644 --- a/zuul/driver/pagure/paguresource.py +++ b/zuul/driver/pagure/paguresource.py @@ -88,6 +88,9 @@ class PagureSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): return self.connection.getChangesDependingOn( change, projects, tenant) diff --git a/zuul/driver/zuul/__init__.py b/zuul/driver/zuul/__init__.py index afbee1bd0c..390b65b79c 100644 --- a/zuul/driver/zuul/__init__.py +++ b/zuul/driver/zuul/__init__.py @@ -111,7 +111,8 @@ class ZuulDriver(Driver, TriggerInterface): # This is very inefficient, especially on systems with large # numbers of github installations. This can be improved later # with persistent storage of dependency information. - needed_by_changes = set(change.needed_by_changes) + needed_by_changes = set( + pipeline.manager.resolveChangeKeys(change.needed_by_changes)) for source in self.sched.connections.getSources(): log.debug(" Checking source: %s", source) needed_by_changes.update( diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 70174079a6..83aedbe9d2 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. import collections +import contextlib import logging import textwrap import time @@ -61,6 +62,9 @@ class PipelineManager(metaclass=ABCMeta): # Cached dynamic layouts (layout uuid -> layout) self._layout_cache = {} self.sql = self.sched.sql + # A small local cache to avoid hitting the ZK-based connection + # change cache for multiple hits in the same pipeline run. + self._change_cache = {} def __str__(self): return "<%s %s>" % (self.__class__.__name__, self.pipeline.name) @@ -150,6 +154,44 @@ class PipelineManager(metaclass=ABCMeta): i.live] return items.index(item) + def resolveChangeKeys(self, change_keys): + resolved_changes = [] + for key in change_keys: + change = self._change_cache.get(key) + if change is None: + connection_name, connection_key = key + source = self.sched.connections.getSource(connection_name) + change = source.getChangeByKey(connection_key) + self._change_cache[change.cache_key] = change + resolved_changes.append(change) + return resolved_changes + + def _maintainCache(self): + active_layout_uuids = set() + referenced_change_keys = set() + for item in self.pipeline.getAllItems(): + if item.layout_uuid: + active_layout_uuids.add(item.layout_uuid) + + if isinstance(item.change, model.Change): + referenced_change_keys.update(item.change.needs_changes) + referenced_change_keys.update(item.change.needed_by_changes) + + # Clean up unused layouts in the cache + unused_layouts = set(self._layout_cache.keys()) - active_layout_uuids + if unused_layouts: + self.log.debug("Removing unused layouts %s from cache", + unused_layouts) + for uid in unused_layouts: + with contextlib.suppress(KeyError): + del self._layout_cache[uid] + + # Clean up change cache + unused_keys = set(self._change_cache.keys()) - referenced_change_keys + for key in unused_keys: + with contextlib.suppress(KeyError): + del self._change_cache[key] + def isChangeAlreadyInPipeline(self, change): # Checks live items in the pipeline for item in self.pipeline.getAllItems(): @@ -623,7 +665,7 @@ class PipelineManager(metaclass=ABCMeta): change.project.connection_name) source.setChangeAttributes( change, - commit_needs_changes=[d for d in dependencies], + commit_needs_changes=[d.cache_key for d in dependencies], refresh_deps=False) def provisionNodes(self, item): @@ -1350,17 +1392,7 @@ class PipelineManager(metaclass=ABCMeta): self.log.debug("Queue %s status is now:\n %s" % (queue.name, status)) - # Cleanup unused layouts in the cache - active_layout_uuids = {i.layout_uuid - for i in self.pipeline.getAllItems() - if i.layout_uuid} - unused_layouts = set(self._layout_cache.keys()) - active_layout_uuids - if unused_layouts: - self.log.debug("Removing unused layouts %s from cache", - unused_layouts) - for uid in unused_layouts: - del self._layout_cache[uid] - + self._maintainCache() self.log.debug("Finished queue processor: %s (changed: %s)" % (self.pipeline.name, changed)) return changed diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index 7b6667fef4..750a272090 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -85,8 +85,8 @@ class DependentPipelineManager(SharedQueuePipelineManager): for project, _ in change_queue.project_branches: sources.add(project.source) - seen = set(change.needed_by_changes) - needed_by_changes = change.needed_by_changes[:] + needed_by_changes = self.resolveChangeKeys(change.needed_by_changes) + seen = set(needed_by_changes) for source in sources: log.debug(" Checking source: %s", source) projects = [project_branch[0] @@ -157,7 +157,7 @@ class DependentPipelineManager(SharedQueuePipelineManager): # a git level dependency, we need to enqueue it before the current # change. if (needed_change not in history or - needed_change in change.git_needs_changes): + needed_change.cache_key in change.git_needs_changes): r = self.addChange(needed_change, event, quiet=quiet, ignore_requirements=ignore_requirements, change_queue=change_queue, history=history, @@ -185,7 +185,7 @@ class DependentPipelineManager(SharedQueuePipelineManager): changes_needed = [] # Ignore supplied change_queue with self.getChangeQueue(change, event) as change_queue: - for needed_change in change.needs_changes: + for needed_change in self.resolveChangeKeys(change.needs_changes): log.debug(" Change %s needs change %s:" % ( change, needed_change)) if needed_change.is_merged: @@ -234,7 +234,7 @@ class DependentPipelineManager(SharedQueuePipelineManager): if not item.change.needs_changes: return None failing_items = set() - for needed_change in item.change.needs_changes: + for needed_change in self.resolveChangeKeys(item.change.needs_changes): needed_item = self.getItemForChange(needed_change) if not needed_item: continue diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index 7686d5a7d4..61aede3c68 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -86,7 +86,7 @@ class IndependentPipelineManager(PipelineManager): log.debug(" No changes needed") return True changes_needed = [] - for needed_change in change.needs_changes: + for needed_change in self.resolveChangeKeys(change.needs_changes): log.debug(" Change %s needs change %s:" % ( change, needed_change)) if needed_change.is_merged: diff --git a/zuul/model.py b/zuul/model.py index 73ad5c0624..8fd57fbdec 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -14,7 +14,7 @@ # under the License. import abc -from collections import OrderedDict, defaultdict, UserDict +from collections import OrderedDict, defaultdict, namedtuple, UserDict import copy import json import logging @@ -3538,6 +3538,10 @@ class Bundle: return any(i.change.updatesConfig(tenant) for i in self.items) +# Cache info of a ref +CacheStat = namedtuple("CacheStat", ["key", "uuid", "version"]) + + class Ref(object): """An existing state of a Project.""" @@ -3547,6 +3551,13 @@ class Ref(object): self.oldrev = None self.newrev = None self.files = [] + # Cache info about this ref: + # CacheStat(cache key, uuid, version) + self.cache_stat = None + + @property + def cache_key(self): + return (self.project.connection_name, self.cache_stat.key) def _id(self): return self.newrev diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py index 014dc93432..07a5960f7e 100644 --- a/zuul/reporter/__init__.py +++ b/zuul/reporter/__init__.py @@ -223,8 +223,10 @@ class BaseReporter(object, metaclass=abc.ABCMeta): return msg def _formatItemReportOtherBundleItems(self, item): + related_changes = item.pipeline.manager.resolveChangeKeys( + item.change.needs_changes) return "Related changes:\n{}".format("\n".join( - c.url for c in item.change.needs_changes if c is not item.change)) + c.url for c in related_changes if c is not item.change)) def _getItemReportJobsFields(self, item): # Extract the report elements from an item diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 0afc3555aa..54540be552 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2190,7 +2190,9 @@ class Scheduler(threading.Thread): for other_change in source.getCachedChanges(): if other_change.commit_needs_changes is None: continue - for dep in other_change.commit_needs_changes: + for connection_name, key in other_change.commit_needs_changes: + dep_source = self.connections.getSource(connection_name) + dep = dep_source.getChangeByKey(key) if change.isUpdateOf(dep): source.setChangeAttributes( other_change, refresh_deps=True) diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py index d3a11be6ca..b9f73373fb 100644 --- a/zuul/source/__init__.py +++ b/zuul/source/__init__.py @@ -76,6 +76,14 @@ class BaseSource(object, metaclass=abc.ABCMeta): """ + def getChangeByKey(self, key): + """Get the change corresponding to the supplied cache key. + + The key may not correspond to this source. Return None if it + doesn't. + """ + raise NotImplementedError + @abc.abstractmethod def getChangesDependingOn(self, change, projects, tenant): """Return changes which depend on changes at the supplied URIs.