Expand the query cache scope to encompass multiple events
The query cache added to the scheduler was previously scoped to a single event (but across all pipelines for that event). Expand the scope so that it encompasses processing all of the pending tenant trigger events. This reduces the number of queries in the case of a fast cycle storm: Old: queries = 5*count New: queries = 4*count + 1 100 changes gives 401 queries. The case where a user uploads or updates changes one at a time is unchanged. Change-Id: I9c1c086a05c13a7a7c10d3318ea334fd7e3c8cd5
This commit is contained in:
@@ -3576,11 +3576,11 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
self.assertEqual(0, counters[('changes', '1', 'submitted_together')])
|
||||
self.assertEqual(0, counters[('changes', '2', 'submitted_together')])
|
||||
self.assertEqual(0, counters[('changes', '3', 'submitted_together')])
|
||||
# This query happens once for each event.
|
||||
# This query happens once for each event (but is cached).
|
||||
# * A+B+C: 3x scheduler, 0x pipeline
|
||||
qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
|
||||
'q=status%3Aopen%20topic%3A%22test-topic%22')
|
||||
self.assertEqual(3, counters[('changes', qstring)])
|
||||
self.assertEqual(1, counters[('changes', qstring)])
|
||||
self.assertHistory([
|
||||
dict(name="project-job", changes="3,1 2,1 1,1"),
|
||||
dict(name="project1-job", changes="3,1 2,1 1,1"),
|
||||
|
||||
@@ -9429,7 +9429,7 @@ class TestEventProcessing(ZuulTestCase):
|
||||
orig = zuul.scheduler.Scheduler._forward_trigger_event
|
||||
|
||||
def patched_forward(obj, *args, **kw):
|
||||
if args[1].name == 'tenant-one':
|
||||
if args[2].name == 'tenant-one':
|
||||
raise Exception("test")
|
||||
return orig(obj, *args, **kw)
|
||||
|
||||
|
||||
@@ -2407,6 +2407,7 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Finished connection cache maintenance")
|
||||
|
||||
def process_tenant_trigger_queue(self, tenant):
|
||||
query_cache = QueryCache()
|
||||
try:
|
||||
with trigger_queue_lock(
|
||||
self.zk_client, tenant.name, blocking=False
|
||||
@@ -2444,7 +2445,8 @@ class Scheduler(threading.Thread):
|
||||
links=[
|
||||
trace.Link(trigger_span.get_span_context())
|
||||
]):
|
||||
self._forward_trigger_event(event, tenant)
|
||||
self._forward_trigger_event(query_cache,
|
||||
event, tenant)
|
||||
except Exception:
|
||||
log.exception("Unable to forward event %s "
|
||||
"to tenant %s", event, tenant.name)
|
||||
@@ -2455,7 +2457,7 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Skipping locked trigger event queue in tenant %s",
|
||||
tenant.name)
|
||||
|
||||
def _forward_trigger_event(self, event, tenant):
|
||||
def _forward_trigger_event(self, query_cache, event, tenant):
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
trusted, project = tenant.getProject(event.canonical_project_name)
|
||||
|
||||
@@ -2543,7 +2545,6 @@ class Scheduler(threading.Thread):
|
||||
span.set_attribute("reconfigure_tenant", reconfigure_tenant)
|
||||
event.span_context = tracing.getSpanContext(span)
|
||||
|
||||
query_cache = QueryCache()
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
# For most kinds of dependencies, it's sufficient to check
|
||||
# if this change is already in the pipeline, because the
|
||||
|
||||
Reference in New Issue
Block a user