Add queue.dependencies-by-topic

This adds a pipeline queue setting to emulate the Gerrit behavior
of submitWholeTopic without needing to enable it site-wide in Gerrit.

Change-Id: Icb33a1e87d15229e6fb3aa1e4b1ad14a60623a29
This commit is contained in:
James E. Blair 2022-03-07 13:32:37 -08:00
parent 249ccc403b
commit e16fcc80f8
11 changed files with 248 additions and 15 deletions

View File

@ -77,3 +77,29 @@ Here is an example ``queue`` configuration.
dependency cycle, then the gating system for a
project may be broken and may require an
intervention to correct.
.. attr:: dependencies-by-topic
:default: false
Determines whether Zuul should query the code review system for
changes under the same topic and treat those as a set of
circular dependencies.
Note that the Gerrit code review system supports a setting
called ``change.submitWholeTopic``, which, when set, will cause
all changes under the same topic to be merged simultaneously.
Zuul automatically observes this setting and treats all changes
to be submitted together as circular dependencies. If this
setting is enabled in gerrit, do not enable
``dependencies-by-topic`` in associated Zuul queues.
Because ``change.submitWholeTopic`` is applied system-wide in
Gerrit, some Zuul users may wish to emulate the behavior for
some projects without enabling it for all of Gerrit. In this
case, setting ``dependencies-by-topic`` will cause Zuul to
approxiamate the Gerrit behavior only for changes enqueued into
queues where this is set.
This setting requires :attr:`queue.allow-circular-dependencies`
to also be set. All of the caveats noted there continue to
apply.

View File

@ -0,0 +1,6 @@
---
features:
- |
The :attr:`queue.dependencies-by-topic` setting is added for Zuul
users who wish to emulate the Gerrit ``submitWholeTopic`` behavior
for some projects without enabling it site-wide in Gerrit.

View File

@ -733,11 +733,13 @@ class FakeGerritChange(object):
'username': 'owner_jane'},
'project': self.project,
'subject': self.subject,
'topic': 'master',
'url': 'https://hostname/459'},
'comment': message,
'patchSet': self.patchsets[-1],
'type': 'comment-added'}
if 'topic' in self.data:
event['change']['topic'] = self.data['topic']
self.data['submitRecords'] = self.getSubmitRecords()
return json.loads(json.dumps(event))
@ -886,6 +888,8 @@ class FakeGerritChange(object):
}
if 'parents' in self.data:
data['parents'] = self.data['parents']
if 'topic' in self.data:
data['topic'] = self.data['topic']
return json.loads(json.dumps(data))
def queryRevisionHTTP(self, revision):

View File

@ -0,0 +1,70 @@
- queue:
name: integrated
allow-circular-dependencies: true
dependencies-by-topic: true
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
require:
gerrit:
approval:
- Approved: 1
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- job:
name: base
parent: null
run: playbooks/run.yaml
- job:
name: test-job
- project:
name: org/project1
queue: integrated
check:
jobs:
- test-job
gate:
jobs:
- test-job
- project:
name: org/project2
queue: integrated
check:
jobs:
- test-job
gate:
jobs:
- test-job

View File

@ -1470,6 +1470,45 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(B.data["status"], "NEW")
self.assertIn("does not share a change queue", B.messages[-1])
@simple_layout('layouts/deps-by-topic.yaml')
def test_deps_by_topic(self):
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
topic='test-topic')
B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
topic='test-topic')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(A.patchsets[-1]["approvals"]), 1)
self.assertEqual(A.patchsets[-1]["approvals"][0]["type"], "Verified")
self.assertEqual(A.patchsets[-1]["approvals"][0]["value"], "1")
self.assertEqual(len(B.patchsets[-1]["approvals"]), 1)
self.assertEqual(B.patchsets[-1]["approvals"][0]["type"], "Verified")
self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1")
# We're about to add approvals to changes without adding the
# triggering events to Zuul, so that we can be sure that it is
# enqueing the changes based on dependencies, not because of
# triggering events. Since it will have the changes cached
# already (without approvals), we need to clear the cache
# first.
for connection in self.scheds.first.connections.connections.values():
connection.maintainCache([], max_age=0)
A.addApproval("Code-Review", 2)
B.addApproval("Code-Review", 2)
A.addApproval("Approved", 1)
self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
self.waitUntilSettled()
self.assertEqual(A.reported, 3)
self.assertEqual(B.reported, 3)
self.assertEqual(A.data["status"], "MERGED")
self.assertEqual(B.data["status"], "MERGED")
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"

View File

@ -1379,6 +1379,7 @@ class QueueParser:
queue = {vs.Required('name'): str,
'per-branch': bool,
'allow-circular-dependencies': bool,
'dependencies-by-topic': bool,
'_source_context': model.SourceContext,
'_start_mark': model.ZuulMark,
}
@ -1390,7 +1391,12 @@ class QueueParser:
conf['name'],
conf.get('per-branch', False),
conf.get('allow-circular-dependencies', False),
conf.get('dependencies-by-topic', False),
)
if (queue.dependencies_by_topic and not
queue.allow_circular_dependencies):
raise Exception("The 'allow-circular-dependencies' setting must be"
"enabled in order to use dependencies-by-topic")
queue.source_context = conf.get('_source_context')
queue.start_mark = conf.get('_start_mark')
queue.freeze()

View File

@ -114,6 +114,7 @@ class GerritChange(Change):
self.wip = data.get('wip', False)
self.owner = data['owner'].get('username')
self.message = data['commitMessage']
self.topic = data.get('topic')
self.missing_labels = set()
for sr in data.get('submitRecords', []):
@ -189,6 +190,7 @@ class GerritChange(Change):
self.wip = data.get('work_in_progress', False)
self.owner = data['owner'].get('username')
self.message = current_revision['commit']['message']
self.topic = data.get('topic')
class GerritTriggerEvent(TriggerEvent):

View File

@ -143,6 +143,36 @@ class GerritSource(BaseSource):
changes.append(change)
return changes
def getChangesByTopic(self, topic, changes=None):
if not topic:
return []
if changes is None:
changes = {}
query = 'status:open topic:%s' % topic
results = self.connection.simpleQuery(query)
for result in results:
change_key = ChangeKey(self.connection.connection_name, None,
'GerritChange',
str(result.number),
str(result.current_patchset))
if change_key in changes:
continue
change = self.connection._getChange(change_key)
changes[change_key] = change
for change in changes.values():
for git_key in change.git_needs_changes:
if git_key in changes:
continue
git_change = self.getChange(git_key)
if not git_change.topic or git_change.topic == topic:
continue
self.getChangesByTopic(git_change.topic, changes)
return list(changes.values())
def getCachedChanges(self):
yield from self.connection._change_cache

View File

@ -627,7 +627,7 @@ class PipelineManager(metaclass=ABCMeta):
return scc
return []
def canProcessCycle(self, project):
def getQueueConfig(self, project):
layout = self.pipeline.tenant.layout
pipeline_queue_name = None
project_queue_name = None
@ -651,13 +651,23 @@ class PipelineManager(metaclass=ABCMeta):
# project while project has precedence.
queue_name = project_queue_name or pipeline_queue_name
if queue_name is None:
return None
return layout.queues.get(queue_name)
def canProcessCycle(self, project):
queue_config = self.getQueueConfig(project)
if queue_config is None:
return False
queue_config = layout.queues.get(queue_name)
return (
queue_config is not None and
queue_config.allow_circular_dependencies
)
return queue_config.allow_circular_dependencies
def useDependenciesByTopic(self, project):
queue_config = self.getQueueConfig(project)
if queue_config is None:
return False
return queue_config.dependencies_by_topic
def canMergeCycle(self, bundle):
"""Check if the cycle still fulfills the pipeline's ready criteria."""
@ -809,12 +819,24 @@ class PipelineManager(metaclass=ABCMeta):
log.debug(" Adding dependency: %s", dep)
dependencies.append(dep)
new_commit_needs_changes = [d.cache_key for d in dependencies]
update_attrs = dict(commit_needs_changes=new_commit_needs_changes)
# Ask the source for any tenant-specific changes (this allows
# drivers to implement their own way of collecting deps):
source = self.sched.connections.getSource(
change.project.connection_name)
if self.useDependenciesByTopic(change.project):
log.debug(" Updating topic dependencies for %s", change)
new_topic_needs_changes = []
for dep in source.getChangesByTopic(change.topic):
if dep and (not dep.is_merged):
log.debug(" Adding dependency: %s", dep)
new_topic_needs_changes.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes
if change.commit_needs_changes != new_commit_needs_changes:
source = self.sched.connections.getSource(
change.project.connection_name)
source.setChangeAttributes(
change,
commit_needs_changes=new_commit_needs_changes)
source.setChangeAttributes(change, **update_attrs)
def provisionNodes(self, item):
log = item.annotateLogger(self.log)

View File

@ -5399,12 +5399,18 @@ class Change(Branch):
# to Depends-On headers (all drivers):
self.commit_needs_changes = None
# Needed changes by topic (all
# drivers in theory, but Gerrit only in practice for
# emulate-submit-whole-topic):
self.topic_needs_changes = None
self.is_current_patchset = True
self.can_merge = False
self.is_merged = False
self.failed_to_merge = False
self.open = None
self.owner = None
self.topic = None
# This may be the commit message, or it may be a cover message
# in the case of a PR. Either way, it's the place where we
@ -5428,6 +5434,7 @@ class Change(Branch):
None if data.get("commit_needs_changes") is None
else data.get("commit_needs_changes", [])
)
self.topic_needs_changes = data.get("topic_needs_changes")
self.is_current_patchset = data.get("is_current_patchset", True)
self.can_merge = data.get("can_merge", False)
self.is_merged = data.get("is_merged", False)
@ -5448,6 +5455,7 @@ class Change(Branch):
"compat_needs_changes": self.compat_needs_changes,
"compat_needed_by_changes": self.git_needed_by_changes,
"commit_needs_changes": self.commit_needs_changes,
"topic_needs_changes": self.topic_needs_changes,
"is_current_patchset": self.is_current_patchset,
"can_merge": self.can_merge,
"is_merged": self.is_merged,
@ -5478,8 +5486,9 @@ class Change(Branch):
@property
def needs_changes(self):
commit_needs_changes = self.commit_needs_changes or []
topic_needs_changes = self.topic_needs_changes or []
return (self.git_needs_changes + self.compat_needs_changes +
commit_needs_changes)
commit_needs_changes + topic_needs_changes)
@property
def needed_by_changes(self):
@ -7265,11 +7274,13 @@ class Semaphore(ConfigObject):
class Queue(ConfigObject):
def __init__(self, name, per_branch=False,
allow_circular_dependencies=False):
allow_circular_dependencies=False,
dependencies_by_topic=False):
super().__init__()
self.name = name
self.per_branch = per_branch
self.allow_circular_dependencies = allow_circular_dependencies
self.dependencies_by_topic = dependencies_by_topic
def __ne__(self, other):
return not self.__eq__(other)
@ -7281,7 +7292,9 @@ class Queue(ConfigObject):
self.name == other.name and
self.per_branch == other.per_branch and
self.allow_circular_dependencies ==
other.allow_circular_dependencies
other.allow_circular_dependencies and
self.dependencies_by_topic ==
other.dependencies_by_topic
)

View File

@ -126,8 +126,23 @@ class BaseSource(object, metaclass=abc.ABCMeta):
search scope.
"""
def getChangesByTopic(self, topic):
"""Return changes in the same topic.
This should return changes under the same topic, as well as
changes under the same topic of any git-dependent changes,
recursively.
This is only implemented by the Gerrit driver, however if
other systems have a similar "topic" functionality, it could
be added to other drivers.
"""
return []
@abc.abstractmethod
def getProjectOpenChanges(self, project):
"""Get the open changes for a project."""
@abc.abstractmethod