diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 802ff570a5..5138bafd3a 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -1573,10 +1573,11 @@ class TestGerritCircularDependencies(ZuulTestCase): C.addApproval("Code-Review", 2) D.addApproval("Code-Review", 2) - self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) - self.fake_gerrit.addEvent(B.addApproval("Approved", 1)) - self.fake_gerrit.addEvent(C.addApproval("Approved", 1)) - self.fake_gerrit.addEvent(D.addApproval("Approved", 1)) + # Add the approvals first, then add the events to ensure we + # are not racing gerrit approval changes. + events = [c.addApproval("Approved", 1) for c in [A, B, C, D]] + for event in events: + self.fake_gerrit.addEvent(event) self.waitUntilSettled() self.assertEqual(A.data["status"], "NEW") @@ -3499,15 +3500,10 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(0, counters[('changes', '1', 'submitted_together')]) self.assertEqual(0, counters[('changes', '2', 'submitted_together')]) self.assertEqual(0, counters[('changes', '3', 'submitted_together')]) - # This query happens once for each event in the scheduler, - # then once for each change in the pipeline if there's more - # than one (cache is in play here). - # * A: 1x scheduler, 0x pipeline - # * A+B: 1x scheduler, 1x pipeline - # * A+B+C: 1x scheduler, 1x pipeline + # This query happens once for each event in the scheduler. qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&' 'q=status%3Aopen%20topic%3A%22test-topic%22') - self.assertEqual(5, counters[('changes', qstring)]) + self.assertEqual(3, counters[('changes', qstring)]) self.assertHistory([ dict(name="project-job", changes="1,1"), @@ -3653,13 +3649,13 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(len(C.patchsets[-1]["approvals"]), 1) self.assertEqual(C.patchsets[-1]["approvals"][0]["type"], "Verified") self.assertEqual(C.patchsets[-1]["approvals"][0]["value"], "1") - self.assertEqual(A.queried, 3) - self.assertEqual(B.queried, 3) - self.assertEqual(C.queried, 3) - self.assertEqual(D.queried, 3) - self.assertEqual(E.queried, 3) - self.assertEqual(F.queried, 3) - self.assertEqual(G.queried, 3) + self.assertEqual(A.queried, 2) + self.assertEqual(B.queried, 2) + self.assertEqual(C.queried, 2) + self.assertEqual(D.queried, 2) + self.assertEqual(E.queried, 2) + self.assertEqual(F.queried, 2) + self.assertEqual(G.queried, 2) self.assertHistory([ dict(name="project1-job", result="SUCCESS", changes="1,1 2,1 3,1 4,1 5,1 6,1 7,1", diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 95563b0cdf..f9034a1a15 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -9429,7 +9429,7 @@ class TestEventProcessing(ZuulTestCase): orig = zuul.scheduler.Scheduler._forward_trigger_event def patched_forward(obj, *args, **kw): - if args[2].name == 'tenant-one': + if args[1].name == 'tenant-one': raise Exception("test") return orig(obj, *args, **kw) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 03fab9884b..4c042b5c98 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,7 +28,7 @@ from zuul.lib.tarjan import strongly_connected_components import zuul.lib.tracing as tracing from zuul.model import ( Change, PipelineState, PipelineChangeList, - filter_severity, EnqueueEvent, QueryCache + filter_severity, EnqueueEvent ) from zuul.zk.change_cache import ChangeKey from zuul.zk.exceptions import LockException @@ -235,7 +235,6 @@ class PipelineManager(metaclass=ABCMeta): def resolveChangeKeys(self, change_keys): resolved_changes = [] - query_cache = QueryCache() for key in change_keys: change = self._change_cache.get(key.reference) if change is None: @@ -252,8 +251,7 @@ class PipelineManager(metaclass=ABCMeta): and self.useDependenciesByTopic(change.project)) if (update_commit_dependencies or update_topic_dependencies): - self.updateCommitDependencies( - query_cache, change, event=None) + self.updateCommitDependencies(change, event=None) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -332,7 +330,7 @@ class PipelineManager(metaclass=ABCMeta): if not isinstance(change, model.Change): return - query_cache = QueryCache() + self.sched.query_cache.clearIfOlderThan(event) to_refresh = set() for item in self.pipeline.getAllItems(): for item_change in item.changes: @@ -346,7 +344,7 @@ class PipelineManager(metaclass=ABCMeta): to_refresh.add(item_change) for existing_change in to_refresh: - self.updateCommitDependencies(query_cache, existing_change, event) + self.updateCommitDependencies(existing_change, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: @@ -849,7 +847,7 @@ class PipelineManager(metaclass=ABCMeta): ) - set(cycle) def getDependencyGraph(self, change, dependency_graph, event, - update_deps=False, query_cache=None, + update_deps=False, history=None, quiet=False, indent=''): log = get_annotated_logger(self.log, event) if not quiet: @@ -860,11 +858,10 @@ class PipelineManager(metaclass=ABCMeta): return if history is None: history = set() - if query_cache is None: - query_cache = QueryCache() + self.sched.query_cache.clearIfOlderThan(event) history.add(change) if update_deps: - self.updateCommitDependencies(query_cache, change, event) + self.updateCommitDependencies(change, event) for needed_change in self.resolveChangeReferences( change.getNeedsChanges( self.useDependenciesByTopic(change.project))): @@ -893,7 +890,7 @@ class PipelineManager(metaclass=ABCMeta): if needed_change not in history: self.getDependencyGraph(needed_change, dependency_graph, event, - update_deps, query_cache, + update_deps, history, quiet, indent + ' ') def getQueueConfig(self, project): @@ -999,8 +996,10 @@ class PipelineManager(metaclass=ABCMeta): self.pipeline.tenant.name][other_pipeline.name ].put_supercede(event) - def updateCommitDependencies(self, query_cache, change, event): + def updateCommitDependencies(self, change, event): log = get_annotated_logger(self.log, event) + query_cache = self.sched.query_cache + query_cache.clearIfOlderThan(event) must_update_commit_deps = ( not hasattr(event, "zuul_event_ltime") diff --git a/zuul/model.py b/zuul/model.py index 891663adc6..26eed4fb75 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -151,9 +151,23 @@ def filter_severity(error_list, errors=True, warnings=True): class QueryCache: """Cache query information while processing dependencies""" - def __init__(self): + def __init__(self, zk_client): + self.zk_client = zk_client + self.ltime = 0 + self.clear(0) + + def clear(self, ltime): + self.ltime = ltime self.topic_queries = {} + def clearIfOlderThan(self, event): + if not hasattr(event, "zuul_event_ltime"): + return + ltime = event.zuul_event_ltime + if ltime > self.ltime: + ltime = self.zk_client.getCurrentLtime() + self.clear(ltime) + class ZuulMark: # The yaml mark class differs between the C and python versions. diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 00c4a8e8a4..db84064a93 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -270,6 +270,7 @@ class Scheduler(threading.Thread): self.zk_client = ZooKeeperClient.fromConfig(self.config) self.zk_client.connect() + self.query_cache = QueryCache(self.zk_client) self.system = ZuulSystem(self.zk_client) self.zuul_version = get_version_string() @@ -2407,7 +2408,6 @@ class Scheduler(threading.Thread): self.log.debug("Finished connection cache maintenance") def process_tenant_trigger_queue(self, tenant): - query_cache = QueryCache() try: with trigger_queue_lock( self.zk_client, tenant.name, blocking=False @@ -2445,8 +2445,7 @@ class Scheduler(threading.Thread): links=[ trace.Link(trigger_span.get_span_context()) ]): - self._forward_trigger_event(query_cache, - event, tenant) + self._forward_trigger_event(event, tenant) except Exception: log.exception("Unable to forward event %s " "to tenant %s", event, tenant.name) @@ -2457,7 +2456,7 @@ class Scheduler(threading.Thread): self.log.debug("Skipping locked trigger event queue in tenant %s", tenant.name) - def _forward_trigger_event(self, query_cache, event, tenant): + def _forward_trigger_event(self, event, tenant): log = get_annotated_logger(self.log, event.zuul_event_id) trusted, project = tenant.getProject(event.canonical_project_name) @@ -2562,8 +2561,7 @@ class Scheduler(threading.Thread): # manager, but the result of the work goes into the change # cache, so it's not wasted; it's just less parallelized. if isinstance(change, Change): - pipeline.manager.updateCommitDependencies(query_cache, - change, event) + pipeline.manager.updateCommitDependencies(change, event) if ( pipeline.manager.eventMatches(event, change) or pipeline.manager.isChangeRelevantToPipeline(change)