From b0a95abc928baa9cc92f903785ac733fb6114ac7 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 18 Oct 2017 09:39:18 -0700 Subject: [PATCH] Revert "Use weakref for change cache" This reverts commit b9704302bdb6857026319ac80df7b490a2282f89. This is strongly suspected of causing a memory leak. Change-Id: I0ebf9cee304277909a0b80420ac7ba659a437b29 --- tests/unit/test_scheduler.py | 9 +++---- zuul/connection/__init__.py | 7 ++++++ zuul/driver/gerrit/gerritconnection.py | 35 +++++++++++++++++++------- zuul/driver/github/githubconnection.py | 8 ++++-- zuul/scheduler.py | 19 ++++++++++++++ 5 files changed, 61 insertions(+), 17 deletions(-) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 90904213f3..6efc43fdfb 100755 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -875,8 +875,7 @@ class TestScheduler(ZuulTestCase): # already (without approvals), we need to clear the cache # first. for connection in self.connections.connections.values(): - if hasattr(connection, '_change_cache'): - connection._change_cache.clear() + connection.maintainCache([]) self.executor_server.hold_jobs_in_build = True A.addApproval('Approved', 1) @@ -946,8 +945,7 @@ class TestScheduler(ZuulTestCase): self.log.debug("len %s" % self.fake_gerrit._change_cache.keys()) # there should still be changes in the cache - self.assertNotEqual(len(list(self.fake_gerrit._change_cache.keys())), - 0) + self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -3933,8 +3931,7 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual(B.data['status'], 'NEW') for connection in self.connections.connections.values(): - if hasattr(connection, '_change_cache'): - connection._change_cache.clear() + connection.maintainCache([]) self.executor_server.hold_jobs_in_build = True B.addApproval('Approved', 1) diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py index ca10f21468..b44fa462ac 100644 --- a/zuul/connection/__init__.py +++ b/zuul/connection/__init__.py @@ -68,6 +68,13 @@ class BaseConnection(object, metaclass=abc.ABCMeta): def registerScheduler(self, sched): self.sched = sched + def maintainCache(self, relevant): + """Make cache contain relevant changes. + + This lets the user supply a list of change objects that are + still in use. Anything in our cache that isn't in the supplied + list should be safe to remove from the cache.""" + def registerWebapp(self, webapp): self.webapp = webapp diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 343c305916..83871e3edb 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -23,10 +23,9 @@ import logging import pprint import shlex import queue -import weakref +import voluptuous as v from typing import Dict, List -import voluptuous as v from zuul.connection import BaseConnection from zuul.model import Ref, Tag, Branch, Project @@ -143,8 +142,7 @@ class GerritEventConnector(threading.Thread): # cache as it may be a dependency if event.change_number: refresh = True - if ((event.change_number, event.patch_number) not in - self.connection._change_cache): + if event.change_number not in self.connection._change_cache: refresh = False for tenant in self.connection.sched.abide.tenants.values(): # TODO(fungi): it would be better to have some simple means @@ -302,7 +300,7 @@ class GerritConnection(BaseConnection): self.baseurl = self.connection_config.get('baseurl', 'https://%s' % self.server) - self._change_cache = weakref.WeakValueDictionary() + self._change_cache = {} self.projects = {} self.gerrit_event_connector = None self.source = driver.getSource(self) @@ -313,6 +311,22 @@ class GerritConnection(BaseConnection): def addProject(self, project: Project) -> None: self.projects[project.name] = project + def maintainCache(self, relevant): + # This lets the user supply a list of change objects that are + # still in use. Anything in our cache that isn't in the supplied + # list should be safe to remove from the cache. + remove = {} + for change_number, patchsets in self._change_cache.items(): + for patchset, change in patchsets.items(): + if change not in relevant: + remove.setdefault(change_number, []) + remove[change_number].append(patchset) + for change_number, patchsets in remove.items(): + for patchset in patchsets: + del self._change_cache[change_number][patchset] + if not self._change_cache[change_number]: + del self._change_cache[change_number] + def getChange(self, event, refresh=False): if event.change_number: change = self._getChange(event.change_number, event.patch_number, @@ -357,19 +371,22 @@ class GerritConnection(BaseConnection): return change def _getChange(self, number, patchset, refresh=False, history=None): - change = self._change_cache.get((number, patchset)) + change = self._change_cache.get(number, {}).get(patchset) if change and not refresh: return change if not change: change = GerritChange(None) change.number = number change.patchset = patchset - self._change_cache[(change.number, change.patchset)] = change + self._change_cache.setdefault(change.number, {}) + self._change_cache[change.number][change.patchset] = change try: self._updateChange(change, history) except Exception: - if self._change_cache.get((change.number, change.patchset)): - del self._change_cache[(change.number, change.patchset)] + if self._change_cache.get(change.number, {}).get(change.patchset): + del self._change_cache[change.number][change.patchset] + if not self._change_cache[change.number]: + del self._change_cache[change.number] raise return change diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 46c8ee52ce..3d0eb37951 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -21,7 +21,6 @@ import queue import threading import time import re -import weakref import cachecontrol from cachecontrol.cache import DictCache @@ -395,7 +394,7 @@ class GithubConnection(BaseConnection): def __init__(self, driver, connection_name, connection_config): super(GithubConnection, self).__init__( driver, connection_name, connection_config) - self._change_cache = weakref.WeakValueDictionary() + self._change_cache = {} self._project_branch_cache = {} self.projects = {} self.git_ssh_key = self.connection_config.get('sshkey') @@ -568,6 +567,11 @@ class GithubConnection(BaseConnection): # authenticated, if not then anonymous is the best we have. return self._github + def maintainCache(self, relevant): + for key, change in self._change_cache.items(): + if change not in relevant: + del self._change_cache[key] + def getChange(self, event, refresh=False): """Get the change representing an event.""" diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 026763b6b1..e5924f834e 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -595,6 +595,8 @@ class Scheduler(threading.Thread): self._reenqueueTenant(old_tenant, tenant) + # TODOv3(jeblair): update for tenants + # self.maintainConnectionCache() self.connections.reconfigureDrivers(tenant) # TODOv3(jeblair): remove postconfig calls? @@ -726,6 +728,23 @@ class Scheduler(threading.Thread): finally: self.run_handler_lock.release() + def maintainConnectionCache(self): + # TODOv3(jeblair): update for tenants + relevant = set() + for tenant in self.abide.tenants.values(): + for pipeline in tenant.layout.pipelines.values(): + self.log.debug("Gather relevant cache items for: %s" % + pipeline) + + for item in pipeline.getAllItems(): + relevant.add(item.change) + relevant.update(item.change.getRelatedChanges()) + for connection in self.connections.values(): + connection.maintainCache(relevant) + self.log.debug( + "End maintain connection cache for: %s" % connection) + self.log.debug("Connection cache size: %s" % len(relevant)) + def process_event_queue(self): self.log.debug("Fetching trigger event") event = self.trigger_event_queue.get()