Stop using submitted-together for submitWholeTopic

During development of the recent change to reduce the number of
queries sent to Gerrit during circular dependency event storms,
it was noted that the emulated submit-whole-topic behavior in Zuul
need not be subject to the same behavior, because it only acts after
a change is enqueued into a pipeline.  When a change is enqueued,
it can query Gerrit once to obtain all changes in the topic, and then
adds them to the cycle at once.  It does not do that today -- in fact,
it will query gerrit once for each change in the cycle just like the
gerrit driver, however, we can cache the results of that query so
that when dealing with a cycle, we only perform the query once.

Rather than handling submitWholeTopic changes directly in the gerrit
driver as we do now, let's remove that code and just use the
"emulated" path for both cases.  We will automatically enable the
"emulated" path if the server has submitWholeTopic enabled, so the
user-visible functionality is the same.  Moreover, this fits better
with our desire to handle dependencies in the pipeline manager as
much as possible.

This means that if a user uploads 100 changes, we will query
gerrit 4 times for each change; the four queries being:

* The change info itself
* Related changes (git parent/child)
* Files list
* Reverve commit dependencies (Depends-On that point to this change)

And that is it until a change is enqueued.  Since there is a built-in
delay in the Gerrit driver, at least 10 seconds should elapse between
the first change in a cycle being uploaded and Zuul enqueing that change
into a pipeline.  Assuming that all the changes are able to be uploaded
within that window (or if the topic is being created by updating change
topics), then only one more query should need to be performed: to get
the list of changes in the topic on enqueue.

In this fast case, the total queries are:

  queries = 5*count

  100 changes gives 500 queries

If changes are updated outside of that 10 second window, more queries
will happen as items are removed from the pipeline and re-added as their
dependency cycle cahnges, but that is no different than today, and that
is action on a human timescale, and less likely to impact Zuul's
performance.  However, extra queries may be performed due to the
following:

When the scheduler forwards a change event to pipelines, it updates the
change's dependencies first in order to decide if it is relevant to
changes already in the pipeline.  That will cause a topic query to be
performed.  Then, once the pipeline manager runs, it will update the
dependencies of all the changes in the queue item, performing the query
again; but that query will be cached for the rest of the cycle.  This
means that when changes are added slowly to the pipeline, we will perform
two queries for each change, one when forwarding the event to the pipeline,
and one for the cycle in the pipeline.

That means the total queries are:

  queries = 4*count + 2*count - 1

  100 changes gives 599 queries

This change retains the implementation and testing of the submitted-together
fake gerrit API endpoint, even though it is no longer used, for completeness
in case we find we want to use it again in the future.

One of the tests for max-dependencies inside the gerrit driver is updated
because without using the submitted-together endpoint, the driver no
longer recursively follows all git dependencies, so a series of depends-on
footers is used to achieve the same effect.  Keep in mind that there is
a separate pipeline max-dependencies option which will still protect
pipelines from having too many dependencies, no matter the source.

The check to exit early from processing the dependency graph is removed
because it behaves erroneously in the case where a change is enqueued into
a pipeline with no dependencies and then another change is added to its
topic.  This bug was likely masked by extra queries and updates performed
in other circumstances.  It is now covered by tests.

The isChangeRelevantToPipeline method is also corrected.  It only effectively
checked direct dependencies; the topic checking was erroneous and actually
checked whether the change being added was its own dependency (oddly: yes!
in the case of emulated topic dependencies, which is also corrected in this
change).  It now correctly checks whether dependencies are in the pipeline.

Change-Id: I20c7a8f6f1b8a869e163a1524d96fe53ef20a291
This commit is contained in:
James E. Blair 2024-05-16 10:29:08 -07:00
parent c224c36b12
commit 81fe0a50d1
11 changed files with 223 additions and 193 deletions

View File

@ -44,6 +44,12 @@ configuration is to configure both SSH and HTTP access.
The section below describes commond configuration settings. Specific
settings for different connection methods follow.
.. note::
If Gerrit is upgraded, or the value of ``change.submitWholeTopic``
is changed while Zuul is running, all running Zuul schedulers
should be restarted in order to see the change.
Connection Configuration
------------------------

View File

@ -0,0 +1,7 @@
---
other:
- |
Zuul now checks the submitWholeTopic setting in Gerrit when it
starts. If this setting is changed, or Gerrit is upgraded, all
running Zuul schedulers should be restarted in order to see the
change.

View File

@ -723,6 +723,7 @@ class GerritWebServer(object):
r'\?parent=1')
change_search_re = re.compile(r'/a/changes/\?n=500.*&q=(.*)')
version_re = re.compile(r'/a/config/server/version')
info_re = re.compile(r'/a/config/server/info')
head_re = re.compile(r'/a/projects/(.*)/HEAD')
def do_POST(self):
@ -776,6 +777,9 @@ class GerritWebServer(object):
m = self.version_re.match(path)
if m:
return self.version()
m = self.info_re.match(path)
if m:
return self.info()
m = self.head_re.match(path)
if m:
return self.head(m.group(1))
@ -954,6 +958,40 @@ class GerritWebServer(object):
self.send_data('3.0.0-some-stuff')
self.end_headers()
def info(self):
# This is not complete; see documentation for
# additional fields.
data = {
"accounts": {
"visibility": "ALL",
"default_display_name": "FULL_NAME"
},
"change": {
"allow_blame": True,
"disable_private_changes": True,
"update_delay": 300,
"mergeability_computation_behavior":
"API_REF_UPDATED_AND_CHANGE_REINDEX",
"enable_robot_comments": True,
"conflicts_predicate_enabled": True
},
"gerrit": {
"all_projects": "All-Projects",
"all_users": "All-Users",
"doc_search": True,
},
"note_db_enabled": True,
"sshd": {},
"suggest": {"from": 0},
"user": {"anonymous_coward_name": "Name of user not set"},
"receive": {"enable_signed_push": False},
"submit_requirement_dashboard_columns": []
}
if fake_gerrit._fake_submit_whole_topic:
data['change']['submit_whole_topic'] = True
self.send_data(data)
self.end_headers()
def head(self, project):
project = urllib.parse.unquote(project)
head = fake_gerrit._fake_project_default_branch.get(

View File

@ -3454,6 +3454,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
topic='test-topic')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project1', "master", "B",
topic='test-topic')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
@ -3494,10 +3495,19 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(1, counters[
('changes', '3', 'revisions', 'files?parent=1')])
self.assertEqual(3, counters[('changes', 'message')])
# These queries need to run more often
self.assertEqual(3, counters[('changes', '1', 'submitted_together')])
self.assertEqual(2, counters[('changes', '2', 'submitted_together')])
self.assertEqual(1, counters[('changes', '3', 'submitted_together')])
# These queries are no longer used
self.assertEqual(0, counters[('changes', '1', 'submitted_together')])
self.assertEqual(0, counters[('changes', '2', 'submitted_together')])
self.assertEqual(0, counters[('changes', '3', 'submitted_together')])
# This query happens once for each event in the scheduler,
# then once for each change in the pipeline if there's more
# than one (cache is in play here).
# * A: 1x scheduler, 0x pipeline
# * A+B: 1x scheduler, 1x pipeline
# * A+B+C: 1x scheduler, 1x pipeline
qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
'q=status%3Aopen%20topic%3A%22test-topic%22')
self.assertEqual(5, counters[('changes', qstring)])
self.assertHistory([
dict(name="project-job", changes="1,1"),
@ -3511,6 +3521,73 @@ class TestGerritCircularDependencies(ZuulTestCase):
dict(name="project2-job", changes="2,1 1,1 3,1"),
], ordered=False)
@gerrit_config(submit_whole_topic=True)
def test_submitted_together_storm_fast(self):
# Test that if many changes are uploaded with the same topic,
# we handle queries efficiently.
# This mimics the changes being uploaded in rapid succession.
self.waitUntilSettled()
with self.scheds.first.sched.run_handler_lock:
A = self.fake_gerrit.addFakeChange('org/project', "master", "A",
topic='test-topic')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
B = self.fake_gerrit.addFakeChange('org/project1', "master", "B",
topic='test-topic')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
C = self.fake_gerrit.addFakeChange('org/project2', "master", "C",
topic='test-topic')
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Output all the queries seen for debugging
for q in self.fake_gerrit.api_calls:
self.log.debug("Query: %s", q)
gets = [q[1] for q in self.fake_gerrit.api_calls if q[0] == 'GET']
counters = Counter()
for q in gets:
parts = q.split('/')[2:]
if len(parts) > 2 and parts[2] == 'revisions':
parts.pop(3)
if 'q=message' in parts[1]:
parts[1] = 'message'
counters[tuple(parts)] += 1
# Ensure that we don't run these queries more than once for each change
qstring = ('o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&'
'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&'
'o=DETAILED_LABELS&o=ALL_REVISIONS')
self.assertEqual(1, counters[('changes', f'1?{qstring}')])
self.assertEqual(1, counters[('changes', f'2?{qstring}')])
self.assertEqual(1, counters[('changes', f'3?{qstring}')])
self.assertEqual(1, counters[('changes', '1', 'revisions', 'related')])
self.assertEqual(1, counters[('changes', '2', 'revisions', 'related')])
self.assertEqual(1, counters[('changes', '3', 'revisions', 'related')])
self.assertEqual(1, counters[
('changes', '1', 'revisions', 'files?parent=1')])
self.assertEqual(1, counters[
('changes', '2', 'revisions', 'files?parent=1')])
self.assertEqual(1, counters[
('changes', '3', 'revisions', 'files?parent=1')])
self.assertEqual(3, counters[('changes', 'message')])
# These queries are no longer used
self.assertEqual(0, counters[('changes', '1', 'submitted_together')])
self.assertEqual(0, counters[('changes', '2', 'submitted_together')])
self.assertEqual(0, counters[('changes', '3', 'submitted_together')])
# This query happens once for each event.
# * A+B+C: 3x scheduler, 0x pipeline
qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
'q=status%3Aopen%20topic%3A%22test-topic%22')
self.assertEqual(3, counters[('changes', qstring)])
self.assertHistory([
dict(name="project-job", changes="3,1 2,1 1,1"),
dict(name="project1-job", changes="3,1 2,1 1,1"),
dict(name="project-vars-job", changes="3,1 2,1 1,1"),
dict(name="project2-job", changes="3,1 2,1 1,1"),
], ordered=False)
@gerrit_config(submit_whole_topic=True)
def test_submitted_together_git(self):
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A")
@ -3576,13 +3653,13 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(len(C.patchsets[-1]["approvals"]), 1)
self.assertEqual(C.patchsets[-1]["approvals"][0]["type"], "Verified")
self.assertEqual(C.patchsets[-1]["approvals"][0]["value"], "1")
self.assertEqual(A.queried, 8)
self.assertEqual(B.queried, 8)
self.assertEqual(C.queried, 8)
self.assertEqual(D.queried, 8)
self.assertEqual(E.queried, 8)
self.assertEqual(F.queried, 8)
self.assertEqual(G.queried, 8)
self.assertEqual(A.queried, 3)
self.assertEqual(B.queried, 3)
self.assertEqual(C.queried, 3)
self.assertEqual(D.queried, 3)
self.assertEqual(E.queried, 3)
self.assertEqual(F.queried, 3)
self.assertEqual(G.queried, 3)
self.assertHistory([
dict(name="project1-job", result="SUCCESS",
changes="1,1 2,1 3,1 4,1 5,1 6,1 7,1",

View File

@ -858,27 +858,20 @@ class TestGerritFake(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
data = self._get_tuple(1)
self.assertEqual(data, [])
ret = self.fake_gerrit._getSubmittedTogether(A, None)
self.assertEqual(ret, [])
# A dependent series (B->A)
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.setDependsOn(A, 1)
data = self._get_tuple(2)
self.assertEqual(data, [(1, 1), (2, 1)])
# The Gerrit connection method filters out the queried change
ret = self.fake_gerrit._getSubmittedTogether(B, None)
self.assertEqual(ret, [(1, 1)])
# A topic cycle
C1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'C1',
topic='test-topic')
self.fake_gerrit.addFakeChange('org/project', 'master', 'C1',
topic='test-topic')
self.fake_gerrit.addFakeChange('org/project', 'master', 'C2',
topic='test-topic')
data = self._get_tuple(3)
self.assertEqual(data, [])
ret = self.fake_gerrit._getSubmittedTogether(C1, None)
self.assertEqual(ret, [])
@gerrit_config(submit_whole_topic=True)
def test_submitted_together_whole_topic(self):
@ -890,8 +883,6 @@ class TestGerritFake(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
data = self._get_tuple(1)
self.assertEqual(data, [])
ret = self.fake_gerrit._getSubmittedTogether(A, None)
self.assertEqual(ret, [])
# A dependent series (B->A)
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@ -899,19 +890,15 @@ class TestGerritFake(ZuulTestCase):
data = self._get_tuple(2)
self.assertEqual(data, [(1, 1), (2, 1)])
# The Gerrit connection method filters out the queried change
ret = self.fake_gerrit._getSubmittedTogether(B, None)
self.assertEqual(ret, [(1, 1)])
# A topic cycle
C1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'C1',
topic='test-topic')
self.fake_gerrit.addFakeChange('org/project', 'master', 'C1',
topic='test-topic')
self.fake_gerrit.addFakeChange('org/project', 'master', 'C2',
topic='test-topic')
data = self._get_tuple(3)
self.assertEqual(data, [(3, 1), (4, 1)])
# The Gerrit connection method filters out the queried change
ret = self.fake_gerrit._getSubmittedTogether(C1, None)
self.assertEqual(ret, [(4, 1)])
# Test also the query used by the GerritConnection:
ret = self.fake_gerrit._simpleQuery('status:open topic:test-topic')
@ -1213,7 +1200,8 @@ class TestGerritMaxDeps(ZuulTestCase):
# This is not
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.setDependsOn(B, 1)
C.data['commitMessage'] = '%s\n\nDepends-On: %s\nDepends-On: %s\n' % (
C.subject, A.data['id'], B.data['id'])
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()

View File

@ -176,7 +176,6 @@ class QueryHistory:
class Query(enum.Enum):
SEEN = 1 # Not a real query, just that we've seen the change
CHANGE = 2 # The main change query
SUBMITTED_TOGETHER = 3 # The submitted-together query
def __init__(self):
self.queries = collections.defaultdict(lambda: dict())
@ -519,6 +518,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.watched_checkers = []
self.project_checker_map = {}
self.version = (0, 0, 0)
self.submit_whole_topic = None
self.ssh_timeout = SSH_TIMEOUT
self.baseurl = self.connection_config.get(
@ -818,32 +818,6 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
records.append(result)
return [(x.number, x.current_patchset) for x in records]
def _getSubmittedTogether(self, change, event):
if not self.session:
return []
# We could probably ask for everything in one query, but it
# might be extremely large, so just get the change ids here
# and then query the individual changes.
log = get_annotated_logger(self.log, event)
log.debug("Updating %s: Looking for changes submitted together",
change)
ret = []
try:
data = self.get(f'changes/{change.number}/submitted_together')
except Exception:
log.error("Unable to find changes submitted together for %s",
change)
return ret
for c in data:
dep_change = c['_number']
dep_ps = c['revisions'][c['current_revision']]['_number']
if str(dep_change) == str(change.number):
continue
log.debug("Updating %s: Found change %s,%s submitted together",
change, dep_change, dep_ps)
ret.append((dep_change, dep_ps))
return ret
def _updateChange(self, key, change, event, history,
allow_key_update=False):
log = get_annotated_logger(self.log, event)
@ -977,69 +951,6 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
log.exception("Failed to get commit-needed change %s,%s",
dep_num, dep_ps)
for (dep_num, dep_ps) in self._getSubmittedTogether(change, event):
try:
log.debug("Updating %s: Getting submitted-together "
"change %s,%s",
change, dep_num, dep_ps)
# The query above will have returned a set of changes
# that are submitted together along with this one.
# That set includes:
# * Any git-dependent change if we're not being cherry-picked
# * Any change with the same topic if submitWholeTopic is set
# The first is a one-way dependency, the second is
# simultaneous. We are unable to distinguish the two
# based only on the results of the submitted-together
# query. Therefore, while we know that we need to add
# the dep to our dependency list, we don't know
# whether we need to add ourselves to the dep list.
# In order to find that out, we will need to run the
# submitted-together query for each dep as well. But
# if we've already queried the dep, we don't need to
# do it again, and if this change already appears in
# the dep's dependencies, we also don't need to query
# again.
dep_key = ChangeKey(self.connection_name, None,
'GerritChange', str(dep_num), str(dep_ps))
dep = self._getChange(
dep_key, refresh=False, history=history,
event=event)
refresh = True
if (history.getByKey(history.Query.CHANGE, dep_key) or
history.getByKey(history.Query.SUBMITTED_TOGETHER,
dep_key)):
refresh = False
if (key in dep.compat_needs_changes and
key in dep.compat_needed_by_changes):
refresh = False
# Gerrit changes to be submitted together do not
# necessarily get posted with dependency cycles using
# git trees and depends-on. However, they are
# functionally equivalent to a stack of changes with
# cycles using those methods. Here we set
# needs_changes and needed_by_changes as if there were
# a cycle. This ensures Zuul's cycle handling manages
# the submitted together changes properly.
if dep.open and dep not in needs_changes:
compat_needs_changes.append(dep_key.reference)
needs_changes.add(dep_key.reference)
if (dep.open and dep.is_current_patchset
and dep not in needed_by_changes):
compat_needed_by_changes.append(dep_key.reference)
needed_by_changes.add(dep_key.reference)
if refresh:
# We may need to update the deps dependencies (as
# explained above).
history.add(history.Query.SUBMITTED_TOGETHER, dep)
self.updateSubmittedTogether(log, dep, history, event)
except GerritEventProcessingException:
raise
except Exception:
log.exception("Failed to get commit-needed change %s,%s",
dep_num, dep_ps)
return dict(
git_needs_changes=git_needs_changes,
compat_needs_changes=compat_needs_changes,
@ -1047,51 +958,6 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
compat_needed_by_changes=compat_needed_by_changes,
)
def updateSubmittedTogether(self, log, change, history, event):
# This method is very similar to the last part of
# _updateChangeDependencies, but it updates the other
# direction and does so without performing a full change
# query.
extra = {
'compat_needs_changes': change.compat_needs_changes[:],
'compat_needed_by_changes': change.compat_needed_by_changes[:],
}
update = False
for (dep_num, dep_ps) in self._getSubmittedTogether(change, event):
try:
log.debug("Updating %s: Getting reverse submitted-together "
"change %s,%s",
change, dep_num, dep_ps)
dep_key = ChangeKey(self.connection_name, None,
'GerritChange', str(dep_num), str(dep_ps))
dep = self._getChange(
dep_key, refresh=False, history=history,
event=event)
if (dep.open and
dep_key.reference not in
extra['compat_needs_changes']):
extra['compat_needs_changes'].append(dep_key.reference)
update = True
if (dep.open and
dep.is_current_patchset and
dep_key.reference not in
extra['compat_needed_by_changes']):
extra['compat_needed_by_changes'].append(dep_key.reference)
update = True
except GerritEventProcessingException:
raise
except Exception:
log.exception("Failed to get commit-needed change %s,%s",
dep_num, dep_ps)
if update:
# Actually update the dep in the change cache.
def _update_change(c):
for k, v in extra.items():
setattr(c, k, v)
self._change_cache.updateChangeWithRetry(
change.cache_stat.key, change, _update_change,
allow_key_update=False)
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
@ -1771,6 +1637,12 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.log.info("Remote version is: %s (parsed as %s)" %
(version, self.version))
def _getRemoteInfo(self):
info = self.get('config/server/info')
change_info = info.get('change', {})
self.submit_whole_topic = change_info.get('submit_whole_topic', False)
self.log.info("Remote submitWholeTopic: %s", self.submit_whole_topic)
def refWatcherCallback(self, data):
event = {
'type': 'ref-updated',
@ -1785,11 +1657,15 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def onLoad(self, zk_client, component_registry):
self.log.debug("Starting Gerrit Connection/Watchers")
try:
if self.session:
if self.session:
try:
self._getRemoteVersion()
except Exception:
self.log.exception("Unable to determine remote Gerrit version")
except Exception:
self.log.exception("Unable to determine remote Gerrit version")
try:
self._getRemoteInfo()
except Exception:
self.log.exception("Unable to fetch remote Gerrit info")
# Set the project branch cache to read only if no scheduler is
# provided to prevent fetching the branches from the connection.

View File

@ -154,6 +154,9 @@ class GerritSource(BaseSource):
changes.append(change)
return changes
def useDependenciesByTopic(self):
return bool(self.connection.submit_whole_topic)
def getChangesByTopic(self, topic, changes=None, history=None):
if not topic:
return []

View File

@ -28,7 +28,7 @@ from zuul.lib.tarjan import strongly_connected_components
import zuul.lib.tracing as tracing
from zuul.model import (
Change, PipelineState, PipelineChangeList,
filter_severity, EnqueueEvent
filter_severity, EnqueueEvent, QueryCache
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.exceptions import LockException
@ -235,6 +235,7 @@ class PipelineManager(metaclass=ABCMeta):
def resolveChangeKeys(self, change_keys):
resolved_changes = []
query_cache = QueryCache()
for key in change_keys:
change = self._change_cache.get(key.reference)
if change is None:
@ -251,7 +252,8 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
self.updateCommitDependencies(change, event=None)
self.updateCommitDependencies(
query_cache, change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@ -308,8 +310,9 @@ class PipelineManager(metaclass=ABCMeta):
for dep_change_ref in change.getNeedsChanges(
self.useDependenciesByTopic(change.project)):
dep_change_key = ChangeKey.fromReference(dep_change_ref)
if change.cache_stat.key.isSameChange(dep_change_key):
return True
for change_key in self.pipeline.change_list.getChangeKeys():
if change_key.isSameChange(dep_change_key):
return True
return False
def isChangeAlreadyInQueue(self, change, change_queue, item=None):
@ -329,6 +332,7 @@ class PipelineManager(metaclass=ABCMeta):
if not isinstance(change, model.Change):
return
query_cache = QueryCache()
to_refresh = set()
for item in self.pipeline.getAllItems():
for item_change in item.changes:
@ -342,7 +346,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item_change)
for existing_change in to_refresh:
self.updateCommitDependencies(existing_change, event)
self.updateCommitDependencies(query_cache, existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@ -845,7 +849,7 @@ class PipelineManager(metaclass=ABCMeta):
) - set(cycle)
def getDependencyGraph(self, change, dependency_graph, event,
update_deps=False,
update_deps=False, query_cache=None,
history=None, quiet=False, indent=''):
log = get_annotated_logger(self.log, event)
if not quiet:
@ -854,14 +858,13 @@ class PipelineManager(metaclass=ABCMeta):
return
if not isinstance(change, model.Change):
return
if not change.getNeedsChanges(
self.useDependenciesByTopic(change.project)):
return
if history is None:
history = set()
if query_cache is None:
query_cache = QueryCache()
history.add(change)
if update_deps:
self.updateCommitDependencies(change, event)
self.updateCommitDependencies(query_cache, change, event)
for needed_change in self.resolveChangeReferences(
change.getNeedsChanges(
self.useDependenciesByTopic(change.project))):
@ -888,9 +891,10 @@ class PipelineManager(metaclass=ABCMeta):
"change %s", indent, needed_change, change)
node.append(needed_change)
if needed_change not in history:
self.getDependencyGraph(needed_change, dependency_graph,
event, update_deps, history,
quiet, indent + ' ')
self.getDependencyGraph(needed_change,
dependency_graph, event,
update_deps, query_cache,
history, quiet, indent + ' ')
def getQueueConfig(self, project):
layout = self.pipeline.tenant.layout
@ -917,6 +921,10 @@ class PipelineManager(metaclass=ABCMeta):
return queue_config.allow_circular_dependencies
def useDependenciesByTopic(self, project):
source = self.sched.connections.getSource(project.connection_name)
if source.useDependenciesByTopic():
return True
queue_config = self.getQueueConfig(project)
if queue_config is None:
return False
@ -991,7 +999,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name
].put_supercede(event)
def updateCommitDependencies(self, change, event):
def updateCommitDependencies(self, query_cache, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (
@ -1042,12 +1050,17 @@ class PipelineManager(metaclass=ABCMeta):
change.project.connection_name)
if must_update_topic_deps:
log.debug(" Updating topic dependencies for %s", change)
new_topic_needs_changes = []
for dep in source.getChangesByTopic(change.topic):
if dep and (not dep.is_merged):
new_topic_needs_changes_keys = []
query_cache_key = (change.project.connection_name, change.topic)
changes_by_topic = query_cache.topic_queries.get(query_cache_key)
if changes_by_topic is None:
changes_by_topic = source.getChangesByTopic(change.topic)
query_cache.topic_queries[query_cache_key] = changes_by_topic
for dep in changes_by_topic:
if dep and (not dep.is_merged) and dep is not change:
log.debug(" Adding dependency: %s", dep)
new_topic_needs_changes.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes
new_topic_needs_changes_keys.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes_keys
if update_attrs:
source.setChangeAttributes(change, **update_attrs)

View File

@ -148,6 +148,13 @@ def filter_severity(error_list, errors=True, warnings=True):
)]
class QueryCache:
"""Cache query information while processing dependencies"""
def __init__(self):
self.topic_queries = {}
class ZuulMark:
# The yaml mark class differs between the C and python versions.
# The C version does not provide a snippet, and also appears to

View File

@ -59,21 +59,22 @@ from zuul.model import (
BuildStatusEvent,
Change,
ChangeManagementEvent,
PipelinePostConfigEvent,
SemaphoreReleaseEvent,
PipelineSemaphoreReleaseEvent,
DequeueEvent,
EnqueueEvent,
FilesChangesCompletedEvent,
HoldRequest,
MergeCompletedEvent,
NodesProvisionedEvent,
PipelinePostConfigEvent,
PipelineSemaphoreReleaseEvent,
PromoteEvent,
QueryCache,
ReconfigureEvent,
TenantReconfigureEvent,
UnparsedAbideConfig,
SemaphoreReleaseEvent,
SupercedeEvent,
SystemAttributes,
TenantReconfigureEvent,
UnparsedAbideConfig,
STATE_FAILED,
SEVERITY_WARNING,
)
@ -2542,6 +2543,7 @@ class Scheduler(threading.Thread):
span.set_attribute("reconfigure_tenant", reconfigure_tenant)
event.span_context = tracing.getSpanContext(span)
query_cache = QueryCache()
for pipeline in tenant.layout.pipelines.values():
# For most kinds of dependencies, it's sufficient to check
# if this change is already in the pipeline, because the
@ -2559,7 +2561,8 @@ class Scheduler(threading.Thread):
# manager, but the result of the work goes into the change
# cache, so it's not wasted; it's just less parallelized.
if isinstance(change, Change):
pipeline.manager.updateCommitDependencies(change, event)
pipeline.manager.updateCommitDependencies(query_cache,
change, event)
if (
pipeline.manager.eventMatches(event, change)
or pipeline.manager.isChangeRelevantToPipeline(change)

View File

@ -127,6 +127,18 @@ class BaseSource(object, metaclass=abc.ABCMeta):
search scope.
"""
def useDependenciesByTopic(self):
"""Return whether the source uses topic dependencies
If a source treats changes in a topic as a dependency cycle,
this will return True.
This is only implemented by the Gerrit driver, however if
other systems have a similar "topic" functionality, it could
be added to other drivers.
"""
return False
def getChangesByTopic(self, topic):
"""Return changes in the same topic.