From 8806e833839ad7dd2b3123bb3be09a7e58c54eca Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 16 Mar 2022 11:10:30 -0700 Subject: [PATCH] Remove unused AWS implementation These files have been replaced by adapter.py. Change-Id: Ibbf0577f06a633183ce336ad5b322e10123bcdc0 --- nodepool/driver/aws/handler.py | 222 --------------------------- nodepool/driver/aws/provider.py | 263 -------------------------------- 2 files changed, 485 deletions(-) delete mode 100644 nodepool/driver/aws/handler.py delete mode 100644 nodepool/driver/aws/provider.py diff --git a/nodepool/driver/aws/handler.py b/nodepool/driver/aws/handler.py deleted file mode 100644 index c8e18ab5d..000000000 --- a/nodepool/driver/aws/handler.py +++ /dev/null @@ -1,222 +0,0 @@ -# Copyright 2018 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import math -import time - -import botocore.exceptions - -from nodepool import exceptions -from nodepool import zk -from nodepool.driver.utils import NodeLauncher, QuotaInformation -from nodepool.driver import NodeRequestHandler -from nodepool.nodeutils import nodescan - - -class AwsInstanceLauncher(NodeLauncher): - def __init__(self, handler, node, provider_config, provider_label): - super().__init__(handler, node, provider_config) - self.provider_name = provider_config.name - self.retries = provider_config.launch_retries - self.pool = provider_config.pools[provider_label.pool.name] - self.boot_timeout = provider_config.boot_timeout - self.label = provider_label - - def launch(self): - self.log.debug("Starting %s instance" % self.node.type) - attempts = 1 - while attempts <= self.retries: - try: - instance = self.handler.manager.createInstance(self.label) - break - except Exception: - if attempts <= self.retries: - self.log.exception( - "Launch attempt %d/%d failed for node %s:", - attempts, self.retries, self.node.id) - if attempts == self.retries: - raise - attempts += 1 - time.sleep(1) - - instance_id = instance.id - self.node.external_id = instance_id - self.zk.storeNode(self.node) - - boot_start = time.monotonic() - state = None - while time.monotonic() - boot_start < self.boot_timeout: - try: - state = instance.state.get('Name') - except botocore.exceptions.ClientError: - # This can happen if we try to get the instance too quickly. - time.sleep(0.5) - continue - self.log.debug("Instance %s is %s" % (instance_id, state)) - if state == 'running': - instance.create_tags( - Tags=[ - { - 'Key': 'nodepool_id', - 'Value': str(self.node.id) - }, - { - 'Key': 'nodepool_pool', - 'Value': str(self.pool.name) - }, - { - 'Key': 'nodepool_provider', - 'Value': str(self.provider_name) - } - ] - ) - break - time.sleep(0.5) - instance.reload() - if state is None or state != 'running': - raise exceptions.LaunchStatusException( - "Instance %s failed to start: %s" % (instance_id, state)) - - server_ip = instance.public_ip_address or instance.private_ip_address - if not server_ip: - raise exceptions.LaunchStatusException( - "Instance %s doesn't have a public ip" % instance_id) - - self.node.connection_port = self.label.cloud_image.connection_port - self.node.connection_type = self.label.cloud_image.connection_type - keys = [] - if self.pool.host_key_checking: - try: - if (self.node.connection_type == 'ssh' or - self.node.connection_type == 'network_cli'): - gather_hostkeys = True - else: - gather_hostkeys = False - keys = nodescan(server_ip, port=self.node.connection_port, - timeout=180, gather_hostkeys=gather_hostkeys) - except Exception: - raise exceptions.LaunchKeyscanException( - "Can't scan instance %s key" % instance_id) - - self.log.info("Instance %s ready" % instance_id) - self.node.state = zk.READY - self.node.external_id = instance_id - self.node.hostname = server_ip - self.node.interface_ip = server_ip - self.node.public_ipv4 = server_ip - self.node.host_keys = keys - self.node.username = self.label.cloud_image.username - self.node.python_path = self.label.cloud_image.python_path - self.node.shell_type = self.label.cloud_image.shell_type - self.zk.storeNode(self.node) - self.log.info("Instance %s is ready", instance_id) - - -class AwsNodeRequestHandler(NodeRequestHandler): - log = logging.getLogger("nodepool.driver.aws." - "AwsNodeRequestHandler") - - def __init__(self, pw, request): - super().__init__(pw, request) - self._threads = [] - - @property - def alive_thread_count(self): - count = 0 - for t in self._threads: - if t.is_alive(): - count += 1 - return count - - def imagesAvailable(self): - ''' - Determines if the requested images are available for this provider. - - :returns: True if it is available, False otherwise. - ''' - if self.provider.manage_images: - for label in self.request.node_types: - if self.pool.labels[label].cloud_image: - if not self.manager.labelReady(self.pool.labels[label]): - return False - return True - - def hasRemainingQuota(self, ntype): - ''' - Apply max_servers check, ignoring other quotas. - - :returns: True if we have room, False otherwise. - ''' - needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1) - n_running = self.manager.countNodes(self.pool.name) - pool_quota = QuotaInformation( - cores=math.inf, - instances=self.pool.max_servers - n_running, - ram=math.inf, - default=math.inf) - pool_quota.subtract(needed_quota) - self.log.debug("hasRemainingQuota({},{}) = {}".format( - self.pool, ntype, pool_quota)) - return pool_quota.non_negative() - - def hasProviderQuota(self, node_types): - ''' - Apply max_servers check to a whole request - - :returns: True if we have room, False otherwise. - ''' - needed_quota = QuotaInformation( - cores=1, - instances=len(node_types), - ram=1, - default=1) - pool_quota = QuotaInformation( - cores=math.inf, - instances=self.pool.max_servers, - ram=math.inf, - default=math.inf) - pool_quota.subtract(needed_quota) - self.log.debug("hasProviderQuota({},{}) = {}".format( - self.pool, node_types, pool_quota)) - return pool_quota.non_negative() - - def launchesComplete(self): - ''' - Check if all launch requests have completed. - - When all of the Node objects have reached a final state (READY or - FAILED), we'll know all threads have finished the launch process. - ''' - if not self._threads: - return True - - # Give the NodeLaunch threads time to finish. - if self.alive_thread_count: - return False - - node_states = [node.state for node in self.nodeset] - - # NOTE: It very important that NodeLauncher always sets one of - # these states, no matter what. - if not all(s in (zk.READY, zk.FAILED) for s in node_states): - return False - - return True - - def launch(self, node): - label = self.pool.labels[node.type[0]] - thd = AwsInstanceLauncher(self, node, self.provider, label) - thd.start() - self._threads.append(thd) diff --git a/nodepool/driver/aws/provider.py b/nodepool/driver/aws/provider.py deleted file mode 100644 index a2305c458..000000000 --- a/nodepool/driver/aws/provider.py +++ /dev/null @@ -1,263 +0,0 @@ -# Copyright 2018 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import math - -import boto3 -import botocore.exceptions - -import nodepool.exceptions -from nodepool.driver import Provider -from nodepool.driver.utils import NodeDeleter -from nodepool.driver.utils import QuotaInformation, QuotaSupport -from nodepool.driver.aws.handler import AwsNodeRequestHandler - - -class AwsInstance: - def __init__(self, name, metadatas, provider): - self.id = name - self.name = name - self.metadata = {} - if metadatas: - for metadata in metadatas: - if metadata["Key"] == "nodepool_id": - self.metadata['nodepool_node_id'] = metadata["Value"] - continue - if metadata["Key"] == "nodepool_pool": - self.metadata['nodepool_pool_name'] = metadata["Value"] - continue - if metadata["Key"] == "nodepool_provider": - self.metadata['nodepool_provider_name'] = metadata["Value"] - continue - - def get(self, name, default=None): - return getattr(self, name, default) - - -class AwsProvider(Provider, QuotaSupport): - log = logging.getLogger("nodepool.driver.aws.AwsProvider") - - def __init__(self, provider, *args): - self.provider = provider - self.ec2 = None - - def getRequestHandler(self, poolworker, request): - return AwsNodeRequestHandler(poolworker, request) - - def start(self, zk_conn): - self._zk = zk_conn - if self.ec2 is not None: - return True - self.log.debug("Starting") - self.aws = boto3.Session( - region_name=self.provider.region_name, - profile_name=self.provider.profile_name) - self.ec2 = self.aws.resource('ec2') - self.ec2_client = self.aws.client("ec2") - - def stop(self): - self.log.debug("Stopping") - - def listNodes(self): - servers = [] - - for instance in self.ec2.instances.all(): - if instance.state["Name"].lower() == "terminated": - continue - ours = False - if instance.tags: - for tag in instance.tags: - if (tag["Key"] == 'nodepool_provider' - and tag["Value"] == self.provider.name): - ours = True - break - if not ours: - continue - servers.append(AwsInstance( - instance.id, instance.tags, self.provider)) - return servers - - def countNodes(self, pool=None): - n = 0 - for instance in self.listNodes(): - if pool is not None: - if 'nodepool_pool_name' not in instance.metadata: - continue - if pool != instance.metadata['nodepool_pool_name']: - continue - n += 1 - return n - - def getLatestImageIdByFilters(self, image_filters): - res = self.ec2_client.describe_images( - Filters=image_filters - ).get("Images") - - images = sorted( - res, - key=lambda k: k["CreationDate"], - reverse=True - ) - - if not images: - msg = "No cloud-image (AMI) matches supplied image filters" - raise Exception(msg) - else: - return images[0].get("ImageId") - - def getImageId(self, cloud_image): - image_id = cloud_image.image_id - image_filters = cloud_image.image_filters - - if image_filters is not None: - if image_id is not None: - msg = "image-id and image-filters cannot by used together" - raise Exception(msg) - else: - return self.getLatestImageIdByFilters(image_filters) - - return image_id - - def getImage(self, cloud_image): - return self.ec2.Image(self.getImageId(cloud_image)) - - def labelReady(self, label): - if not label.cloud_image: - msg = "A cloud-image (AMI) must be supplied with the AWS driver." - raise Exception(msg) - - image = self.getImage(label.cloud_image) - # Image loading is deferred, check if it's really there - if image.state != 'available': - self.log.warning( - "Provider %s is configured to use %s as the AMI for" - " label %s and that AMI is there but unavailable in the" - " cloud." % (self.provider.name, - label.cloud_image.external_name, - label.name)) - return False - return True - - def join(self): - return True - - def cleanupLeakedResources(self): - # TODO: remove leaked resources if any - pass - - def startNodeCleanup(self, node): - t = NodeDeleter(self._zk, self, node) - t.start() - return t - - def cleanupNode(self, server_id): - if self.ec2 is None: - return False - instance = self.ec2.Instance(server_id) - try: - instance.terminate() - except botocore.exceptions.ClientError as e: - error_code = e.response.get('Error', {}).get('Code', 'Unknown') - if error_code == "InvalidInstanceID.NotFound": - raise nodepool.exceptions.NotFound() - raise e - - def waitForNodeCleanup(self, server_id): - # TODO: track instance deletion - return True - - def createInstance(self, label): - image_id = self.getImageId(label.cloud_image) - tags = label.tags - if not [tag for tag in label.tags if tag["Key"] == "Name"]: - tags.append( - {"Key": "Name", "Value": str(label.name)} - ) - args = dict( - ImageId=image_id, - MinCount=1, - MaxCount=1, - KeyName=label.key_name, - EbsOptimized=label.ebs_optimized, - InstanceType=label.instance_type, - NetworkInterfaces=[{ - 'AssociatePublicIpAddress': label.pool.public_ip, - 'DeviceIndex': 0}], - TagSpecifications=[{ - 'ResourceType': 'instance', - 'Tags': tags - }] - ) - - if label.pool.security_group_id: - args['NetworkInterfaces'][0]['Groups'] = [ - label.pool.security_group_id - ] - if label.pool.subnet_id: - args['NetworkInterfaces'][0]['SubnetId'] = label.pool.subnet_id - - if label.userdata: - args['UserData'] = label.userdata - - if label.iam_instance_profile: - if 'name' in label.iam_instance_profile: - args['IamInstanceProfile'] = { - 'Name': label.iam_instance_profile['name'] - } - elif 'arn' in label.iam_instance_profile: - args['IamInstanceProfile'] = { - 'Arn': label.iam_instance_profile['arn'] - } - - # Default block device mapping parameters are embedded in AMIs. - # We might need to supply our own mapping before lauching the instance. - # We basically want to make sure DeleteOnTermination is true and be - # able to set the volume type and size. - image = self.getImage(label.cloud_image) - # TODO: Flavors can also influence whether or not the VM spawns with a - # volume -- we basically need to ensure DeleteOnTermination is true - if hasattr(image, 'block_device_mappings'): - bdm = image.block_device_mappings - mapping = bdm[0] - if 'Ebs' in mapping: - mapping['Ebs']['DeleteOnTermination'] = True - if label.volume_size: - mapping['Ebs']['VolumeSize'] = label.volume_size - if label.volume_type: - mapping['Ebs']['VolumeType'] = label.volume_type - # If the AMI is a snapshot, we cannot supply an "encrypted" - # parameter - if 'Encrypted' in mapping['Ebs']: - del mapping['Ebs']['Encrypted'] - args['BlockDeviceMappings'] = [mapping] - - instances = self.ec2.create_instances(**args) - return self.ec2.Instance(instances[0].id) - - def getProviderLimits(self): - # TODO: query the api to get real limits - return QuotaInformation( - cores=math.inf, - instances=math.inf, - ram=math.inf, - default=math.inf) - - def quotaNeededByLabel(self, ntype, pool): - # TODO: return real quota information about a label - return QuotaInformation(cores=0, instances=1, ram=0, default=1) - - def unmanagedQuotaUsed(self): - # TODO: return real quota information about quota - return QuotaInformation()