diff --git a/tests/fakegerrit.py b/tests/fakegerrit.py index 26d4eb9854..c573fbe777 100644 --- a/tests/fakegerrit.py +++ b/tests/fakegerrit.py @@ -728,6 +728,7 @@ class GerritWebServer(object): def do_POST(self): path = self.path self.log.debug("Got POST %s", path) + fake_gerrit.api_calls.append(('POST', path)) data = self.rfile.read(int(self.headers['Content-Length'])) data = json.loads(data.decode('utf-8')) @@ -749,6 +750,7 @@ class GerritWebServer(object): def do_GET(self): path = self.path self.log.debug("Got GET %s", path) + fake_gerrit.api_calls.append(('GET', path)) m = self.change_re.match(path) if m: @@ -1015,6 +1017,7 @@ class FakeGerritConnection(gerritconnection.GerritConnection): self.change_number = 0 self.changes = changes_db self.queries = [] + self.api_calls = [] self.upstream_root = upstream_root self.fake_checkers = [] self._poller_event = poller_event @@ -1136,6 +1139,8 @@ class FakeGerritConnection(gerritconnection.GerritConnection): for dep in change.data.get('dependsOn', []): dep_change = self.changes.get(int(dep['number'])) r = dep_change.queryHTTP(internal=True) + if r['status'] == 'MERGED': + continue if r not in results: results.append(r) if len(results) == 1: diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 7583a37616..0fea420d40 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this software. If not, see . +from collections import Counter import re import textwrap import json @@ -3443,6 +3444,72 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(A.data["status"], "MERGED") self.assertEqual(B.data["status"], "MERGED") + def test_submitted_together_storm(self): + # Test that if many changes are uploaded with the same topic, + # we handle queries efficiently. + self.fake_gerrit._fake_submit_whole_topic = True + self.waitUntilSettled() + A = self.fake_gerrit.addFakeChange('org/project', "master", "A", + topic='test-topic') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + B = self.fake_gerrit.addFakeChange('org/project1', "master", "B", + topic='test-topic') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + C = self.fake_gerrit.addFakeChange('org/project2', "master", "C", + topic='test-topic') + self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Output all the queries seen for debugging + for q in self.fake_gerrit.api_calls: + self.log.debug("Query: %s", q) + + gets = [q[1] for q in self.fake_gerrit.api_calls if q[0] == 'GET'] + counters = Counter() + for q in gets: + parts = q.split('/')[2:] + if len(parts) > 2 and parts[2] == 'revisions': + parts.pop(3) + if 'q=message' in parts[1]: + parts[1] = 'message' + counters[tuple(parts)] += 1 + # Ensure that we don't run these queries more than once for each change + qstring = ('o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&' + 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&' + 'o=DETAILED_LABELS&o=ALL_REVISIONS') + self.assertEqual(1, counters[('changes', f'1?{qstring}')]) + self.assertEqual(1, counters[('changes', f'2?{qstring}')]) + self.assertEqual(1, counters[('changes', f'3?{qstring}')]) + self.assertEqual(1, counters[('changes', '1', 'revisions', 'related')]) + self.assertEqual(1, counters[('changes', '2', 'revisions', 'related')]) + self.assertEqual(1, counters[('changes', '3', 'revisions', 'related')]) + self.assertEqual(1, counters[ + ('changes', '1', 'revisions', 'files?parent=1')]) + self.assertEqual(1, counters[ + ('changes', '2', 'revisions', 'files?parent=1')]) + self.assertEqual(1, counters[ + ('changes', '3', 'revisions', 'files?parent=1')]) + self.assertEqual(3, counters[('changes', 'message')]) + # These queries need to run more often + self.assertEqual(3, counters[('changes', '1', 'submitted_together')]) + self.assertEqual(2, counters[('changes', '2', 'submitted_together')]) + self.assertEqual(1, counters[('changes', '3', 'submitted_together')]) + self.assertHistory([ + dict(name="project-job", changes="1,1"), + + dict(name="project-job", changes="1,1 2,1"), + dict(name="project1-job", changes="1,1 2,1"), + dict(name="project-vars-job", changes="1,1 2,1"), + + dict(name="project-job", changes="2,1 1,1 3,1"), + dict(name="project1-job", changes="2,1 1,1 3,1"), + dict(name="project-vars-job", changes="2,1 1,1 3,1"), + dict(name="project2-job", changes="2,1 1,1 3,1"), + ], ordered=False) + def test_submitted_together_git(self): self.fake_gerrit._fake_submit_whole_topic = True diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index e006a1f30f..10dde02772 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -14,8 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import copy import datetime +import enum import itertools import json import logging @@ -170,6 +172,28 @@ class GerritChangeData(object): change['_revision_number'])) +class QueryHistory: + class Query(enum.Enum): + SEEN = 1 # Not a real query, just that we've seen the change + CHANGE = 2 # The main change query + SUBMITTED_TOGETHER = 3 # The submitted-together query + + def __init__(self): + self.queries = collections.defaultdict(lambda: dict()) + + def getByKey(self, query, change_key): + if not isinstance(change_key, ChangeKey): + raise Exception("Must supply a ChangeKey") + key = (change_key.stable_id, change_key.revision) + return self.queries[query].get(key) + + def add(self, query, change): + if not isinstance(change, GerritChange): + raise Exception("Must supply a GerritChange") + key = (change.number, change.patchset) + self.queries[query][key] = change + + class GerritEventConnector(threading.Thread): """Move events from Gerrit to the scheduler.""" @@ -658,10 +682,20 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): elif change_key.change_type == 'Ref': return self._getRef(change_key, refresh=refresh, event=event) + def _checkMaxDependencies(self, change, history): + if change and history: + history.add(history.Query.SEEN, change) + if (self.max_dependencies is not None and + len(history.queries[history.Query.SEEN]) > + self.max_dependencies): + raise GerritEventProcessingException( + f"Change {change} has too many dependencies") + def _getChange(self, change_key, refresh=False, history=None, event=None, allow_key_update=False): # Ensure number and patchset are str change = self._change_cache.get(change_key) + self._checkMaxDependencies(change, history) if change and not refresh: return change if not change: @@ -671,6 +705,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): change = GerritChange(None) change.number = change_key.stable_id change.patchset = change_key.revision + self._checkMaxDependencies(change, history) return self._updateChange(change_key, change, event, history, allow_key_update) @@ -830,17 +865,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): # drop history, we need to resolve the patchset on events with # no patchsets before adding the entry to the change cache. if history and change.number and change.patchset: - for history_change in history: - if (history_change.number == change.number and - history_change.patchset == change.patchset): - log.debug("Change %s is in history", change) - return history_change - - if (self.max_dependencies is not None and - history and - len(history) > self.max_dependencies): - raise GerritEventProcessingException( - f"Change {change} has too many dependencies") + history_change = history.getByKey(history.Query.CHANGE, key) + if history_change: + log.debug("Change %s is in history", change) + return history_change log.info("Updating %s", change) data = self.queryChange(change.number, event=event) @@ -853,7 +881,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): # dependent changes (recursively calling this method). if not change.is_merged: extra = self._updateChangeDependencies( - log, change, data, event, history) + log, key, change, data, event, history) else: extra = {} @@ -866,10 +894,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return change - def _updateChangeDependencies(self, log, change, data, event, history): + def _updateChangeDependencies(self, log, key, change, data, event, + history): if history is None: - history = [] - history.append(change) + history = QueryHistory() + history.add(history.Query.CHANGE, change) needs_changes = set() git_needs_changes = [] @@ -932,9 +961,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): # reference the latest patchset of its Depends-On (this # change). In case the dep is already in history we already # refreshed this change so refresh is not needed in this case. - refresh = (dep_num, dep_ps) not in history dep_key = ChangeKey(self.connection_name, None, 'GerritChange', str(dep_num), str(dep_ps)) + refresh = not history.getByKey(history.Query.CHANGE, dep_key) dep = self._getChange( dep_key, refresh=refresh, history=history, event=event) @@ -953,17 +982,38 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): log.debug("Updating %s: Getting submitted-together " "change %s,%s", change, dep_num, dep_ps) - # Because a submitted-together change may be a cross-repo - # dependency, cause that change to refresh so that it will - # reference the latest patchset of its Depends-On (this - # change). In case the dep is already in history we already - # refreshed this change so refresh is not needed in this case. - refresh = (dep_num, dep_ps) not in history + # The query above will have returned a set of changes + # that are submitted together along with this one. + # That set includes: + # * Any git-dependent change if we're not being cherry-picked + # * Any change with the same topic if submitWholeTopic is set + + # The first is a one-way dependency, the second is + # simultaneous. We are unable to distinguish the two + # based only on the results of the submitted-together + # query. Therefore, while we know that we need to add + # the dep to our dependency list, we don't know + # whether we need to add ourselves to the dep list. + + # In order to find that out, we will need to run the + # submitted-together query for each dep as well. But + # if we've already queried the dep, we don't need to + # do it again, and if this change already appears in + # the dep's dependencies, we also don't need to query + # again. dep_key = ChangeKey(self.connection_name, None, 'GerritChange', str(dep_num), str(dep_ps)) dep = self._getChange( - dep_key, refresh=refresh, history=history, + dep_key, refresh=False, history=history, event=event) + refresh = True + if (history.getByKey(history.Query.CHANGE, dep_key) or + history.getByKey(history.Query.SUBMITTED_TOGETHER, + dep_key)): + refresh = False + if (key in dep.compat_needs_changes and + key in dep.compat_needed_by_changes): + refresh = False # Gerrit changes to be submitted together do not # necessarily get posted with dependency cycles using # git trees and depends-on. However, they are @@ -979,6 +1029,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): and dep not in needed_by_changes): compat_needed_by_changes.append(dep_key.reference) needed_by_changes.add(dep_key.reference) + if refresh: + # We may need to update the deps dependencies (as + # explained above). + history.add(history.Query.SUBMITTED_TOGETHER, dep) + self.updateSubmittedTogether(log, dep, history, event) except GerritEventProcessingException: raise except Exception: @@ -992,6 +1047,51 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): compat_needed_by_changes=compat_needed_by_changes, ) + def updateSubmittedTogether(self, log, change, history, event): + # This method is very similar to the last part of + # _updateChangeDependencies, but it updates the other + # direction and does so without performing a full change + # query. + extra = { + 'compat_needs_changes': change.compat_needs_changes[:], + 'compat_needed_by_changes': change.compat_needed_by_changes[:], + } + update = False + for (dep_num, dep_ps) in self._getSubmittedTogether(change, event): + try: + log.debug("Updating %s: Getting reverse submitted-together " + "change %s,%s", + change, dep_num, dep_ps) + dep_key = ChangeKey(self.connection_name, None, + 'GerritChange', str(dep_num), str(dep_ps)) + dep = self._getChange( + dep_key, refresh=False, history=history, + event=event) + if (dep.open and + dep_key.reference not in + extra['compat_needs_changes']): + extra['compat_needs_changes'].append(dep_key.reference) + update = True + if (dep.open and + dep.is_current_patchset and + dep_key.reference not in + extra['compat_needed_by_changes']): + extra['compat_needed_by_changes'].append(dep_key.reference) + update = True + except GerritEventProcessingException: + raise + except Exception: + log.exception("Failed to get commit-needed change %s,%s", + dep_num, dep_ps) + if update: + # Actually update the dep in the change cache. + def _update_change(c): + for k, v in extra.items(): + setattr(c, k, v) + self._change_cache.updateChangeWithRetry( + change.cache_stat.key, change, _update_change, + allow_key_update=False) + def isMerged(self, change, head=None): self.log.debug("Checking if change %s is merged" % change) if not change.number: