From 60224efc0615c533d52259a3c654d7fd4744e841 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 16 May 2024 14:48:25 -0700 Subject: [PATCH] Expand the query cache scope to encompass multiple events The query cache added to the scheduler was previously scoped to a single event (but across all pipelines for that event). Expand the scope so that it encompasses processing all of the pending tenant trigger events. This reduces the number of queries in the case of a fast cycle storm: Old: queries = 5*count New: queries = 4*count + 1 100 changes gives 401 queries. The case where a user uploads or updates changes one at a time is unchanged. Change-Id: I9c1c086a05c13a7a7c10d3318ea334fd7e3c8cd5 --- tests/unit/test_circular_dependencies.py | 4 ++-- tests/unit/test_scheduler.py | 2 +- zuul/scheduler.py | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index 9c67e45ba5..802ff570a5 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -3576,11 +3576,11 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(0, counters[('changes', '1', 'submitted_together')]) self.assertEqual(0, counters[('changes', '2', 'submitted_together')]) self.assertEqual(0, counters[('changes', '3', 'submitted_together')]) - # This query happens once for each event. + # This query happens once for each event (but is cached). # * A+B+C: 3x scheduler, 0x pipeline qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&' 'q=status%3Aopen%20topic%3A%22test-topic%22') - self.assertEqual(3, counters[('changes', qstring)]) + self.assertEqual(1, counters[('changes', qstring)]) self.assertHistory([ dict(name="project-job", changes="3,1 2,1 1,1"), dict(name="project1-job", changes="3,1 2,1 1,1"), diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index f9034a1a15..95563b0cdf 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -9429,7 +9429,7 @@ class TestEventProcessing(ZuulTestCase): orig = zuul.scheduler.Scheduler._forward_trigger_event def patched_forward(obj, *args, **kw): - if args[1].name == 'tenant-one': + if args[2].name == 'tenant-one': raise Exception("test") return orig(obj, *args, **kw) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 908ae8341d..00c4a8e8a4 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2407,6 +2407,7 @@ class Scheduler(threading.Thread): self.log.debug("Finished connection cache maintenance") def process_tenant_trigger_queue(self, tenant): + query_cache = QueryCache() try: with trigger_queue_lock( self.zk_client, tenant.name, blocking=False @@ -2444,7 +2445,8 @@ class Scheduler(threading.Thread): links=[ trace.Link(trigger_span.get_span_context()) ]): - self._forward_trigger_event(event, tenant) + self._forward_trigger_event(query_cache, + event, tenant) except Exception: log.exception("Unable to forward event %s " "to tenant %s", event, tenant.name) @@ -2455,7 +2457,7 @@ class Scheduler(threading.Thread): self.log.debug("Skipping locked trigger event queue in tenant %s", tenant.name) - def _forward_trigger_event(self, event, tenant): + def _forward_trigger_event(self, query_cache, event, tenant): log = get_annotated_logger(self.log, event.zuul_event_id) trusted, project = tenant.getProject(event.canonical_project_name) @@ -2543,7 +2545,6 @@ class Scheduler(threading.Thread): span.set_attribute("reconfigure_tenant", reconfigure_tenant) event.span_context = tracing.getSpanContext(span) - query_cache = QueryCache() for pipeline in tenant.layout.pipelines.values(): # For most kinds of dependencies, it's sufficient to check # if this change is already in the pipeline, because the