Merge "Make the query cache continuous"
This commit is contained in:
commit
d2f1ac2067
@ -1573,10 +1573,11 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
C.addApproval("Code-Review", 2)
|
||||
D.addApproval("Code-Review", 2)
|
||||
|
||||
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
|
||||
self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
|
||||
self.fake_gerrit.addEvent(C.addApproval("Approved", 1))
|
||||
self.fake_gerrit.addEvent(D.addApproval("Approved", 1))
|
||||
# Add the approvals first, then add the events to ensure we
|
||||
# are not racing gerrit approval changes.
|
||||
events = [c.addApproval("Approved", 1) for c in [A, B, C, D]]
|
||||
for event in events:
|
||||
self.fake_gerrit.addEvent(event)
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data["status"], "NEW")
|
||||
@ -3499,15 +3500,10 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
self.assertEqual(0, counters[('changes', '1', 'submitted_together')])
|
||||
self.assertEqual(0, counters[('changes', '2', 'submitted_together')])
|
||||
self.assertEqual(0, counters[('changes', '3', 'submitted_together')])
|
||||
# This query happens once for each event in the scheduler,
|
||||
# then once for each change in the pipeline if there's more
|
||||
# than one (cache is in play here).
|
||||
# * A: 1x scheduler, 0x pipeline
|
||||
# * A+B: 1x scheduler, 1x pipeline
|
||||
# * A+B+C: 1x scheduler, 1x pipeline
|
||||
# This query happens once for each event in the scheduler.
|
||||
qstring = ('?n=500&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
|
||||
'q=status%3Aopen%20topic%3A%22test-topic%22')
|
||||
self.assertEqual(5, counters[('changes', qstring)])
|
||||
self.assertEqual(3, counters[('changes', qstring)])
|
||||
self.assertHistory([
|
||||
dict(name="project-job", changes="1,1"),
|
||||
|
||||
@ -3653,13 +3649,13 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
self.assertEqual(len(C.patchsets[-1]["approvals"]), 1)
|
||||
self.assertEqual(C.patchsets[-1]["approvals"][0]["type"], "Verified")
|
||||
self.assertEqual(C.patchsets[-1]["approvals"][0]["value"], "1")
|
||||
self.assertEqual(A.queried, 3)
|
||||
self.assertEqual(B.queried, 3)
|
||||
self.assertEqual(C.queried, 3)
|
||||
self.assertEqual(D.queried, 3)
|
||||
self.assertEqual(E.queried, 3)
|
||||
self.assertEqual(F.queried, 3)
|
||||
self.assertEqual(G.queried, 3)
|
||||
self.assertEqual(A.queried, 2)
|
||||
self.assertEqual(B.queried, 2)
|
||||
self.assertEqual(C.queried, 2)
|
||||
self.assertEqual(D.queried, 2)
|
||||
self.assertEqual(E.queried, 2)
|
||||
self.assertEqual(F.queried, 2)
|
||||
self.assertEqual(G.queried, 2)
|
||||
self.assertHistory([
|
||||
dict(name="project1-job", result="SUCCESS",
|
||||
changes="1,1 2,1 3,1 4,1 5,1 6,1 7,1",
|
||||
|
@ -9429,7 +9429,7 @@ class TestEventProcessing(ZuulTestCase):
|
||||
orig = zuul.scheduler.Scheduler._forward_trigger_event
|
||||
|
||||
def patched_forward(obj, *args, **kw):
|
||||
if args[2].name == 'tenant-one':
|
||||
if args[1].name == 'tenant-one':
|
||||
raise Exception("test")
|
||||
return orig(obj, *args, **kw)
|
||||
|
||||
|
@ -28,7 +28,7 @@ from zuul.lib.tarjan import strongly_connected_components
|
||||
import zuul.lib.tracing as tracing
|
||||
from zuul.model import (
|
||||
Change, PipelineState, PipelineChangeList,
|
||||
filter_severity, EnqueueEvent, QueryCache
|
||||
filter_severity, EnqueueEvent
|
||||
)
|
||||
from zuul.zk.change_cache import ChangeKey
|
||||
from zuul.zk.exceptions import LockException
|
||||
@ -235,7 +235,6 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
|
||||
def resolveChangeKeys(self, change_keys):
|
||||
resolved_changes = []
|
||||
query_cache = QueryCache()
|
||||
for key in change_keys:
|
||||
change = self._change_cache.get(key.reference)
|
||||
if change is None:
|
||||
@ -252,8 +251,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
and self.useDependenciesByTopic(change.project))
|
||||
if (update_commit_dependencies
|
||||
or update_topic_dependencies):
|
||||
self.updateCommitDependencies(
|
||||
query_cache, change, event=None)
|
||||
self.updateCommitDependencies(change, event=None)
|
||||
self._change_cache[change.cache_key] = change
|
||||
resolved_changes.append(change)
|
||||
return resolved_changes
|
||||
@ -332,7 +330,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
if not isinstance(change, model.Change):
|
||||
return
|
||||
|
||||
query_cache = QueryCache()
|
||||
self.sched.query_cache.clearIfOlderThan(event)
|
||||
to_refresh = set()
|
||||
for item in self.pipeline.getAllItems():
|
||||
for item_change in item.changes:
|
||||
@ -346,7 +344,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
to_refresh.add(item_change)
|
||||
|
||||
for existing_change in to_refresh:
|
||||
self.updateCommitDependencies(query_cache, existing_change, event)
|
||||
self.updateCommitDependencies(existing_change, event)
|
||||
|
||||
def reportEnqueue(self, item):
|
||||
if not self.pipeline.state.disabled:
|
||||
@ -849,7 +847,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
) - set(cycle)
|
||||
|
||||
def getDependencyGraph(self, change, dependency_graph, event,
|
||||
update_deps=False, query_cache=None,
|
||||
update_deps=False,
|
||||
history=None, quiet=False, indent=''):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
if not quiet:
|
||||
@ -860,11 +858,10 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
return
|
||||
if history is None:
|
||||
history = set()
|
||||
if query_cache is None:
|
||||
query_cache = QueryCache()
|
||||
self.sched.query_cache.clearIfOlderThan(event)
|
||||
history.add(change)
|
||||
if update_deps:
|
||||
self.updateCommitDependencies(query_cache, change, event)
|
||||
self.updateCommitDependencies(change, event)
|
||||
for needed_change in self.resolveChangeReferences(
|
||||
change.getNeedsChanges(
|
||||
self.useDependenciesByTopic(change.project))):
|
||||
@ -893,7 +890,7 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
if needed_change not in history:
|
||||
self.getDependencyGraph(needed_change,
|
||||
dependency_graph, event,
|
||||
update_deps, query_cache,
|
||||
update_deps,
|
||||
history, quiet, indent + ' ')
|
||||
|
||||
def getQueueConfig(self, project):
|
||||
@ -999,8 +996,10 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
self.pipeline.tenant.name][other_pipeline.name
|
||||
].put_supercede(event)
|
||||
|
||||
def updateCommitDependencies(self, query_cache, change, event):
|
||||
def updateCommitDependencies(self, change, event):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
query_cache = self.sched.query_cache
|
||||
query_cache.clearIfOlderThan(event)
|
||||
|
||||
must_update_commit_deps = (
|
||||
not hasattr(event, "zuul_event_ltime")
|
||||
|
@ -151,9 +151,23 @@ def filter_severity(error_list, errors=True, warnings=True):
|
||||
class QueryCache:
|
||||
"""Cache query information while processing dependencies"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, zk_client):
|
||||
self.zk_client = zk_client
|
||||
self.ltime = 0
|
||||
self.clear(0)
|
||||
|
||||
def clear(self, ltime):
|
||||
self.ltime = ltime
|
||||
self.topic_queries = {}
|
||||
|
||||
def clearIfOlderThan(self, event):
|
||||
if not hasattr(event, "zuul_event_ltime"):
|
||||
return
|
||||
ltime = event.zuul_event_ltime
|
||||
if ltime > self.ltime:
|
||||
ltime = self.zk_client.getCurrentLtime()
|
||||
self.clear(ltime)
|
||||
|
||||
|
||||
class ZuulMark:
|
||||
# The yaml mark class differs between the C and python versions.
|
||||
|
@ -274,6 +274,7 @@ class Scheduler(threading.Thread):
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
self.query_cache = QueryCache(self.zk_client)
|
||||
self.system = ZuulSystem(self.zk_client)
|
||||
|
||||
self.zuul_version = get_version_string()
|
||||
@ -2429,7 +2430,6 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Finished connection cache maintenance")
|
||||
|
||||
def process_tenant_trigger_queue(self, tenant):
|
||||
query_cache = QueryCache()
|
||||
try:
|
||||
with trigger_queue_lock(
|
||||
self.zk_client, tenant.name, blocking=False
|
||||
@ -2467,8 +2467,7 @@ class Scheduler(threading.Thread):
|
||||
links=[
|
||||
trace.Link(trigger_span.get_span_context())
|
||||
]):
|
||||
self._forward_trigger_event(query_cache,
|
||||
event, tenant)
|
||||
self._forward_trigger_event(event, tenant)
|
||||
except Exception:
|
||||
log.exception("Unable to forward event %s "
|
||||
"to tenant %s", event, tenant.name)
|
||||
@ -2479,7 +2478,7 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Skipping locked trigger event queue in tenant %s",
|
||||
tenant.name)
|
||||
|
||||
def _forward_trigger_event(self, query_cache, event, tenant):
|
||||
def _forward_trigger_event(self, event, tenant):
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
trusted, project = tenant.getProject(event.canonical_project_name)
|
||||
|
||||
@ -2584,8 +2583,7 @@ class Scheduler(threading.Thread):
|
||||
# manager, but the result of the work goes into the change
|
||||
# cache, so it's not wasted; it's just less parallelized.
|
||||
if isinstance(change, Change):
|
||||
pipeline.manager.updateCommitDependencies(query_cache,
|
||||
change, event)
|
||||
pipeline.manager.updateCommitDependencies(change, event)
|
||||
if (
|
||||
pipeline.manager.eventMatches(event, change)
|
||||
or pipeline.manager.isChangeRelevantToPipeline(change)
|
||||
|
Loading…
Reference in New Issue
Block a user