diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index 584ba1a981..09882ec814 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -227,3 +227,9 @@ Version 31 ---------- :Prior Zuul version: 11.0.1 :Description: Upgrade sharded zkobject format. + +Version 32 +---------- +:Prior Zuul version: 11.1.0 +:Description: Add topic query timestamp. + Affects schedulers. diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 0fa151c37e..0d6d4f6b26 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -15,11 +15,14 @@ # along with this software. If not, see . from collections import Counter +import fixtures import re import textwrap +import threading import json from zuul.model import PromoteEvent +import zuul.scheduler from tests.base import ( iterate_timeout, @@ -4212,6 +4215,65 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(B.data["status"], "MERGED") self.assertEqual(X.data["status"], "ABANDONED") + @gerrit_config(submit_whole_topic=True) + def test_abandoned_change_refresh_changes(self): + # Test that we can re-enqueue a topic cycle after abandoning a + # change (out of band). This adds extra events to refresh all + # of the changes while the re-enqueue is in progress in order + # to trigger a race condition. + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic', + files={'conflict': 'B'}) + X = self.fake_gerrit.addFakeChange('org/project2', "master", "X", + topic='test-topic', + files={'conflict': 'X'}) + + A.addApproval("Code-Review", 2) + B.addApproval("Code-Review", 2) + X.addApproval("Code-Review", 2) + X.addApproval("Approved", 1) + B.addApproval("Approved", 1) + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + self.waitUntilSettled() + + orig_forward = zuul.scheduler.Scheduler._forward_trigger_event + stop_event = threading.Event() + go_event = threading.Event() + + def patched_forward(obj, *args, **kw): + stop_event.set() + go_event.wait() + return orig_forward(obj, *args, **kw) + + self.useFixture(fixtures.MonkeyPatch( + 'zuul.scheduler.Scheduler._forward_trigger_event', + patched_forward)) + + X.setAbandoned() + X.addApproval("Approved", -1) + self.waitUntilSettled("abandoned") + + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + stop_event.wait() + stop_event.clear() + # The scheduler is waiting to forward the approved event; send + # another event that refreshes the cache: + self.fake_gerrit.addEvent(A.getChangeCommentEvent(1, 'testcomment')) + self.fake_gerrit.addEvent(B.getChangeCommentEvent(1, 'testcomment')) + self.fake_gerrit.addEvent(X.getChangeCommentEvent(1, 'testcomment')) + for x in iterate_timeout(30, 'events to be submitted'): + if len(self.scheds.first.sched.trigger_events['tenant-one']) == 4: + break + go_event.set() + stop_event.wait() + self.waitUntilSettled("reapproved") + + self.assertEqual(A.data["status"], "MERGED") + self.assertEqual(B.data["status"], "MERGED") + self.assertEqual(X.data["status"], "ABANDONED") + class TestGithubCircularDependencies(ZuulTestCase): config_file = "zuul-gerrit-github.conf" diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index 08e8a68686..6f840a08e0 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -22,6 +22,7 @@ from zuul.zk.components import ( from tests.base import ( BaseTestCase, ZuulTestCase, + gerrit_config, simple_layout, iterate_timeout, model_version, @@ -64,6 +65,54 @@ class TestModelUpgrade(ZuulTestCase): break +class TestModelUpgradeGerritCircularDependencies(ZuulTestCase): + config_file = "zuul-gerrit-github.conf" + tenant_config_file = "config/circular-dependencies/main.yaml" + + @model_version(31) + @gerrit_config(submit_whole_topic=True) + def test_model_31_32(self): + self.executor_server.hold_jobs_in_build = True + + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic') + + A.addApproval("Code-Review", 2) + B.addApproval("Code-Review", 2) + B.addApproval("Approved", 1) + + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + self.waitUntilSettled() + + first = self.scheds.first + second = self.createScheduler() + second.start() + self.assertEqual(len(self.scheds), 2) + for _ in iterate_timeout(10, "until priming is complete"): + state_one = first.sched.local_layout_state.get("tenant-one") + if state_one: + break + + for _ in iterate_timeout( + 10, "all schedulers to have the same layout state"): + if (second.sched.local_layout_state.get( + "tenant-one") == state_one): + break + + self.model_test_component_info.model_api = 32 + with first.sched.layout_update_lock, first.sched.run_handler_lock: + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + self.waitUntilSettled(matcher=[second]) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + self.assertEqual(A.data["status"], "MERGED") + self.assertEqual(B.data["status"], "MERGED") + + class TestGithubModelUpgrade(ZuulTestCase): config_file = "zuul-gerrit-github.conf" scheduler_count = 1 diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index e293b40808..bb02182fab 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -29,10 +29,12 @@ import zuul.lib.tracing as tracing from zuul.model import ( Change, PipelineState, PipelineChangeList, filter_severity, EnqueueEvent, FalseWithReason, + QueryCacheEntry ) from zuul.zk.change_cache import ChangeKey from zuul.zk.exceptions import LockException from zuul.zk.locks import pipeline_lock +from zuul.zk.components import COMPONENT_REGISTRY from opentelemetry import trace @@ -1054,11 +1056,23 @@ class PipelineManager(metaclass=ABCMeta): or change.cache_stat.mzxid <= event.zuul_event_ltime ) + if hasattr(event, "zuul_event_ltime"): + if COMPONENT_REGISTRY.model_api < 32: + topic_out_of_date =\ + change.cache_stat.mzxid <= event.zuul_event_ltime + else: + topic_out_of_date =\ + change.topic_query_ltime <= event.zuul_event_ltime + else: + # The value is unused and doesn't matter because of the + # clause below, but True is the safer value. + topic_out_of_date = True + must_update_topic_deps = ( self.useDependenciesByTopic(change.project) and ( not hasattr(event, "zuul_event_ltime") or change.topic_needs_changes is None - or change.cache_stat.mzxid <= event.zuul_event_ltime + or topic_out_of_date ) ) @@ -1098,15 +1112,19 @@ class PipelineManager(metaclass=ABCMeta): log.debug(" Updating topic dependencies for %s", change) new_topic_needs_changes_keys = [] query_cache_key = (change.project.connection_name, change.topic) - changes_by_topic = query_cache.topic_queries.get(query_cache_key) - if changes_by_topic is None: + cache_entry = query_cache.topic_queries.get(query_cache_key) + if cache_entry is None: changes_by_topic = source.getChangesByTopic(change.topic) - query_cache.topic_queries[query_cache_key] = changes_by_topic - for dep in changes_by_topic: + cache_entry = QueryCacheEntry( + self.sched.zk_client.getCurrentLtime(), + changes_by_topic) + query_cache.topic_queries[query_cache_key] = cache_entry + for dep in cache_entry.results: if dep and (not dep.is_merged) and dep is not change: log.debug(" Adding dependency: %s", dep) new_topic_needs_changes_keys.append(dep.cache_key) update_attrs['topic_needs_changes'] = new_topic_needs_changes_keys + update_attrs['topic_query_ltime'] = cache_entry.ltime if update_attrs: source.setChangeAttributes(change, **update_attrs) diff --git a/zuul/model.py b/zuul/model.py index 2a0f625934..832260dcf4 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -147,6 +147,12 @@ def filter_severity(error_list, errors=True, warnings=True): )] +class QueryCacheEntry: + def __init__(self, ltime, results): + self.ltime = ltime + self.results = results + + class QueryCache: """Cache query information while processing dependencies""" @@ -7302,6 +7308,7 @@ class Change(Branch): # drivers in theory, but Gerrit only in practice for # emulate-submit-whole-topic): self.topic_needs_changes = None + self.topic_query_ltime = 0 self.is_current_patchset = True self.can_merge = False @@ -7337,6 +7344,7 @@ class Change(Branch): else data.get("commit_needs_changes", []) ) self.topic_needs_changes = data.get("topic_needs_changes") + self.topic_query_ltime = data.get("topic_query_ltime", 0) self.is_current_patchset = data.get("is_current_patchset", True) self.can_merge = data.get("can_merge", False) self.is_merged = data.get("is_merged", False) @@ -7360,6 +7368,7 @@ class Change(Branch): "compat_needed_by_changes": self.git_needed_by_changes, "commit_needs_changes": self.commit_needs_changes, "topic_needs_changes": self.topic_needs_changes, + "topic_query_ltime": self.topic_query_ltime, "is_current_patchset": self.is_current_patchset, "can_merge": self.can_merge, "is_merged": self.is_merged, diff --git a/zuul/model_api.py b/zuul/model_api.py index 952a05f21f..f3f789d50d 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # doc/source/developer/model-changelog.rst -MODEL_API = 31 +MODEL_API = 32