From 7fc68db6b44efcc8a4fc72ff9d092156fb0d7524 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 16 May 2024 15:15:07 -0700 Subject: [PATCH] Make the query cache continuous This moves the query cache into the scheduler and persists it permanently (ie, across pipeline runs) except that each time we encounter an event newer than the last time we cleared the cache, we will clear it again. This means we should start with an empty cache each time a new event arrives, but if a bunch of events arrive in succession, we will use the same cached values. This improves performance in topic dependencies in the slow case (ie, developers adding changes one at a time). The number of queries in this case will be: queries = 5*count 100 changes gives 500 queries This does not alter the performance of the fast case (when all changes in a topic appear at once). That remains: queries = 4*count +1 100 changes gives 401 queries Change-Id: Iede8dc3d716279a07310726c647d8646e52ecc98 --- tests/unit/test_circular_dependencies.py | 32 +++++++++++------------- tests/unit/test_scheduler.py | 2 +- zuul/manager/__init__.py | 23 ++++++++--------- zuul/model.py | 16 +++++++++++- zuul/scheduler.py | 10 +++----- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 802ff570a5..5138bafd3a 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -1573,10 +1573,11 @@ class TestGerritCircularDependencies(ZuulTestCase): C.addApproval("Code-Review", 2) D.addApproval("Code-Review", 2) - self.fake_gerrit.addEvent(A.addApproval("Approved", 1)) - self.fake_gerrit.addEvent(B.addApproval("Approved", 1)) - self.fake_gerrit.addEvent(C.addApproval("Approved", 1)) - self.fake_gerrit.addEvent(D.addApproval("Approved", 1)) + # Add the approvals first, then add the events to ensure we + # are not racing gerrit approval changes. + events = [c.addApproval("Approved", 1) for c in [A, B, C, D]] + for event in events: + self.fake_gerrit.addEvent(event) self.waitUntilSettled() self.assertEqual(A.data["status"], "NEW") @@ -3499,15 +3500,10 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(0, counters[('changes', '1', 'submitted_together')]) self.assertEqual(0, counters[('changes', '2', 'submitted_together')]) self.assertEqual(0, counters[('changes', '3', 'submitted_together')]) - # This query happens once for each event in the scheduler, - # then once for each change in the pipeline if there's more - # than one (cache is in play here). - # * A: 1x scheduler, 0x pipeline - # * A+B: 1x scheduler, 1x pipeline - # * A+B+C: 1x scheduler, 1x pipeline + # This query happens once for each event in the scheduler. qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&' 'q=status%3Aopen%20topic%3A%22test-topic%22') - self.assertEqual(5, counters[('changes', qstring)]) + self.assertEqual(3, counters[('changes', qstring)]) self.assertHistory([ dict(name="project-job", changes="1,1"), @@ -3653,13 +3649,13 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(len(C.patchsets[-1]["approvals"]), 1) self.assertEqual(C.patchsets[-1]["approvals"][0]["type"], "Verified") self.assertEqual(C.patchsets[-1]["approvals"][0]["value"], "1") - self.assertEqual(A.queried, 3) - self.assertEqual(B.queried, 3) - self.assertEqual(C.queried, 3) - self.assertEqual(D.queried, 3) - self.assertEqual(E.queried, 3) - self.assertEqual(F.queried, 3) - self.assertEqual(G.queried, 3) + self.assertEqual(A.queried, 2) + self.assertEqual(B.queried, 2) + self.assertEqual(C.queried, 2) + self.assertEqual(D.queried, 2) + self.assertEqual(E.queried, 2) + self.assertEqual(F.queried, 2) + self.assertEqual(G.queried, 2) self.assertHistory([ dict(name="project1-job", result="SUCCESS", changes="1,1 2,1 3,1 4,1 5,1 6,1 7,1", diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 95563b0cdf..f9034a1a15 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -9429,7 +9429,7 @@ class TestEventProcessing(ZuulTestCase): orig = zuul.scheduler.Scheduler._forward_trigger_event def patched_forward(obj, *args, **kw): - if args[2].name == 'tenant-one': + if args[1].name == 'tenant-one': raise Exception("test") return orig(obj, *args, **kw) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 03fab9884b..4c042b5c98 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,7 +28,7 @@ from zuul.lib.tarjan import strongly_connected_components import zuul.lib.tracing as tracing from zuul.model import ( Change, PipelineState, PipelineChangeList, - filter_severity, EnqueueEvent, QueryCache + filter_severity, EnqueueEvent ) from zuul.zk.change_cache import ChangeKey from zuul.zk.exceptions import LockException @@ -235,7 +235,6 @@ class PipelineManager(metaclass=ABCMeta): def resolveChangeKeys(self, change_keys): resolved_changes = [] - query_cache = QueryCache() for key in change_keys: change = self._change_cache.get(key.reference) if change is None: @@ -252,8 +251,7 @@ class PipelineManager(metaclass=ABCMeta): and self.useDependenciesByTopic(change.project)) if (update_commit_dependencies or update_topic_dependencies): - self.updateCommitDependencies( - query_cache, change, event=None) + self.updateCommitDependencies(change, event=None) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -332,7 +330,7 @@ class PipelineManager(metaclass=ABCMeta): if not isinstance(change, model.Change): return - query_cache = QueryCache() + self.sched.query_cache.clearIfOlderThan(event) to_refresh = set() for item in self.pipeline.getAllItems(): for item_change in item.changes: @@ -346,7 +344,7 @@ class PipelineManager(metaclass=ABCMeta): to_refresh.add(item_change) for existing_change in to_refresh: - self.updateCommitDependencies(query_cache, existing_change, event) + self.updateCommitDependencies(existing_change, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: @@ -849,7 +847,7 @@ class PipelineManager(metaclass=ABCMeta): ) - set(cycle) def getDependencyGraph(self, change, dependency_graph, event, - update_deps=False, query_cache=None, + update_deps=False, history=None, quiet=False, indent=''): log = get_annotated_logger(self.log, event) if not quiet: @@ -860,11 +858,10 @@ class PipelineManager(metaclass=ABCMeta): return if history is None: history = set() - if query_cache is None: - query_cache = QueryCache() + self.sched.query_cache.clearIfOlderThan(event) history.add(change) if update_deps: - self.updateCommitDependencies(query_cache, change, event) + self.updateCommitDependencies(change, event) for needed_change in self.resolveChangeReferences( change.getNeedsChanges( self.useDependenciesByTopic(change.project))): @@ -893,7 +890,7 @@ class PipelineManager(metaclass=ABCMeta): if needed_change not in history: self.getDependencyGraph(needed_change, dependency_graph, event, - update_deps, query_cache, + update_deps, history, quiet, indent + ' ') def getQueueConfig(self, project): @@ -999,8 +996,10 @@ class PipelineManager(metaclass=ABCMeta): self.pipeline.tenant.name][other_pipeline.name ].put_supercede(event) - def updateCommitDependencies(self, query_cache, change, event): + def updateCommitDependencies(self, change, event): log = get_annotated_logger(self.log, event) + query_cache = self.sched.query_cache + query_cache.clearIfOlderThan(event) must_update_commit_deps = ( not hasattr(event, "zuul_event_ltime") diff --git a/zuul/model.py b/zuul/model.py index 891663adc6..26eed4fb75 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -151,9 +151,23 @@ def filter_severity(error_list, errors=True, warnings=True): class QueryCache: """Cache query information while processing dependencies""" - def __init__(self): + def __init__(self, zk_client): + self.zk_client = zk_client + self.ltime = 0 + self.clear(0) + + def clear(self, ltime): + self.ltime = ltime self.topic_queries = {} + def clearIfOlderThan(self, event): + if not hasattr(event, "zuul_event_ltime"): + return + ltime = event.zuul_event_ltime + if ltime > self.ltime: + ltime = self.zk_client.getCurrentLtime() + self.clear(ltime) + class ZuulMark: # The yaml mark class differs between the C and python versions. diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 00c4a8e8a4..db84064a93 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -270,6 +270,7 @@ class Scheduler(threading.Thread): self.zk_client = ZooKeeperClient.fromConfig(self.config) self.zk_client.connect() + self.query_cache = QueryCache(self.zk_client) self.system = ZuulSystem(self.zk_client) self.zuul_version = get_version_string() @@ -2407,7 +2408,6 @@ class Scheduler(threading.Thread): self.log.debug("Finished connection cache maintenance") def process_tenant_trigger_queue(self, tenant): - query_cache = QueryCache() try: with trigger_queue_lock( self.zk_client, tenant.name, blocking=False @@ -2445,8 +2445,7 @@ class Scheduler(threading.Thread): links=[ trace.Link(trigger_span.get_span_context()) ]): - self._forward_trigger_event(query_cache, - event, tenant) + self._forward_trigger_event(event, tenant) except Exception: log.exception("Unable to forward event %s " "to tenant %s", event, tenant.name) @@ -2457,7 +2456,7 @@ class Scheduler(threading.Thread): self.log.debug("Skipping locked trigger event queue in tenant %s", tenant.name) - def _forward_trigger_event(self, query_cache, event, tenant): + def _forward_trigger_event(self, event, tenant): log = get_annotated_logger(self.log, event.zuul_event_id) trusted, project = tenant.getProject(event.canonical_project_name) @@ -2562,8 +2561,7 @@ class Scheduler(threading.Thread): # manager, but the result of the work goes into the change # cache, so it's not wasted; it's just less parallelized. if isinstance(change, Change): - pipeline.manager.updateCommitDependencies(query_cache, - change, event) + pipeline.manager.updateCommitDependencies(change, event) if ( pipeline.manager.eventMatches(event, change) or pipeline.manager.isChangeRelevantToPipeline(change)