Make the query cache continuous

This moves the query cache into the scheduler and persists it
permanently (ie, across pipeline runs) except that each time we
encounter an event newer than the last time we cleared the cache,
we will clear it again.

This means we should start with an empty cache each time a new
event arrives, but if a bunch of events arrive in succession, we
will use the same cached values.

This improves performance in topic dependencies in the slow case
(ie, developers adding changes one at a time).  The number of queries
in this case will be:

  queries = 5*count
  100 changes gives 500 queries

This does not alter the performance of the fast case (when all
changes in a topic appear at once).  That remains:

  queries = 4*count +1
  100 changes gives 401 queries

Change-Id: Iede8dc3d716279a07310726c647d8646e52ecc98
This commit is contained in:
James E. Blair
2024-05-16 15:15:07 -07:00
parent 60224efc06
commit 7fc68db6b4
5 changed files with 45 additions and 38 deletions

View File

@@ -1573,10 +1573,11 @@ class TestGerritCircularDependencies(ZuulTestCase):
C.addApproval("Code-Review", 2)
D.addApproval("Code-Review", 2)
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
self.fake_gerrit.addEvent(C.addApproval("Approved", 1))
self.fake_gerrit.addEvent(D.addApproval("Approved", 1))
# Add the approvals first, then add the events to ensure we
# are not racing gerrit approval changes.
events = [c.addApproval("Approved", 1) for c in [A, B, C, D]]
for event in events:
self.fake_gerrit.addEvent(event)
self.waitUntilSettled()
self.assertEqual(A.data["status"], "NEW")
@@ -3499,15 +3500,10 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(0, counters[('changes', '1', 'submitted_together')])
self.assertEqual(0, counters[('changes', '2', 'submitted_together')])
self.assertEqual(0, counters[('changes', '3', 'submitted_together')])
# This query happens once for each event in the scheduler,
# then once for each change in the pipeline if there's more
# than one (cache is in play here).
# * A: 1x scheduler, 0x pipeline
# * A+B: 1x scheduler, 1x pipeline
# * A+B+C: 1x scheduler, 1x pipeline
# This query happens once for each event in the scheduler.
qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
'q=status%3Aopen%20topic%3A%22test-topic%22')
self.assertEqual(5, counters[('changes', qstring)])
self.assertEqual(3, counters[('changes', qstring)])
self.assertHistory([
dict(name="project-job", changes="1,1"),
@@ -3653,13 +3649,13 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(len(C.patchsets[-1]["approvals"]), 1)
self.assertEqual(C.patchsets[-1]["approvals"][0]["type"], "Verified")
self.assertEqual(C.patchsets[-1]["approvals"][0]["value"], "1")
self.assertEqual(A.queried, 3)
self.assertEqual(B.queried, 3)
self.assertEqual(C.queried, 3)
self.assertEqual(D.queried, 3)
self.assertEqual(E.queried, 3)
self.assertEqual(F.queried, 3)
self.assertEqual(G.queried, 3)
self.assertEqual(A.queried, 2)
self.assertEqual(B.queried, 2)
self.assertEqual(C.queried, 2)
self.assertEqual(D.queried, 2)
self.assertEqual(E.queried, 2)
self.assertEqual(F.queried, 2)
self.assertEqual(G.queried, 2)
self.assertHistory([
dict(name="project1-job", result="SUCCESS",
changes="1,1 2,1 3,1 4,1 5,1 6,1 7,1",

View File

@@ -9429,7 +9429,7 @@ class TestEventProcessing(ZuulTestCase):
orig = zuul.scheduler.Scheduler._forward_trigger_event
def patched_forward(obj, *args, **kw):
if args[2].name == 'tenant-one':
if args[1].name == 'tenant-one':
raise Exception("test")
return orig(obj, *args, **kw)

View File

@@ -28,7 +28,7 @@ from zuul.lib.tarjan import strongly_connected_components
import zuul.lib.tracing as tracing
from zuul.model import (
Change, PipelineState, PipelineChangeList,
filter_severity, EnqueueEvent, QueryCache
filter_severity, EnqueueEvent
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.exceptions import LockException
@@ -235,7 +235,6 @@ class PipelineManager(metaclass=ABCMeta):
def resolveChangeKeys(self, change_keys):
resolved_changes = []
query_cache = QueryCache()
for key in change_keys:
change = self._change_cache.get(key.reference)
if change is None:
@@ -252,8 +251,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
self.updateCommitDependencies(
query_cache, change, event=None)
self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -332,7 +330,7 @@ class PipelineManager(metaclass=ABCMeta):
if not isinstance(change, model.Change):
return
query_cache = QueryCache()
self.sched.query_cache.clearIfOlderThan(event)
to_refresh = set()
for item in self.pipeline.getAllItems():
for item_change in item.changes:
@@ -346,7 +344,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item_change)
for existing_change in to_refresh:
self.updateCommitDependencies(query_cache, existing_change, event)
self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -849,7 +847,7 @@ class PipelineManager(metaclass=ABCMeta):
) - set(cycle)
def getDependencyGraph(self, change, dependency_graph, event,
update_deps=False, query_cache=None,
update_deps=False,
history=None, quiet=False, indent=''):
log = get_annotated_logger(self.log, event)
if not quiet:
@@ -860,11 +858,10 @@ class PipelineManager(metaclass=ABCMeta):
return
if history is None:
history = set()
if query_cache is None:
query_cache = QueryCache()
self.sched.query_cache.clearIfOlderThan(event)
history.add(change)
if update_deps:
self.updateCommitDependencies(query_cache, change, event)
self.updateCommitDependencies(change, event)
for needed_change in self.resolveChangeReferences(
change.getNeedsChanges(
self.useDependenciesByTopic(change.project))):
@@ -893,7 +890,7 @@ class PipelineManager(metaclass=ABCMeta):
if needed_change not in history:
self.getDependencyGraph(needed_change,
dependency_graph, event,
update_deps, query_cache,
update_deps,
history, quiet, indent + ' ')
def getQueueConfig(self, project):
@@ -999,8 +996,10 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name
].put_supercede(event)
def updateCommitDependencies(self, query_cache, change, event):
def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
query_cache = self.sched.query_cache
query_cache.clearIfOlderThan(event)
must_update_commit_deps = (
not hasattr(event, "zuul_event_ltime")

View File

@@ -151,9 +151,23 @@ def filter_severity(error_list, errors=True, warnings=True):
class QueryCache:
"""Cache query information while processing dependencies"""
def __init__(self):
def __init__(self, zk_client):
self.zk_client = zk_client
self.ltime = 0
self.clear(0)
def clear(self, ltime):
self.ltime = ltime
self.topic_queries = {}
def clearIfOlderThan(self, event):
if not hasattr(event, "zuul_event_ltime"):
return
ltime = event.zuul_event_ltime
if ltime > self.ltime:
ltime = self.zk_client.getCurrentLtime()
self.clear(ltime)
class ZuulMark:
# The yaml mark class differs between the C and python versions.

View File

@@ -270,6 +270,7 @@ class Scheduler(threading.Thread):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.query_cache = QueryCache(self.zk_client)
self.system = ZuulSystem(self.zk_client)
self.zuul_version = get_version_string()
@@ -2407,7 +2408,6 @@ class Scheduler(threading.Thread):
self.log.debug("Finished connection cache maintenance")
def process_tenant_trigger_queue(self, tenant):
query_cache = QueryCache()
try:
with trigger_queue_lock(
self.zk_client, tenant.name, blocking=False
@@ -2445,8 +2445,7 @@ class Scheduler(threading.Thread):
links=[
trace.Link(trigger_span.get_span_context())
]):
self._forward_trigger_event(query_cache,
event, tenant)
self._forward_trigger_event(event, tenant)
except Exception:
log.exception("Unable to forward event %s "
"to tenant %s", event, tenant.name)
@@ -2457,7 +2456,7 @@ class Scheduler(threading.Thread):
self.log.debug("Skipping locked trigger event queue in tenant %s",
tenant.name)
def _forward_trigger_event(self, query_cache, event, tenant):
def _forward_trigger_event(self, event, tenant):
log = get_annotated_logger(self.log, event.zuul_event_id)
trusted, project = tenant.getProject(event.canonical_project_name)
@@ -2562,8 +2561,7 @@ class Scheduler(threading.Thread):
# manager, but the result of the work goes into the change
# cache, so it's not wasted; it's just less parallelized.
if isinstance(change, Change):
pipeline.manager.updateCommitDependencies(query_cache,
change, event)
pipeline.manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
or pipeline.manager.isChangeRelevantToPipeline(change)