Files
nodepool/nodepool/tests/unit/test_driver_aws.py
James E. Blair 242f9cc3e6 Handle early AWS spot instance reclamations
If an AWS spot instance is used as a metastatic backing node, an
unexpected series of events can occur:

* aws driver creates backing node instance
* aws driver scans ssh keys and stores them on backing node
* aws reclaims spot instance
* aws re-uses IP from backing node
* metastatic driver creates node
* metastatic driver scans ssh keys and stores them on node

Zuul would then use the wrong node (whether that succeeds depends
on what else has happened to the node in the interim).

To avoid this situation, we implement this change:
* After scanning the metastatic node ssh keys, we compare them to
  the backing node ssh keys and if they differ, trigger an error
  in the metastatic node and mark the backing node as failed.

In case the node is reclaimed one step early in the above sequence,
we implement this change:
* After completing the nodescan, the aws driver will double check
  that the instance is still running; if not, it will trigger an
  error.

The above is still subject to a small race if the nodescan time
takes less than the cache interval of the instance list, and the
node is reclaimed after the nodescan and within the cache interval
(currently 10 seconds).  In the unlikely event that does happen,
then the metastatic key check should still catch the issue as long
as the replacement node also does not boot within those 10 seconds.
(Technically possible, but the combination of all of these things
should be very unlikely in practice.)

Change-Id: I9ce1f6df04e9c49deceda99c8e4024dd98ea88f9
2024-11-05 13:24:07 -08:00

1643 lines
65 KiB
Python

# Copyright (C) 2018 Red Hat
# Copyright 2022-2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import fixtures
import logging
import urllib.parse
import boto3
import botocore.exceptions
from moto import mock_aws
import testtools
from nodepool import config as nodepool_config
from nodepool import tests
import nodepool.status
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout
import nodepool.driver.statemachine
from nodepool.driver.statemachine import StateMachineProvider
import nodepool.driver.aws.adapter
from nodepool.driver.aws.adapter import AwsInstance, AwsAdapter
from nodepool.tests.unit.fake_aws import FakeAws
class Dummy:
pass
class FakeAwsAdapter(AwsAdapter):
# Patch/override adapter methods to aid unit tests
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
# Note: boto3 doesn't handle ipv6 addresses correctly
# when in fake mode so we need to intercept the
# run_instances call and validate the args we supply.
def _fake_run_instances(*args, **kwargs):
self.__testcase.run_instances_calls.append(kwargs)
if self.__testcase.run_instances_exception:
raise self.__testcase.run_instances_exception
return self.ec2_client.run_instances_orig(*args, **kwargs)
# Note: boto3 doesn't handle all features correctly (e.g.
# instance-requirements, volume attributes) when creating
# fleet in fake mode, we need to intercept the create_fleet
# call and validate the args we supply. Results are also
# intercepted for validate instance attributes
def _fake_create_fleet(*args, **kwargs):
self.__testcase.create_fleet_calls.append(kwargs)
if self.__testcase.create_fleet_exception:
raise self.__testcase.create_fleet_exception
result = self.ec2_client.create_fleet_orig(*args, **kwargs)
self.__testcase.create_fleet_results.append(result)
return result
def _fake_allocate_hosts(*args, **kwargs):
if self.__testcase.allocate_hosts_exception:
raise self.__testcase.allocate_hosts_exception
return self.ec2_client.allocate_hosts_orig(*args, **kwargs)
# The ImdsSupport parameter isn't handled by moto
def _fake_register_image(*args, **kwargs):
self.__testcase.register_image_calls.append(kwargs)
return self.ec2_client.register_image_orig(*args, **kwargs)
def _fake_get_paginator(*args, **kwargs):
try:
return self.__testcase.fake_aws.get_paginator(*args, **kwargs)
except NotImplementedError:
return self.ec2_client.get_paginator_orig(*args, **kwargs)
self.ec2_client.run_instances_orig = self.ec2_client.run_instances
self.ec2_client.run_instances = _fake_run_instances
self.ec2_client.create_fleet_orig = self.ec2_client.create_fleet
self.ec2_client.create_fleet = _fake_create_fleet
self.ec2_client.allocate_hosts_orig = self.ec2_client.allocate_hosts
self.ec2_client.allocate_hosts = _fake_allocate_hosts
self.ec2_client.register_image_orig = self.ec2_client.register_image
self.ec2_client.register_image = _fake_register_image
self.ec2_client.import_snapshot = \
self.__testcase.fake_aws.import_snapshot
self.ec2_client.import_image = \
self.__testcase.fake_aws.import_image
self.ec2_client.get_paginator_orig = self.ec2_client.get_paginator
self.ec2_client.get_paginator = _fake_get_paginator
# moto does not mock service-quotas, so we do it ourselves:
def _fake_get_service_quota(ServiceCode, QuotaCode, *args, **kwargs):
if ServiceCode == 'ec2':
qdict = self.__ec2_quotas
elif ServiceCode == 'ebs':
qdict = self.__ebs_quotas
else:
raise NotImplementedError(
f"Quota code {ServiceCode} not implemented")
return {'Quota': {'Value': qdict.get(QuotaCode)}}
self.aws_quotas.get_service_quota = _fake_get_service_quota
def _fake_list_service_quotas(ServiceCode, *args, **kwargs):
if ServiceCode == 'ec2':
qdict = self.__ec2_quotas
elif ServiceCode == 'ebs':
qdict = self.__ebs_quotas
else:
raise NotImplementedError(
f"Quota code {ServiceCode} not implemented")
quotas = []
for code, value in qdict.items():
quotas.append(
{'Value': value, 'QuotaCode': code}
)
return {'Quotas': quotas}
self.aws_quotas.list_service_quotas = _fake_list_service_quotas
def ec2_quotas(quotas):
"""Specify a set of AWS EC2 quota values for use by a test method.
:arg dict quotas: The quota dictionary.
"""
def decorator(test):
test.__aws_ec2_quotas__ = quotas
return test
return decorator
def ebs_quotas(quotas):
"""Specify a set of AWS EBS quota values for use by a test method.
:arg dict quotas: The quota dictionary.
"""
def decorator(test):
test.__aws_ebs_quotas__ = quotas
return test
return decorator
class TestDriverAws(tests.DBTestCase):
log = logging.getLogger("nodepool.TestDriverAws")
mock_aws = mock_aws()
def setUp(self):
super().setUp()
StateMachineProvider.MINIMUM_SLEEP = 0.1
StateMachineProvider.MAXIMUM_SLEEP = 1
AwsAdapter.IMAGE_UPLOAD_SLEEP = 1
self.useFixture(fixtures.MonkeyPatch(
'nodepool.driver.statemachine.NodescanRequest.FAKE', True))
aws_id = 'AK000000000000000000'
aws_key = '0123456789abcdef0123456789abcdef0123456789abcdef'
self.useFixture(
fixtures.EnvironmentVariable('AWS_ACCESS_KEY_ID', aws_id))
self.useFixture(
fixtures.EnvironmentVariable('AWS_SECRET_ACCESS_KEY', aws_key))
self.fake_aws = FakeAws()
self.mock_aws.start()
self.addCleanup(self.mock_aws.stop)
self.ec2 = boto3.resource('ec2', region_name='us-west-2')
self.ec2_client = boto3.client('ec2', region_name='us-west-2')
self.s3 = boto3.resource('s3', region_name='us-west-2')
self.s3_client = boto3.client('s3', region_name='us-west-2')
self.iam = boto3.resource('iam', region_name='us-west-2')
self.s3.create_bucket(
Bucket='nodepool',
CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
# A list of args to method calls for validation
self.run_instances_calls = []
self.run_instances_exception = None
self.create_fleet_calls = []
self.create_fleet_results = []
self.create_fleet_exception = None
self.allocate_hosts_exception = None
self.register_image_calls = []
# TEST-NET-3
ipv6 = False
if ipv6:
# This is currently unused, but if moto gains IPv6 support
# on instance creation, this may be useful.
self.vpc = self.ec2_client.create_vpc(
CidrBlock='203.0.113.0/24',
AmazonProvidedIpv6CidrBlock=True)
ipv6_cidr = self.vpc['Vpc'][
'Ipv6CidrBlockAssociationSet'][0]['Ipv6CidrBlock']
ipv6_cidr = ipv6_cidr.split('/')[0] + '/64'
self.subnet = self.ec2_client.create_subnet(
CidrBlock='203.0.113.128/25',
Ipv6CidrBlock=ipv6_cidr,
VpcId=self.vpc['Vpc']['VpcId'])
self.subnet_id = self.subnet['Subnet']['SubnetId']
else:
self.vpc = self.ec2_client.create_vpc(CidrBlock='203.0.113.0/24')
self.subnet = self.ec2_client.create_subnet(
CidrBlock='203.0.113.128/25', VpcId=self.vpc['Vpc']['VpcId'])
self.subnet_id = self.subnet['Subnet']['SubnetId']
profile = self.iam.create_instance_profile(
InstanceProfileName='not-a-real-profile')
self.instance_profile_name = profile.name
self.instance_profile_arn = profile.arn
self.security_group = self.ec2_client.create_security_group(
GroupName='zuul-nodes', VpcId=self.vpc['Vpc']['VpcId'],
Description='Zuul Nodes')
self.security_group_id = self.security_group['GroupId']
test_name = self.id().split('.')[-1]
test = getattr(self, test_name)
self.patchAdapter(ec2_quotas=getattr(test, '__aws_ec2_quotas__', None),
ebs_quotas=getattr(test, '__aws_ebs_quotas__', None))
def setup_config(self, *args, **kw):
kw['subnet_id'] = self.subnet_id
kw['security_group_id'] = self.security_group_id
kw['instance_profile_name'] = self.instance_profile_name
kw['instance_profile_arn'] = self.instance_profile_arn
return super().setup_config(*args, **kw)
def patchAdapter(self, ec2_quotas=None, ebs_quotas=None):
default_ec2_quotas = {
'L-1216C47A': 100,
'L-43DA4232': 100,
'L-34B43A08': 100,
}
default_ebs_quotas = {
'L-D18FCD1D': 100.0,
'L-7A658B76': 100.0,
}
if ec2_quotas is None:
ec2_quotas = default_ec2_quotas
if ebs_quotas is None:
ebs_quotas = default_ebs_quotas
self.patch(nodepool.driver.aws.adapter, 'AwsAdapter', FakeAwsAdapter)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__testcase', self)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__ec2_quotas', ec2_quotas)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__ebs_quotas', ebs_quotas)
def requestNode(self, config_path, label):
# A helper method to perform a single node request
configfile = self.setup_config(config_path)
self.pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(self.pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.tenant_name = 'tenant-1'
req.node_types.append(label)
self.zk.storeNodeRequest(req)
self.log.debug("Waiting for request %s", req.id)
return self.waitForNodeRequest(req)
def assertSuccess(self, req):
# Assert values common to most requests
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
return node
def test_aws_multiple(self):
# Test creating multiple instances at once. This is most
# useful to run manually during development to observe
# behavior.
configfile = self.setup_config('aws/aws-multiple.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
reqs = []
for x in range(4):
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('ubuntu1404')
self.zk.storeNodeRequest(req)
reqs.append(req)
nodes = []
for req in reqs:
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
nodes.append(self.assertSuccess(req))
for node in nodes:
node.state = zk.USED
self.zk.storeNode(node)
for node in nodes:
self.waitForNodeDeletion(node)
@ec2_quotas({
'L-1216C47A': 2,
'L-43DA4232': 448,
'L-34B43A08': 2
})
def test_aws_multi_quota(self):
# Test multiple instance type quotas (standard and high-mem)
configfile = self.setup_config('aws/aws-quota.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a high-memory node request.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('high')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second high-memory node request; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('high')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second high node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
# Create a standard node request which should succeed even
# though we're at quota for high-mem (but not standard).
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.node_types.append('standard')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
@ec2_quotas({
'L-43DA4232': 448,
'L-1216C47A': 200,
'L-34B43A08': 200
})
def test_aws_multi_quota_spot(self):
# Test multiple instance type quotas (standard, high-mem and spot)
configfile = self.setup_config('aws/aws-quota.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
# Create a spot node request which should succeed.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('spot')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create an on-demand node request which should succeed.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('on-demand')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
# Create another spot node request which should be paused.
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.node_types.append('spot')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# third request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first spot node so that the third can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the fourth spot node exists now.
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
def test_aws_multi_quota_unknown(self):
# Test multiple instance type quotas (standard, high-mem and spot)
configfile = self.setup_config('aws/aws-quota.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
# We don't have quota information for this node type; make
# sure we can still launch a node with it.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('unknown')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
self.assertSuccess(req1)
@ec2_quotas({
'L-1216C47A': 1000,
'L-43DA4232': 1000,
})
def test_aws_multi_pool_limits(self):
# Test multiple instance type quotas (standard and high-mem)
# with pool resource limits
configfile = self.setup_config('aws/aws-limits.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a standard node request.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('standard')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second standard node request; this should be
# over max-cores so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('standard')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second standard node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
@ec2_quotas({
'L-1216C47A': 1000,
'L-43DA4232': 1000,
})
def test_aws_multi_tenant_limits(self):
# Test multiple instance type quotas (standard and high-mem)
# with tenant resource limits
configfile = self.setup_config('aws/aws-limits.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a high node request.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.tenant_name = 'tenant-1'
req1.node_types.append('high')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
self.assertSuccess(req1)
# Create a second high node request; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.tenant_name = 'tenant-1'
req2.node_types.append('high')
self.zk.storeNodeRequest(req2)
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
# Create a standard node request which should succeed even
# though we're at quota for high-mem (but not standard).
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.tenant_name = 'tenant-1'
req3.node_types.append('standard')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
# Assert that the second request is still being deferred
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
@ec2_quotas({
'L-1216C47A': 200, # instance
})
@ebs_quotas({
'L-D18FCD1D': 1.0, # gp2 storage (TB)
'L-7A658B76': 1.0, # gp3 storage (TB)
})
def test_aws_volume_quota(self):
# Test volume quotas
# Moto doesn't correctly pass through iops when creating
# instances, so we can't test volume types that require iops.
# Therefore in this test we only cover storage quotas.
configfile = self.setup_config('aws/aws-volume-quota.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create an gp2 request
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('volume-gp2')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second gp2 node request; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('volume-gp2')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second high node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
# Create a gp3 node request which should succeed even
# though we're at quota for gp2 (but not gp3).
req3 = zk.NodeRequest()
req3.state = zk.REQUESTED
req3.node_types.append('volume-gp3')
self.zk.storeNodeRequest(req3)
self.log.debug("Waiting for request %s", req3.id)
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
def test_aws_node(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertIsNotNone(node.public_ipv4)
self.assertIsNotNone(node.private_ipv4)
self.assertIsNone(node.public_ipv6)
self.assertIsNotNone(node.interface_ip)
self.assertEqual(node.public_ipv4, node.interface_ip)
self.assertTrue(node.private_ipv4.startswith('203.0.113.'))
self.assertFalse(node.public_ipv4.startswith('203.0.113.'))
self.assertEqual(node.python_path, 'auto')
self.assertEqual(node.cloud, 'AWS')
self.assertEqual(node.region, 'us-west-2')
# Like us-west-2x where x is random
self.assertTrue(len(node.az) == len('us-west-2x'))
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertFalse(response['EbsOptimized']['Value'])
self.assertFalse(
'MetadataOptions' in self.run_instances_calls[0])
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
def test_aws_by_filters(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-by-filters')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404-by-filters')
def test_aws_by_capitalized_filters(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-by-capitalized-filters')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404-by-capitalized-filters')
def test_aws_bad_ami_name(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-bad-ami-name')
self.assertEqual(req.state, zk.FAILED)
self.assertEqual(req.nodes, [])
def test_aws_bad_config(self):
# This fails config schema validation
with testtools.ExpectedException(ValueError,
".*?could not be validated.*?"):
self.setup_config('aws/bad-config-images.yaml')
def test_aws_non_host_key_checking(self):
req = self.requestNode('aws/non-host-key-checking.yaml',
'ubuntu1404-non-host-key-checking')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, [])
def test_aws_userdata(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-userdata')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(
Attribute='userData')
self.assertIn('UserData', response)
userdata = base64.b64decode(
response['UserData']['Value']).decode()
self.assertEqual('fake-user-data', userdata)
def test_aws_iam_instance_profile_name(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-iam-instance-profile-name')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id['instance'],
associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
def test_aws_iam_instance_profile_arn(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-iam-instance-profile-arn')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id['instance'],
associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
def test_aws_private_ip(self):
req = self.requestNode('aws/private-ip.yaml', 'ubuntu1404-private-ip')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertIsNone(node.public_ipv4)
self.assertIsNotNone(node.private_ipv4)
self.assertIsNone(node.public_ipv6)
self.assertIsNotNone(node.interface_ip)
self.assertEqual(node.private_ipv4, node.interface_ip)
self.assertTrue(node.private_ipv4.startswith('203.0.113.'))
def test_aws_ipv6(self):
req = self.requestNode('aws/ipv6.yaml', 'ubuntu1404-ipv6')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertIsNotNone(node.public_ipv4)
self.assertIsNotNone(node.private_ipv4)
# Not supported by moto
# self.assertIsNotNone(node.public_ipv6)
self.assertIsNotNone(node.interface_ip)
self.assertEqual(node.public_ipv4, node.interface_ip)
self.assertTrue(node.private_ipv4.startswith('203.0.113.'))
# Moto doesn't support ipv6 assignment on creation, so we can
# only unit test the parts.
# Make sure we make the call to AWS as expected
self.assertEqual(
self.run_instances_calls[0]['NetworkInterfaces']
[0]['Ipv6AddressCount'], 1)
# This is like what we should get back from AWS, verify the
# statemachine instance object has the parameters set
# correctly.
instance = {}
instance['InstanceId'] = 'test'
instance['Tags'] = []
instance['PrivateIpAddress'] = '10.0.0.1'
instance['PublicIpAddress'] = '1.2.3.4'
instance['Placement'] = {'AvailabilityZone': 'us-west-2b'}
iface = {'Ipv6Addresses': [{'Ipv6Address': 'fe80::dead:beef'}]}
instance['NetworkInterfaces'] = [iface]
instance['InstanceType'] = 'test'
provider = Dummy()
provider.region_name = 'us-west-2'
awsi = AwsInstance(provider, instance, None, None, None)
self.assertEqual(awsi.public_ipv4, '1.2.3.4')
self.assertEqual(awsi.private_ipv4, '10.0.0.1')
self.assertEqual(awsi.public_ipv6, 'fe80::dead:beef')
self.assertIsNone(awsi.private_ipv6)
self.assertEqual(awsi.public_ipv4, awsi.interface_ip)
def test_aws_tags(self):
req = self.requestNode('aws/aws.yaml', 'ubuntu1404-with-tags')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id['instance'])
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
self.assertNotIn({"Key": "Name", "Value": "ignored-name"}, tag_list)
self.assertIn(
{"Key": "dynamic-tenant", "Value": "Tenant is tenant-1"}, tag_list)
def test_aws_min_ready(self):
# Test dynamic tag formatting without a real node request
configfile = self.setup_config('aws/aws-min-ready.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
node = self.waitForNodes('ubuntu1404-with-tags')[0]
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id['instance'])
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
self.assertNotIn({"Key": "Name", "Value": "ignored-name"}, tag_list)
self.assertIn(
{"Key": "dynamic-tenant", "Value": "Tenant is None"}, tag_list)
def test_aws_shell_type(self):
req = self.requestNode('aws/shell-type.yaml',
'ubuntu1404-with-shell-type')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404-with-shell-type')
self.assertEqual(node.shell_type, 'csh')
def test_aws_config(self):
configfile = self.setup_config('aws/config.yaml')
config = nodepool_config.loadConfig(configfile)
self.assertIn('ec2-us-west-2', config.providers)
config2 = nodepool_config.loadConfig(configfile)
self.assertEqual(config, config2)
def test_aws_ebs_optimized(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-ebs-optimized')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertTrue(response['EbsOptimized']['Value'])
def test_aws_imdsv2(self):
req = self.requestNode('aws/aws.yaml',
'ubuntu1404-imdsv2')
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertEqual(
self.run_instances_calls[0]['MetadataOptions']['HttpTokens'],
'required')
self.assertEqual(
self.run_instances_calls[0]['MetadataOptions']['HttpEndpoint'],
'enabled')
def test_aws_invalid_instance_type(self):
req = self.requestNode('aws/aws-invalid.yaml', 'ubuntu-invalid')
self.assertEqual(req.state, zk.FAILED)
self.assertEqual(req.nodes, [])
# Make sure other instance types are not affected
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.tenant_name = 'tenant-1'
req.node_types.append('ubuntu')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertEqual(len(req.nodes), 1)
def test_aws_diskimage_snapshot(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'another_user')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertFalse('ImdsSupport' in self.register_image_calls[0])
# As of 2024-07-09, moto does not set tags, but AWS itself does.
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
self.assertIn(
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
self.assertIn(
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.username, 'another_user')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage-import-image.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'zuul')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
in ec2_image.tags)
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
in ec2_image.tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_snapshot_imdsv2(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage-imdsv2-snapshot.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'another_user')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertEqual(
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
# As of 2024-07-09, moto does not set tags, but AWS itself does.
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
self.assertIn(
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
self.assertIn(
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.username, 'another_user')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image_imdsv2(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config('aws/diskimage-imdsv2-image.yaml')
with testtools.ExpectedException(Exception, "IMDSv2 requires"):
self.useBuilder(configfile)
def test_aws_diskimage_ebs_snapshot_imdsv2(self):
self.fake_aws.fail_import_count = 1
configfile = self.setup_config(
'aws/diskimage-imdsv2-ebs-snapshot.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'another_user')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertEqual(
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
# As of 2024-07-09, moto does not set tags, but AWS itself does.
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
self.assertIn(
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
self.assertIn(
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.username, 'another_user')
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_removal(self):
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
self.waitForImage('ec2-us-west-2', 'fake-image')
self.replace_config(configfile, 'aws/config.yaml')
self.waitForImageDeletion('ec2-us-west-2', 'fake-image')
self.waitForBuildDeletion('fake-image', '0000000001')
def test_aws_resource_cleanup(self):
# This tests everything except the image imports
# Start by setting up leaked resources
instance_tags = [
{'Key': 'nodepool_node_id', 'Value': '0000000042'},
{'Key': 'nodepool_pool_name', 'Value': 'main'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
s3_tags = {
'nodepool_build_id': '0000000042',
'nodepool_upload_id': '0000000042',
'nodepool_provider_name': 'ec2-us-west-2',
}
reservation = self.ec2_client.run_instances(
ImageId="ami-12c6146b", MinCount=1, MaxCount=1,
BlockDeviceMappings=[{
'DeviceName': '/dev/sda1',
'Ebs': {
'VolumeSize': 80,
'DeleteOnTermination': False
}
}],
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': instance_tags
}, {
'ResourceType': 'volume',
'Tags': instance_tags
}]
)
instance_id = reservation['Instances'][0]['InstanceId']
bucket = self.s3.Bucket('nodepool')
bucket.put_object(Body=b'hi',
Key='testimage',
Tagging=urllib.parse.urlencode(s3_tags))
obj = self.s3.Object('nodepool', 'testimage')
# This effectively asserts the object exists
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
instance = self.ec2.Instance(instance_id)
self.assertEqual(instance.state['Name'], 'running')
volume_id = list(instance.volumes.all())[0].id
volume = self.ec2.Volume(volume_id)
self.assertEqual(volume.state, 'in-use')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
for _ in iterate_timeout(30, Exception, 'instance deletion'):
instance = self.ec2.Instance(instance_id)
if instance.state['Name'] == 'terminated':
break
for _ in iterate_timeout(30, Exception, 'volume deletion'):
volume = self.ec2.Volume(volume_id)
try:
if volume.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
for _ in iterate_timeout(30, Exception, 'object deletion'):
obj = self.s3.Object('nodepool', 'testimage')
try:
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
except self.s3_client.exceptions.NoSuchKey:
break
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.instances',
value='1', kind='c')
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.volumes',
value='1', kind='c')
self.assertReportedStat(
'nodepool.provider.ec2-us-west-2.leaked.objects',
value='1', kind='c')
def test_aws_resource_cleanup_import_snapshot(self):
# This tests the import_snapshot path
# Create a valid, non-leaked image to test id collisions, and
# that it is not deleted.
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
current_image = self.waitForImage('ec2-us-west-2', 'fake-image')
# Assert the image exists
self.ec2.Image(current_image.external_id).state
# Start by setting up leaked resources
# Use low numbers to intentionally collide with the current
# image to ensure we test the non-uniqueness of upload ids.
image_tags = [
{'Key': 'nodepool_build_id', 'Value': '0000000001'},
{'Key': 'nodepool_upload_id', 'Value': '0000000001'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
task = self.fake_aws.import_snapshot(
DiskContainer={
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'nodepool',
'S3Key': 'testfile',
}
},
TagSpecifications=[{
'ResourceType': 'import-snapshot-task',
'Tags': image_tags,
}])
snapshot_id = self.fake_aws.finish_import_snapshot(task)
register_response = self.ec2_client.register_image(
Architecture='amd64',
BlockDeviceMappings=[
{
'DeviceName': '/dev/sda1',
'Ebs': {
'DeleteOnTermination': True,
'SnapshotId': snapshot_id,
'VolumeSize': 20,
'VolumeType': 'gp2',
},
},
],
RootDeviceName='/dev/sda1',
VirtualizationType='hvm',
Name='testimage',
)
image_id = register_response['ImageId']
ami = self.ec2.Image(image_id)
new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId']
self.fake_aws.change_snapshot_id(task, new_snapshot_id)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
# adapter.
image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available')
snap = self.ec2.Snapshot(snapshot_id)
self.assertEqual(snap.state, 'completed')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
for _ in iterate_timeout(30, Exception, 'ami deletion'):
image = self.ec2.Image(image_id)
try:
# If this has a value the image was not deleted
if image.state == 'available':
# Definitely not deleted yet
continue
except AttributeError:
# Per AWS API, a recently deleted image is empty and
# looking at the state raises an AttributeFailure; see
# https://github.com/boto/boto3/issues/2531. The image
# was deleted, so we continue on here
break
for _ in iterate_timeout(30, Exception, 'snapshot deletion'):
snap = self.ec2.Snapshot(new_snapshot_id)
try:
if snap.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
# Assert the non-leaked image still exists
self.ec2.Image(current_image.external_id).state
def test_aws_resource_cleanup_import_image(self):
# This tests the import_image path
# Create a valid, non-leaked image to test id collisions, and
# that it is not deleted.
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
current_image = self.waitForImage('ec2-us-west-2', 'fake-image')
# Assert the image exists
self.ec2.Image(current_image.external_id).state
# Start by setting up leaked resources
image_tags = [
{'Key': 'nodepool_build_id', 'Value': '0000000001'},
{'Key': 'nodepool_upload_id', 'Value': '0000000001'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
# The image import path:
task = self.fake_aws.import_image(
DiskContainers=[{
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'nodepool',
'S3Key': 'testfile',
}
}],
TagSpecifications=[{
'ResourceType': 'import-image-task',
'Tags': image_tags,
}])
image_id, snapshot_id = self.fake_aws.finish_import_image(task)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
# adapter.
image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available')
snap = self.ec2.Snapshot(snapshot_id)
self.assertEqual(snap.state, 'completed')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
for _ in iterate_timeout(30, Exception, 'ami deletion'):
image = self.ec2.Image(image_id)
try:
# If this has a value the image was not deleted
if image.state == 'available':
# Definitely not deleted yet
continue
except AttributeError:
# Per AWS API, a recently deleted image is empty and
# looking at the state raises an AttributeFailure; see
# https://github.com/boto/boto3/issues/2531. The image
# was deleted, so we continue on here
break
for _ in iterate_timeout(30, Exception, 'snapshot deletion'):
snap = self.ec2.Snapshot(snapshot_id)
try:
if snap.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
# Assert the non-leaked image still exists
self.ec2.Image(current_image.external_id).state
def test_aws_get_import_image_task(self):
# A unit test of the unusual error handling for missing tasks
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
adapter = pool.getProviderManager('ec2-us-west-2').adapter
self.assertIsNone(adapter._getImportImageTask("fake-id"))
def test_aws_provisioning_spot_instances(self):
# Test creating a spot instances instead of an on-demand on.
req = self.requestNode('aws/aws-spot.yaml', 'ubuntu1404-spot')
node = self.assertSuccess(req)
instance = self.ec2.Instance(node.external_id['instance'])
self.assertEqual(instance.instance_lifecycle, 'spot')
# moto doesn't provide the spot_instance_request_id
# self.assertIsNotNone(instance.spot_instance_request_id)
self.assertTrue(node.node_properties['spot'])
def test_aws_provisioning_spot_early_reclaim(self):
# Test that if AWS reclaims the spot instance before we're
# done with the nodescan we fail the request.
orig_notify = nodepool.driver.aws.adapter.AwsAdapter.\
notifyNodescanResult
def notify(*args, **kw):
adapter = self.pool.getProviderManager('ec2-us-west-2').adapter
for i in adapter._listInstances():
self.ec2_client.terminate_instances(
InstanceIds=[i['InstanceId']])
for _ in iterate_timeout(60, Exception,
"Instance list cache to update",
interval=1):
for i in adapter._listInstances():
if i['State']['Name'].lower() == 'running':
break
else:
break
return orig_notify(*args, **kw)
self.useFixture(fixtures.MonkeyPatch(
'nodepool.driver.aws.adapter.AwsAdapter.notifyNodescanResult',
notify))
configfile = self.setup_config('aws/aws-spot.yaml')
self.pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(self.pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.tenant_name = 'tenant-1'
req.node_types.append('ubuntu1404-spot')
self.zk.storeNodeRequest(req)
self.log.debug("Waiting for request %s", req.id)
self.waitForNodeRequest(req, states=(zk.FAILED,))
def test_aws_dedicated_host(self):
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
for _ in iterate_timeout(60, Exception,
"Node request state transition",
interval=1):
# Ensure that we can render the node list (and that our
# use of a dictionary for external_id does not cause an
# error).
node_list = nodepool.status.node_list(self.zk)
nodepool.status.output(node_list, 'pretty')
nodepool.status.output(node_list, 'json')
req = self.zk.getNodeRequest(req.id)
if req.state in (zk.FULFILLED,):
break
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
# Verify instance and host are created
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 1)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 1)
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
# verify instance and host are deleted
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_dedicated_host_instance_failure(self):
self.run_instances_exception = Exception("some failure")
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
self.assertEqual(req.state, zk.FAILED)
# verify instance and host are deleted
provider = self.pool.getProviderManager('ec2-us-west-2')
for _ in iterate_timeout(60, Exception,
"Cloud cleanup",
interval=1):
if not (provider.launchers or provider.deleters):
break
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_dedicated_host_allocation_failure(self):
self.allocate_hosts_exception = Exception("some failure")
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
self.assertEqual(req.state, zk.FAILED)
# verify instance and host are deleted
provider = self.pool.getProviderManager('ec2-us-west-2')
for _ in iterate_timeout(60, Exception,
"Cloud cleanup",
interval=1):
if not (provider.launchers or provider.deleters):
break
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_create_launch_templates(self):
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
launch_tempaltes = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(launch_tempaltes), 2)
lt1 = launch_tempaltes[0]
lt2 = launch_tempaltes[1]
self.assertTrue(lt1['LaunchTemplateName'].startswith(
'nodepool-launch-template'))
self.assertTrue(lt2['LaunchTemplateName'].startswith(
'nodepool-launch-template'))
# Get details from first launch template
lt1_version = self.ec2_client.\
describe_launch_template_versions(
LaunchTemplateId=lt1['LaunchTemplateId'])[
'LaunchTemplateVersions'][0]
lt1_data = lt1_version['LaunchTemplateData']
lt1_userdata = base64.b64decode(lt1_data['UserData']).decode()
self.assertEqual(lt1_userdata, 'some-command')
# Get details from second launch template
lt2_version = self.ec2_client.\
describe_launch_template_versions(
LaunchTemplateId=lt2['LaunchTemplateId'])[
'LaunchTemplateVersions'][0]
lt2_data = lt2_version['LaunchTemplateData']
self.assertIsNotNone(lt2_data.get('SecurityGroupIds'))
metadata = lt2_data['MetadataOptions']
self.assertEqual(metadata['HttpEndpoint'], 'enabled')
self.assertEqual(metadata['HttpTokens'], 'required')
ebs_settings = lt2_data['BlockDeviceMappings'][0]['Ebs']
self.assertTrue(ebs_settings['DeleteOnTermination'])
self.assertEqual(ebs_settings['Iops'], 1000)
self.assertEqual(ebs_settings['VolumeSize'], 40)
self.assertEqual(ebs_settings['VolumeType'], 'gp3')
self.assertEqual(ebs_settings['Throughput'], 200)
# Restart pool, the launch templates must be the same and
# must not be recreated
pool.stop()
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
lt_2nd_run = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(lt_2nd_run), 2)
self.assertEqual(lt1['LaunchTemplateId'],
lt_2nd_run[0]['LaunchTemplateId'])
self.assertEqual(lt2['LaunchTemplateId'],
lt_2nd_run[1]['LaunchTemplateId'])
def test_aws_cleanup_launch_templates(self):
# start nodepool with old templates config
configfile = self.setup_config('aws/aws-fleet-old-template.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
launch_tempaltes = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(launch_tempaltes), 1)
# Restart pool with the config that not include the old template,
# the old template should be deleted.
pool.stop()
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
lt_2nd_run = self.ec2_client.\
describe_launch_templates()['LaunchTemplates']
self.assertEqual(len(lt_2nd_run), 2)
def test_aws_create_fleet_on_demand(self):
req = self.requestNode('aws/aws-fleet.yaml', 'ubuntu1404-on-demand')
node = self.assertSuccess(req)
self.assertEqual(
self.create_fleet_calls[0]['OnDemandOptions']
['AllocationStrategy'], 'prioritized')
self.assertTrue(
self.create_fleet_calls[0]['LaunchTemplateConfigs'][0]
['LaunchTemplateSpecification']['LaunchTemplateName'].startswith(
'nodepool-launch-template'))
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][0]
['ResourceType'], 'instance')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][0]
['Tags'][1]['Key'], 'nodepool_pool_name')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][0]
['Tags'][1]['Value'], 'main')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][1]
['ResourceType'], 'volume')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][1]
['Tags'][1]['Key'], 'nodepool_pool_name')
self.assertEqual(self.create_fleet_calls[0]['TagSpecifications'][1]
['Tags'][1]['Value'], 'main')
self.assertEqual(
self.create_fleet_results[0]['Instances'][0]['Lifecycle'],
'on-demand')
self.assertIn(self.create_fleet_results[0]['Instances'][0]
['InstanceType'],
('t3.nano', 't3.micro', 't3.small', 't3.medium'))
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
def test_aws_create_fleet_spot(self):
req = self.requestNode('aws/aws-fleet.yaml', 'ubuntu1404-spot')
node = self.assertSuccess(req)
self.assertEqual(
self.create_fleet_calls[0]['SpotOptions']
['AllocationStrategy'], 'price-capacity-optimized')
self.assertEqual(
self.create_fleet_calls[0]['TargetCapacitySpecification']
['DefaultTargetCapacityType'], 'spot')
self.assertIn(self.create_fleet_results[0]['Instances'][0]
['InstanceType'],
('t3.nano', 't3.micro', 't3.small', 't3.medium'))
self.assertTrue(node.node_properties['fleet'])
self.assertTrue(node.node_properties['spot'])
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
@ec2_quotas({
'L-1216C47A': 6,
'L-34B43A08': 2
})
def test_aws_fleet_quota(self):
# Test if the quota used by instances launched by fleet API
# are taken into account.
configfile = self.setup_config('aws/aws-fleet.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
# Create a node request with fleet API.
req1 = zk.NodeRequest()
req1.state = zk.REQUESTED
req1.node_types.append('ubuntu1404-fleet-4core')
self.zk.storeNodeRequest(req1)
self.log.debug("Waiting for request %s", req1.id)
req1 = self.waitForNodeRequest(req1)
node1 = self.assertSuccess(req1)
# Create a second node request with non-fleet API; this should be
# over quota so it won't be fulfilled.
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('ubuntu1404-4core')
self.zk.storeNodeRequest(req2)
self.log.debug("Waiting for request %s", req2.id)
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
# Make sure we're paused while we attempt to fulfill the
# second request.
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
for _ in iterate_timeout(30, Exception, 'paused handler'):
if pool_worker[0].paused_handlers:
break
# Release the first node so that the second can be fulfilled.
node1.state = zk.USED
self.zk.storeNode(node1)
self.waitForNodeDeletion(node1)
# Make sure the second high node exists now.
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)