nodepool/nodepool/tests/unit/test_driver_aws.py

1226 lines
47 KiB
Python

# Copyright (C) 2018 Red Hat
# Copyright 2022-2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import fixtures
import logging
import urllib.parse
import boto3
import botocore.exceptions
from moto import mock_aws
import testtools
from nodepool import config as nodepool_config
from nodepool import tests
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout
import nodepool.driver.statemachine
from nodepool.driver.statemachine import StateMachineProvider
import nodepool.driver.aws.adapter
from nodepool.driver.aws.adapter import AwsInstance, AwsAdapter
from nodepool.tests.unit.fake_aws import FakeAws
class Dummy:
pass
class FakeAwsAdapter(AwsAdapter):
# Patch/override adapter methods to aid unit tests
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
# Note: boto3 doesn't handle ipv6 addresses correctly
# when in fake mode so we need to intercept the
# run_instances call and validate the args we supply.
def _fake_run_instances(*args, **kwargs):
self.__testcase.run_instance_calls.append(kwargs)
return self.ec2_client.run_instances_orig(*args, **kwargs)
# The ImdsSupport parameter isn't handled by moto
def _fake_register_image(*args, **kwargs):
self.__testcase.register_image_calls.append(kwargs)
return self.ec2_client.register_image_orig(*args, **kwargs)
def _fake_get_paginator(*args, **kwargs):
try:
return self.__testcase.fake_aws.get_paginator(*args, **kwargs)
except NotImplementedError:
return self.ec2_client.get_paginator_orig(*args, **kwargs)
self.ec2_client.run_instances_orig = self.ec2_client.run_instances
self.ec2_client.run_instances = _fake_run_instances
self.ec2_client.register_image_orig = self.ec2_client.register_image
self.ec2_client.register_image = _fake_register_image
self.ec2_client.import_snapshot = \
self.__testcase.fake_aws.import_snapshot
self.ec2_client.import_image = \
self.__testcase.fake_aws.import_image
self.ec2_client.get_paginator_orig = self.ec2_client.get_paginator
self.ec2_client.get_paginator = _fake_get_paginator
# moto does not mock service-quotas, so we do it ourselves:
def _fake_get_service_quota(ServiceCode, QuotaCode, *args, **kwargs):
if ServiceCode == 'ec2':
qdict = self.__ec2_quotas
elif ServiceCode == 'ebs':
qdict = self.__ebs_quotas
else:
raise NotImplementedError(
f"Quota code {ServiceCode} not implemented")
return {'Quota': {'Value': qdict.get(QuotaCode)}}
self.aws_quotas.get_service_quota = _fake_get_service_quota
def _fake_list_service_quotas(ServiceCode, *args, **kwargs):
if ServiceCode == 'ec2':
qdict = self.__ec2_quotas
elif ServiceCode == 'ebs':
qdict = self.__ebs_quotas
else:
raise NotImplementedError(
f"Quota code {ServiceCode} not implemented")
quotas = []
for code, value in qdict.items():
quotas.append(
{'Value': value, 'QuotaCode': code}
)
return {'Quotas': quotas}
self.aws_quotas.list_service_quotas = _fake_list_service_quotas
def ec2_quotas(quotas):
"""Specify a set of AWS EC2 quota values for use by a test method.
:arg dict quotas: The quota dictionary.
"""
def decorator(test):
test.__aws_ec2_quotas__ = quotas
return test
return decorator
def ebs_quotas(quotas):
"""Specify a set of AWS EBS quota values for use by a test method.
:arg dict quotas: The quota dictionary.
"""
def decorator(test):
test.__aws_ebs_quotas__ = quotas
return test
return decorator
class TestDriverAws(tests.DBTestCase):
log = logging.getLogger("nodepool.TestDriverAws")
mock_aws = mock_aws()
def setUp(self):
super().setUp()
StateMachineProvider.MINIMUM_SLEEP = 0.1
StateMachineProvider.MAXIMUM_SLEEP = 1
AwsAdapter.IMAGE_UPLOAD_SLEEP = 1
self.useFixture(fixtures.MonkeyPatch(
'nodepool.driver.statemachine.NodescanRequest.FAKE', True))
aws_id = 'AK000000000000000000'
aws_key = '0123456789abcdef0123456789abcdef0123456789abcdef'
self.useFixture(
fixtures.EnvironmentVariable('AWS_ACCESS_KEY_ID', aws_id))
self.useFixture(
fixtures.EnvironmentVariable('AWS_SECRET_ACCESS_KEY', aws_key))
self.fake_aws = FakeAws()
self.mock_aws.start()
self.ec2 = boto3.resource('ec2', region_name='us-west-2')
self.ec2_client = boto3.client('ec2', region_name='us-west-2')
self.s3 = boto3.resource('s3', region_name='us-west-2')
self.s3_client = boto3.client('s3', region_name='us-west-2')
self.iam = boto3.resource('iam', region_name='us-west-2')
self.s3.create_bucket(
Bucket='nodepool',
CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
# A list of args to method calls for validation
self.run_instance_calls = []
self.register_image_calls = []
# TEST-NET-3
ipv6 = False
if ipv6:
# This is currently unused, but if moto gains IPv6 support
# on instance creation, this may be useful.
self.vpc = self.ec2_client.create_vpc(
CidrBlock='203.0.113.0/24',
AmazonProvidedIpv6CidrBlock=True)
ipv6_cidr = self.vpc['Vpc'][
'Ipv6CidrBlockAssociationSet'][0]['Ipv6CidrBlock']
ipv6_cidr = ipv6_cidr.split('/')[0] + '/64'
self.subnet = self.ec2_client.create_subnet(
CidrBlock='203.0.113.128/25',
Ipv6CidrBlock=ipv6_cidr,
VpcId=self.vpc['Vpc']['VpcId'])
self.subnet_id = self.subnet['Subnet']['SubnetId']
else:
self.vpc = self.ec2_client.create_vpc(CidrBlock='203.0.113.0/24')
self.subnet = self.ec2_client.create_subnet(
CidrBlock='203.0.113.128/25', VpcId=self.vpc['Vpc']['VpcId'])
self.subnet_id = self.subnet['Subnet']['SubnetId']
profile = self.iam.create_instance_profile(
InstanceProfileName='not-a-real-profile')
self.instance_profile_name = profile.name
self.instance_profile_arn = profile.arn
self.security_group = self.ec2_client.create_security_group(
GroupName='zuul-nodes', VpcId=self.vpc['Vpc']['VpcId'],
Description='Zuul Nodes')
self.security_group_id = self.security_group['GroupId']
test_name = self.id().split('.')[-1]
test = getattr(self, test_name)
self.patchAdapter(ec2_quotas=getattr(test, '__aws_ec2_quotas__', None),
ebs_quotas=getattr(test, '__aws_ebs_quotas__', None))
def tearDown(self):
self.mock_aws.stop()
super().tearDown()
def setup_config(self, *args, **kw):
kw['subnet_id'] = self.subnet_id
kw['security_group_id'] = self.security_group_id
kw['instance_profile_name'] = self.instance_profile_name
kw['instance_profile_arn'] = self.instance_profile_arn
return super().setup_config(*args, **kw)
def patchAdapter(self, ec2_quotas=None, ebs_quotas=None):
default_ec2_quotas = {
'L-1216C47A': 100,
'L-43DA4232': 100,
'L-34B43A08': 100,
}
default_ebs_quotas = {
'L-D18FCD1D': 100.0,
'L-7A658B76': 100.0,
}
if ec2_quotas is None:
ec2_quotas = default_ec2_quotas
if ebs_quotas is None:
ebs_quotas = default_ebs_quotas
self.patch(nodepool.driver.aws.adapter, 'AwsAdapter', FakeAwsAdapter)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__testcase', self)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__ec2_quotas', ec2_quotas)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__ebs_quotas', ebs_quotas)
def requestNode(self, config_path, label):
# A helper method to perform a single node request
configfile = self.setup_config(config_path)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.tenant_name = 'tenant-1'
req.node_types.append(label)
self.zk.storeNodeRequest(req)
self.log.debug("Waiting for request %s", req.id)
return self.waitForNodeRequest(req)
def assertSuccess(self, req):
# Assert values common to most requests
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
return node
def test_aws_multiple(self):
# Test creating multiple instances at once. This is most
# useful to run manually during development to observe
# behavior.
configfile = self.setup_config('aws/aws-multiple.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
reqs = []
for x in range(4):
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('ubuntu1404')
self.zk.storeNodeRequest(req)
reqs.append(req)
nodes = []
for req in reqs:
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
nodes.append(self.assertSuccess(req))
for node in nodes:
node.state = zk.USED
self.zk.storeNode(node)
for node in nodes:
self.waitForNodeDeletion(node)
@ec2_quotas({
'L-1216C47A': 2,
'L-43DA4232': 448,
'L-34B43A08': 2
})
def test_aws_multi_quota(self):
# Test multiple instance type quotas (standard and high-mem)
configfile = self.setup_config('aws/aws-quota.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a high-memory node request.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('high')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second high-memory node request; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('high')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second high node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
# Create a standard node request which should succeed even
# though we're at quota for high-mem (but not standard).
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.node_types.append('standard')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
@ec2_quotas({
'L-43DA4232': 448,
'L-1216C47A': 200,
'L-34B43A08': 200
})
def test_aws_multi_quota_spot(self):
# Test multiple instance type quotas (standard, high-mem and spot)
configfile = self.setup_config('aws/aws-quota.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
# Create a spot node request which should succeed.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('spot')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create an on-demand node request which should succeed.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('on-demand')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
# Create another spot node request which should be paused.
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.node_types.append('spot')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# third request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first spot node so that the third can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the fourth spot node exists now.
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
@ec2_quotas({
'L-1216C47A': 1000,
'L-43DA4232': 1000,
})
def test_aws_multi_pool_limits(self):
# Test multiple instance type quotas (standard and high-mem)
# with pool resource limits
configfile = self.setup_config('aws/aws-limits.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a standard node request.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('standard')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second standard node request; this should be
# over max-cores so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('standard')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second standard node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
@ec2_quotas({
'L-1216C47A': 1000,
'L-43DA4232': 1000,
})
def test_aws_multi_tenant_limits(self):
# Test multiple instance type quotas (standard and high-mem)
# with tenant resource limits
configfile = self.setup_config('aws/aws-limits.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a high node request.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.tenant_name = 'tenant-1'
req1.node_types.append('high')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
self.assertSuccess(req1)
# Create a second high node request; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.tenant_name = 'tenant-1'
req2.node_types.append('high')
self.zk.storeNodeRequest(req2)
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
# Create a standard node request which should succeed even
# though we're at quota for high-mem (but not standard).
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.tenant_name = 'tenant-1'
req3.node_types.append('standard')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
# Assert that the second request is still being deferred
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
@ec2_quotas({
'L-1216C47A': 200, # instance
})
@ebs_quotas({
'L-D18FCD1D': 1.0, # gp2 storage (TB)
'L-7A658B76': 1.0, # gp3 storage (TB)
})
def test_aws_volume_quota(self):
# Test volume quotas
# Moto doesn't correctly pass through iops when creating
# instances, so we can't test volume types that require iops.
# Therefore in this test we only cover storage quotas.
configfile = self.setup_config('aws/aws-volume-quota.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create an gp2 request
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('volume-gp2')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second gp2 node request; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('volume-gp2')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second high node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
# Create a gp3 node request which should succeed even
# though we're at quota for gp2 (but not gp3).
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.node_types.append('volume-gp3')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
def test_aws_node(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertIsNotNone(node.public_ipv4)
self.assertIsNotNone(node.private_ipv4)
self.assertIsNone(node.public_ipv6)
self.assertIsNotNone(node.interface_ip)
self.assertEqual(node.public_ipv4, node.interface_ip)
self.assertTrue(node.private_ipv4.startswith('203.0.113.'))
self.assertFalse(node.public_ipv4.startswith('203.0.113.'))
self.assertEqual(node.python_path, 'auto')
self.assertEqual(node.cloud, 'AWS')
self.assertEqual(node.region, 'us-west-2')
# Like us-west-2x where x is random
self.assertTrue(len(node.az) == len('us-west-2x'))
instance = self.ec2.Instance(node.external_id)
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertFalse(response['EbsOptimized']['Value'])
self.assertFalse(
'MetadataOptions' in self.run_instance_calls[0])
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
def test_aws_by_filters(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-by-filters')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404-by-filters')
def test_aws_by_capitalized_filters(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-by-capitalized-filters')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404-by-capitalized-filters')
def test_aws_bad_ami_name(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-bad-ami-name')
self.assertEqual(req.state, zk.FAILED)
self.assertEqual(req.nodes, [])
def test_aws_bad_config(self):
# This fails config schema validation
with testtools.ExpectedException(ValueError,
".*?could not be validated.*?"):
self.setup_config('aws/bad-config-images.yaml')
def test_aws_non_host_key_checking(self):
req = self.requestNode('aws/non-host-key-checking.yaml',
'ubuntu1404-non-host-key-checking')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, [])
def test_aws_userdata(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-userdata')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
response = instance.describe_attribute(
Attribute='userData')
self.assertIn('UserData', response)
userdata = base64.b64decode(
response['UserData']['Value']).decode()
self.assertEqual('fake-user-data', userdata)
def test_aws_iam_instance_profile_name(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-iam-instance-profile-name')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id, associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
def test_aws_iam_instance_profile_arn(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-iam-instance-profile-arn')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id, associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
def test_aws_private_ip(self):
req = self.requestNode('aws/private-ip.yaml', 'ubuntu1404-private-ip')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertIsNone(node.public_ipv4)
self.assertIsNotNone(node.private_ipv4)
self.assertIsNone(node.public_ipv6)
self.assertIsNotNone(node.interface_ip)
self.assertEqual(node.private_ipv4, node.interface_ip)
self.assertTrue(node.private_ipv4.startswith('203.0.113.'))
def test_aws_ipv6(self):
req = self.requestNode('aws/ipv6.yaml', 'ubuntu1404-ipv6')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertIsNotNone(node.public_ipv4)
self.assertIsNotNone(node.private_ipv4)
# Not supported by moto
# self.assertIsNotNone(node.public_ipv6)
self.assertIsNotNone(node.interface_ip)
self.assertEqual(node.public_ipv4, node.interface_ip)
self.assertTrue(node.private_ipv4.startswith('203.0.113.'))
# Moto doesn't support ipv6 assignment on creation, so we can
# only unit test the parts.
# Make sure we make the call to AWS as expected
self.assertEqual(
self.run_instance_calls[0]['NetworkInterfaces']
[0]['Ipv6AddressCount'], 1)
# This is like what we should get back from AWS, verify the
# statemachine instance object has the parameters set
# correctly.
instance = {}
instance['InstanceId'] = 'test'
instance['Tags'] = []
instance['PrivateIpAddress'] = '10.0.0.1'
instance['PublicIpAddress'] = '1.2.3.4'
instance['Placement'] = {'AvailabilityZone': 'us-west-2b'}
iface = {'Ipv6Addresses': [{'Ipv6Address': 'fe80::dead:beef'}]}
instance['NetworkInterfaces'] = [iface]
provider = Dummy()
provider.region_name = 'us-west-2'
awsi = AwsInstance(provider, instance, None)
self.assertEqual(awsi.public_ipv4, '1.2.3.4')
self.assertEqual(awsi.private_ipv4, '10.0.0.1')
self.assertEqual(awsi.public_ipv6, 'fe80::dead:beef')
self.assertIsNone(awsi.private_ipv6)
self.assertEqual(awsi.public_ipv4, awsi.interface_ip)
def test_aws_tags(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-with-tags')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
self.assertNotIn({"Key": "Name", "Value": "ignored-name"}, tag_list)
self.assertIn(
{"Key": "dynamic-tenant", "Value": "Tenant is tenant-1"}, tag_list)
def test_aws_min_ready(self):
# Test dynamic tag formatting without a real node request
configfile = self.setup_config('aws/aws-min-ready.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
node = self.waitForNodes('ubuntu1404-with-tags')[0]
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
self.assertNotIn({"Key": "Name", "Value": "ignored-name"}, tag_list)
self.assertIn(
{"Key": "dynamic-tenant", "Value": "Tenant is None"}, tag_list)
def test_aws_shell_type(self):
req = self.requestNode('aws/shell-type.yaml',
'ubuntu1404-with-shell-type')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404-with-shell-type')
self.assertEqual(node.shell_type, 'csh')
def test_aws_config(self):
configfile = self.setup_config('aws/config.yaml')
config = nodepool_config.loadConfig(configfile)
self.assertIn('ec2-us-west-2', config.providers)
config2 = nodepool_config.loadConfig(configfile)
self.assertEqual(config, config2)
def test_aws_ebs_optimized(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-ebs-optimized')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertTrue(response['EbsOptimized']['Value'])
def test_aws_imdsv2(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-imdsv2')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertEqual(
self.run_instance_calls[0]['MetadataOptions']['HttpTokens'],
'required')
self.assertEqual(
self.run_instance_calls[0]['MetadataOptions']['HttpEndpoint'],
'enabled')
def test_aws_invalid_instance_type(self):
req = self.requestNode('aws/aws-invalid.yaml', 'ubuntu-invalid')
self.assertEqual(req.state, zk.FAILED)
self.assertEqual(req.nodes, [])
# Make sure other instance types are not affected
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.tenant_name = 'tenant-1'
req.node_types.append('ubuntu')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertEqual(len(req.nodes), 1)
def test_aws_diskimage_snapshot(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'another_user')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertFalse('ImdsSupport' in self.register_image_calls[0])
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
in ec2_image.tags)
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
in ec2_image.tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.username, 'another_user')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage-import-image.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'zuul')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
in ec2_image.tags)
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
in ec2_image.tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_snapshot_imdsv2(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage-imdsv2-snapshot.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'another_user')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertEqual(
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
in ec2_image.tags)
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
in ec2_image.tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.username, 'another_user')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image_imdsv2(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage-imdsv2-image.yaml')
with testtools.ExpectedException(Exception, "IMDSv2 requires"):
self.useBuilder(configfile)
def test_aws_diskimage_removal(self):
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
self.waitForImage('ec2-us-west-2', 'fake-image')
self.replace_config(configfile, 'aws/config.yaml')
self.waitForImageDeletion('ec2-us-west-2', 'fake-image')
self.waitForBuildDeletion('fake-image', '0000000001')
def test_aws_resource_cleanup(self):
# This tests everything except the image imports
# Start by setting up leaked resources
instance_tags = [
{'Key': 'nodepool_node_id', 'Value': '0000000042'},
{'Key': 'nodepool_pool_name', 'Value': 'main'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
s3_tags = {
'nodepool_build_id': '0000000042',
'nodepool_upload_id': '0000000042',
'nodepool_provider_name': 'ec2-us-west-2',
}
reservation = self.ec2_client.run_instances(
ImageId="ami-12c6146b", MinCount=1, MaxCount=1,
BlockDeviceMappings=[{
'DeviceName': '/dev/sda1',
'Ebs': {
'VolumeSize': 80,
'DeleteOnTermination': False
}
}],
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': instance_tags
}, {
'ResourceType': 'volume',
'Tags': instance_tags
}]
)
instance_id = reservation['Instances'][0]['InstanceId']
bucket = self.s3.Bucket('nodepool')
bucket.put_object(Body=b'hi',
Key='testimage',
Tagging=urllib.parse.urlencode(s3_tags))
obj = self.s3.Object('nodepool', 'testimage')
# This effectively asserts the object exists
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
instance = self.ec2.Instance(instance_id)
self.assertEqual(instance.state['Name'], 'running')
volume_id = list(instance.volumes.all())[0].id
volume = self.ec2.Volume(volume_id)
self.assertEqual(volume.state, 'in-use')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
for _ in iterate_timeout(30, Exception, 'instance deletion'):
instance = self.ec2.Instance(instance_id)
if instance.state['Name'] == 'terminated':
break
for _ in iterate_timeout(30, Exception, 'volume deletion'):
volume = self.ec2.Volume(volume_id)
try:
if volume.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
for _ in iterate_timeout(30, Exception, 'object deletion'):
obj = self.s3.Object('nodepool', 'testimage')
try:
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
except self.s3_client.exceptions.NoSuchKey:
break
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.instances',
value='1', kind='c')
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.volumes',
value='1', kind='c')
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.objects',
value='1', kind='c')
def test_aws_resource_cleanup_import_snapshot(self):
# This tests the import_snapshot path
# Create a valid, non-leaked image to test id collisions, and
# that it is not deleted.
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
current_image = self.waitForImage('ec2-us-west-2', 'fake-image')
# Assert the image exists
self.ec2.Image(current_image.external_id).state
# Start by setting up leaked resources
# Use low numbers to intentionally collide with the current
# image to ensure we test the non-uniqueness of upload ids.
image_tags = [
{'Key': 'nodepool_build_id', 'Value': '0000000001'},
{'Key': 'nodepool_upload_id', 'Value': '0000000001'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
task = self.fake_aws.import_snapshot(
DiskContainer={
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'nodepool',
'S3Key': 'testfile',
}
},
TagSpecifications=[{
'ResourceType': 'import-snapshot-task',
'Tags': image_tags,
}])
snapshot_id = self.fake_aws.finish_import_snapshot(task)
register_response = self.ec2_client.register_image(
Architecture='amd64',
BlockDeviceMappings=[
{
'DeviceName': '/dev/sda1',
'Ebs': {
'DeleteOnTermination': True,
'SnapshotId': snapshot_id,
'VolumeSize': 20,
'VolumeType': 'gp2',
},
},
],
RootDeviceName='/dev/sda1',
VirtualizationType='hvm',
Name='testimage',
)
image_id = register_response['ImageId']
ami = self.ec2.Image(image_id)
new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId']
self.fake_aws.change_snapshot_id(task, new_snapshot_id)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
# adapter.
image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available')
snap = self.ec2.Snapshot(snapshot_id)
self.assertEqual(snap.state, 'completed')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
for _ in iterate_timeout(30, Exception, 'ami deletion'):
image = self.ec2.Image(image_id)
try:
# If this has a value the image was not deleted
if image.state == 'available':
# Definitely not deleted yet
continue
except AttributeError:
# Per AWS API, a recently deleted image is empty and
# looking at the state raises an AttributeFailure; see
# https://github.com/boto/boto3/issues/2531. The image
# was deleted, so we continue on here
break
for _ in iterate_timeout(30, Exception, 'snapshot deletion'):
snap = self.ec2.Snapshot(new_snapshot_id)
try:
if snap.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
# Assert the non-leaked image still exists
self.ec2.Image(current_image.external_id).state
def test_aws_resource_cleanup_import_image(self):
# This tests the import_image path
# Create a valid, non-leaked image to test id collisions, and
# that it is not deleted.
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
current_image = self.waitForImage('ec2-us-west-2', 'fake-image')
# Assert the image exists
self.ec2.Image(current_image.external_id).state
# Start by setting up leaked resources
image_tags = [
{'Key': 'nodepool_build_id', 'Value': '0000000001'},
{'Key': 'nodepool_upload_id', 'Value': '0000000001'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
# The image import path:
task = self.fake_aws.import_image(
DiskContainers=[{
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'nodepool',
'S3Key': 'testfile',
}
}],
TagSpecifications=[{
'ResourceType': 'import-image-task',
'Tags': image_tags,
}])
image_id, snapshot_id = self.fake_aws.finish_import_image(task)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
# adapter.
image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available')
snap = self.ec2.Snapshot(snapshot_id)
self.assertEqual(snap.state, 'completed')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
for _ in iterate_timeout(30, Exception, 'ami deletion'):
image = self.ec2.Image(image_id)
try:
# If this has a value the image was not deleted
if image.state == 'available':
# Definitely not deleted yet
continue
except AttributeError:
# Per AWS API, a recently deleted image is empty and
# looking at the state raises an AttributeFailure; see
# https://github.com/boto/boto3/issues/2531. The image
# was deleted, so we continue on here
break
for _ in iterate_timeout(30, Exception, 'snapshot deletion'):
snap = self.ec2.Snapshot(snapshot_id)
try:
if snap.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
# Assert the non-leaked image still exists
self.ec2.Image(current_image.external_id).state
def test_aws_get_import_image_task(self):
# A unit test of the unusual error handling for missing tasks
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
adapter = pool.getProviderManager('ec2-us-west-2').adapter
self.assertIsNone(adapter._getImportImageTask("fake-id"))
def test_aws_provisioning_spot_instances(self):
# Test creating a spot instances instead of an on-demand on.
req = self.requestNode('aws/aws-spot.yaml', 'ubuntu1404-spot')
node = self.assertSuccess(req)
instance = self.ec2.Instance(node.external_id)
self.assertEqual(instance.instance_lifecycle, 'spot')
# moto doesn't provide the spot_instance_request_id
# self.assertIsNotNone(instance.spot_instance_request_id)