Merge "Move relevant change expansion to change cache cleanup"
This commit is contained in:
commit
43ccb74319
|
@ -1253,7 +1253,8 @@ class TestScheduler(ZuulTestCase):
|
|||
# already (without approvals), we need to clear the cache
|
||||
# first.
|
||||
for connection in self.scheds.first.connections.connections.values():
|
||||
connection.maintainCache([], max_age=0)
|
||||
if hasattr(connection, '_change_cache'):
|
||||
connection.maintainCache([], max_age=0)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A.addApproval('Approved', 1)
|
||||
|
@ -1365,7 +1366,7 @@ class TestScheduler(ZuulTestCase):
|
|||
# cache keys of the correct type, since we don't do run-time
|
||||
# validation.
|
||||
relevant = sched._gatherConnectionCacheKeys()
|
||||
self.assertEqual(len(relevant), 3)
|
||||
self.assertEqual(len(relevant), 2)
|
||||
for k in relevant:
|
||||
if not isinstance(k, ChangeKey):
|
||||
raise RuntimeError("Cache key %s is not a ChangeKey" % repr(k))
|
||||
|
@ -1381,9 +1382,39 @@ class TestScheduler(ZuulTestCase):
|
|||
# Test that outdated but still relevant changes are not cleaned up
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache(
|
||||
[c.cache_stat.key for c in _getCachedChanges()], max_age=0)
|
||||
set([c.cache_stat.key for c in _getCachedChanges()]),
|
||||
max_age=0)
|
||||
self.assertEqual(len(_getCachedChanges()), 3)
|
||||
|
||||
change1 = None
|
||||
change2 = None
|
||||
for c in _getCachedChanges():
|
||||
if c.cache_stat.key.stable_id == '1':
|
||||
change1 = c
|
||||
if c.cache_stat.key.stable_id == '2':
|
||||
change2 = c
|
||||
# Make change1 eligible for cleanup, but not change2
|
||||
change1.cache_stat = zuul.model.CacheStat(change1.cache_stat.key,
|
||||
change1.cache_stat.uuid,
|
||||
change1.cache_stat.version,
|
||||
0.0)
|
||||
# We should not delete change1 since it's needed by change2
|
||||
# which we want to keep.
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=7200)
|
||||
self.assertEqual(len(_getCachedChanges()), 3)
|
||||
|
||||
# Make both changes eligible for deletion
|
||||
change2.cache_stat = zuul.model.CacheStat(change2.cache_stat.key,
|
||||
change2.cache_stat.uuid,
|
||||
change2.cache_stat.version,
|
||||
0.0)
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=7200)
|
||||
# The master branch change remains
|
||||
self.assertEqual(len(_getCachedChanges()), 1)
|
||||
|
||||
# Test that we can remove changes once max_age has expired
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=0)
|
||||
self.assertEqual(len(_getCachedChanges()), 0)
|
||||
|
|
|
@ -1268,25 +1268,39 @@ class DummyChange:
|
|||
def deserialize(self, data):
|
||||
self.__dict__.update(data)
|
||||
|
||||
def getRelatedChanges(self, sched, relevant):
|
||||
return
|
||||
|
||||
|
||||
class DummyChangeCache(AbstractChangeCache):
|
||||
|
||||
CHANGE_TYPE_MAP = {
|
||||
"DummyChange": DummyChange,
|
||||
}
|
||||
|
||||
|
||||
class DummySource:
|
||||
|
||||
def getProject(self, project_name):
|
||||
return project_name
|
||||
|
||||
def getChangeByKey(self, key):
|
||||
return DummyChange('project')
|
||||
|
||||
|
||||
class DummyConnections:
|
||||
def getSource(self, name):
|
||||
return DummySource()
|
||||
|
||||
|
||||
class DummyScheduler:
|
||||
def __init__(self):
|
||||
self.connections = DummyConnections()
|
||||
|
||||
|
||||
class DummyConnection:
|
||||
|
||||
def __init__(self):
|
||||
self.connection_name = "DummyConnection"
|
||||
self.source = DummySource()
|
||||
self.sched = DummyScheduler()
|
||||
|
||||
|
||||
class TestChangeCache(ZooKeeperBaseTestCase):
|
||||
|
|
|
@ -1753,7 +1753,7 @@ class Scheduler(threading.Thread):
|
|||
relevant_changes = pipeline.manager.resolveChangeKeys(
|
||||
change_keys)
|
||||
for change in relevant_changes:
|
||||
change.getRelatedChanges(self, relevant)
|
||||
relevant.add(change.cache_stat.key)
|
||||
return relevant
|
||||
|
||||
def maintainConnectionCache(self):
|
||||
|
|
|
@ -206,9 +206,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
|||
return key, data['data_uuid']
|
||||
|
||||
def prune(self, relevant, max_age=3600): # 1h
|
||||
# Relevant is the list of changes directly in a pipeline.
|
||||
# This method will take care of expanding that out to each
|
||||
# change's network of related changes.
|
||||
self.log.debug("Pruning cache")
|
||||
cutoff_time = time.time() - max_age
|
||||
outdated_versions = dict()
|
||||
to_keep = set(relevant)
|
||||
sched = self.connection.sched
|
||||
for c in list(self._change_cache.values()):
|
||||
# Assign to a local variable so all 3 values we use are
|
||||
# consistent in case the cache_stat is updated during this
|
||||
|
@ -216,11 +221,20 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
|||
cache_stat = c.cache_stat
|
||||
if cache_stat.last_modified >= cutoff_time:
|
||||
# This entry isn't old enough to delete yet
|
||||
to_keep.add(cache_stat.key)
|
||||
continue
|
||||
# Save the version we examined so we can make sure to only
|
||||
# delete that version.
|
||||
outdated_versions[cache_stat.key] = cache_stat.version
|
||||
to_prune = set(outdated_versions.keys()) - set(relevant)
|
||||
# Changes we want to keep may have localized networks; keep
|
||||
# them together even if one member hasn't been updated in a
|
||||
# while. Only when the entire network hasn't been modified in
|
||||
# max_age will any change in it be removed.
|
||||
for key in to_keep.copy():
|
||||
source = sched.connections.getSource(key.connection_name)
|
||||
change = source.getChangeByKey(key)
|
||||
change.getRelatedChanges(sched, to_keep)
|
||||
to_prune = set(outdated_versions.keys()) - to_keep
|
||||
for key in to_prune:
|
||||
self.delete(key, outdated_versions[key])
|
||||
self.log.debug("Done pruning cache")
|
||||
|
|
Loading…
Reference in New Issue