diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index 3302f7b52..6351a3cd9 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -26,7 +26,11 @@ import queue import time import urllib.parse -from nodepool.driver.utils import QuotaInformation, RateLimiter +from nodepool.driver.utils import ( + QuotaInformation, + LazyExecutorTTLCache, + RateLimiter, +) from nodepool.driver import statemachine from nodepool import exceptions @@ -286,6 +290,33 @@ class AwsAdapter(statemachine.Adapter): self.s3 = self.aws.resource('s3') self.s3_client = self.aws.client('s3') self.aws_quotas = self.aws.client("service-quotas") + + workers = 10 + self.log.info("Create executor with max workers=%s", workers) + self.api_executor = ThreadPoolExecutor( + thread_name_prefix=f'aws-api-{provider_config.name}', + max_workers=workers) + + # Use a lazy TTL cache for these. This uses the TPE to + # asynchronously update the cached values, meanwhile returning + # the previous cached data if available. This means every + # call after the first one is instantaneous. + self._listInstances = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listInstances) + self._listVolumes = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listVolumes) + self._listAmis = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listAmis) + self._listSnapshots = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listSnapshots) + self._listObjects = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listObjects) + # In listResources, we reconcile AMIs which appear to be # imports but have no nodepool tags, however it's possible # that these aren't nodepool images. If we determine that's @@ -296,6 +327,7 @@ class AwsAdapter(statemachine.Adapter): def stop(self): self.create_executor.shutdown() + self.api_executor.shutdown() self._running = False def getCreateStateMachine(self, hostname, label, image_external_id, @@ -945,7 +977,6 @@ class AwsAdapter(statemachine.Adapter): return instance return None - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listInstances(self): with self.non_mutating_rate_limiter( self.log.debug, "Listed instances"): @@ -956,7 +987,6 @@ class AwsAdapter(statemachine.Adapter): instances.extend(res['Instances']) return instances - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listVolumes(self): with self.non_mutating_rate_limiter: paginator = self.ec2_client.get_paginator('describe_volumes') @@ -965,7 +995,6 @@ class AwsAdapter(statemachine.Adapter): volumes.extend(page['Volumes']) return volumes - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listAmis(self): # Note: this is overridden in tests due to the filter with self.non_mutating_rate_limiter: @@ -975,7 +1004,6 @@ class AwsAdapter(statemachine.Adapter): images.extend(page['Images']) return images - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listSnapshots(self): # Note: this is overridden in tests due to the filter with self.non_mutating_rate_limiter: @@ -985,7 +1013,6 @@ class AwsAdapter(statemachine.Adapter): snapshots.extend(page['Snapshots']) return snapshots - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listObjects(self): bucket_name = self.provider.object_storage.get('bucket-name') if not bucket_name: diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 3233bf945..713f5a7b2 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -289,6 +289,8 @@ class BaseTestCase(testtools.TestCase): continue if t.name.startswith("openstack-api"): continue + if t.name.startswith("aws-api"): + continue if t.name.startswith("keyscan-"): continue if t.name.startswith("start-"):