Merge "Expand the query cache scope to encompass multiple events"

This commit is contained in:
Zuul
2024-06-14 00:58:32 +00:00
committed by Gerrit Code Review
3 changed files with 7 additions and 6 deletions

View File

@@ -3576,11 +3576,11 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(0, counters[('changes', '1', 'submitted_together')])
self.assertEqual(0, counters[('changes', '2', 'submitted_together')])
self.assertEqual(0, counters[('changes', '3', 'submitted_together')])
# This query happens once for each event.
# This query happens once for each event (but is cached).
# * A+B+C: 3x scheduler, 0x pipeline
qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
'q=status%3Aopen%20topic%3A%22test-topic%22')
self.assertEqual(3, counters[('changes', qstring)])
self.assertEqual(1, counters[('changes', qstring)])
self.assertHistory([
dict(name="project-job", changes="3,1 2,1 1,1"),
dict(name="project1-job", changes="3,1 2,1 1,1"),

View File

@@ -9429,7 +9429,7 @@ class TestEventProcessing(ZuulTestCase):
orig = zuul.scheduler.Scheduler._forward_trigger_event
def patched_forward(obj, *args, **kw):
if args[1].name == 'tenant-one':
if args[2].name == 'tenant-one':
raise Exception("test")
return orig(obj, *args, **kw)

View File

@@ -2429,6 +2429,7 @@ class Scheduler(threading.Thread):
self.log.debug("Finished connection cache maintenance")
def process_tenant_trigger_queue(self, tenant):
query_cache = QueryCache()
try:
with trigger_queue_lock(
self.zk_client, tenant.name, blocking=False
@@ -2466,7 +2467,8 @@ class Scheduler(threading.Thread):
links=[
trace.Link(trigger_span.get_span_context())
]):
self._forward_trigger_event(event, tenant)
self._forward_trigger_event(query_cache,
event, tenant)
except Exception:
log.exception("Unable to forward event %s "
"to tenant %s", event, tenant.name)
@@ -2477,7 +2479,7 @@ class Scheduler(threading.Thread):
self.log.debug("Skipping locked trigger event queue in tenant %s",
tenant.name)
def _forward_trigger_event(self, event, tenant):
def _forward_trigger_event(self, query_cache, event, tenant):
log = get_annotated_logger(self.log, event.zuul_event_id)
trusted, project = tenant.getProject(event.canonical_project_name)
@@ -2565,7 +2567,6 @@ class Scheduler(threading.Thread):
span.set_attribute("reconfigure_tenant", reconfigure_tenant)
event.span_context = tracing.getSpanContext(span)
query_cache = QueryCache()
for pipeline in tenant.layout.pipelines.values():
# For most kinds of dependencies, it's sufficient to check
# if this change is already in the pipeline, because the