From 46e130fe1aba2f02ed19148796eddff3c067e34e Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 11 Apr 2022 10:14:20 -0700 Subject: [PATCH] Add more debug info to AWS driver These changes are all in service of being able to better understand AWS driver log messages: * Use annotated loggers in the statemachine provider framework so that we see the request, node, and provider information * Have the statemachine framework pass annotated loggers to the state machines themselves so that the above information is available for log messages on individual API calls * Add optional performance information to the rate limit handler (delay and API call duration) * Add some additional log entries to the AWS adapter Also: * Suppress boto logging by default in unit tests (it is verbose and usually not helpful) * Add coverage of node deletion in the AWS driver tests Change-Id: I0e6b4ad72d1af7f776da73c5dd2a50b40f60e4a2 --- nodepool/driver/aws/adapter.py | 47 +++++++++++++++----------- nodepool/driver/azure/adapter.py | 4 +-- nodepool/driver/example/adapter.py | 4 +-- nodepool/driver/ibmvpc/adapter.py | 4 +-- nodepool/driver/metastatic/adapter.py | 4 +-- nodepool/driver/statemachine.py | 35 ++++++++++++++----- nodepool/driver/utils.py | 46 +++++++++++++++++++++++-- nodepool/tests/__init__.py | 6 ++++ nodepool/tests/unit/test_driver_aws.py | 4 +++ 9 files changed, 115 insertions(+), 39 deletions(-) diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index 8a6023cd0..73997ffd5 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -76,7 +76,8 @@ class AwsDeleteStateMachine(statemachine.StateMachine): DISK_DELETING = 'deleting disk' COMPLETE = 'complete' - def __init__(self, adapter, external_id): + def __init__(self, adapter, external_id, log): + self.log = log super().__init__() self.adapter = adapter self.external_id = external_id @@ -84,7 +85,7 @@ class AwsDeleteStateMachine(statemachine.StateMachine): def advance(self): if self.state == self.START: self.instance = self.adapter._deleteInstance( - self.external_id) + self.external_id, self.log) self.state = self.VM_DELETING if self.state == self.VM_DELETING: @@ -102,7 +103,8 @@ class AwsCreateStateMachine(statemachine.StateMachine): COMPLETE = 'complete' def __init__(self, adapter, hostname, label, image_external_id, - metadata, retries): + metadata, retries, log): + self.log = log super().__init__() self.adapter = adapter self.retries = retries @@ -125,7 +127,7 @@ class AwsCreateStateMachine(statemachine.StateMachine): self.instance = self.adapter._createInstance( self.label, self.image_external_id, - self.tags, self.hostname) + self.tags, self.hostname, self.log) self.state = self.INSTANCE_CREATING if self.state == self.INSTANCE_CREATING: @@ -140,7 +142,7 @@ class AwsCreateStateMachine(statemachine.StateMachine): raise Exception("Too many retries") self.attempts += 1 self.instance = self.adapter._deleteInstance( - self.external_id) + self.external_id, self.log) self.state = self.INSTANCE_RETRY else: return @@ -157,11 +159,11 @@ class AwsCreateStateMachine(statemachine.StateMachine): class AwsAdapter(statemachine.Adapter): - log = logging.getLogger("nodepool.driver.aws.AwsAdapter") - IMAGE_UPLOAD_SLEEP = 30 def __init__(self, provider_config): + self.log = logging.getLogger( + f"nodepool.AwsAdapter.{provider_config.name}") self.provider = provider_config # The standard rate limit, this might be 1 request per second self.rate_limiter = RateLimiter(self.provider.name, @@ -189,12 +191,12 @@ class AwsAdapter(statemachine.Adapter): self.not_our_snapshots = set() def getCreateStateMachine(self, hostname, label, - image_external_id, metadata, retries): + image_external_id, metadata, retries, log): return AwsCreateStateMachine(self, hostname, label, - image_external_id, metadata, retries) + image_external_id, metadata, retries, log) - def getDeleteStateMachine(self, external_id): - return AwsDeleteStateMachine(self, external_id) + def getDeleteStateMachine(self, external_id, log): + return AwsDeleteStateMachine(self, external_id, log) def listResources(self): self._tagAmis() @@ -249,6 +251,7 @@ class AwsAdapter(statemachine.Adapter): def getQuotaLimits(self): with self.non_mutating_rate_limiter: + self.log.debug("Getting quota limits") response = self.aws_quotas.get_service_quota( ServiceCode='ec2', QuotaCode='L-1216C47A' @@ -432,7 +435,8 @@ class AwsAdapter(statemachine.Adapter): @cachetools.func.ttl_cache(maxsize=1, ttl=10) def _listInstances(self): - with self.non_mutating_rate_limiter: + with self.non_mutating_rate_limiter( + self.log.debug, "Listed instances"): return self.ec2.instances.all() @cachetools.func.ttl_cache(maxsize=1, ttl=10) @@ -505,7 +509,7 @@ class AwsAdapter(statemachine.Adapter): return self.ec2.Image(image_id) def _createInstance(self, label, image_external_id, - tags, hostname): + tags, hostname, log): if image_external_id: image_id = image_external_id else: @@ -579,20 +583,23 @@ class AwsAdapter(statemachine.Adapter): del mapping['Ebs']['Encrypted'] args['BlockDeviceMappings'] = [mapping] - with self.rate_limiter: - self.log.debug(f"Creating VM {hostname}") + with self.rate_limiter(log.debug, "Created instance"): + log.debug(f"Creating VM {hostname}") instances = self.ec2.create_instances(**args) + log.debug(f"Created VM {hostname} as instance {instances[0].id}") return self.ec2.Instance(instances[0].id) - def _deleteInstance(self, external_id): + def _deleteInstance(self, external_id, log=None): + if log is None: + log = self.log for instance in self._listInstances(): if instance.id == external_id: break else: - self.log.warning(f"Instance not found when deleting {external_id}") + log.warning(f"Instance not found when deleting {external_id}") return None - with self.rate_limiter: - self.log.debug(f"Deleting instance {external_id}") + with self.rate_limiter(log.debug, "Deleted instance"): + log.debug(f"Deleting instance {external_id}") instance.terminate() return instance @@ -603,7 +610,7 @@ class AwsAdapter(statemachine.Adapter): else: self.log.warning(f"Volume not found when deleting {external_id}") return None - with self.rate_limiter: + with self.rate_limiter(self.log.debug, "Deleted volume"): self.log.debug(f"Deleting volume {external_id}") volume.delete() return volume diff --git a/nodepool/driver/azure/adapter.py b/nodepool/driver/azure/adapter.py index efe4ff5b6..e50171ebb 100644 --- a/nodepool/driver/azure/adapter.py +++ b/nodepool/driver/azure/adapter.py @@ -329,11 +329,11 @@ class AzureAdapter(statemachine.Adapter): self._getSKUs() def getCreateStateMachine(self, hostname, label, - image_external_id, metadata, retries): + image_external_id, metadata, retries, log): return AzureCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) - def getDeleteStateMachine(self, external_id): + def getDeleteStateMachine(self, external_id, log): return AzureDeleteStateMachine(self, external_id) def listResources(self): diff --git a/nodepool/driver/example/adapter.py b/nodepool/driver/example/adapter.py index 6804c11bc..1c40987c1 100644 --- a/nodepool/driver/example/adapter.py +++ b/nodepool/driver/example/adapter.py @@ -93,10 +93,10 @@ class Adapter(statemachine.Adapter): provider_config.rate_limit) self.cloud = object() - def getCreateStateMachine(self, hostname, label, metadata, retries): + def getCreateStateMachine(self, hostname, label, metadata, retries, log): return CreateStateMachine(self, hostname, label, metadata, retries) - def getDeleteStateMachine(self, external_id): + def getDeleteStateMachine(self, external_id, log): return DeleteStateMachine(self, external_id) def cleanupLeakedResources(self, known_nodes, metadata): diff --git a/nodepool/driver/ibmvpc/adapter.py b/nodepool/driver/ibmvpc/adapter.py index 52b6ff640..5e60e4ea8 100644 --- a/nodepool/driver/ibmvpc/adapter.py +++ b/nodepool/driver/ibmvpc/adapter.py @@ -379,11 +379,11 @@ class IBMVPCAdapter(statemachine.Adapter): return authenticator def getCreateStateMachine(self, hostname, label, - image_external_id, metadata, retries): + image_external_id, metadata, retries, log): return IBMVPCCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) - def getDeleteStateMachine(self, external_id): + def getDeleteStateMachine(self, external_id, log): return IBMVPCDeleteStateMachine(self, external_id) def listResources(self): diff --git a/nodepool/driver/metastatic/adapter.py b/nodepool/driver/metastatic/adapter.py index 2e87b73b3..a24f0b661 100644 --- a/nodepool/driver/metastatic/adapter.py +++ b/nodepool/driver/metastatic/adapter.py @@ -270,12 +270,12 @@ class MetastaticAdapter(statemachine.Adapter): return self._provider._zk def getCreateStateMachine(self, hostname, label, - image_external_id, metadata, retries): + image_external_id, metadata, retries, log): return MetastaticCreateStateMachine(self, hostname, label, image_external_id, metadata, retries) - def getDeleteStateMachine(self, external_id): + def getDeleteStateMachine(self, external_id, log): return MetastaticDeleteStateMachine(self, external_id) def listResources(self): diff --git a/nodepool/driver/statemachine.py b/nodepool/driver/statemachine.py index 75f0fffd5..cd4bd92b4 100644 --- a/nodepool/driver/statemachine.py +++ b/nodepool/driver/statemachine.py @@ -69,7 +69,8 @@ class StateMachineNodeLauncher(stats.StatsReporter): def __init__(self, handler, node, provider_config): super().__init__() # Based on utils.NodeLauncher - logger = logging.getLogger("nodepool.StateMachineNodeLauncher") + logger = logging.getLogger( + f"nodepool.StateMachineNodeLauncher.{provider_config.name}") request = handler.request self.log = get_annotated_logger(logger, event_id=request.event_id, @@ -131,7 +132,7 @@ class StateMachineNodeLauncher(stats.StatsReporter): 'nodepool_pool_name': self.handler.pool.name, 'nodepool_provider_name': self.manager.provider.name} self.state_machine = self.manager.adapter.getCreateStateMachine( - hostname, label, image_external_id, metadata, retries) + hostname, label, image_external_id, metadata, retries, self.log) def updateNodeFromInstance(self, instance): if instance is None: @@ -267,7 +268,11 @@ class StateMachineNodeDeleter: def __init__(self, zk, provider_manager, node): # Based on utils.NodeDeleter - self.log = logging.getLogger("nodepool.StateMachineNodeDeleter") + logger = logging.getLogger( + "nodepool.StateMachineNodeDeleter." + f"{provider_manager.provider.name}") + self.log = get_annotated_logger(logger, + node_id=node.id) self.manager = provider_manager self.zk = zk # Note: the node is locked @@ -275,7 +280,7 @@ class StateMachineNodeDeleter: # Local additions: self.start_time = time.monotonic() self.state_machine = self.manager.adapter.getDeleteStateMachine( - node.external_id) + node.external_id, self.log) @property def complete(self): @@ -449,12 +454,12 @@ class StateMachineProvider(Provider, QuotaSupport): """The Provider implementation for the StateMachineManager driver framework""" - log = logging.getLogger("nodepool.driver.statemachine." - "StateMachineProvider") MINIMUM_SLEEP = 1 MAXIMUM_SLEEP = 10 def __init__(self, adapter, provider): + self.log = logging.getLogger( + f"nodepool.StateMachineProvider.{provider.name}") super().__init__() self.provider = provider self.adapter = adapter @@ -500,7 +505,11 @@ class StateMachineProvider(Provider, QuotaSupport): while self.running: to_remove = [] loop_start = time.monotonic() - for sm in self.deleters + self.launchers: + state_machines = self.deleters + self.launchers + if state_machines: + self.log.debug("Running %s state machines", + len(state_machines)) + for sm in state_machines: try: sm.runStateMachine() if sm.complete: @@ -514,6 +523,9 @@ class StateMachineProvider(Provider, QuotaSupport): if sm in self.launchers: self.launchers.remove(sm) loop_end = time.monotonic() + if state_machines: + self.log.debug("Ran %s state machines in %s seconds", + len(state_machines), loop_end - loop_start) if self.launchers or self.deleters: time.sleep(max(0, self.MAXIMUM_SLEEP - (loop_end - loop_start))) @@ -810,7 +822,8 @@ class Adapter: pass def getCreateStateMachine(self, hostname, label, - image_external_id, metadata, retries): + image_external_id, metadata, retries, + log): """Return a state machine suitable for creating an instance This method should return a new state machine object @@ -828,13 +841,15 @@ class Adapter: returned from `listInstances`. :param retries int: The number of attempts which should be made to launch the node. + :param log Logger: A logger instance for emitting annotated + logs related to the request. :returns: A :py:class:`StateMachine` object. """ raise NotImplementedError() - def getDeleteStateMachine(self, external_id): + def getDeleteStateMachine(self, external_id, log): """Return a state machine suitable for deleting an instance This method should return a new state machine object @@ -842,6 +857,8 @@ class Adapter: :param str external_id: The external_id of the instance, as supplied by a creation StateMachine or an Instance. + :param log Logger: A logger instance for emitting annotated + logs related to the request. """ raise NotImplementedError() diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py index 2d1cd8a3f..3af156112 100644 --- a/nodepool/driver/utils.py +++ b/nodepool/driver/utils.py @@ -376,6 +376,25 @@ class QuotaSupport: return used_quota +class RateLimitInstance: + def __init__(self, limiter, logger, msg): + self.limiter = limiter + self.logger = logger + self.msg = msg + + def __enter__(self): + self.delay = self.limiter._enter() + self.start_time = time.monotonic() + + def __exit__(self, etype, value, tb): + end_time = time.monotonic() + self.limiter._exit(etype, value, tb) + self.logger("%s in %ss after %ss delay", + self.msg, + end_time - self.start_time, + self.delay) + + class RateLimiter: """A Rate limiter @@ -389,6 +408,16 @@ class RateLimiter: rate_limiter = RateLimiter('provider', 1.0) with rate_limiter: api_call() + + You can optionally use the limiter as a callable in which case it + will log a supplied message with timing information. + + .. code:: python + + rate_limiter = RateLimiter('provider', 1.0) + with rate_limiter(log.debug, "an API call"): + api_call() + """ def __init__(self, name, rate_limit): @@ -397,14 +426,27 @@ class RateLimiter: self.delta = 1.0 / rate_limit self.last_ts = None + def __call__(self, logmethod, msg): + return RateLimitInstance(self, logmethod, msg) + def __enter__(self): + self._enter() + + def _enter(self): + total_delay = 0.0 if self.last_ts is None: - return + return total_delay while True: delta = time.monotonic() - self.last_ts if delta >= self.delta: break - time.sleep(self.delta - delta) + delay = self.delta - delta + time.sleep(delay) + total_delay += delay + return total_delay def __exit__(self, etype, value, tb): + self._exit(etype, value, tb) + + def _exit(self, etype, value, tb): self.last_ts = time.monotonic() diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 6a735781a..56edb03db 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -192,6 +192,12 @@ class BaseTestCase(testtools.TestCase): l = logging.getLogger('stevedore') l.setLevel(logging.INFO) l.propagate = False + l = logging.getLogger('botocore') + l.setLevel(logging.INFO) + l.propagate = False + l = logging.getLogger('boto3') + l.setLevel(logging.INFO) + l.propagate = False self.useFixture(fixtures.NestedTempfile()) self.subprocesses = [] diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index 334c920df..7132301ab 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -194,6 +194,10 @@ class TestDriverAws(tests.DBTestCase): response = instance.describe_attribute(Attribute='ebsOptimized') self.assertFalse(response['EbsOptimized']['Value']) + node.state = zk.USED + self.zk.storeNode(node) + self.waitForNodeDeletion(node) + def test_aws_by_filters(self): req = self.requestNode('aws/aws.yaml', 'ubuntu1404-by-filters') node = self.assertSuccess(req)