Merge "Stop using submitted-together for submitWholeTopic"

This commit is contained in:
Zuul
2024-06-13 23:28:39 +00:00
committed by Gerrit Code Review
11 changed files with 223 additions and 193 deletions

View File

@@ -28,7 +28,7 @@ from zuul.lib.tarjan import strongly_connected_components
import zuul.lib.tracing as tracing
from zuul.model import (
Change, PipelineState, PipelineChangeList,
filter_severity, EnqueueEvent
filter_severity, EnqueueEvent, QueryCache
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.exceptions import LockException
@@ -235,6 +235,7 @@ class PipelineManager(metaclass=ABCMeta):
def resolveChangeKeys(self, change_keys):
resolved_changes = []
query_cache = QueryCache()
for key in change_keys:
change = self._change_cache.get(key.reference)
if change is None:
@@ -251,7 +252,8 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
self.updateCommitDependencies(change, event=None)
self.updateCommitDependencies(
query_cache, change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -308,8 +310,9 @@ class PipelineManager(metaclass=ABCMeta):
for dep_change_ref in change.getNeedsChanges(
self.useDependenciesByTopic(change.project)):
dep_change_key = ChangeKey.fromReference(dep_change_ref)
if change.cache_stat.key.isSameChange(dep_change_key):
return True
for change_key in self.pipeline.change_list.getChangeKeys():
if change_key.isSameChange(dep_change_key):
return True
return False
def isChangeAlreadyInQueue(self, change, change_queue, item=None):
@@ -329,6 +332,7 @@ class PipelineManager(metaclass=ABCMeta):
if not isinstance(change, model.Change):
return
query_cache = QueryCache()
to_refresh = set()
for item in self.pipeline.getAllItems():
for item_change in item.changes:
@@ -342,7 +346,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item_change)
for existing_change in to_refresh:
self.updateCommitDependencies(existing_change, event)
self.updateCommitDependencies(query_cache, existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -845,7 +849,7 @@ class PipelineManager(metaclass=ABCMeta):
) - set(cycle)
def getDependencyGraph(self, change, dependency_graph, event,
update_deps=False,
update_deps=False, query_cache=None,
history=None, quiet=False, indent=''):
log = get_annotated_logger(self.log, event)
if not quiet:
@@ -854,14 +858,13 @@ class PipelineManager(metaclass=ABCMeta):
return
if not isinstance(change, model.Change):
return
if not change.getNeedsChanges(
self.useDependenciesByTopic(change.project)):
return
if history is None:
history = set()
if query_cache is None:
query_cache = QueryCache()
history.add(change)
if update_deps:
self.updateCommitDependencies(change, event)
self.updateCommitDependencies(query_cache, change, event)
for needed_change in self.resolveChangeReferences(
change.getNeedsChanges(
self.useDependenciesByTopic(change.project))):
@@ -888,9 +891,10 @@ class PipelineManager(metaclass=ABCMeta):
"change %s", indent, needed_change, change)
node.append(needed_change)
if needed_change not in history:
self.getDependencyGraph(needed_change, dependency_graph,
event, update_deps, history,
quiet, indent + ' ')
self.getDependencyGraph(needed_change,
dependency_graph, event,
update_deps, query_cache,
history, quiet, indent + ' ')
def getQueueConfig(self, project):
layout = self.pipeline.tenant.layout
@@ -917,6 +921,10 @@ class PipelineManager(metaclass=ABCMeta):
return queue_config.allow_circular_dependencies
def useDependenciesByTopic(self, project):
source = self.sched.connections.getSource(project.connection_name)
if source.useDependenciesByTopic():
return True
queue_config = self.getQueueConfig(project)
if queue_config is None:
return False
@@ -991,7 +999,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name
].put_supercede(event)
def updateCommitDependencies(self, change, event):
def updateCommitDependencies(self, query_cache, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (
@@ -1042,12 +1050,17 @@ class PipelineManager(metaclass=ABCMeta):
change.project.connection_name)
if must_update_topic_deps:
log.debug(" Updating topic dependencies for %s", change)
new_topic_needs_changes = []
for dep in source.getChangesByTopic(change.topic):
if dep and (not dep.is_merged):
new_topic_needs_changes_keys = []
query_cache_key = (change.project.connection_name, change.topic)
changes_by_topic = query_cache.topic_queries.get(query_cache_key)
if changes_by_topic is None:
changes_by_topic = source.getChangesByTopic(change.topic)
query_cache.topic_queries[query_cache_key] = changes_by_topic
for dep in changes_by_topic:
if dep and (not dep.is_merged) and dep is not change:
log.debug(" Adding dependency: %s", dep)
new_topic_needs_changes.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes
new_topic_needs_changes_keys.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes_keys
if update_attrs:
source.setChangeAttributes(change, **update_attrs)