implement describing other network interface attributes

Change-Id: Ib1bc75d2fbbad113c5a9c4d390358cdf8e7d7f97
This commit is contained in:
Andrey Pavlov 2015-05-13 20:50:44 +03:00
parent 95c8930bfb
commit 6dd3cdbfcc
3 changed files with 217 additions and 153 deletions

View File

@ -1735,7 +1735,7 @@ class VpcCloudController(CloudController):
'str')
def describe_network_interface_attribute(self, context,
network_interface_id,
attribute):
attribute=None):
"""Describes the specified attribute of the specified network interface.

View File

@ -315,14 +315,49 @@ def unassign_private_ip_addresses(context, network_interface_id,
def describe_network_interface_attribute(context, network_interface_id,
attribute):
attribute=None):
if attribute is None:
raise exception.InvalidParameterCombination(
_('No attributes specified.'))
network_interface = ec2utils.get_db_item(context, network_interface_id)
# TODO(Alex): Implement attachments, groupSet
db_key = attribute if attribute == 'description' else 'source_dest_check'
default_value = '' if attribute == 'description' else True
return {'networkInterfaceId': network_interface['id'],
attribute: {'value': network_interface.get(db_key, default_value)}}
def _format_attr_description(result):
result['description'] = {
'value': network_interface.get('description', '')}
def _format_attr_source_dest_check(result):
result['sourceDestCheck'] = {
'value': network_interface.get('source_dest_check', True)}
def _format_attr_group_set(result):
ec2_network_interface = describe_network_interfaces(context,
network_interface_id=[network_interface_id]
)['networkInterfaceSet'][0]
result['groupSet'] = ec2_network_interface['groupSet']
def _format_attr_attachment(result):
ec2_network_interface = describe_network_interfaces(context,
network_interface_id=[network_interface_id]
)['networkInterfaceSet'][0]
if 'attachment' in ec2_network_interface:
result['attachment'] = ec2_network_interface['attachment']
attribute_formatter = {
'description': _format_attr_description,
'sourceDestCheck': _format_attr_source_dest_check,
'groupSet': _format_attr_group_set,
'attachment': _format_attr_attachment,
}
fn = attribute_formatter.get(attribute)
if fn is None:
raise exception.InvalidParameterValue(value=attribute,
parameter='attribute',
reason='Unknown attribute.')
result = {'networkInterfaceId': network_interface['id']}
fn(result)
return result
def modify_network_interface_attribute(context, network_interface_id,

View File

@ -69,42 +69,36 @@ class NetworkInterfaceTest(base.EC2TestCase):
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
kwargs = {
'SubnetId': subnet_id,
'Description': data_utils.rand_name('ni')
}
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(SubnetId=subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean_ni = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
time.sleep(2)
self.assertRaises('DependencyViolation',
self.client.delete_subnet,
SubnetId=subnet_id)
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id)
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean_ni)
self.get_network_interface_waiter().wait_delete(ni_id)
data = self.client.delete_subnet(SubnetId=subnet_id)
self.client.delete_subnet(SubnetId=subnet_id)
self.cancelResourceCleanUp(res_clean_subnet)
self.get_subnet_waiter().wait_delete(subnet_id)
def test_create_network_interface(self):
kwargs = {
'SubnetId': self.subnet_id,
'Description': data_utils.rand_name('ni')
}
data = self.client.create_network_interface(*[], **kwargs)
desc = data_utils.rand_name('ni')
data = self.client.create_network_interface(SubnetId=self.subnet_id,
Description=desc)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
ni = data['NetworkInterface']
self.assertEqual(self.vpc_id, ni['VpcId'])
self.assertEqual(self.subnet_id, ni['SubnetId'])
self.assertEqual(kwargs['Description'], ni['Description'])
self.assertEqual(desc, ni['Description'])
self.assertNotEmpty(ni.get('Groups'))
self.assertEqual('default', ni['Groups'][0]['GroupName'])
@ -124,8 +118,7 @@ class NetworkInterfaceTest(base.EC2TestCase):
self.get_network_interface_waiter().wait_available(ni_id)
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id)
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
@ -142,13 +135,11 @@ class NetworkInterfaceTest(base.EC2TestCase):
data = self.client.describe_subnets(SubnetIds=[self.subnet_id])
count_before = data['Subnets'][0]['AvailableIpAddressCount']
kwargs = {
'SubnetId': self.subnet_id,
}
addresses = []
while True:
try:
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(
SubnetId=self.subnet_id)
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
self.assertEqual('NetworkInterfaceLimitExceeded', error_code)
@ -167,18 +158,12 @@ class NetworkInterfaceTest(base.EC2TestCase):
self.assertEqual(len(addresses), count_before - count_after)
for addr in addresses:
kwargs = {
'NetworkInterfaceId': addr[0],
}
data = self.client.delete_network_interface(*[], **kwargs)
self.client.delete_network_interface(NetworkInterfaceId=addr[0])
self.cancelResourceCleanUp(addr[1])
self.get_network_interface_waiter().wait_delete(addr[0])
def test_unassign_primary_addresses(self):
kwargs = {
'SubnetId': self.subnet_id,
}
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
@ -190,18 +175,14 @@ class NetworkInterfaceTest(base.EC2TestCase):
NetworkInterfaceId=ni_id,
PrivateIpAddresses=[primary_address])
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id)
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_assign_unassign_private_addresses_by_count(self):
data = self.client.describe_subnets(SubnetIds=[self.subnet_id])
count = data['Subnets'][0]['AvailableIpAddressCount']
kwargs = {
'SubnetId': self.subnet_id,
}
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
@ -234,18 +215,14 @@ class NetworkInterfaceTest(base.EC2TestCase):
count_after = data['Subnets'][0]['AvailableIpAddressCount']
self.assertEqual(count - 1, count_after)
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id)
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_assign_unassign_private_addresses_by_addresses(self):
data = self.client.describe_subnets(SubnetIds=[self.subnet_id])
count = data['Subnets'][0]['AvailableIpAddressCount']
kwargs = {
'SubnetId': self.subnet_id,
}
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
@ -281,97 +258,13 @@ class NetworkInterfaceTest(base.EC2TestCase):
count_after = data['Subnets'][0]['AvailableIpAddressCount']
self.assertIn(count_after, [count - 1, count - 2])
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_network_interface_attribute(self):
desc = data_utils.rand_name('ni')
kwargs = {
'SubnetId': self.subnet_id,
'Description': desc
}
data = self.client.create_network_interface(*[], **kwargs)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id,
Attribute='description')
self.assertEqual(desc, data['Description']['Value'])
new_desc = data_utils.rand_name('new-ni')
kwargs = {
'NetworkInterfaceId': ni_id,
'Description': {'Value': new_desc}
}
data = self.client.modify_network_interface_attribute(*[], **kwargs)
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id,
Attribute='description')
self.assertEqual(new_desc, data['Description']['Value'])
kwargs = {
'NetworkInterfaceId': ni_id,
'SourceDestCheck': {'Value': False}
}
data = self.client.modify_network_interface_attribute(*[], **kwargs)
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id,
Attribute='sourceDestCheck')
self.assertEqual(False, data['SourceDestCheck']['Value'])
# NOTE(andrey-mp): ResetNetworkInterfaceAttribute had inadequate json
# scheme in botocore.
kwargs = {
'NetworkInterfaceId': ni_id,
'SourceDestCheck': {'Value': True}
}
data = self.client.modify_network_interface_attribute(*[], **kwargs)
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id,
Attribute='sourceDestCheck')
self.assertEqual(True, data['SourceDestCheck']['Value'])
kwargs = {
'NetworkInterfaceId': ni_id,
'Attachment': {
'AttachmentId': 'fake'
}
}
self.assertRaises('MissingParameter',
self.client.modify_network_interface_attribute,
**kwargs)
kwargs = {
'NetworkInterfaceId': ni_id,
'Attachment': {
'AttachmentId': 'eni-attach-ffffffff',
'DeleteOnTermination': True
}
}
self.assertRaises('InvalidAttachmentID.NotFound',
self.client.modify_network_interface_attribute,
**kwargs)
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id)
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
@testtools.skipUnless(CONF.aws.image_id, "image id is not defined")
def test_attach_network_interface(self):
kwargs = {
'SubnetId': self.subnet_id,
}
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
self.addResourceCleanUp(self.client.delete_network_interface,
NetworkInterfaceId=ni_id)
@ -411,12 +304,9 @@ class NetworkInterfaceTest(base.EC2TestCase):
self.client.delete_network_interface,
NetworkInterfaceId=ni_id)
kwargs = {
'AttachmentId': attachment_id,
}
data = self.client.detach_network_interface(*[], **kwargs)
self.client.detach_network_interface(AttachmentId=attachment_id)
data = self.client.terminate_instances(InstanceIds=[instance_id])
self.client.terminate_instances(InstanceIds=[instance_id])
self.get_instance_waiter().wait_delete(instance_id)
@testtools.skipUnless(CONF.aws.image_id, "image id is not defined")
@ -441,10 +331,7 @@ class NetworkInterfaceTest(base.EC2TestCase):
clean_ni = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
kwargs = {
'SubnetId': self.subnet_id,
}
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id2 = data['NetworkInterface']['NetworkInterfaceId']
clean_ni2 = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id2)
@ -466,19 +353,17 @@ class NetworkInterfaceTest(base.EC2TestCase):
self.assertEqual(attachment_id, ni['Attachment']['AttachmentId'])
self.assertFalse(ni['Attachment']['DeleteOnTermination'])
data = self.client.terminate_instances(InstanceIds=[instance_id])
self.client.terminate_instances(InstanceIds=[instance_id])
self.get_instance_waiter().wait_delete(instance_id)
self.get_network_interface_waiter().wait_available(ni_id)
self.get_network_interface_waiter().wait_available(ni_id2)
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id)
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(clean_ni)
self.get_network_interface_waiter().wait_delete(ni_id)
data = self.client.delete_network_interface(
NetworkInterfaceId=ni_id2)
self.client.delete_network_interface(NetworkInterfaceId=ni_id2)
self.cancelResourceCleanUp(clean_ni2)
self.get_network_interface_waiter().wait_delete(ni_id2)
@ -492,10 +377,7 @@ class NetworkInterfaceTest(base.EC2TestCase):
self.assertTrue(nis[0]['Attachment']['DeleteOnTermination'])
ni_id = nis[0]['NetworkInterfaceId']
kwargs = {
'SubnetId': self.subnet_id,
}
data = self.client.create_network_interface(*[], **kwargs)
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id2 = data['NetworkInterface']['NetworkInterfaceId']
self.addResourceCleanUp(self.client.delete_network_interface,
NetworkInterfaceId=ni_id2)
@ -515,10 +397,157 @@ class NetworkInterfaceTest(base.EC2TestCase):
'DeleteOnTermination': True,
}
}
data = self.client.modify_network_interface_attribute(*[], **kwargs)
self.client.modify_network_interface_attribute(*[], **kwargs)
data = self.client.terminate_instances(InstanceIds=[instance_id])
self.client.terminate_instances(InstanceIds=[instance_id])
self.get_instance_waiter().wait_delete(instance_id)
self.get_network_interface_waiter().wait_delete(ni_id)
self.get_network_interface_waiter().wait_delete(ni_id2)
def test_network_interface_attribute_description(self):
desc = data_utils.rand_name('ni')
data = self.client.create_network_interface(
SubnetId=self.subnet_id, Description=desc)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id, Attribute='description')
self.assertEqual(desc, data['Description']['Value'])
new_desc = data_utils.rand_name('new-ni')
self.client.modify_network_interface_attribute(
NetworkInterfaceId=ni_id, Description={'Value': new_desc})
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id, Attribute='description')
self.assertEqual(new_desc, data['Description']['Value'])
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_network_interface_attribute_source_dest_check(self):
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
self.client.modify_network_interface_attribute(
NetworkInterfaceId=ni_id, SourceDestCheck={'Value': False})
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id, Attribute='sourceDestCheck')
self.assertFalse(data['SourceDestCheck']['Value'])
# NOTE(andrey-mp): ResetNetworkInterfaceAttribute had inadequate json
# scheme in botocore and doesn't work against Amazon.
self.client.modify_network_interface_attribute(
NetworkInterfaceId=ni_id, SourceDestCheck={'Value': True})
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id, Attribute='sourceDestCheck')
self.assertEqual(True, data['SourceDestCheck']['Value'])
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
@testtools.skipUnless(CONF.aws.image_id, "image id is not defined")
def test_network_interface_attribute_attachment(self):
instance_id = self.run_instance(SubnetId=self.subnet_id)
instance = self.get_instance(instance_id)
nis = instance.get('NetworkInterfaces', [])
self.assertEqual(1, len(nis))
self.assertTrue(nis[0]['Attachment']['DeleteOnTermination'])
ni_id = nis[0]['NetworkInterfaceId']
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id, Attribute='attachment')
self.assertIn('Attachment', data)
self.assertTrue(data['Attachment'].get('AttachmentId'))
self.assertTrue(data['Attachment'].get('DeleteOnTermination'))
self.assertEqual(0, data['Attachment'].get('DeviceIndex'))
self.assertEqual(instance_id, data['Attachment'].get('InstanceId'))
self.assertEqual('attached', data['Attachment'].get('Status'))
self.client.terminate_instances(InstanceIds=[instance_id])
self.get_instance_waiter().wait_delete(instance_id)
def test_network_interface_attribute_empty_attachment(self):
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id, Attribute='attachment')
self.assertNotIn('Attachment', data)
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_network_interface_attribute_group_set(self):
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
data = self.client.describe_network_interface_attribute(
NetworkInterfaceId=ni_id, Attribute='groupSet')
self.assertIn('Groups', data)
self.assertEqual(1, len(data['Groups']))
self.assertEqual('default', data['Groups'][0]['GroupName'])
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_instance_attributes_negative(self):
data = self.client.create_network_interface(SubnetId=self.subnet_id)
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.delete_network_interface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
self.assertRaises('InvalidParameterCombination',
self.client.describe_network_interface_attribute,
NetworkInterfaceId=ni_id)
self.assertRaises('InvalidParameterValue',
self.client.describe_network_interface_attribute,
NetworkInterfaceId=ni_id, Attribute='fake')
self.assertRaises('InvalidNetworkInterfaceID.NotFound',
self.client.describe_network_interface_attribute,
NetworkInterfaceId='eni-0', Attribute='description')
self.assertRaises('InvalidParameterCombination',
self.client.modify_network_interface_attribute,
NetworkInterfaceId=ni_id)
self.assertRaises('MissingParameter',
self.client.modify_network_interface_attribute,
NetworkInterfaceId=ni_id,
Attachment={'AttachmentId': 'fake'})
self.assertRaises('InvalidAttachmentID.NotFound',
self.client.modify_network_interface_attribute,
NetworkInterfaceId=ni_id,
Attachment={'AttachmentId': 'eni-attach-0',
'DeleteOnTermination': True})
self.assertRaises('InvalidGroup.NotFound',
self.client.modify_network_interface_attribute,
NetworkInterfaceId=ni_id, Groups=['sg-0'])
self.assertRaises('InvalidParameterCombination',
self.client.modify_network_interface_attribute,
NetworkInterfaceId=ni_id, Description={})
self.client.delete_network_interface(NetworkInterfaceId=ni_id)
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)