diff --git a/doc/source/drivers/gerrit.rst b/doc/source/drivers/gerrit.rst index a354a228ce..ecdfd6247a 100644 --- a/doc/source/drivers/gerrit.rst +++ b/doc/source/drivers/gerrit.rst @@ -44,6 +44,12 @@ configuration is to configure both SSH and HTTP access. The section below describes commond configuration settings. Specific settings for different connection methods follow. +.. note:: + + If Gerrit is upgraded, or the value of ``change.submitWholeTopic`` + is changed while Zuul is running, all running Zuul schedulers + should be restarted in order to see the change. + Connection Configuration ------------------------ diff --git a/releasenotes/notes/submit-whole-topic-check-51c84d7226351807.yaml b/releasenotes/notes/submit-whole-topic-check-51c84d7226351807.yaml new file mode 100644 index 0000000000..921b6e62cd --- /dev/null +++ b/releasenotes/notes/submit-whole-topic-check-51c84d7226351807.yaml @@ -0,0 +1,7 @@ +--- +other: + - | + Zuul now checks the submitWholeTopic setting in Gerrit when it + starts. If this setting is changed, or Gerrit is upgraded, all + running Zuul schedulers should be restarted in order to see the + change. diff --git a/tests/fakegerrit.py b/tests/fakegerrit.py index 6be447e394..ee3b9952ea 100644 --- a/tests/fakegerrit.py +++ b/tests/fakegerrit.py @@ -723,6 +723,7 @@ class GerritWebServer(object): r'\?parent=1') change_search_re = re.compile(r'/a/changes/\?n=500.*&q=(.*)') version_re = re.compile(r'/a/config/server/version') + info_re = re.compile(r'/a/config/server/info') head_re = re.compile(r'/a/projects/(.*)/HEAD') def do_POST(self): @@ -776,6 +777,9 @@ class GerritWebServer(object): m = self.version_re.match(path) if m: return self.version() + m = self.info_re.match(path) + if m: + return self.info() m = self.head_re.match(path) if m: return self.head(m.group(1)) @@ -954,6 +958,40 @@ class GerritWebServer(object): self.send_data('3.0.0-some-stuff') self.end_headers() + def info(self): + # This is not complete; see documentation for + # additional fields. + data = { + "accounts": { + "visibility": "ALL", + "default_display_name": "FULL_NAME" + }, + "change": { + "allow_blame": True, + "disable_private_changes": True, + "update_delay": 300, + "mergeability_computation_behavior": + "API_REF_UPDATED_AND_CHANGE_REINDEX", + "enable_robot_comments": True, + "conflicts_predicate_enabled": True + }, + "gerrit": { + "all_projects": "All-Projects", + "all_users": "All-Users", + "doc_search": True, + }, + "note_db_enabled": True, + "sshd": {}, + "suggest": {"from": 0}, + "user": {"anonymous_coward_name": "Name of user not set"}, + "receive": {"enable_signed_push": False}, + "submit_requirement_dashboard_columns": [] + } + if fake_gerrit._fake_submit_whole_topic: + data['change']['submit_whole_topic'] = True + self.send_data(data) + self.end_headers() + def head(self, project): project = urllib.parse.unquote(project) head = fake_gerrit._fake_project_default_branch.get( diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 0c47aea8c7..9c67e45ba5 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -3454,6 +3454,7 @@ class TestGerritCircularDependencies(ZuulTestCase): topic='test-topic') self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() + B = self.fake_gerrit.addFakeChange('org/project1', "master", "B", topic='test-topic') self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) @@ -3494,10 +3495,19 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(1, counters[ ('changes', '3', 'revisions', 'files?parent=1')]) self.assertEqual(3, counters[('changes', 'message')]) - # These queries need to run more often - self.assertEqual(3, counters[('changes', '1', 'submitted_together')]) - self.assertEqual(2, counters[('changes', '2', 'submitted_together')]) - self.assertEqual(1, counters[('changes', '3', 'submitted_together')]) + # These queries are no longer used + self.assertEqual(0, counters[('changes', '1', 'submitted_together')]) + self.assertEqual(0, counters[('changes', '2', 'submitted_together')]) + self.assertEqual(0, counters[('changes', '3', 'submitted_together')]) + # This query happens once for each event in the scheduler, + # then once for each change in the pipeline if there's more + # than one (cache is in play here). + # * A: 1x scheduler, 0x pipeline + # * A+B: 1x scheduler, 1x pipeline + # * A+B+C: 1x scheduler, 1x pipeline + qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&' + 'q=status%3Aopen%20topic%3A%22test-topic%22') + self.assertEqual(5, counters[('changes', qstring)]) self.assertHistory([ dict(name="project-job", changes="1,1"), @@ -3511,6 +3521,73 @@ class TestGerritCircularDependencies(ZuulTestCase): dict(name="project2-job", changes="2,1 1,1 3,1"), ], ordered=False) + @gerrit_config(submit_whole_topic=True) + def test_submitted_together_storm_fast(self): + # Test that if many changes are uploaded with the same topic, + # we handle queries efficiently. + + # This mimics the changes being uploaded in rapid succession. + self.waitUntilSettled() + with self.scheds.first.sched.run_handler_lock: + A = self.fake_gerrit.addFakeChange('org/project', "master", "A", + topic='test-topic') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + B = self.fake_gerrit.addFakeChange('org/project1', "master", "B", + topic='test-topic') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + + C = self.fake_gerrit.addFakeChange('org/project2', "master", "C", + topic='test-topic') + self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Output all the queries seen for debugging + for q in self.fake_gerrit.api_calls: + self.log.debug("Query: %s", q) + + gets = [q[1] for q in self.fake_gerrit.api_calls if q[0] == 'GET'] + counters = Counter() + for q in gets: + parts = q.split('/')[2:] + if len(parts) > 2 and parts[2] == 'revisions': + parts.pop(3) + if 'q=message' in parts[1]: + parts[1] = 'message' + counters[tuple(parts)] += 1 + # Ensure that we don't run these queries more than once for each change + qstring = ('o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&' + 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&' + 'o=DETAILED_LABELS&o=ALL_REVISIONS') + self.assertEqual(1, counters[('changes', f'1?{qstring}')]) + self.assertEqual(1, counters[('changes', f'2?{qstring}')]) + self.assertEqual(1, counters[('changes', f'3?{qstring}')]) + self.assertEqual(1, counters[('changes', '1', 'revisions', 'related')]) + self.assertEqual(1, counters[('changes', '2', 'revisions', 'related')]) + self.assertEqual(1, counters[('changes', '3', 'revisions', 'related')]) + self.assertEqual(1, counters[ + ('changes', '1', 'revisions', 'files?parent=1')]) + self.assertEqual(1, counters[ + ('changes', '2', 'revisions', 'files?parent=1')]) + self.assertEqual(1, counters[ + ('changes', '3', 'revisions', 'files?parent=1')]) + self.assertEqual(3, counters[('changes', 'message')]) + # These queries are no longer used + self.assertEqual(0, counters[('changes', '1', 'submitted_together')]) + self.assertEqual(0, counters[('changes', '2', 'submitted_together')]) + self.assertEqual(0, counters[('changes', '3', 'submitted_together')]) + # This query happens once for each event. + # * A+B+C: 3x scheduler, 0x pipeline + qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&' + 'q=status%3Aopen%20topic%3A%22test-topic%22') + self.assertEqual(3, counters[('changes', qstring)]) + self.assertHistory([ + dict(name="project-job", changes="3,1 2,1 1,1"), + dict(name="project1-job", changes="3,1 2,1 1,1"), + dict(name="project-vars-job", changes="3,1 2,1 1,1"), + dict(name="project2-job", changes="3,1 2,1 1,1"), + ], ordered=False) + @gerrit_config(submit_whole_topic=True) def test_submitted_together_git(self): A = self.fake_gerrit.addFakeChange('org/project1', "master", "A") @@ -3576,13 +3653,13 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(len(C.patchsets[-1]["approvals"]), 1) self.assertEqual(C.patchsets[-1]["approvals"][0]["type"], "Verified") self.assertEqual(C.patchsets[-1]["approvals"][0]["value"], "1") - self.assertEqual(A.queried, 8) - self.assertEqual(B.queried, 8) - self.assertEqual(C.queried, 8) - self.assertEqual(D.queried, 8) - self.assertEqual(E.queried, 8) - self.assertEqual(F.queried, 8) - self.assertEqual(G.queried, 8) + self.assertEqual(A.queried, 3) + self.assertEqual(B.queried, 3) + self.assertEqual(C.queried, 3) + self.assertEqual(D.queried, 3) + self.assertEqual(E.queried, 3) + self.assertEqual(F.queried, 3) + self.assertEqual(G.queried, 3) self.assertHistory([ dict(name="project1-job", result="SUCCESS", changes="1,1 2,1 3,1 4,1 5,1 6,1 7,1", diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py index e2b1aff58e..30e897df7c 100644 --- a/tests/unit/test_gerrit.py +++ b/tests/unit/test_gerrit.py @@ -858,27 +858,20 @@ class TestGerritFake(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') data = self._get_tuple(1) self.assertEqual(data, []) - ret = self.fake_gerrit._getSubmittedTogether(A, None) - self.assertEqual(ret, []) # A dependent series (B->A) B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') B.setDependsOn(A, 1) data = self._get_tuple(2) self.assertEqual(data, [(1, 1), (2, 1)]) - # The Gerrit connection method filters out the queried change - ret = self.fake_gerrit._getSubmittedTogether(B, None) - self.assertEqual(ret, [(1, 1)]) # A topic cycle - C1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'C1', - topic='test-topic') + self.fake_gerrit.addFakeChange('org/project', 'master', 'C1', + topic='test-topic') self.fake_gerrit.addFakeChange('org/project', 'master', 'C2', topic='test-topic') data = self._get_tuple(3) self.assertEqual(data, []) - ret = self.fake_gerrit._getSubmittedTogether(C1, None) - self.assertEqual(ret, []) @gerrit_config(submit_whole_topic=True) def test_submitted_together_whole_topic(self): @@ -890,8 +883,6 @@ class TestGerritFake(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') data = self._get_tuple(1) self.assertEqual(data, []) - ret = self.fake_gerrit._getSubmittedTogether(A, None) - self.assertEqual(ret, []) # A dependent series (B->A) B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') @@ -899,19 +890,15 @@ class TestGerritFake(ZuulTestCase): data = self._get_tuple(2) self.assertEqual(data, [(1, 1), (2, 1)]) # The Gerrit connection method filters out the queried change - ret = self.fake_gerrit._getSubmittedTogether(B, None) - self.assertEqual(ret, [(1, 1)]) # A topic cycle - C1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'C1', - topic='test-topic') + self.fake_gerrit.addFakeChange('org/project', 'master', 'C1', + topic='test-topic') self.fake_gerrit.addFakeChange('org/project', 'master', 'C2', topic='test-topic') data = self._get_tuple(3) self.assertEqual(data, [(3, 1), (4, 1)]) # The Gerrit connection method filters out the queried change - ret = self.fake_gerrit._getSubmittedTogether(C1, None) - self.assertEqual(ret, [(4, 1)]) # Test also the query used by the GerritConnection: ret = self.fake_gerrit._simpleQuery('status:open topic:test-topic') @@ -1213,7 +1200,8 @@ class TestGerritMaxDeps(ZuulTestCase): # This is not C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') - C.setDependsOn(B, 1) + C.data['commitMessage'] = '%s\n\nDepends-On: %s\nDepends-On: %s\n' % ( + C.subject, A.data['id'], B.data['id']) self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) self.waitUntilSettled() diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 10dde02772..47cc2b6230 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -176,7 +176,6 @@ class QueryHistory: class Query(enum.Enum): SEEN = 1 # Not a real query, just that we've seen the change CHANGE = 2 # The main change query - SUBMITTED_TOGETHER = 3 # The submitted-together query def __init__(self): self.queries = collections.defaultdict(lambda: dict()) @@ -519,6 +518,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): self.watched_checkers = [] self.project_checker_map = {} self.version = (0, 0, 0) + self.submit_whole_topic = None self.ssh_timeout = SSH_TIMEOUT self.baseurl = self.connection_config.get( @@ -818,32 +818,6 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): records.append(result) return [(x.number, x.current_patchset) for x in records] - def _getSubmittedTogether(self, change, event): - if not self.session: - return [] - # We could probably ask for everything in one query, but it - # might be extremely large, so just get the change ids here - # and then query the individual changes. - log = get_annotated_logger(self.log, event) - log.debug("Updating %s: Looking for changes submitted together", - change) - ret = [] - try: - data = self.get(f'changes/{change.number}/submitted_together') - except Exception: - log.error("Unable to find changes submitted together for %s", - change) - return ret - for c in data: - dep_change = c['_number'] - dep_ps = c['revisions'][c['current_revision']]['_number'] - if str(dep_change) == str(change.number): - continue - log.debug("Updating %s: Found change %s,%s submitted together", - change, dep_change, dep_ps) - ret.append((dep_change, dep_ps)) - return ret - def _updateChange(self, key, change, event, history, allow_key_update=False): log = get_annotated_logger(self.log, event) @@ -977,69 +951,6 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): log.exception("Failed to get commit-needed change %s,%s", dep_num, dep_ps) - for (dep_num, dep_ps) in self._getSubmittedTogether(change, event): - try: - log.debug("Updating %s: Getting submitted-together " - "change %s,%s", - change, dep_num, dep_ps) - # The query above will have returned a set of changes - # that are submitted together along with this one. - # That set includes: - # * Any git-dependent change if we're not being cherry-picked - # * Any change with the same topic if submitWholeTopic is set - - # The first is a one-way dependency, the second is - # simultaneous. We are unable to distinguish the two - # based only on the results of the submitted-together - # query. Therefore, while we know that we need to add - # the dep to our dependency list, we don't know - # whether we need to add ourselves to the dep list. - - # In order to find that out, we will need to run the - # submitted-together query for each dep as well. But - # if we've already queried the dep, we don't need to - # do it again, and if this change already appears in - # the dep's dependencies, we also don't need to query - # again. - dep_key = ChangeKey(self.connection_name, None, - 'GerritChange', str(dep_num), str(dep_ps)) - dep = self._getChange( - dep_key, refresh=False, history=history, - event=event) - refresh = True - if (history.getByKey(history.Query.CHANGE, dep_key) or - history.getByKey(history.Query.SUBMITTED_TOGETHER, - dep_key)): - refresh = False - if (key in dep.compat_needs_changes and - key in dep.compat_needed_by_changes): - refresh = False - # Gerrit changes to be submitted together do not - # necessarily get posted with dependency cycles using - # git trees and depends-on. However, they are - # functionally equivalent to a stack of changes with - # cycles using those methods. Here we set - # needs_changes and needed_by_changes as if there were - # a cycle. This ensures Zuul's cycle handling manages - # the submitted together changes properly. - if dep.open and dep not in needs_changes: - compat_needs_changes.append(dep_key.reference) - needs_changes.add(dep_key.reference) - if (dep.open and dep.is_current_patchset - and dep not in needed_by_changes): - compat_needed_by_changes.append(dep_key.reference) - needed_by_changes.add(dep_key.reference) - if refresh: - # We may need to update the deps dependencies (as - # explained above). - history.add(history.Query.SUBMITTED_TOGETHER, dep) - self.updateSubmittedTogether(log, dep, history, event) - except GerritEventProcessingException: - raise - except Exception: - log.exception("Failed to get commit-needed change %s,%s", - dep_num, dep_ps) - return dict( git_needs_changes=git_needs_changes, compat_needs_changes=compat_needs_changes, @@ -1047,51 +958,6 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): compat_needed_by_changes=compat_needed_by_changes, ) - def updateSubmittedTogether(self, log, change, history, event): - # This method is very similar to the last part of - # _updateChangeDependencies, but it updates the other - # direction and does so without performing a full change - # query. - extra = { - 'compat_needs_changes': change.compat_needs_changes[:], - 'compat_needed_by_changes': change.compat_needed_by_changes[:], - } - update = False - for (dep_num, dep_ps) in self._getSubmittedTogether(change, event): - try: - log.debug("Updating %s: Getting reverse submitted-together " - "change %s,%s", - change, dep_num, dep_ps) - dep_key = ChangeKey(self.connection_name, None, - 'GerritChange', str(dep_num), str(dep_ps)) - dep = self._getChange( - dep_key, refresh=False, history=history, - event=event) - if (dep.open and - dep_key.reference not in - extra['compat_needs_changes']): - extra['compat_needs_changes'].append(dep_key.reference) - update = True - if (dep.open and - dep.is_current_patchset and - dep_key.reference not in - extra['compat_needed_by_changes']): - extra['compat_needed_by_changes'].append(dep_key.reference) - update = True - except GerritEventProcessingException: - raise - except Exception: - log.exception("Failed to get commit-needed change %s,%s", - dep_num, dep_ps) - if update: - # Actually update the dep in the change cache. - def _update_change(c): - for k, v in extra.items(): - setattr(c, k, v) - self._change_cache.updateChangeWithRetry( - change.cache_stat.key, change, _update_change, - allow_key_update=False) - def isMerged(self, change, head=None): self.log.debug("Checking if change %s is merged" % change) if not change.number: @@ -1771,6 +1637,12 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): self.log.info("Remote version is: %s (parsed as %s)" % (version, self.version)) + def _getRemoteInfo(self): + info = self.get('config/server/info') + change_info = info.get('change', {}) + self.submit_whole_topic = change_info.get('submit_whole_topic', False) + self.log.info("Remote submitWholeTopic: %s", self.submit_whole_topic) + def refWatcherCallback(self, data): event = { 'type': 'ref-updated', @@ -1785,11 +1657,15 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): def onLoad(self, zk_client, component_registry): self.log.debug("Starting Gerrit Connection/Watchers") - try: - if self.session: + if self.session: + try: self._getRemoteVersion() - except Exception: - self.log.exception("Unable to determine remote Gerrit version") + except Exception: + self.log.exception("Unable to determine remote Gerrit version") + try: + self._getRemoteInfo() + except Exception: + self.log.exception("Unable to fetch remote Gerrit info") # Set the project branch cache to read only if no scheduler is # provided to prevent fetching the branches from the connection. diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py index c2d92c9336..16a49988f8 100644 --- a/zuul/driver/gerrit/gerritsource.py +++ b/zuul/driver/gerrit/gerritsource.py @@ -154,6 +154,9 @@ class GerritSource(BaseSource): changes.append(change) return changes + def useDependenciesByTopic(self): + return bool(self.connection.submit_whole_topic) + def getChangesByTopic(self, topic, changes=None, history=None): if not topic: return [] diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 9c6227cc1b..03fab9884b 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,7 +28,7 @@ from zuul.lib.tarjan import strongly_connected_components import zuul.lib.tracing as tracing from zuul.model import ( Change, PipelineState, PipelineChangeList, - filter_severity, EnqueueEvent + filter_severity, EnqueueEvent, QueryCache ) from zuul.zk.change_cache import ChangeKey from zuul.zk.exceptions import LockException @@ -235,6 +235,7 @@ class PipelineManager(metaclass=ABCMeta): def resolveChangeKeys(self, change_keys): resolved_changes = [] + query_cache = QueryCache() for key in change_keys: change = self._change_cache.get(key.reference) if change is None: @@ -251,7 +252,8 @@ class PipelineManager(metaclass=ABCMeta): and self.useDependenciesByTopic(change.project)) if (update_commit_dependencies or update_topic_dependencies): - self.updateCommitDependencies(change, event=None) + self.updateCommitDependencies( + query_cache, change, event=None) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -308,8 +310,9 @@ class PipelineManager(metaclass=ABCMeta): for dep_change_ref in change.getNeedsChanges( self.useDependenciesByTopic(change.project)): dep_change_key = ChangeKey.fromReference(dep_change_ref) - if change.cache_stat.key.isSameChange(dep_change_key): - return True + for change_key in self.pipeline.change_list.getChangeKeys(): + if change_key.isSameChange(dep_change_key): + return True return False def isChangeAlreadyInQueue(self, change, change_queue, item=None): @@ -329,6 +332,7 @@ class PipelineManager(metaclass=ABCMeta): if not isinstance(change, model.Change): return + query_cache = QueryCache() to_refresh = set() for item in self.pipeline.getAllItems(): for item_change in item.changes: @@ -342,7 +346,7 @@ class PipelineManager(metaclass=ABCMeta): to_refresh.add(item_change) for existing_change in to_refresh: - self.updateCommitDependencies(existing_change, event) + self.updateCommitDependencies(query_cache, existing_change, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: @@ -845,7 +849,7 @@ class PipelineManager(metaclass=ABCMeta): ) - set(cycle) def getDependencyGraph(self, change, dependency_graph, event, - update_deps=False, + update_deps=False, query_cache=None, history=None, quiet=False, indent=''): log = get_annotated_logger(self.log, event) if not quiet: @@ -854,14 +858,13 @@ class PipelineManager(metaclass=ABCMeta): return if not isinstance(change, model.Change): return - if not change.getNeedsChanges( - self.useDependenciesByTopic(change.project)): - return if history is None: history = set() + if query_cache is None: + query_cache = QueryCache() history.add(change) if update_deps: - self.updateCommitDependencies(change, event) + self.updateCommitDependencies(query_cache, change, event) for needed_change in self.resolveChangeReferences( change.getNeedsChanges( self.useDependenciesByTopic(change.project))): @@ -888,9 +891,10 @@ class PipelineManager(metaclass=ABCMeta): "change %s", indent, needed_change, change) node.append(needed_change) if needed_change not in history: - self.getDependencyGraph(needed_change, dependency_graph, - event, update_deps, history, - quiet, indent + ' ') + self.getDependencyGraph(needed_change, + dependency_graph, event, + update_deps, query_cache, + history, quiet, indent + ' ') def getQueueConfig(self, project): layout = self.pipeline.tenant.layout @@ -917,6 +921,10 @@ class PipelineManager(metaclass=ABCMeta): return queue_config.allow_circular_dependencies def useDependenciesByTopic(self, project): + source = self.sched.connections.getSource(project.connection_name) + if source.useDependenciesByTopic(): + return True + queue_config = self.getQueueConfig(project) if queue_config is None: return False @@ -991,7 +999,7 @@ class PipelineManager(metaclass=ABCMeta): self.pipeline.tenant.name][other_pipeline.name ].put_supercede(event) - def updateCommitDependencies(self, change, event): + def updateCommitDependencies(self, query_cache, change, event): log = get_annotated_logger(self.log, event) must_update_commit_deps = ( @@ -1042,12 +1050,17 @@ class PipelineManager(metaclass=ABCMeta): change.project.connection_name) if must_update_topic_deps: log.debug(" Updating topic dependencies for %s", change) - new_topic_needs_changes = [] - for dep in source.getChangesByTopic(change.topic): - if dep and (not dep.is_merged): + new_topic_needs_changes_keys = [] + query_cache_key = (change.project.connection_name, change.topic) + changes_by_topic = query_cache.topic_queries.get(query_cache_key) + if changes_by_topic is None: + changes_by_topic = source.getChangesByTopic(change.topic) + query_cache.topic_queries[query_cache_key] = changes_by_topic + for dep in changes_by_topic: + if dep and (not dep.is_merged) and dep is not change: log.debug(" Adding dependency: %s", dep) - new_topic_needs_changes.append(dep.cache_key) - update_attrs['topic_needs_changes'] = new_topic_needs_changes + new_topic_needs_changes_keys.append(dep.cache_key) + update_attrs['topic_needs_changes'] = new_topic_needs_changes_keys if update_attrs: source.setChangeAttributes(change, **update_attrs) diff --git a/zuul/model.py b/zuul/model.py index a2b1c7bc10..891663adc6 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -148,6 +148,13 @@ def filter_severity(error_list, errors=True, warnings=True): )] +class QueryCache: + """Cache query information while processing dependencies""" + + def __init__(self): + self.topic_queries = {} + + class ZuulMark: # The yaml mark class differs between the C and python versions. # The C version does not provide a snippet, and also appears to diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 82d6f587a9..908ae8341d 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -59,21 +59,22 @@ from zuul.model import ( BuildStatusEvent, Change, ChangeManagementEvent, - PipelinePostConfigEvent, - SemaphoreReleaseEvent, - PipelineSemaphoreReleaseEvent, DequeueEvent, EnqueueEvent, FilesChangesCompletedEvent, HoldRequest, MergeCompletedEvent, NodesProvisionedEvent, + PipelinePostConfigEvent, + PipelineSemaphoreReleaseEvent, PromoteEvent, + QueryCache, ReconfigureEvent, - TenantReconfigureEvent, - UnparsedAbideConfig, + SemaphoreReleaseEvent, SupercedeEvent, SystemAttributes, + TenantReconfigureEvent, + UnparsedAbideConfig, STATE_FAILED, SEVERITY_WARNING, ) @@ -2542,6 +2543,7 @@ class Scheduler(threading.Thread): span.set_attribute("reconfigure_tenant", reconfigure_tenant) event.span_context = tracing.getSpanContext(span) + query_cache = QueryCache() for pipeline in tenant.layout.pipelines.values(): # For most kinds of dependencies, it's sufficient to check # if this change is already in the pipeline, because the @@ -2559,7 +2561,8 @@ class Scheduler(threading.Thread): # manager, but the result of the work goes into the change # cache, so it's not wasted; it's just less parallelized. if isinstance(change, Change): - pipeline.manager.updateCommitDependencies(change, event) + pipeline.manager.updateCommitDependencies(query_cache, + change, event) if ( pipeline.manager.eventMatches(event, change) or pipeline.manager.isChangeRelevantToPipeline(change) diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py index 2d6da5c978..49e92bc6b4 100644 --- a/zuul/source/__init__.py +++ b/zuul/source/__init__.py @@ -127,6 +127,18 @@ class BaseSource(object, metaclass=abc.ABCMeta): search scope. """ + def useDependenciesByTopic(self): + """Return whether the source uses topic dependencies + + If a source treats changes in a topic as a dependency cycle, + this will return True. + + This is only implemented by the Gerrit driver, however if + other systems have a similar "topic" functionality, it could + be added to other drivers. + """ + return False + def getChangesByTopic(self, topic): """Return changes in the same topic.