Fix race with topic changes

When a gerrit event is submitted, we update the change cache first,
then set the event ltime in the scheduler right before we forward the
event.  That serves to make the cache ltime always less than the event
ltime, which means we should always update topic dependencies in the
pipeline (there is a check for this when forwarding events to
pipelines).

However, there could be a race with two schedulers and a lot of events:

[1] iterate over trigger event 1 for change A, set its ltime
[2] receive new event 2 for change A, update cache
[1] forward trigger event 1 to pipelines
[1] tell pipelines to update change dependencies
[1] pipeline sees that cache is newer than event 1 and skips update

This adds a test which exercises this race (via a fairly contrived,
but plausible, example).

The race is corrected by storing an extra timestamp: the last time that
we performed a topic query for a change.  This will ensure that even if
the driver has updated the change cache for a change with a topic-unaware
query after our trigger event, we will know that the last topic-aware
query was performed earlier than that and needs to be updated.

(Note, we also store the timestamp as a bit of metadata in the local
in-memory query cache -- this is separate than the above and is simply
used to ensure that the correct timestamp value is copied along with
the results if the query is run more than once in a single pipeline
processing run.)

Change-Id: I11761046c24a922939594e81d4312d90f976f9ed
This commit is contained in:
James E. Blair 2024-10-14 16:44:55 -07:00
parent 783a4c084a
commit 475e11cd81
6 changed files with 150 additions and 6 deletions

View File

@ -227,3 +227,9 @@ Version 31
----------
:Prior Zuul version: 11.0.1
:Description: Upgrade sharded zkobject format.
Version 32
----------
:Prior Zuul version: 11.1.0
:Description: Add topic query timestamp.
Affects schedulers.

View File

@ -15,11 +15,14 @@
# along with this software. If not, see <http://www.gnu.org/licenses/>.
from collections import Counter
import fixtures
import re
import textwrap
import threading
import json
from zuul.model import PromoteEvent
import zuul.scheduler
from tests.base import (
iterate_timeout,
@ -4212,6 +4215,65 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(B.data["status"], "MERGED")
self.assertEqual(X.data["status"], "ABANDONED")
@gerrit_config(submit_whole_topic=True)
def test_abandoned_change_refresh_changes(self):
# Test that we can re-enqueue a topic cycle after abandoning a
# change (out of band). This adds extra events to refresh all
# of the changes while the re-enqueue is in progress in order
# to trigger a race condition.
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
topic='test-topic')
B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
topic='test-topic',
files={'conflict': 'B'})
X = self.fake_gerrit.addFakeChange('org/project2', "master", "X",
topic='test-topic',
files={'conflict': 'X'})
A.addApproval("Code-Review", 2)
B.addApproval("Code-Review", 2)
X.addApproval("Code-Review", 2)
X.addApproval("Approved", 1)
B.addApproval("Approved", 1)
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
self.waitUntilSettled()
orig_forward = zuul.scheduler.Scheduler._forward_trigger_event
stop_event = threading.Event()
go_event = threading.Event()
def patched_forward(obj, *args, **kw):
stop_event.set()
go_event.wait()
return orig_forward(obj, *args, **kw)
self.useFixture(fixtures.MonkeyPatch(
'zuul.scheduler.Scheduler._forward_trigger_event',
patched_forward))
X.setAbandoned()
X.addApproval("Approved", -1)
self.waitUntilSettled("abandoned")
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
stop_event.wait()
stop_event.clear()
# The scheduler is waiting to forward the approved event; send
# another event that refreshes the cache:
self.fake_gerrit.addEvent(A.getChangeCommentEvent(1, 'testcomment'))
self.fake_gerrit.addEvent(B.getChangeCommentEvent(1, 'testcomment'))
self.fake_gerrit.addEvent(X.getChangeCommentEvent(1, 'testcomment'))
for x in iterate_timeout(30, 'events to be submitted'):
if len(self.scheds.first.sched.trigger_events['tenant-one']) == 4:
break
go_event.set()
stop_event.wait()
self.waitUntilSettled("reapproved")
self.assertEqual(A.data["status"], "MERGED")
self.assertEqual(B.data["status"], "MERGED")
self.assertEqual(X.data["status"], "ABANDONED")
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"

View File

@ -22,6 +22,7 @@ from zuul.zk.components import (
from tests.base import (
BaseTestCase,
ZuulTestCase,
gerrit_config,
simple_layout,
iterate_timeout,
model_version,
@ -64,6 +65,54 @@ class TestModelUpgrade(ZuulTestCase):
break
class TestModelUpgradeGerritCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
tenant_config_file = "config/circular-dependencies/main.yaml"
@model_version(31)
@gerrit_config(submit_whole_topic=True)
def test_model_31_32(self):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
topic='test-topic')
B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
topic='test-topic')
A.addApproval("Code-Review", 2)
B.addApproval("Code-Review", 2)
B.addApproval("Approved", 1)
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
self.waitUntilSettled()
first = self.scheds.first
second = self.createScheduler()
second.start()
self.assertEqual(len(self.scheds), 2)
for _ in iterate_timeout(10, "until priming is complete"):
state_one = first.sched.local_layout_state.get("tenant-one")
if state_one:
break
for _ in iterate_timeout(
10, "all schedulers to have the same layout state"):
if (second.sched.local_layout_state.get(
"tenant-one") == state_one):
break
self.model_test_component_info.model_api = 32
with first.sched.layout_update_lock, first.sched.run_handler_lock:
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
self.waitUntilSettled(matcher=[second])
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(A.data["status"], "MERGED")
self.assertEqual(B.data["status"], "MERGED")
class TestGithubModelUpgrade(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
scheduler_count = 1

View File

@ -29,10 +29,12 @@ import zuul.lib.tracing as tracing
from zuul.model import (
Change, PipelineState, PipelineChangeList,
filter_severity, EnqueueEvent, FalseWithReason,
QueryCacheEntry
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.exceptions import LockException
from zuul.zk.locks import pipeline_lock
from zuul.zk.components import COMPONENT_REGISTRY
from opentelemetry import trace
@ -1054,11 +1056,23 @@ class PipelineManager(metaclass=ABCMeta):
or change.cache_stat.mzxid <= event.zuul_event_ltime
)
if hasattr(event, "zuul_event_ltime"):
if COMPONENT_REGISTRY.model_api < 32:
topic_out_of_date =\
change.cache_stat.mzxid <= event.zuul_event_ltime
else:
topic_out_of_date =\
change.topic_query_ltime <= event.zuul_event_ltime
else:
# The value is unused and doesn't matter because of the
# clause below, but True is the safer value.
topic_out_of_date = True
must_update_topic_deps = (
self.useDependenciesByTopic(change.project) and (
not hasattr(event, "zuul_event_ltime")
or change.topic_needs_changes is None
or change.cache_stat.mzxid <= event.zuul_event_ltime
or topic_out_of_date
)
)
@ -1098,15 +1112,19 @@ class PipelineManager(metaclass=ABCMeta):
log.debug(" Updating topic dependencies for %s", change)
new_topic_needs_changes_keys = []
query_cache_key = (change.project.connection_name, change.topic)
changes_by_topic = query_cache.topic_queries.get(query_cache_key)
if changes_by_topic is None:
cache_entry = query_cache.topic_queries.get(query_cache_key)
if cache_entry is None:
changes_by_topic = source.getChangesByTopic(change.topic)
query_cache.topic_queries[query_cache_key] = changes_by_topic
for dep in changes_by_topic:
cache_entry = QueryCacheEntry(
self.sched.zk_client.getCurrentLtime(),
changes_by_topic)
query_cache.topic_queries[query_cache_key] = cache_entry
for dep in cache_entry.results:
if dep and (not dep.is_merged) and dep is not change:
log.debug(" Adding dependency: %s", dep)
new_topic_needs_changes_keys.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes_keys
update_attrs['topic_query_ltime'] = cache_entry.ltime
if update_attrs:
source.setChangeAttributes(change, **update_attrs)

View File

@ -147,6 +147,12 @@ def filter_severity(error_list, errors=True, warnings=True):
)]
class QueryCacheEntry:
def __init__(self, ltime, results):
self.ltime = ltime
self.results = results
class QueryCache:
"""Cache query information while processing dependencies"""
@ -7226,6 +7232,7 @@ class Change(Branch):
# drivers in theory, but Gerrit only in practice for
# emulate-submit-whole-topic):
self.topic_needs_changes = None
self.topic_query_ltime = 0
self.is_current_patchset = True
self.can_merge = False
@ -7261,6 +7268,7 @@ class Change(Branch):
else data.get("commit_needs_changes", [])
)
self.topic_needs_changes = data.get("topic_needs_changes")
self.topic_query_ltime = data.get("topic_query_ltime", 0)
self.is_current_patchset = data.get("is_current_patchset", True)
self.can_merge = data.get("can_merge", False)
self.is_merged = data.get("is_merged", False)
@ -7284,6 +7292,7 @@ class Change(Branch):
"compat_needed_by_changes": self.git_needed_by_changes,
"commit_needs_changes": self.commit_needs_changes,
"topic_needs_changes": self.topic_needs_changes,
"topic_query_ltime": self.topic_query_ltime,
"is_current_patchset": self.is_current_patchset,
"can_merge": self.can_merge,
"is_merged": self.is_merged,

View File

@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
MODEL_API = 31
MODEL_API = 32