Use per-instance LRU caches on statemachine adapters
Several drivers used the lru_cache decorator which creates a global cache rather than an instance-specific cache. This could cause us to leak memory over time as adapters are replaced (this happens when the config for a provider changes and the driver is reloaded). To correct this, create the cache in the constructor and wrap the methods there. The Azure driver wasn't subject to the memory leak because it had a 24 hour TTL, but the intent of that method is the same as the others (images don't change), so let's use the same approach for all the drivers. Change-Id: Ifb0f2706c9ff6daf9ad84213d5fa4a52e123c39f
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import cachetools.func
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
@@ -212,6 +213,14 @@ class AwsAdapter(statemachine.Adapter):
|
||||
IMAGE_UPLOAD_SLEEP = 30
|
||||
|
||||
def __init__(self, provider_config):
|
||||
# Wrap these instance methods with a per-instance LRU cache so
|
||||
# that we don't leak memory over time when the adapter is
|
||||
# occasionally replaced.
|
||||
self._getInstanceType = functools.lru_cache(maxsize=None)(
|
||||
self._getInstanceType)
|
||||
self._getImage = functools.lru_cache(maxsize=None)(
|
||||
self._getImage)
|
||||
|
||||
self.log = logging.getLogger(
|
||||
f"nodepool.AwsAdapter.{provider_config.name}")
|
||||
self.provider = provider_config
|
||||
@@ -623,7 +632,7 @@ class AwsAdapter(statemachine.Adapter):
|
||||
args[code] = cores
|
||||
return QuotaInformation(**args)
|
||||
|
||||
@cachetools.func.lru_cache(maxsize=None)
|
||||
# This method is wrapped with an LRU cache in the constructor.
|
||||
def _getInstanceType(self, instance_type):
|
||||
with self.non_mutating_rate_limiter:
|
||||
self.log.debug(
|
||||
@@ -718,7 +727,7 @@ class AwsAdapter(statemachine.Adapter):
|
||||
|
||||
return image_id
|
||||
|
||||
@cachetools.func.lru_cache(maxsize=None)
|
||||
# This method is wrapped with an LRU cache in the constructor.
|
||||
def _getImage(self, image_id):
|
||||
with self.non_mutating_rate_limiter:
|
||||
return self.ec2.Image(image_id)
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
@@ -310,6 +311,12 @@ class AzureAdapter(statemachine.Adapter):
|
||||
log = logging.getLogger("nodepool.driver.azure.AzureAdapter")
|
||||
|
||||
def __init__(self, provider_config):
|
||||
# Wrap these instance methods with a per-instance LRU cache so
|
||||
# that we don't leak memory over time when the adapter is
|
||||
# occasionally replaced.
|
||||
self._getImage = functools.lru_cache(maxsize=None)(
|
||||
self._getImage)
|
||||
|
||||
self.provider = provider_config
|
||||
self.resource_group = self.provider.resource_group
|
||||
self.resource_group_location = self.provider.resource_group_location
|
||||
@@ -554,7 +561,7 @@ class AzureAdapter(statemachine.Adapter):
|
||||
self.skus[key] = sku
|
||||
self.log.debug("Done querying compute SKUs")
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=0, ttl=(24 * 60 * 60))
|
||||
# This method is wrapped with an LRU cache in the constructor.
|
||||
def _getImage(self, image_name):
|
||||
with self.rate_limiter:
|
||||
return self.azul.images.get(self.resource_group, image_name)
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import cachetools.func
|
||||
import functools
|
||||
import logging
|
||||
import math
|
||||
|
||||
@@ -144,6 +145,14 @@ class GceAdapter(statemachine.Adapter):
|
||||
log = logging.getLogger("nodepool.GceAdapter")
|
||||
|
||||
def __init__(self, provider_config):
|
||||
# Wrap these instance methods with a per-instance LRU cache so
|
||||
# that we don't leak memory over time when the adapter is
|
||||
# occasionally replaced.
|
||||
self._getMachineType = functools.lru_cache(maxsize=None)(
|
||||
self._getMachineType)
|
||||
self._getImageId = functools.lru_cache(maxsize=None)(
|
||||
self._getImageId)
|
||||
|
||||
self.provider = provider_config
|
||||
self.compute = googleapiclient.discovery.build('compute', 'v1')
|
||||
self.rate_limiter = RateLimiter(self.provider.name,
|
||||
@@ -252,7 +261,7 @@ class GceAdapter(statemachine.Adapter):
|
||||
result = q.execute()
|
||||
return result.get('items', [])
|
||||
|
||||
@cachetools.func.lru_cache(maxsize=None)
|
||||
# This method is wrapped with an LRU cache in the constructor.
|
||||
def _getImageId(self, cloud_image):
|
||||
image_id = cloud_image.image_id
|
||||
|
||||
@@ -269,7 +278,7 @@ class GceAdapter(statemachine.Adapter):
|
||||
|
||||
return image_id
|
||||
|
||||
@cachetools.func.lru_cache(maxsize=None)
|
||||
# This method is wrapped with an LRU cache in the constructor.
|
||||
def _getMachineType(self, machine_type):
|
||||
q = self.compute.machineTypes().get(
|
||||
project=self.provider.project,
|
||||
|
||||
Reference in New Issue
Block a user