Fix trigger cache maintenance
Each pipeline was maintaining the trigger cache only based on its own current usage, which means that pipelines were clearing the cache of changes currently being used by other pipelines. This considers all changes in use in all pipelines when maintaining the cache. Change-Id: I3ab14c69acd80ecc613b63628c837511594744d0 Reviewed-on: https://review.openstack.org/36699 Reviewed-by: Clark Boylan <clark.boylan@gmail.com> Approved: James E. Blair <corvus@inaugust.com> Reviewed-by: Jeremy Stanley <fungi@yuggoth.org> Tested-by: Jenkins
This commit is contained in:
parent
64ed6f2700
commit
0e933c5a4c
|
@ -1394,6 +1394,52 @@ class TestScheduler(testtools.TestCase):
|
|||
self.assertEqual(B.reported, 2)
|
||||
self.assertEqual(C.reported, 2)
|
||||
|
||||
def test_trigger_cache(self):
|
||||
"Test that the trigger cache operates correctly"
|
||||
self.worker.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
X = self.fake_gerrit.addFakeChange('org/project', 'master', 'X')
|
||||
A.addApproval('CRVW', 2)
|
||||
B.addApproval('CRVW', 2)
|
||||
|
||||
M1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'M1')
|
||||
M1.setMerged()
|
||||
|
||||
B.setDependsOn(A, 1)
|
||||
A.setDependsOn(M1, 1)
|
||||
|
||||
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
|
||||
self.fake_gerrit.addEvent(X.getPatchsetCreatedEvent(1))
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
||||
for build in self.builds:
|
||||
if build.parameters['ZUUL_PIPELINE'] == 'check':
|
||||
build.release()
|
||||
self.waitUntilSettled()
|
||||
for build in self.builds:
|
||||
if build.parameters['ZUUL_PIPELINE'] == 'check':
|
||||
build.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.log.debug("len %s " % self.sched.trigger._change_cache.keys())
|
||||
# there should still be changes in the cache
|
||||
self.assertNotEqual(len(self.sched.trigger._change_cache.keys()), 0)
|
||||
|
||||
self.worker.hold_jobs_in_build = False
|
||||
self.worker.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(B.data['status'], 'MERGED')
|
||||
self.assertEqual(A.queried, 2) # Initial and isMerged
|
||||
self.assertEqual(B.queried, 3) # Initial A, refresh from B, isMerged
|
||||
|
||||
def test_can_merge(self):
|
||||
"Test whether a change is ready to merge"
|
||||
# TODO: move to test_gerrit (this is a unit test!)
|
||||
|
|
|
@ -76,6 +76,7 @@ class Scheduler(threading.Thread):
|
|||
self.launcher = None
|
||||
self.trigger = None
|
||||
self.config = None
|
||||
self._maintain_trigger_cache = False
|
||||
|
||||
self.trigger_event_queue = Queue.Queue()
|
||||
self.result_event_queue = Queue.Queue()
|
||||
|
@ -499,9 +500,23 @@ class Scheduler(threading.Thread):
|
|||
else:
|
||||
if not self.result_event_queue.empty():
|
||||
self.wake_event.set()
|
||||
|
||||
if self._maintain_trigger_cache:
|
||||
self.maintainTriggerCache()
|
||||
self._maintain_trigger_cache = False
|
||||
|
||||
except:
|
||||
self.log.exception("Exception in run handler:")
|
||||
|
||||
def maintainTriggerCache(self):
|
||||
relevant = set()
|
||||
for pipeline in self.layout.pipelines.values():
|
||||
for item in pipeline.getAllItems():
|
||||
relevant.add(item.change)
|
||||
relevant.update(item.change.getRelatedChanges())
|
||||
self.log.debug("Trigger cache size: %s" % len(relevant))
|
||||
self.trigger.maintainCache(relevant)
|
||||
|
||||
def process_event_queue(self):
|
||||
self.log.debug("Fetching trigger event")
|
||||
event = self.trigger_event_queue.get()
|
||||
|
@ -794,14 +809,7 @@ class BasePipelineManager(object):
|
|||
(item.change.is_reportable and not item.reported)):
|
||||
self.log.debug("Adding %s as a severed head" % item.change)
|
||||
change_queue.addSeveredHead(item)
|
||||
self.maintainTriggerCache()
|
||||
|
||||
def maintainTriggerCache(self):
|
||||
relevant = set()
|
||||
for item in self.pipeline.getAllItems():
|
||||
relevant.add(item.change)
|
||||
relevant.update(item.change.getRelatedChanges())
|
||||
self.sched.trigger.maintainCache(relevant)
|
||||
self.sched._maintain_trigger_cache = True
|
||||
|
||||
def removeChange(self, change):
|
||||
# Remove a change from the queue, probably because it has been
|
||||
|
|
Loading…
Reference in New Issue