diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index f7e509b34a..8502940c70 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -41,7 +41,7 @@ from zuul.driver.gerrit.gcloudauth import GCloudAuth from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent from zuul.driver.git.gitwatcher import GitWatcher from zuul.lib.logutil import get_annotated_logger -from zuul.model import Ref, Tag, Branch, Project +from zuul.model import Ref, Tag, Branch, Project, CacheStat from zuul.zk.event_queues import ConnectionEventQueue, EventReceiverElection # HTTP timeout in seconds @@ -798,6 +798,8 @@ class GerritConnection(BaseConnection): change = GerritChange(None) change.number = number change.patchset = patchset + change.cache_stat = CacheStat((change.number, change.patchset), + None, None) self._change_cache.setdefault(change.number, {}) self._change_cache[change.number][change.patchset] = change try: @@ -902,8 +904,8 @@ class GerritConnection(BaseConnection): # already merged. So even if it is "ABANDONED", we should not # ignore it. if (not dep.is_merged) and dep not in needs_changes: - git_needs_changes.append(dep) - needs_changes.add(dep) + git_needs_changes.append(dep.cache_key) + needs_changes.add(dep.cache_key) change.git_needs_changes = git_needs_changes compat_needs_changes = [] @@ -914,8 +916,8 @@ class GerritConnection(BaseConnection): dep = self._getChange(dep_num, dep_ps, history=history, event=event) if dep.open and dep not in needs_changes: - compat_needs_changes.append(dep) - needs_changes.add(dep) + compat_needs_changes.append(dep.cache_key) + needs_changes.add(dep.cache_key) change.compat_needs_changes = compat_needs_changes needed_by_changes = set() @@ -928,8 +930,8 @@ class GerritConnection(BaseConnection): event=event) if (dep.open and dep.is_current_patchset and dep not in needed_by_changes): - git_needed_by_changes.append(dep) - needed_by_changes.add(dep) + git_needed_by_changes.append(dep.cache_key) + needed_by_changes.add(dep.cache_key) except Exception: log.exception("Failed to get git-needed change %s,%s", dep_num, dep_ps) @@ -952,13 +954,20 @@ class GerritConnection(BaseConnection): event=event) if (dep.open and dep.is_current_patchset and dep not in needed_by_changes): - compat_needed_by_changes.append(dep) - needed_by_changes.add(dep) + compat_needed_by_changes.append(dep.cache_key) + needed_by_changes.add(dep.cache_key) except Exception: log.exception("Failed to get commit-needed change %s,%s", dep_num, dep_ps) change.compat_needed_by_changes = compat_needed_by_changes + def getChangeByKey(self, key): + try: + number, patchset = key + return self._change_cache[number][patchset] + except (KeyError, ValueError): + return None + def isMerged(self, change, head=None): self.log.debug("Checking if change %s is merged" % change) if not change.number: diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py index 50f71d9ed5..7c18ef8710 100644 --- a/zuul/driver/gerrit/gerritsource.py +++ b/zuul/driver/gerrit/gerritsource.py @@ -81,6 +81,9 @@ class GerritSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): changes = [] if not change.uris: diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 598bfc0345..7a8bac6724 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -44,7 +44,7 @@ from zuul.connection import CachedBranchConnection from zuul.driver.github.graphql import GraphQLClient from zuul.web.handler import BaseWebController from zuul.lib.logutil import get_annotated_logger -from zuul.model import Ref, Branch, Tag, Project +from zuul.model import Ref, Branch, Tag, Project, CacheStat from zuul.exceptions import MergeFailure from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent from zuul.model import DequeueEvent @@ -1307,6 +1307,7 @@ class GithubConnection(CachedBranchConnection): if lock.acquire(blocking=False): try: self._updateChange(change, event) + change.cache_stat = CacheStat(key, None, None) self._change_cache[key] = change if self.sched: @@ -1331,6 +1332,9 @@ class GithubConnection(CachedBranchConnection): raise return change + def getChangeByKey(self, key): + return self._change_cache.get(key) + def getChangesDependingOn(self, change, projects, tenant): changes = [] if not change.uris: diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py index 3028bb52aa..1b23401ff1 100644 --- a/zuul/driver/github/githubsource.py +++ b/zuul/driver/github/githubsource.py @@ -95,6 +95,9 @@ class GithubSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): return self.connection.getChangesDependingOn(change, projects, tenant) diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 35f1f282ae..b6e6f6e50d 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -32,7 +32,7 @@ from zuul.connection import CachedBranchConnection from zuul.web.handler import BaseWebController from zuul.lib.logutil import get_annotated_logger from zuul.exceptions import MergeFailure -from zuul.model import Branch, Project, Ref, Tag +from zuul.model import Branch, CacheStat, Project, Ref, Tag from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest from zuul.zk.event_queues import ConnectionEventQueue @@ -535,6 +535,7 @@ class GitlabConnection(CachedBranchConnection): change.patchset = patch_number change.url = url or self.getMRUrl(project.name, number) change.uris = [change.url.split('://', 1)[-1]] # remove scheme + change.cache_stat = CacheStat(key, None, None) self._change_cache[key] = change try: log.debug("Getting change mr#%s from project %s" % ( @@ -614,6 +615,9 @@ class GitlabConnection(CachedBranchConnection): "Set approval: %s on MR %s#%s (%s)", approve, project_name, number, patchset) + def getChangeByKey(self, key): + return self._change_cache.get(key) + def getChangesDependingOn(self, change, projects, tenant): """ Reverse lookup of MR depending on this one """ diff --git a/zuul/driver/gitlab/gitlabsource.py b/zuul/driver/gitlab/gitlabsource.py index 9296c641c9..fbd582367f 100644 --- a/zuul/driver/gitlab/gitlabsource.py +++ b/zuul/driver/gitlab/gitlabsource.py @@ -82,6 +82,9 @@ class GitlabSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): return self.connection.getChangesDependingOn( change, projects, tenant) diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py index e65a329b89..4c7e762073 100644 --- a/zuul/driver/pagure/pagureconnection.py +++ b/zuul/driver/pagure/pagureconnection.py @@ -26,7 +26,7 @@ import voluptuous as v from zuul.connection import BaseConnection from zuul.lib.logutil import get_annotated_logger from zuul.web.handler import BaseWebController -from zuul.model import Ref, Branch, Tag +from zuul.model import Ref, Branch, Tag, CacheStat from zuul.lib import dependson from zuul.zk.event_queues import ConnectionEventQueue @@ -626,6 +626,7 @@ class PagureConnection(BaseConnection): change.uris = [ '%s/%s/pull/%s' % (self.baseurl, project, number), ] + change.cache_stat = CacheStat(key, None, None) self._change_cache[key] = change try: self.log.debug("Getting change pr#%s from project %s" % ( @@ -762,6 +763,9 @@ class PagureConnection(BaseConnection): number, project, flag.get('status'))) return flag.get('status') + def getChangeByKey(self, key): + return self._change_cache.get(key) + def getChangesDependingOn(self, change, projects, tenant): """ Reverse lookup of PR depending on this one """ diff --git a/zuul/driver/pagure/paguresource.py b/zuul/driver/pagure/paguresource.py index 78a62b911f..4202df070d 100644 --- a/zuul/driver/pagure/paguresource.py +++ b/zuul/driver/pagure/paguresource.py @@ -88,6 +88,9 @@ class PagureSource(BaseSource): event=event) return change + def getChangeByKey(self, key): + return self.connection.getChangeByKey(key) + def getChangesDependingOn(self, change, projects, tenant): return self.connection.getChangesDependingOn( change, projects, tenant) diff --git a/zuul/driver/zuul/__init__.py b/zuul/driver/zuul/__init__.py index afbee1bd0c..390b65b79c 100644 --- a/zuul/driver/zuul/__init__.py +++ b/zuul/driver/zuul/__init__.py @@ -111,7 +111,8 @@ class ZuulDriver(Driver, TriggerInterface): # This is very inefficient, especially on systems with large # numbers of github installations. This can be improved later # with persistent storage of dependency information. - needed_by_changes = set(change.needed_by_changes) + needed_by_changes = set( + pipeline.manager.resolveChangeKeys(change.needed_by_changes)) for source in self.sched.connections.getSources(): log.debug(" Checking source: %s", source) needed_by_changes.update( diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 70174079a6..83aedbe9d2 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. import collections +import contextlib import logging import textwrap import time @@ -61,6 +62,9 @@ class PipelineManager(metaclass=ABCMeta): # Cached dynamic layouts (layout uuid -> layout) self._layout_cache = {} self.sql = self.sched.sql + # A small local cache to avoid hitting the ZK-based connection + # change cache for multiple hits in the same pipeline run. + self._change_cache = {} def __str__(self): return "<%s %s>" % (self.__class__.__name__, self.pipeline.name) @@ -150,6 +154,44 @@ class PipelineManager(metaclass=ABCMeta): i.live] return items.index(item) + def resolveChangeKeys(self, change_keys): + resolved_changes = [] + for key in change_keys: + change = self._change_cache.get(key) + if change is None: + connection_name, connection_key = key + source = self.sched.connections.getSource(connection_name) + change = source.getChangeByKey(connection_key) + self._change_cache[change.cache_key] = change + resolved_changes.append(change) + return resolved_changes + + def _maintainCache(self): + active_layout_uuids = set() + referenced_change_keys = set() + for item in self.pipeline.getAllItems(): + if item.layout_uuid: + active_layout_uuids.add(item.layout_uuid) + + if isinstance(item.change, model.Change): + referenced_change_keys.update(item.change.needs_changes) + referenced_change_keys.update(item.change.needed_by_changes) + + # Clean up unused layouts in the cache + unused_layouts = set(self._layout_cache.keys()) - active_layout_uuids + if unused_layouts: + self.log.debug("Removing unused layouts %s from cache", + unused_layouts) + for uid in unused_layouts: + with contextlib.suppress(KeyError): + del self._layout_cache[uid] + + # Clean up change cache + unused_keys = set(self._change_cache.keys()) - referenced_change_keys + for key in unused_keys: + with contextlib.suppress(KeyError): + del self._change_cache[key] + def isChangeAlreadyInPipeline(self, change): # Checks live items in the pipeline for item in self.pipeline.getAllItems(): @@ -623,7 +665,7 @@ class PipelineManager(metaclass=ABCMeta): change.project.connection_name) source.setChangeAttributes( change, - commit_needs_changes=[d for d in dependencies], + commit_needs_changes=[d.cache_key for d in dependencies], refresh_deps=False) def provisionNodes(self, item): @@ -1350,17 +1392,7 @@ class PipelineManager(metaclass=ABCMeta): self.log.debug("Queue %s status is now:\n %s" % (queue.name, status)) - # Cleanup unused layouts in the cache - active_layout_uuids = {i.layout_uuid - for i in self.pipeline.getAllItems() - if i.layout_uuid} - unused_layouts = set(self._layout_cache.keys()) - active_layout_uuids - if unused_layouts: - self.log.debug("Removing unused layouts %s from cache", - unused_layouts) - for uid in unused_layouts: - del self._layout_cache[uid] - + self._maintainCache() self.log.debug("Finished queue processor: %s (changed: %s)" % (self.pipeline.name, changed)) return changed diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index 7b6667fef4..750a272090 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -85,8 +85,8 @@ class DependentPipelineManager(SharedQueuePipelineManager): for project, _ in change_queue.project_branches: sources.add(project.source) - seen = set(change.needed_by_changes) - needed_by_changes = change.needed_by_changes[:] + needed_by_changes = self.resolveChangeKeys(change.needed_by_changes) + seen = set(needed_by_changes) for source in sources: log.debug(" Checking source: %s", source) projects = [project_branch[0] @@ -157,7 +157,7 @@ class DependentPipelineManager(SharedQueuePipelineManager): # a git level dependency, we need to enqueue it before the current # change. if (needed_change not in history or - needed_change in change.git_needs_changes): + needed_change.cache_key in change.git_needs_changes): r = self.addChange(needed_change, event, quiet=quiet, ignore_requirements=ignore_requirements, change_queue=change_queue, history=history, @@ -185,7 +185,7 @@ class DependentPipelineManager(SharedQueuePipelineManager): changes_needed = [] # Ignore supplied change_queue with self.getChangeQueue(change, event) as change_queue: - for needed_change in change.needs_changes: + for needed_change in self.resolveChangeKeys(change.needs_changes): log.debug(" Change %s needs change %s:" % ( change, needed_change)) if needed_change.is_merged: @@ -234,7 +234,7 @@ class DependentPipelineManager(SharedQueuePipelineManager): if not item.change.needs_changes: return None failing_items = set() - for needed_change in item.change.needs_changes: + for needed_change in self.resolveChangeKeys(item.change.needs_changes): needed_item = self.getItemForChange(needed_change) if not needed_item: continue diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index 7686d5a7d4..61aede3c68 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -86,7 +86,7 @@ class IndependentPipelineManager(PipelineManager): log.debug(" No changes needed") return True changes_needed = [] - for needed_change in change.needs_changes: + for needed_change in self.resolveChangeKeys(change.needs_changes): log.debug(" Change %s needs change %s:" % ( change, needed_change)) if needed_change.is_merged: diff --git a/zuul/model.py b/zuul/model.py index 73ad5c0624..8fd57fbdec 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -14,7 +14,7 @@ # under the License. import abc -from collections import OrderedDict, defaultdict, UserDict +from collections import OrderedDict, defaultdict, namedtuple, UserDict import copy import json import logging @@ -3538,6 +3538,10 @@ class Bundle: return any(i.change.updatesConfig(tenant) for i in self.items) +# Cache info of a ref +CacheStat = namedtuple("CacheStat", ["key", "uuid", "version"]) + + class Ref(object): """An existing state of a Project.""" @@ -3547,6 +3551,13 @@ class Ref(object): self.oldrev = None self.newrev = None self.files = [] + # Cache info about this ref: + # CacheStat(cache key, uuid, version) + self.cache_stat = None + + @property + def cache_key(self): + return (self.project.connection_name, self.cache_stat.key) def _id(self): return self.newrev diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py index 014dc93432..07a5960f7e 100644 --- a/zuul/reporter/__init__.py +++ b/zuul/reporter/__init__.py @@ -223,8 +223,10 @@ class BaseReporter(object, metaclass=abc.ABCMeta): return msg def _formatItemReportOtherBundleItems(self, item): + related_changes = item.pipeline.manager.resolveChangeKeys( + item.change.needs_changes) return "Related changes:\n{}".format("\n".join( - c.url for c in item.change.needs_changes if c is not item.change)) + c.url for c in related_changes if c is not item.change)) def _getItemReportJobsFields(self, item): # Extract the report elements from an item diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 0afc3555aa..54540be552 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2190,7 +2190,9 @@ class Scheduler(threading.Thread): for other_change in source.getCachedChanges(): if other_change.commit_needs_changes is None: continue - for dep in other_change.commit_needs_changes: + for connection_name, key in other_change.commit_needs_changes: + dep_source = self.connections.getSource(connection_name) + dep = dep_source.getChangeByKey(key) if change.isUpdateOf(dep): source.setChangeAttributes( other_change, refresh_deps=True) diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py index d3a11be6ca..b9f73373fb 100644 --- a/zuul/source/__init__.py +++ b/zuul/source/__init__.py @@ -76,6 +76,14 @@ class BaseSource(object, metaclass=abc.ABCMeta): """ + def getChangeByKey(self, key): + """Get the change corresponding to the supplied cache key. + + The key may not correspond to this source. Return None if it + doesn't. + """ + raise NotImplementedError + @abc.abstractmethod def getChangesDependingOn(self, change, projects, tenant): """Return changes which depend on changes at the supplied URIs.