Revert "Use weakref for change cache"

This reverts commit b9704302bd.

This is strongly suspected of causing a memory leak.

Change-Id: I0ebf9cee304277909a0b80420ac7ba659a437b29
This commit is contained in:
James E. Blair 2017-10-18 09:39:18 -07:00
parent 11bd5ee86c
commit b0a95abc92
5 changed files with 61 additions and 17 deletions

View File

@ -875,8 +875,7 @@ class TestScheduler(ZuulTestCase):
# already (without approvals), we need to clear the cache
# first.
for connection in self.connections.connections.values():
if hasattr(connection, '_change_cache'):
connection._change_cache.clear()
connection.maintainCache([])
self.executor_server.hold_jobs_in_build = True
A.addApproval('Approved', 1)
@ -946,8 +945,7 @@ class TestScheduler(ZuulTestCase):
self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
# there should still be changes in the cache
self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())),
0)
self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -3933,8 +3931,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(B.data['status'], 'NEW')
for connection in self.connections.connections.values():
if hasattr(connection, '_change_cache'):
connection._change_cache.clear()
connection.maintainCache([])
self.executor_server.hold_jobs_in_build = True
B.addApproval('Approved', 1)

View File

@ -68,6 +68,13 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
def registerScheduler(self, sched):
self.sched = sched
def maintainCache(self, relevant):
"""Make cache contain relevant changes.
This lets the user supply a list of change objects that are
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
def registerWebapp(self, webapp):
self.webapp = webapp

View File

@ -23,10 +23,9 @@ import logging
import pprint
import shlex
import queue
import weakref
import voluptuous as v
from typing import Dict, List
import voluptuous as v
from zuul.connection import BaseConnection
from zuul.model import Ref, Tag, Branch, Project
@ -143,8 +142,7 @@ class GerritEventConnector(threading.Thread):
# cache as it may be a dependency
if event.change_number:
refresh = True
if ((event.change_number, event.patch_number) not in
self.connection._change_cache):
if event.change_number not in self.connection._change_cache:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@ -302,7 +300,7 @@ class GerritConnection(BaseConnection):
self.baseurl = self.connection_config.get('baseurl',
'https://%s' % self.server)
self._change_cache = weakref.WeakValueDictionary()
self._change_cache = {}
self.projects = {}
self.gerrit_event_connector = None
self.source = driver.getSource(self)
@ -313,6 +311,22 @@ class GerritConnection(BaseConnection):
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied
# list should be safe to remove from the cache.
remove = {}
for change_number, patchsets in self._change_cache.items():
for patchset, change in patchsets.items():
if change not in relevant:
remove.setdefault(change_number, [])
remove[change_number].append(patchset)
for change_number, patchsets in remove.items():
for patchset in patchsets:
del self._change_cache[change_number][patchset]
if not self._change_cache[change_number]:
del self._change_cache[change_number]
def getChange(self, event, refresh=False):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number,
@ -357,19 +371,22 @@ class GerritConnection(BaseConnection):
return change
def _getChange(self, number, patchset, refresh=False, history=None):
change = self._change_cache.get((number, patchset))
change = self._change_cache.get(number, {}).get(patchset)
if change and not refresh:
return change
if not change:
change = GerritChange(None)
change.number = number
change.patchset = patchset
self._change_cache[(change.number, change.patchset)] = change
self._change_cache.setdefault(change.number, {})
self._change_cache[change.number][change.patchset] = change
try:
self._updateChange(change, history)
except Exception:
if self._change_cache.get((change.number, change.patchset)):
del self._change_cache[(change.number, change.patchset)]
if self._change_cache.get(change.number, {}).get(change.patchset):
del self._change_cache[change.number][change.patchset]
if not self._change_cache[change.number]:
del self._change_cache[change.number]
raise
return change

View File

@ -21,7 +21,6 @@ import queue
import threading
import time
import re
import weakref
import cachecontrol
from cachecontrol.cache import DictCache
@ -395,7 +394,7 @@ class GithubConnection(BaseConnection):
def __init__(self, driver, connection_name, connection_config):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
self._change_cache = weakref.WeakValueDictionary()
self._change_cache = {}
self._project_branch_cache = {}
self.projects = {}
self.git_ssh_key = self.connection_config.get('sshkey')
@ -568,6 +567,11 @@ class GithubConnection(BaseConnection):
# authenticated, if not then anonymous is the best we have.
return self._github
def maintainCache(self, relevant):
for key, change in self._change_cache.items():
if change not in relevant:
del self._change_cache[key]
def getChange(self, event, refresh=False):
"""Get the change representing an event."""

View File

@ -595,6 +595,8 @@ class Scheduler(threading.Thread):
self._reenqueueTenant(old_tenant, tenant)
# TODOv3(jeblair): update for tenants
# self.maintainConnectionCache()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
@ -726,6 +728,23 @@ class Scheduler(threading.Thread):
finally:
self.run_handler_lock.release()
def maintainConnectionCache(self):
# TODOv3(jeblair): update for tenants
relevant = set()
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s" %
pipeline)
for item in pipeline.getAllItems():
relevant.add(item.change)
relevant.update(item.change.getRelatedChanges())
for connection in self.connections.values():
connection.maintainCache(relevant)
self.log.debug(
"End maintain connection cache for: %s" % connection)
self.log.debug("Connection cache size: %s" % len(relevant))
def process_event_queue(self):
self.log.debug("Fetching trigger event")
event = self.trigger_event_queue.get()