From 475e11cd8191334b6c1065e1119dda1706d9a711 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 14 Oct 2024 16:44:55 -0700 Subject: [PATCH] Fix race with topic changes When a gerrit event is submitted, we update the change cache first, then set the event ltime in the scheduler right before we forward the event. That serves to make the cache ltime always less than the event ltime, which means we should always update topic dependencies in the pipeline (there is a check for this when forwarding events to pipelines). However, there could be a race with two schedulers and a lot of events: [1] iterate over trigger event 1 for change A, set its ltime [2] receive new event 2 for change A, update cache [1] forward trigger event 1 to pipelines [1] tell pipelines to update change dependencies [1] pipeline sees that cache is newer than event 1 and skips update This adds a test which exercises this race (via a fairly contrived, but plausible, example). The race is corrected by storing an extra timestamp: the last time that we performed a topic query for a change. This will ensure that even if the driver has updated the change cache for a change with a topic-unaware query after our trigger event, we will know that the last topic-aware query was performed earlier than that and needs to be updated. (Note, we also store the timestamp as a bit of metadata in the local in-memory query cache -- this is separate than the above and is simply used to ensure that the correct timestamp value is copied along with the results if the query is run more than once in a single pipeline processing run.) Change-Id: I11761046c24a922939594e81d4312d90f976f9ed --- doc/source/developer/model-changelog.rst | 6 +++ tests/unit/test_circular_dependencies.py | 62 ++++++++++++++++++++++++ tests/unit/test_model_upgrade.py | 49 +++++++++++++++++++ zuul/manager/__init__.py | 28 +++++++++-- zuul/model.py | 9 ++++ zuul/model_api.py | 2 +- 6 files changed, 150 insertions(+), 6 deletions(-) diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index 584ba1a981..09882ec814 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -227,3 +227,9 @@ Version 31 ---------- :Prior Zuul version: 11.0.1 :Description: Upgrade sharded zkobject format. + +Version 32 +---------- +:Prior Zuul version: 11.1.0 +:Description: Add topic query timestamp. + Affects schedulers. diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 0fa151c37e..0d6d4f6b26 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -15,11 +15,14 @@ # along with this software. If not, see . from collections import Counter +import fixtures import re import textwrap +import threading import json from zuul.model import PromoteEvent +import zuul.scheduler from tests.base import ( iterate_timeout, @@ -4212,6 +4215,65 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(B.data["status"], "MERGED") self.assertEqual(X.data["status"], "ABANDONED") + @gerrit_config(submit_whole_topic=True) + def test_abandoned_change_refresh_changes(self): + # Test that we can re-enqueue a topic cycle after abandoning a + # change (out of band). This adds extra events to refresh all + # of the changes while the re-enqueue is in progress in order + # to trigger a race condition. + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic', + files={'conflict': 'B'}) + X = self.fake_gerrit.addFakeChange('org/project2', "master", "X", + topic='test-topic', + files={'conflict': 'X'}) + + A.addApproval("Code-Review", 2) + B.addApproval("Code-Review", 2) + X.addApproval("Code-Review", 2) + X.addApproval("Approved", 1) + B.addApproval("Approved", 1) + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + self.waitUntilSettled() + + orig_forward = zuul.scheduler.Scheduler._forward_trigger_event + stop_event = threading.Event() + go_event = threading.Event() + + def patched_forward(obj, *args, **kw): + stop_event.set() + go_event.wait() + return orig_forward(obj, *args, **kw) + + self.useFixture(fixtures.MonkeyPatch( + 'zuul.scheduler.Scheduler._forward_trigger_event', + patched_forward)) + + X.setAbandoned() + X.addApproval("Approved", -1) + self.waitUntilSettled("abandoned") + + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + stop_event.wait() + stop_event.clear() + # The scheduler is waiting to forward the approved event; send + # another event that refreshes the cache: + self.fake_gerrit.addEvent(A.getChangeCommentEvent(1, 'testcomment')) + self.fake_gerrit.addEvent(B.getChangeCommentEvent(1, 'testcomment')) + self.fake_gerrit.addEvent(X.getChangeCommentEvent(1, 'testcomment')) + for x in iterate_timeout(30, 'events to be submitted'): + if len(self.scheds.first.sched.trigger_events['tenant-one']) == 4: + break + go_event.set() + stop_event.wait() + self.waitUntilSettled("reapproved") + + self.assertEqual(A.data["status"], "MERGED") + self.assertEqual(B.data["status"], "MERGED") + self.assertEqual(X.data["status"], "ABANDONED") + class TestGithubCircularDependencies(ZuulTestCase): config_file = "zuul-gerrit-github.conf" diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index 08e8a68686..6f840a08e0 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -22,6 +22,7 @@ from zuul.zk.components import ( from tests.base import ( BaseTestCase, ZuulTestCase, + gerrit_config, simple_layout, iterate_timeout, model_version, @@ -64,6 +65,54 @@ class TestModelUpgrade(ZuulTestCase): break +class TestModelUpgradeGerritCircularDependencies(ZuulTestCase): + config_file = "zuul-gerrit-github.conf" + tenant_config_file = "config/circular-dependencies/main.yaml" + + @model_version(31) + @gerrit_config(submit_whole_topic=True) + def test_model_31_32(self): + self.executor_server.hold_jobs_in_build = True + + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic') + + A.addApproval("Code-Review", 2) + B.addApproval("Code-Review", 2) + B.addApproval("Approved", 1) + + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + self.waitUntilSettled() + + first = self.scheds.first + second = self.createScheduler() + second.start() + self.assertEqual(len(self.scheds), 2) + for _ in iterate_timeout(10, "until priming is complete"): + state_one = first.sched.local_layout_state.get("tenant-one") + if state_one: + break + + for _ in iterate_timeout( + 10, "all schedulers to have the same layout state"): + if (second.sched.local_layout_state.get( + "tenant-one") == state_one): + break + + self.model_test_component_info.model_api = 32 + with first.sched.layout_update_lock, first.sched.run_handler_lock: + self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) + self.waitUntilSettled(matcher=[second]) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + self.assertEqual(A.data["status"], "MERGED") + self.assertEqual(B.data["status"], "MERGED") + + class TestGithubModelUpgrade(ZuulTestCase): config_file = "zuul-gerrit-github.conf" scheduler_count = 1 diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 97ad9907a8..fdb004cdfa 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -29,10 +29,12 @@ import zuul.lib.tracing as tracing from zuul.model import ( Change, PipelineState, PipelineChangeList, filter_severity, EnqueueEvent, FalseWithReason, + QueryCacheEntry ) from zuul.zk.change_cache import ChangeKey from zuul.zk.exceptions import LockException from zuul.zk.locks import pipeline_lock +from zuul.zk.components import COMPONENT_REGISTRY from opentelemetry import trace @@ -1054,11 +1056,23 @@ class PipelineManager(metaclass=ABCMeta): or change.cache_stat.mzxid <= event.zuul_event_ltime ) + if hasattr(event, "zuul_event_ltime"): + if COMPONENT_REGISTRY.model_api < 32: + topic_out_of_date =\ + change.cache_stat.mzxid <= event.zuul_event_ltime + else: + topic_out_of_date =\ + change.topic_query_ltime <= event.zuul_event_ltime + else: + # The value is unused and doesn't matter because of the + # clause below, but True is the safer value. + topic_out_of_date = True + must_update_topic_deps = ( self.useDependenciesByTopic(change.project) and ( not hasattr(event, "zuul_event_ltime") or change.topic_needs_changes is None - or change.cache_stat.mzxid <= event.zuul_event_ltime + or topic_out_of_date ) ) @@ -1098,15 +1112,19 @@ class PipelineManager(metaclass=ABCMeta): log.debug(" Updating topic dependencies for %s", change) new_topic_needs_changes_keys = [] query_cache_key = (change.project.connection_name, change.topic) - changes_by_topic = query_cache.topic_queries.get(query_cache_key) - if changes_by_topic is None: + cache_entry = query_cache.topic_queries.get(query_cache_key) + if cache_entry is None: changes_by_topic = source.getChangesByTopic(change.topic) - query_cache.topic_queries[query_cache_key] = changes_by_topic - for dep in changes_by_topic: + cache_entry = QueryCacheEntry( + self.sched.zk_client.getCurrentLtime(), + changes_by_topic) + query_cache.topic_queries[query_cache_key] = cache_entry + for dep in cache_entry.results: if dep and (not dep.is_merged) and dep is not change: log.debug(" Adding dependency: %s", dep) new_topic_needs_changes_keys.append(dep.cache_key) update_attrs['topic_needs_changes'] = new_topic_needs_changes_keys + update_attrs['topic_query_ltime'] = cache_entry.ltime if update_attrs: source.setChangeAttributes(change, **update_attrs) diff --git a/zuul/model.py b/zuul/model.py index 9dbad38d77..236ed9a74e 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -147,6 +147,12 @@ def filter_severity(error_list, errors=True, warnings=True): )] +class QueryCacheEntry: + def __init__(self, ltime, results): + self.ltime = ltime + self.results = results + + class QueryCache: """Cache query information while processing dependencies""" @@ -7226,6 +7232,7 @@ class Change(Branch): # drivers in theory, but Gerrit only in practice for # emulate-submit-whole-topic): self.topic_needs_changes = None + self.topic_query_ltime = 0 self.is_current_patchset = True self.can_merge = False @@ -7261,6 +7268,7 @@ class Change(Branch): else data.get("commit_needs_changes", []) ) self.topic_needs_changes = data.get("topic_needs_changes") + self.topic_query_ltime = data.get("topic_query_ltime", 0) self.is_current_patchset = data.get("is_current_patchset", True) self.can_merge = data.get("can_merge", False) self.is_merged = data.get("is_merged", False) @@ -7284,6 +7292,7 @@ class Change(Branch): "compat_needed_by_changes": self.git_needed_by_changes, "commit_needs_changes": self.commit_needs_changes, "topic_needs_changes": self.topic_needs_changes, + "topic_query_ltime": self.topic_query_ltime, "is_current_patchset": self.is_current_patchset, "can_merge": self.can_merge, "is_merged": self.is_merged, diff --git a/zuul/model_api.py b/zuul/model_api.py index 952a05f21f..f3f789d50d 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # doc/source/developer/model-changelog.rst -MODEL_API = 31 +MODEL_API = 32