Merge "Fix race with topic changes"
This commit is contained in:
commit
14db262e16
@ -227,3 +227,9 @@ Version 31
|
||||
----------
|
||||
:Prior Zuul version: 11.0.1
|
||||
:Description: Upgrade sharded zkobject format.
|
||||
|
||||
Version 32
|
||||
----------
|
||||
:Prior Zuul version: 11.1.0
|
||||
:Description: Add topic query timestamp.
|
||||
Affects schedulers.
|
||||
|
@ -15,11 +15,14 @@
|
||||
# along with this software. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from collections import Counter
|
||||
import fixtures
|
||||
import re
|
||||
import textwrap
|
||||
import threading
|
||||
import json
|
||||
|
||||
from zuul.model import PromoteEvent
|
||||
import zuul.scheduler
|
||||
|
||||
from tests.base import (
|
||||
iterate_timeout,
|
||||
@ -4212,6 +4215,65 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
self.assertEqual(B.data["status"], "MERGED")
|
||||
self.assertEqual(X.data["status"], "ABANDONED")
|
||||
|
||||
@gerrit_config(submit_whole_topic=True)
|
||||
def test_abandoned_change_refresh_changes(self):
|
||||
# Test that we can re-enqueue a topic cycle after abandoning a
|
||||
# change (out of band). This adds extra events to refresh all
|
||||
# of the changes while the re-enqueue is in progress in order
|
||||
# to trigger a race condition.
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
|
||||
topic='test-topic')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
|
||||
topic='test-topic',
|
||||
files={'conflict': 'B'})
|
||||
X = self.fake_gerrit.addFakeChange('org/project2', "master", "X",
|
||||
topic='test-topic',
|
||||
files={'conflict': 'X'})
|
||||
|
||||
A.addApproval("Code-Review", 2)
|
||||
B.addApproval("Code-Review", 2)
|
||||
X.addApproval("Code-Review", 2)
|
||||
X.addApproval("Approved", 1)
|
||||
B.addApproval("Approved", 1)
|
||||
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
orig_forward = zuul.scheduler.Scheduler._forward_trigger_event
|
||||
stop_event = threading.Event()
|
||||
go_event = threading.Event()
|
||||
|
||||
def patched_forward(obj, *args, **kw):
|
||||
stop_event.set()
|
||||
go_event.wait()
|
||||
return orig_forward(obj, *args, **kw)
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.scheduler.Scheduler._forward_trigger_event',
|
||||
patched_forward))
|
||||
|
||||
X.setAbandoned()
|
||||
X.addApproval("Approved", -1)
|
||||
self.waitUntilSettled("abandoned")
|
||||
|
||||
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
|
||||
stop_event.wait()
|
||||
stop_event.clear()
|
||||
# The scheduler is waiting to forward the approved event; send
|
||||
# another event that refreshes the cache:
|
||||
self.fake_gerrit.addEvent(A.getChangeCommentEvent(1, 'testcomment'))
|
||||
self.fake_gerrit.addEvent(B.getChangeCommentEvent(1, 'testcomment'))
|
||||
self.fake_gerrit.addEvent(X.getChangeCommentEvent(1, 'testcomment'))
|
||||
for x in iterate_timeout(30, 'events to be submitted'):
|
||||
if len(self.scheds.first.sched.trigger_events['tenant-one']) == 4:
|
||||
break
|
||||
go_event.set()
|
||||
stop_event.wait()
|
||||
self.waitUntilSettled("reapproved")
|
||||
|
||||
self.assertEqual(A.data["status"], "MERGED")
|
||||
self.assertEqual(B.data["status"], "MERGED")
|
||||
self.assertEqual(X.data["status"], "ABANDONED")
|
||||
|
||||
|
||||
class TestGithubCircularDependencies(ZuulTestCase):
|
||||
config_file = "zuul-gerrit-github.conf"
|
||||
|
@ -22,6 +22,7 @@ from zuul.zk.components import (
|
||||
from tests.base import (
|
||||
BaseTestCase,
|
||||
ZuulTestCase,
|
||||
gerrit_config,
|
||||
simple_layout,
|
||||
iterate_timeout,
|
||||
model_version,
|
||||
@ -64,6 +65,54 @@ class TestModelUpgrade(ZuulTestCase):
|
||||
break
|
||||
|
||||
|
||||
class TestModelUpgradeGerritCircularDependencies(ZuulTestCase):
|
||||
config_file = "zuul-gerrit-github.conf"
|
||||
tenant_config_file = "config/circular-dependencies/main.yaml"
|
||||
|
||||
@model_version(31)
|
||||
@gerrit_config(submit_whole_topic=True)
|
||||
def test_model_31_32(self):
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
|
||||
topic='test-topic')
|
||||
B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
|
||||
topic='test-topic')
|
||||
|
||||
A.addApproval("Code-Review", 2)
|
||||
B.addApproval("Code-Review", 2)
|
||||
B.addApproval("Approved", 1)
|
||||
|
||||
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
first = self.scheds.first
|
||||
second = self.createScheduler()
|
||||
second.start()
|
||||
self.assertEqual(len(self.scheds), 2)
|
||||
for _ in iterate_timeout(10, "until priming is complete"):
|
||||
state_one = first.sched.local_layout_state.get("tenant-one")
|
||||
if state_one:
|
||||
break
|
||||
|
||||
for _ in iterate_timeout(
|
||||
10, "all schedulers to have the same layout state"):
|
||||
if (second.sched.local_layout_state.get(
|
||||
"tenant-one") == state_one):
|
||||
break
|
||||
|
||||
self.model_test_component_info.model_api = 32
|
||||
with first.sched.layout_update_lock, first.sched.run_handler_lock:
|
||||
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
|
||||
self.waitUntilSettled(matcher=[second])
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(A.data["status"], "MERGED")
|
||||
self.assertEqual(B.data["status"], "MERGED")
|
||||
|
||||
|
||||
class TestGithubModelUpgrade(ZuulTestCase):
|
||||
config_file = "zuul-gerrit-github.conf"
|
||||
scheduler_count = 1
|
||||
|
@ -29,10 +29,12 @@ import zuul.lib.tracing as tracing
|
||||
from zuul.model import (
|
||||
Change, PipelineState, PipelineChangeList,
|
||||
filter_severity, EnqueueEvent, FalseWithReason,
|
||||
QueryCacheEntry
|
||||
)
|
||||
from zuul.zk.change_cache import ChangeKey
|
||||
from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.locks import pipeline_lock
|
||||
from zuul.zk.components import COMPONENT_REGISTRY
|
||||
|
||||
from opentelemetry import trace
|
||||
|
||||
@ -1054,11 +1056,23 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
or change.cache_stat.mzxid <= event.zuul_event_ltime
|
||||
)
|
||||
|
||||
if hasattr(event, "zuul_event_ltime"):
|
||||
if COMPONENT_REGISTRY.model_api < 32:
|
||||
topic_out_of_date =\
|
||||
change.cache_stat.mzxid <= event.zuul_event_ltime
|
||||
else:
|
||||
topic_out_of_date =\
|
||||
change.topic_query_ltime <= event.zuul_event_ltime
|
||||
else:
|
||||
# The value is unused and doesn't matter because of the
|
||||
# clause below, but True is the safer value.
|
||||
topic_out_of_date = True
|
||||
|
||||
must_update_topic_deps = (
|
||||
self.useDependenciesByTopic(change.project) and (
|
||||
not hasattr(event, "zuul_event_ltime")
|
||||
or change.topic_needs_changes is None
|
||||
or change.cache_stat.mzxid <= event.zuul_event_ltime
|
||||
or topic_out_of_date
|
||||
)
|
||||
)
|
||||
|
||||
@ -1098,15 +1112,19 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
log.debug(" Updating topic dependencies for %s", change)
|
||||
new_topic_needs_changes_keys = []
|
||||
query_cache_key = (change.project.connection_name, change.topic)
|
||||
changes_by_topic = query_cache.topic_queries.get(query_cache_key)
|
||||
if changes_by_topic is None:
|
||||
cache_entry = query_cache.topic_queries.get(query_cache_key)
|
||||
if cache_entry is None:
|
||||
changes_by_topic = source.getChangesByTopic(change.topic)
|
||||
query_cache.topic_queries[query_cache_key] = changes_by_topic
|
||||
for dep in changes_by_topic:
|
||||
cache_entry = QueryCacheEntry(
|
||||
self.sched.zk_client.getCurrentLtime(),
|
||||
changes_by_topic)
|
||||
query_cache.topic_queries[query_cache_key] = cache_entry
|
||||
for dep in cache_entry.results:
|
||||
if dep and (not dep.is_merged) and dep is not change:
|
||||
log.debug(" Adding dependency: %s", dep)
|
||||
new_topic_needs_changes_keys.append(dep.cache_key)
|
||||
update_attrs['topic_needs_changes'] = new_topic_needs_changes_keys
|
||||
update_attrs['topic_query_ltime'] = cache_entry.ltime
|
||||
|
||||
if update_attrs:
|
||||
source.setChangeAttributes(change, **update_attrs)
|
||||
|
@ -147,6 +147,12 @@ def filter_severity(error_list, errors=True, warnings=True):
|
||||
)]
|
||||
|
||||
|
||||
class QueryCacheEntry:
|
||||
def __init__(self, ltime, results):
|
||||
self.ltime = ltime
|
||||
self.results = results
|
||||
|
||||
|
||||
class QueryCache:
|
||||
"""Cache query information while processing dependencies"""
|
||||
|
||||
@ -7302,6 +7308,7 @@ class Change(Branch):
|
||||
# drivers in theory, but Gerrit only in practice for
|
||||
# emulate-submit-whole-topic):
|
||||
self.topic_needs_changes = None
|
||||
self.topic_query_ltime = 0
|
||||
|
||||
self.is_current_patchset = True
|
||||
self.can_merge = False
|
||||
@ -7337,6 +7344,7 @@ class Change(Branch):
|
||||
else data.get("commit_needs_changes", [])
|
||||
)
|
||||
self.topic_needs_changes = data.get("topic_needs_changes")
|
||||
self.topic_query_ltime = data.get("topic_query_ltime", 0)
|
||||
self.is_current_patchset = data.get("is_current_patchset", True)
|
||||
self.can_merge = data.get("can_merge", False)
|
||||
self.is_merged = data.get("is_merged", False)
|
||||
@ -7360,6 +7368,7 @@ class Change(Branch):
|
||||
"compat_needed_by_changes": self.git_needed_by_changes,
|
||||
"commit_needs_changes": self.commit_needs_changes,
|
||||
"topic_needs_changes": self.topic_needs_changes,
|
||||
"topic_query_ltime": self.topic_query_ltime,
|
||||
"is_current_patchset": self.is_current_patchset,
|
||||
"can_merge": self.can_merge,
|
||||
"is_merged": self.is_merged,
|
||||
|
@ -14,4 +14,4 @@
|
||||
|
||||
# When making ZK schema changes, increment this and add a record to
|
||||
# doc/source/developer/model-changelog.rst
|
||||
MODEL_API = 31
|
||||
MODEL_API = 32
|
||||
|
Loading…
x
Reference in New Issue
Block a user