Merge "Cache is held and managed by connections"
This commit is contained in:
commit
87a801fb1d
@ -693,8 +693,8 @@ class TestScheduler(ZuulTestCase):
|
||||
# triggering events. Since it will have the changes cached
|
||||
# already (without approvals), we need to clear the cache
|
||||
# first.
|
||||
source = self.sched.layout.pipelines['gate'].source
|
||||
source.maintainCache([])
|
||||
for connection in self.connections.values():
|
||||
connection.maintainCache([])
|
||||
|
||||
self.worker.hold_jobs_in_build = True
|
||||
A.addApproval('APRV', 1)
|
||||
@ -791,7 +791,6 @@ class TestScheduler(ZuulTestCase):
|
||||
A.addApproval('APRV', 1)
|
||||
a = source._getChange(1, 2, refresh=True)
|
||||
self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
|
||||
source.maintainCache([])
|
||||
|
||||
def test_build_configuration(self):
|
||||
"Test that zuul merges the right commits for testing"
|
||||
@ -2609,6 +2608,53 @@ class TestScheduler(ZuulTestCase):
|
||||
# Ensure the removed job was not included in the report.
|
||||
self.assertNotIn('project1-project2-integration', A.messages[0])
|
||||
|
||||
def test_double_live_reconfiguration_shared_queue(self):
|
||||
# This was a real-world regression. A change is added to
|
||||
# gate; a reconfigure happens, a second change which depends
|
||||
# on the first is added, and a second reconfiguration happens.
|
||||
# Ensure that both changes merge.
|
||||
|
||||
# A failure may indicate incorrect caching or cleaning up of
|
||||
# references during a reconfiguration.
|
||||
self.worker.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
||||
B.setDependsOn(A, 1)
|
||||
A.addApproval('CRVW', 2)
|
||||
B.addApproval('CRVW', 2)
|
||||
|
||||
# Add the parent change.
|
||||
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
|
||||
self.waitUntilSettled()
|
||||
self.worker.release('.*-merge')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Reconfigure (with only one change in the pipeline).
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Add the child change.
|
||||
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
|
||||
self.waitUntilSettled()
|
||||
self.worker.release('.*-merge')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Reconfigure (with both in the pipeline).
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.worker.hold_jobs_in_build = False
|
||||
self.worker.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.history), 8)
|
||||
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(A.reported, 2)
|
||||
self.assertEqual(B.data['status'], 'MERGED')
|
||||
self.assertEqual(B.reported, 2)
|
||||
|
||||
def test_live_reconfiguration_del_project(self):
|
||||
# Test project deletion from layout
|
||||
# while changes are enqueued
|
||||
@ -3675,8 +3721,8 @@ For CI problems and help debugging, contact ci@example.org"""
|
||||
self.assertEqual(A.data['status'], 'NEW')
|
||||
self.assertEqual(B.data['status'], 'NEW')
|
||||
|
||||
source = self.sched.layout.pipelines['gate'].source
|
||||
source.maintainCache([])
|
||||
for connection in self.connections.values():
|
||||
connection.maintainCache([])
|
||||
|
||||
self.worker.hold_jobs_in_build = True
|
||||
B.addApproval('APRV', 1)
|
||||
|
@ -62,3 +62,10 @@ class BaseConnection(object):
|
||||
|
||||
def registerUse(self, what, instance):
|
||||
self.attached_to[what].append(instance)
|
||||
|
||||
def maintainCache(self, relevant):
|
||||
"""Make cache contain relevant changes.
|
||||
|
||||
This lets the user supply a list of change objects that are
|
||||
still in use. Anything in our cache that isn't in the supplied
|
||||
list should be safe to remove from the cache."""
|
||||
|
@ -848,7 +848,7 @@ class Scheduler(threading.Thread):
|
||||
"Exception while canceling build %s "
|
||||
"for change %s" % (build, item.change))
|
||||
self.layout = layout
|
||||
self.maintainTriggerCache()
|
||||
self.maintainConnectionCache()
|
||||
for trigger in self.triggers.values():
|
||||
trigger.postConfig()
|
||||
for pipeline in self.layout.pipelines.values():
|
||||
@ -978,16 +978,18 @@ class Scheduler(threading.Thread):
|
||||
finally:
|
||||
self.run_handler_lock.release()
|
||||
|
||||
def maintainTriggerCache(self):
|
||||
def maintainConnectionCache(self):
|
||||
relevant = set()
|
||||
for pipeline in self.layout.pipelines.values():
|
||||
self.log.debug("Start maintain trigger cache for: %s" % pipeline)
|
||||
self.log.debug("Gather relevant cache items for: %s" % pipeline)
|
||||
for item in pipeline.getAllItems():
|
||||
relevant.add(item.change)
|
||||
relevant.update(item.change.getRelatedChanges())
|
||||
pipeline.source.maintainCache(relevant)
|
||||
self.log.debug("End maintain trigger cache for: %s" % pipeline)
|
||||
self.log.debug("Trigger cache size: %s" % len(relevant))
|
||||
for connection in self.connections.values():
|
||||
connection.maintainCache(relevant)
|
||||
self.log.debug(
|
||||
"End maintain connection cache for: %s" % connection)
|
||||
self.log.debug("Connection cache size: %s" % len(relevant))
|
||||
|
||||
def process_event_queue(self):
|
||||
self.log.debug("Fetching trigger event")
|
||||
|
@ -49,13 +49,6 @@ class BaseSource(object):
|
||||
def canMerge(self, change, allow_needs):
|
||||
"""Determine if change can merge."""
|
||||
|
||||
def maintainCache(self, relevant):
|
||||
"""Make cache contain relevant changes.
|
||||
|
||||
This lets the user supply a list of change objects that are
|
||||
still in use. Anything in our cache that isn't in the supplied
|
||||
list should be safe to remove from the cache."""
|
||||
|
||||
def postConfig(self):
|
||||
"""Called after configuration has been processed."""
|
||||
|
||||
|
@ -319,6 +319,3 @@ class GerritSource(BaseSource):
|
||||
|
||||
def _getGitwebUrl(self, project, sha=None):
|
||||
return self.connection.getGitwebUrl(project, sha)
|
||||
|
||||
def maintainCache(self, relevant):
|
||||
self.connection.maintainCache(relevant)
|
||||
|
Loading…
Reference in New Issue
Block a user