Remove unused AWS implementation

These files have been replaced by adapter.py.

Change-Id: Ibbf0577f06a633183ce336ad5b322e10123bcdc0
changes/57/834057/2
James E. Blair 11 months ago
parent 348fd17e73
commit 8806e83383

@ -1,222 +0,0 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import math
import time
import botocore.exceptions
from nodepool import exceptions
from nodepool import zk
from nodepool.driver.utils import NodeLauncher, QuotaInformation
from nodepool.driver import NodeRequestHandler
from nodepool.nodeutils import nodescan
class AwsInstanceLauncher(NodeLauncher):
def __init__(self, handler, node, provider_config, provider_label):
super().__init__(handler, node, provider_config)
self.provider_name = provider_config.name
self.retries = provider_config.launch_retries
self.pool = provider_config.pools[provider_label.pool.name]
self.boot_timeout = provider_config.boot_timeout
self.label = provider_label
def launch(self):
self.log.debug("Starting %s instance" % self.node.type)
attempts = 1
while attempts <= self.retries:
try:
instance = self.handler.manager.createInstance(self.label)
break
except Exception:
if attempts <= self.retries:
self.log.exception(
"Launch attempt %d/%d failed for node %s:",
attempts, self.retries, self.node.id)
if attempts == self.retries:
raise
attempts += 1
time.sleep(1)
instance_id = instance.id
self.node.external_id = instance_id
self.zk.storeNode(self.node)
boot_start = time.monotonic()
state = None
while time.monotonic() - boot_start < self.boot_timeout:
try:
state = instance.state.get('Name')
except botocore.exceptions.ClientError:
# This can happen if we try to get the instance too quickly.
time.sleep(0.5)
continue
self.log.debug("Instance %s is %s" % (instance_id, state))
if state == 'running':
instance.create_tags(
Tags=[
{
'Key': 'nodepool_id',
'Value': str(self.node.id)
},
{
'Key': 'nodepool_pool',
'Value': str(self.pool.name)
},
{
'Key': 'nodepool_provider',
'Value': str(self.provider_name)
}
]
)
break
time.sleep(0.5)
instance.reload()
if state is None or state != 'running':
raise exceptions.LaunchStatusException(
"Instance %s failed to start: %s" % (instance_id, state))
server_ip = instance.public_ip_address or instance.private_ip_address
if not server_ip:
raise exceptions.LaunchStatusException(
"Instance %s doesn't have a public ip" % instance_id)
self.node.connection_port = self.label.cloud_image.connection_port
self.node.connection_type = self.label.cloud_image.connection_type
keys = []
if self.pool.host_key_checking:
try:
if (self.node.connection_type == 'ssh' or
self.node.connection_type == 'network_cli'):
gather_hostkeys = True
else:
gather_hostkeys = False
keys = nodescan(server_ip, port=self.node.connection_port,
timeout=180, gather_hostkeys=gather_hostkeys)
except Exception:
raise exceptions.LaunchKeyscanException(
"Can't scan instance %s key" % instance_id)
self.log.info("Instance %s ready" % instance_id)
self.node.state = zk.READY
self.node.external_id = instance_id
self.node.hostname = server_ip
self.node.interface_ip = server_ip
self.node.public_ipv4 = server_ip
self.node.host_keys = keys
self.node.username = self.label.cloud_image.username
self.node.python_path = self.label.cloud_image.python_path
self.node.shell_type = self.label.cloud_image.shell_type
self.zk.storeNode(self.node)
self.log.info("Instance %s is ready", instance_id)
class AwsNodeRequestHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.aws."
"AwsNodeRequestHandler")
def __init__(self, pw, request):
super().__init__(pw, request)
self._threads = []
@property
def alive_thread_count(self):
count = 0
for t in self._threads:
if t.is_alive():
count += 1
return count
def imagesAvailable(self):
'''
Determines if the requested images are available for this provider.
:returns: True if it is available, False otherwise.
'''
if self.provider.manage_images:
for label in self.request.node_types:
if self.pool.labels[label].cloud_image:
if not self.manager.labelReady(self.pool.labels[label]):
return False
return True
def hasRemainingQuota(self, ntype):
'''
Apply max_servers check, ignoring other quotas.
:returns: True if we have room, False otherwise.
'''
needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1)
n_running = self.manager.countNodes(self.pool.name)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers - n_running,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasRemainingQuota({},{}) = {}".format(
self.pool, ntype, pool_quota))
return pool_quota.non_negative()
def hasProviderQuota(self, node_types):
'''
Apply max_servers check to a whole request
:returns: True if we have room, False otherwise.
'''
needed_quota = QuotaInformation(
cores=1,
instances=len(node_types),
ram=1,
default=1)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasProviderQuota({},{}) = {}".format(
self.pool, node_types, pool_quota))
return pool_quota.non_negative()
def launchesComplete(self):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY or
FAILED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
# Give the NodeLaunch threads time to finish.
if self.alive_thread_count:
return False
node_states = [node.state for node in self.nodeset]
# NOTE: It very important that NodeLauncher always sets one of
# these states, no matter what.
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = AwsInstanceLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)

@ -1,263 +0,0 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import math
import boto3
import botocore.exceptions
import nodepool.exceptions
from nodepool.driver import Provider
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.aws.handler import AwsNodeRequestHandler
class AwsInstance:
def __init__(self, name, metadatas, provider):
self.id = name
self.name = name
self.metadata = {}
if metadatas:
for metadata in metadatas:
if metadata["Key"] == "nodepool_id":
self.metadata['nodepool_node_id'] = metadata["Value"]
continue
if metadata["Key"] == "nodepool_pool":
self.metadata['nodepool_pool_name'] = metadata["Value"]
continue
if metadata["Key"] == "nodepool_provider":
self.metadata['nodepool_provider_name'] = metadata["Value"]
continue
def get(self, name, default=None):
return getattr(self, name, default)
class AwsProvider(Provider, QuotaSupport):
log = logging.getLogger("nodepool.driver.aws.AwsProvider")
def __init__(self, provider, *args):
self.provider = provider
self.ec2 = None
def getRequestHandler(self, poolworker, request):
return AwsNodeRequestHandler(poolworker, request)
def start(self, zk_conn):
self._zk = zk_conn
if self.ec2 is not None:
return True
self.log.debug("Starting")
self.aws = boto3.Session(
region_name=self.provider.region_name,
profile_name=self.provider.profile_name)
self.ec2 = self.aws.resource('ec2')
self.ec2_client = self.aws.client("ec2")
def stop(self):
self.log.debug("Stopping")
def listNodes(self):
servers = []
for instance in self.ec2.instances.all():
if instance.state["Name"].lower() == "terminated":
continue
ours = False
if instance.tags:
for tag in instance.tags:
if (tag["Key"] == 'nodepool_provider'
and tag["Value"] == self.provider.name):
ours = True
break
if not ours:
continue
servers.append(AwsInstance(
instance.id, instance.tags, self.provider))
return servers
def countNodes(self, pool=None):
n = 0
for instance in self.listNodes():
if pool is not None:
if 'nodepool_pool_name' not in instance.metadata:
continue
if pool != instance.metadata['nodepool_pool_name']:
continue
n += 1
return n
def getLatestImageIdByFilters(self, image_filters):
res = self.ec2_client.describe_images(
Filters=image_filters
).get("Images")
images = sorted(
res,
key=lambda k: k["CreationDate"],
reverse=True
)
if not images:
msg = "No cloud-image (AMI) matches supplied image filters"
raise Exception(msg)
else:
return images[0].get("ImageId")
def getImageId(self, cloud_image):
image_id = cloud_image.image_id
image_filters = cloud_image.image_filters
if image_filters is not None:
if image_id is not None:
msg = "image-id and image-filters cannot by used together"
raise Exception(msg)
else:
return self.getLatestImageIdByFilters(image_filters)
return image_id
def getImage(self, cloud_image):
return self.ec2.Image(self.getImageId(cloud_image))
def labelReady(self, label):
if not label.cloud_image:
msg = "A cloud-image (AMI) must be supplied with the AWS driver."
raise Exception(msg)
image = self.getImage(label.cloud_image)
# Image loading is deferred, check if it's really there
if image.state != 'available':
self.log.warning(
"Provider %s is configured to use %s as the AMI for"
" label %s and that AMI is there but unavailable in the"
" cloud." % (self.provider.name,
label.cloud_image.external_name,
label.name))
return False
return True
def join(self):
return True
def cleanupLeakedResources(self):
# TODO: remove leaked resources if any
pass
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
if self.ec2 is None:
return False
instance = self.ec2.Instance(server_id)
try:
instance.terminate()
except botocore.exceptions.ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
if error_code == "InvalidInstanceID.NotFound":
raise nodepool.exceptions.NotFound()
raise e
def waitForNodeCleanup(self, server_id):
# TODO: track instance deletion
return True
def createInstance(self, label):
image_id = self.getImageId(label.cloud_image)
tags = label.tags
if not [tag for tag in label.tags if tag["Key"] == "Name"]:
tags.append(
{"Key": "Name", "Value": str(label.name)}
)
args = dict(
ImageId=image_id,
MinCount=1,
MaxCount=1,
KeyName=label.key_name,
EbsOptimized=label.ebs_optimized,
InstanceType=label.instance_type,
NetworkInterfaces=[{
'AssociatePublicIpAddress': label.pool.public_ip,
'DeviceIndex': 0}],
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': tags
}]
)
if label.pool.security_group_id:
args['NetworkInterfaces'][0]['Groups'] = [
label.pool.security_group_id
]
if label.pool.subnet_id:
args['NetworkInterfaces'][0]['SubnetId'] = label.pool.subnet_id
if label.userdata:
args['UserData'] = label.userdata
if label.iam_instance_profile:
if 'name' in label.iam_instance_profile:
args['IamInstanceProfile'] = {
'Name': label.iam_instance_profile['name']
}
elif 'arn' in label.iam_instance_profile:
args['IamInstanceProfile'] = {
'Arn': label.iam_instance_profile['arn']
}
# Default block device mapping parameters are embedded in AMIs.
# We might need to supply our own mapping before lauching the instance.
# We basically want to make sure DeleteOnTermination is true and be
# able to set the volume type and size.
image = self.getImage(label.cloud_image)
# TODO: Flavors can also influence whether or not the VM spawns with a
# volume -- we basically need to ensure DeleteOnTermination is true
if hasattr(image, 'block_device_mappings'):
bdm = image.block_device_mappings
mapping = bdm[0]
if 'Ebs' in mapping:
mapping['Ebs']['DeleteOnTermination'] = True
if label.volume_size:
mapping['Ebs']['VolumeSize'] = label.volume_size
if label.volume_type:
mapping['Ebs']['VolumeType'] = label.volume_type
# If the AMI is a snapshot, we cannot supply an "encrypted"
# parameter
if 'Encrypted' in mapping['Ebs']:
del mapping['Ebs']['Encrypted']
args['BlockDeviceMappings'] = [mapping]
instances = self.ec2.create_instances(**args)
return self.ec2.Instance(instances[0].id)
def getProviderLimits(self):
# TODO: query the api to get real limits
return QuotaInformation(
cores=math.inf,
instances=math.inf,
ram=math.inf,
default=math.inf)
def quotaNeededByLabel(self, ntype, pool):
# TODO: return real quota information about a label
return QuotaInformation(cores=0, instances=1, ram=0, default=1)
def unmanagedQuotaUsed(self):
# TODO: return real quota information about quota
return QuotaInformation()
Loading…
Cancel
Save