Move relevant change expansion to change cache cleanup
The following is possible: * Change 1 is updated in gerrit * Change 2 which Depends-On change 1 is updated * Change 3 which Depends-On change 2 is updated * A long time passes * Change 2 and 3 are updated again * A short time passes * Change 1 is pruned from the cache because it hasn't been updated in 2 hours. Change 2 and 3 remain since they were recently updated. * Change 3 is updated * The driver sees that 3 depends on 2 and looks up 2 * The driver finds 2 in the cache and stops (it does not update 2 and therefore will not re-add 1 to the cache) * Change 3 is added to a pipeline * Pipeline processing fails because it can not resolve change 1 To correct this, once we have decided what changes are too old and should be removed, and then reduced that set by the set of changes in the pipeline, find the changes related to those changes and further reduce the set to prune. In other words, move the related change expansion from outside the cache prune method to inside, so we expand the network of changes inside the cache, not just the network of changes in the pipeline. Change-Id: I9a029bc92cf2eecaff7df3598a6c6993d85978a8
This commit is contained in:
parent
d4e9b66fa0
commit
3bc6dfea15
|
@ -1252,7 +1252,8 @@ class TestScheduler(ZuulTestCase):
|
|||
# already (without approvals), we need to clear the cache
|
||||
# first.
|
||||
for connection in self.scheds.first.connections.connections.values():
|
||||
connection.maintainCache([], max_age=0)
|
||||
if hasattr(connection, '_change_cache'):
|
||||
connection.maintainCache([], max_age=0)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A.addApproval('Approved', 1)
|
||||
|
@ -1364,7 +1365,7 @@ class TestScheduler(ZuulTestCase):
|
|||
# cache keys of the correct type, since we don't do run-time
|
||||
# validation.
|
||||
relevant = sched._gatherConnectionCacheKeys()
|
||||
self.assertEqual(len(relevant), 3)
|
||||
self.assertEqual(len(relevant), 2)
|
||||
for k in relevant:
|
||||
if not isinstance(k, ChangeKey):
|
||||
raise RuntimeError("Cache key %s is not a ChangeKey" % repr(k))
|
||||
|
@ -1380,9 +1381,39 @@ class TestScheduler(ZuulTestCase):
|
|||
# Test that outdated but still relevant changes are not cleaned up
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache(
|
||||
[c.cache_stat.key for c in _getCachedChanges()], max_age=0)
|
||||
set([c.cache_stat.key for c in _getCachedChanges()]),
|
||||
max_age=0)
|
||||
self.assertEqual(len(_getCachedChanges()), 3)
|
||||
|
||||
change1 = None
|
||||
change2 = None
|
||||
for c in _getCachedChanges():
|
||||
if c.cache_stat.key.stable_id == '1':
|
||||
change1 = c
|
||||
if c.cache_stat.key.stable_id == '2':
|
||||
change2 = c
|
||||
# Make change1 eligible for cleanup, but not change2
|
||||
change1.cache_stat = zuul.model.CacheStat(change1.cache_stat.key,
|
||||
change1.cache_stat.uuid,
|
||||
change1.cache_stat.version,
|
||||
0.0)
|
||||
# We should not delete change1 since it's needed by change2
|
||||
# which we want to keep.
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=7200)
|
||||
self.assertEqual(len(_getCachedChanges()), 3)
|
||||
|
||||
# Make both changes eligible for deletion
|
||||
change2.cache_stat = zuul.model.CacheStat(change2.cache_stat.key,
|
||||
change2.cache_stat.uuid,
|
||||
change2.cache_stat.version,
|
||||
0.0)
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=7200)
|
||||
# The master branch change remains
|
||||
self.assertEqual(len(_getCachedChanges()), 1)
|
||||
|
||||
# Test that we can remove changes once max_age has expired
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=0)
|
||||
self.assertEqual(len(_getCachedChanges()), 0)
|
||||
|
|
|
@ -1268,25 +1268,39 @@ class DummyChange:
|
|||
def deserialize(self, data):
|
||||
self.__dict__.update(data)
|
||||
|
||||
def getRelatedChanges(self, sched, relevant):
|
||||
return
|
||||
|
||||
|
||||
class DummyChangeCache(AbstractChangeCache):
|
||||
|
||||
CHANGE_TYPE_MAP = {
|
||||
"DummyChange": DummyChange,
|
||||
}
|
||||
|
||||
|
||||
class DummySource:
|
||||
|
||||
def getProject(self, project_name):
|
||||
return project_name
|
||||
|
||||
def getChangeByKey(self, key):
|
||||
return DummyChange('project')
|
||||
|
||||
|
||||
class DummyConnections:
|
||||
def getSource(self, name):
|
||||
return DummySource()
|
||||
|
||||
|
||||
class DummyScheduler:
|
||||
def __init__(self):
|
||||
self.connections = DummyConnections()
|
||||
|
||||
|
||||
class DummyConnection:
|
||||
|
||||
def __init__(self):
|
||||
self.connection_name = "DummyConnection"
|
||||
self.source = DummySource()
|
||||
self.sched = DummyScheduler()
|
||||
|
||||
|
||||
class TestChangeCache(ZooKeeperBaseTestCase):
|
||||
|
|
|
@ -1758,7 +1758,7 @@ class Scheduler(threading.Thread):
|
|||
relevant_changes = pipeline.manager.resolveChangeKeys(
|
||||
change_keys)
|
||||
for change in relevant_changes:
|
||||
change.getRelatedChanges(self, relevant)
|
||||
relevant.add(change.cache_stat.key)
|
||||
return relevant
|
||||
|
||||
def maintainConnectionCache(self):
|
||||
|
|
|
@ -206,9 +206,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
|||
return key, data['data_uuid']
|
||||
|
||||
def prune(self, relevant, max_age=3600): # 1h
|
||||
# Relevant is the list of changes directly in a pipeline.
|
||||
# This method will take care of expanding that out to each
|
||||
# change's network of related changes.
|
||||
self.log.debug("Pruning cache")
|
||||
cutoff_time = time.time() - max_age
|
||||
outdated_versions = dict()
|
||||
to_keep = set(relevant)
|
||||
sched = self.connection.sched
|
||||
for c in list(self._change_cache.values()):
|
||||
# Assign to a local variable so all 3 values we use are
|
||||
# consistent in case the cache_stat is updated during this
|
||||
|
@ -216,11 +221,20 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
|||
cache_stat = c.cache_stat
|
||||
if cache_stat.last_modified >= cutoff_time:
|
||||
# This entry isn't old enough to delete yet
|
||||
to_keep.add(cache_stat.key)
|
||||
continue
|
||||
# Save the version we examined so we can make sure to only
|
||||
# delete that version.
|
||||
outdated_versions[cache_stat.key] = cache_stat.version
|
||||
to_prune = set(outdated_versions.keys()) - set(relevant)
|
||||
# Changes we want to keep may have localized networks; keep
|
||||
# them together even if one member hasn't been updated in a
|
||||
# while. Only when the entire network hasn't been modified in
|
||||
# max_age will any change in it be removed.
|
||||
for key in to_keep.copy():
|
||||
source = sched.connections.getSource(key.connection_name)
|
||||
change = source.getChangeByKey(key)
|
||||
change.getRelatedChanges(sched, to_keep)
|
||||
to_prune = set(outdated_versions.keys()) - to_keep
|
||||
for key in to_prune:
|
||||
self.delete(key, outdated_versions[key])
|
||||
self.log.debug("Done pruning cache")
|
||||
|
|
Loading…
Reference in New Issue