Use thread pool executor for AWS API requests

So far we've cached most of the AWS API listings (instances, volumes,
AMIs, snapshots, objects) but with refreshes happening synchronously.

Since some of those methods are used as part of other methods during
request handling we make them asynchronous.

Change-Id: I22403699ebb39f3e4dcce778efaeb09328acd932
This commit is contained in:
Simon Westphahl
2023-10-12 11:13:16 +02:00
committed by James E. Blair
parent ee5cd42292
commit 3c71fc9f4b
2 changed files with 35 additions and 6 deletions

View File

@@ -26,7 +26,11 @@ import queue
import time
import urllib.parse
from nodepool.driver.utils import QuotaInformation, RateLimiter
from nodepool.driver.utils import (
QuotaInformation,
LazyExecutorTTLCache,
RateLimiter,
)
from nodepool.driver import statemachine
from nodepool import exceptions
@@ -286,6 +290,33 @@ class AwsAdapter(statemachine.Adapter):
self.s3 = self.aws.resource('s3')
self.s3_client = self.aws.client('s3')
self.aws_quotas = self.aws.client("service-quotas")
workers = 10
self.log.info("Create executor with max workers=%s", workers)
self.api_executor = ThreadPoolExecutor(
thread_name_prefix=f'aws-api-{provider_config.name}',
max_workers=workers)
# Use a lazy TTL cache for these. This uses the TPE to
# asynchronously update the cached values, meanwhile returning
# the previous cached data if available. This means every
# call after the first one is instantaneous.
self._listInstances = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listInstances)
self._listVolumes = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listVolumes)
self._listAmis = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listAmis)
self._listSnapshots = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listSnapshots)
self._listObjects = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listObjects)
# In listResources, we reconcile AMIs which appear to be
# imports but have no nodepool tags, however it's possible
# that these aren't nodepool images. If we determine that's
@@ -296,6 +327,7 @@ class AwsAdapter(statemachine.Adapter):
def stop(self):
self.create_executor.shutdown()
self.api_executor.shutdown()
self._running = False
def getCreateStateMachine(self, hostname, label, image_external_id,
@@ -945,7 +977,6 @@ class AwsAdapter(statemachine.Adapter):
return instance
return None
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listInstances(self):
with self.non_mutating_rate_limiter(
self.log.debug, "Listed instances"):
@@ -956,7 +987,6 @@ class AwsAdapter(statemachine.Adapter):
instances.extend(res['Instances'])
return instances
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listVolumes(self):
with self.non_mutating_rate_limiter:
paginator = self.ec2_client.get_paginator('describe_volumes')
@@ -965,7 +995,6 @@ class AwsAdapter(statemachine.Adapter):
volumes.extend(page['Volumes'])
return volumes
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listAmis(self):
# Note: this is overridden in tests due to the filter
with self.non_mutating_rate_limiter:
@@ -975,7 +1004,6 @@ class AwsAdapter(statemachine.Adapter):
images.extend(page['Images'])
return images
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listSnapshots(self):
# Note: this is overridden in tests due to the filter
with self.non_mutating_rate_limiter:
@@ -985,7 +1013,6 @@ class AwsAdapter(statemachine.Adapter):
snapshots.extend(page['Snapshots'])
return snapshots
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listObjects(self):
bucket_name = self.provider.object_storage.get('bucket-name')
if not bucket_name:

View File

@@ -289,6 +289,8 @@ class BaseTestCase(testtools.TestCase):
continue
if t.name.startswith("openstack-api"):
continue
if t.name.startswith("aws-api"):
continue
if t.name.startswith("keyscan-"):
continue
if t.name.startswith("start-"):