Cache is held and managed by connections
Add reconfigure test case. This test previously fails currently due to a regression introduced with the connections changes. Because multiple sources share a connection, a pipeline that does not hold and therefore require any changes in the cache may clear a connections cache before a pipeline that does need said change has an opportunity to add it to the relevant list. Allow connections to manage their cache directly rather than the source doing it vicariously ignorant of other pipelines/sources. Collect the relevant changes from all pipelines and ask any connections holding a cache for that item to keep it on reconfiguration. Co-Authored-By: James E. Blair <jeblair@linux.vnet.ibm.com> Change-Id: I2bf8ba6b9deda58114db9e9b96985a2a0e2a69cb
This commit is contained in:
parent
208eb6618a
commit
4bd7da32fa
|
@ -693,8 +693,8 @@ class TestScheduler(ZuulTestCase):
|
|||
# triggering events. Since it will have the changes cached
|
||||
# already (without approvals), we need to clear the cache
|
||||
# first.
|
||||
source = self.sched.layout.pipelines['gate'].source
|
||||
source.maintainCache([])
|
||||
for connection in self.connections.values():
|
||||
connection.maintainCache([])
|
||||
|
||||
self.worker.hold_jobs_in_build = True
|
||||
A.addApproval('APRV', 1)
|
||||
|
@ -791,7 +791,6 @@ class TestScheduler(ZuulTestCase):
|
|||
A.addApproval('APRV', 1)
|
||||
a = source._getChange(1, 2, refresh=True)
|
||||
self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
|
||||
source.maintainCache([])
|
||||
|
||||
def test_build_configuration(self):
|
||||
"Test that zuul merges the right commits for testing"
|
||||
|
@ -2609,6 +2608,53 @@ class TestScheduler(ZuulTestCase):
|
|||
# Ensure the removed job was not included in the report.
|
||||
self.assertNotIn('project1-project2-integration', A.messages[0])
|
||||
|
||||
def test_double_live_reconfiguration_shared_queue(self):
|
||||
# This was a real-world regression. A change is added to
|
||||
# gate; a reconfigure happens, a second change which depends
|
||||
# on the first is added, and a second reconfiguration happens.
|
||||
# Ensure that both changes merge.
|
||||
|
||||
# A failure may indicate incorrect caching or cleaning up of
|
||||
# references during a reconfiguration.
|
||||
self.worker.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
||||
B.setDependsOn(A, 1)
|
||||
A.addApproval('CRVW', 2)
|
||||
B.addApproval('CRVW', 2)
|
||||
|
||||
# Add the parent change.
|
||||
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
|
||||
self.waitUntilSettled()
|
||||
self.worker.release('.*-merge')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Reconfigure (with only one change in the pipeline).
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Add the child change.
|
||||
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
|
||||
self.waitUntilSettled()
|
||||
self.worker.release('.*-merge')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Reconfigure (with both in the pipeline).
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.worker.hold_jobs_in_build = False
|
||||
self.worker.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.history), 8)
|
||||
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(A.reported, 2)
|
||||
self.assertEqual(B.data['status'], 'MERGED')
|
||||
self.assertEqual(B.reported, 2)
|
||||
|
||||
def test_live_reconfiguration_del_project(self):
|
||||
# Test project deletion from layout
|
||||
# while changes are enqueued
|
||||
|
@ -3656,8 +3702,8 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.assertEqual(A.data['status'], 'NEW')
|
||||
self.assertEqual(B.data['status'], 'NEW')
|
||||
|
||||
source = self.sched.layout.pipelines['gate'].source
|
||||
source.maintainCache([])
|
||||
for connection in self.connections.values():
|
||||
connection.maintainCache([])
|
||||
|
||||
self.worker.hold_jobs_in_build = True
|
||||
B.addApproval('APRV', 1)
|
||||
|
|
|
@ -62,3 +62,10 @@ class BaseConnection(object):
|
|||
|
||||
def registerUse(self, what, instance):
|
||||
self.attached_to[what].append(instance)
|
||||
|
||||
def maintainCache(self, relevant):
|
||||
"""Make cache contain relevant changes.
|
||||
|
||||
This lets the user supply a list of change objects that are
|
||||
still in use. Anything in our cache that isn't in the supplied
|
||||
list should be safe to remove from the cache."""
|
||||
|
|
|
@ -841,7 +841,7 @@ class Scheduler(threading.Thread):
|
|||
"Exception while canceling build %s "
|
||||
"for change %s" % (build, item.change))
|
||||
self.layout = layout
|
||||
self.maintainTriggerCache()
|
||||
self.maintainConnectionCache()
|
||||
for trigger in self.triggers.values():
|
||||
trigger.postConfig()
|
||||
for pipeline in self.layout.pipelines.values():
|
||||
|
@ -971,16 +971,18 @@ class Scheduler(threading.Thread):
|
|||
finally:
|
||||
self.run_handler_lock.release()
|
||||
|
||||
def maintainTriggerCache(self):
|
||||
def maintainConnectionCache(self):
|
||||
relevant = set()
|
||||
for pipeline in self.layout.pipelines.values():
|
||||
self.log.debug("Start maintain trigger cache for: %s" % pipeline)
|
||||
self.log.debug("Gather relevant cache items for: %s" % pipeline)
|
||||
for item in pipeline.getAllItems():
|
||||
relevant.add(item.change)
|
||||
relevant.update(item.change.getRelatedChanges())
|
||||
pipeline.source.maintainCache(relevant)
|
||||
self.log.debug("End maintain trigger cache for: %s" % pipeline)
|
||||
self.log.debug("Trigger cache size: %s" % len(relevant))
|
||||
for connection in self.connections.values():
|
||||
connection.maintainCache(relevant)
|
||||
self.log.debug(
|
||||
"End maintain connection cache for: %s" % connection)
|
||||
self.log.debug("Connection cache size: %s" % len(relevant))
|
||||
|
||||
def process_event_queue(self):
|
||||
self.log.debug("Fetching trigger event")
|
||||
|
|
|
@ -49,13 +49,6 @@ class BaseSource(object):
|
|||
def canMerge(self, change, allow_needs):
|
||||
"""Determine if change can merge."""
|
||||
|
||||
def maintainCache(self, relevant):
|
||||
"""Make cache contain relevant changes.
|
||||
|
||||
This lets the user supply a list of change objects that are
|
||||
still in use. Anything in our cache that isn't in the supplied
|
||||
list should be safe to remove from the cache."""
|
||||
|
||||
def postConfig(self):
|
||||
"""Called after configuration has been processed."""
|
||||
|
||||
|
|
|
@ -319,6 +319,3 @@ class GerritSource(BaseSource):
|
|||
|
||||
def _getGitwebUrl(self, project, sha=None):
|
||||
return self.connection.getGitwebUrl(project, sha)
|
||||
|
||||
def maintainCache(self, relevant):
|
||||
self.connection.maintainCache(relevant)
|
||||
|
|
Loading…
Reference in New Issue