c33bd50a79
Fixes bug #912650. Fixes the following test cases in smoketests/test_netadmin.py:SecurityGroupTests: 'test_004_can_access_metadata_over_public_ip' 'test_005_validate_metadata' 'test_999_tearDown' Change-Id: Iefa8034a364e13243d06200e8af1840df896baf3
203 lines
8.4 KiB
Python
203 lines
8.4 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import commands
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
|
|
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
|
# it will override what happens to be installed in /usr/(local/)lib/python...
|
|
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
|
os.pardir,
|
|
os.pardir))
|
|
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
|
|
sys.path.insert(0, possible_topdir)
|
|
|
|
from smoketests import flags
|
|
from smoketests import base
|
|
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
TEST_PREFIX = 'test%s' % int(random.random() * 1000000)
|
|
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
|
|
TEST_KEY = '%s_key' % TEST_PREFIX
|
|
TEST_GROUP = '%s_group' % TEST_PREFIX
|
|
|
|
|
|
class AddressTests(base.UserSmokeTestCase):
|
|
def test_000_setUp(self):
|
|
self.create_key_pair(self.conn, TEST_KEY)
|
|
reservation = self.conn.run_instances(FLAGS.test_image,
|
|
instance_type='m1.tiny',
|
|
key_name=TEST_KEY)
|
|
self.data['instance'] = reservation.instances[0]
|
|
if not self.wait_for_running(self.data['instance']):
|
|
self.fail('instance failed to start')
|
|
self.data['instance'].update()
|
|
if not self.wait_for_ping(self.data['instance'].private_ip_address):
|
|
self.fail('could not ping instance')
|
|
if not self.wait_for_ssh(self.data['instance'].private_ip_address,
|
|
TEST_KEY):
|
|
self.fail('could not ssh to instance')
|
|
|
|
def test_001_can_allocate_floating_ip(self):
|
|
result = self.conn.allocate_address()
|
|
self.assertTrue(hasattr(result, 'public_ip'))
|
|
self.data['public_ip'] = result.public_ip
|
|
|
|
def test_002_can_associate_ip_with_instance(self):
|
|
result = self.conn.associate_address(self.data['instance'].id,
|
|
self.data['public_ip'])
|
|
self.assertTrue(result)
|
|
|
|
def test_003_can_ssh_with_public_ip(self):
|
|
ssh_authorized = False
|
|
groups = self.conn.get_all_security_groups(['default'])
|
|
for rule in groups[0].rules:
|
|
if (rule.ip_protocol == 'tcp' and
|
|
int(rule.from_port) <= 22 and
|
|
int(rule.to_port) >= 22):
|
|
ssh_authorized = True
|
|
break
|
|
if not ssh_authorized:
|
|
self.conn.authorize_security_group('default',
|
|
ip_protocol='tcp',
|
|
from_port=22,
|
|
to_port=22)
|
|
try:
|
|
if not self.wait_for_ssh(self.data['public_ip'], TEST_KEY):
|
|
self.fail('could not ssh to public ip')
|
|
finally:
|
|
if not ssh_authorized:
|
|
self.conn.revoke_security_group('default',
|
|
ip_protocol='tcp',
|
|
from_port=22,
|
|
to_port=22)
|
|
|
|
def test_004_can_disassociate_ip_from_instance(self):
|
|
result = self.conn.disassociate_address(self.data['public_ip'])
|
|
self.assertTrue(result)
|
|
|
|
def test_005_can_deallocate_floating_ip(self):
|
|
result = self.conn.release_address(self.data['public_ip'])
|
|
self.assertTrue(result)
|
|
|
|
def test_999_tearDown(self):
|
|
self.delete_key_pair(self.conn, TEST_KEY)
|
|
self.conn.terminate_instances([self.data['instance'].id])
|
|
|
|
|
|
class SecurityGroupTests(base.UserSmokeTestCase):
|
|
|
|
def __get_metadata_item(self, category):
|
|
id_url = "latest/meta-data/%s" % category
|
|
options = "-f -s --max-time 1"
|
|
command = "curl %s %s/%s" % (options, self.data['public_ip'], id_url)
|
|
status, output = commands.getstatusoutput(command)
|
|
value = output.strip()
|
|
if status > 0:
|
|
return False
|
|
return value
|
|
|
|
def __public_instance_is_accessible(self):
|
|
instance_id = self.__get_metadata_item('instance-id')
|
|
if not instance_id:
|
|
return False
|
|
if instance_id != self.data['instance'].id:
|
|
raise Exception("Wrong instance id. Expected: %s, Got: %s" %
|
|
(self.data['instance'].id, instance_id))
|
|
return True
|
|
|
|
def test_001_can_create_security_group(self):
|
|
self.conn.create_security_group(TEST_GROUP, description='test')
|
|
|
|
groups = self.conn.get_all_security_groups()
|
|
self.assertTrue(TEST_GROUP in [group.name for group in groups])
|
|
|
|
def test_002_can_launch_instance_in_security_group(self):
|
|
with open("proxy.sh") as f:
|
|
user_data = f.read()
|
|
self.create_key_pair(self.conn, TEST_KEY)
|
|
reservation = self.conn.run_instances(FLAGS.test_image,
|
|
key_name=TEST_KEY,
|
|
security_groups=[TEST_GROUP],
|
|
user_data=user_data,
|
|
instance_type='m1.tiny')
|
|
|
|
self.data['instance'] = reservation.instances[0]
|
|
if not self.wait_for_running(self.data['instance']):
|
|
self.fail('instance failed to start')
|
|
self.data['instance'].update()
|
|
|
|
def test_003_can_authorize_security_group_ingress(self):
|
|
self.assertTrue(self.conn.authorize_security_group(TEST_GROUP,
|
|
ip_protocol='tcp',
|
|
from_port=80,
|
|
to_port=80))
|
|
|
|
def test_004_can_access_metadata_over_public_ip(self):
|
|
result = self.conn.allocate_address()
|
|
self.assertTrue(hasattr(result, 'public_ip'))
|
|
self.data['public_ip'] = result.public_ip
|
|
|
|
result = self.conn.associate_address(self.data['instance'].id,
|
|
self.data['public_ip'])
|
|
start_time = time.time()
|
|
while not self.__public_instance_is_accessible():
|
|
# 1 minute to launch
|
|
if time.time() - start_time > 60:
|
|
raise Exception("Timeout")
|
|
time.sleep(1)
|
|
|
|
def test_005_validate_metadata(self):
|
|
instance = self.data['instance']
|
|
start_time = time.time()
|
|
instance_type = False
|
|
while not instance_type:
|
|
instance_type = self.__get_metadata_item('instance-type')
|
|
# 10 seconds to restart proxy
|
|
if time.time() - start_time > 10:
|
|
raise Exception("Timeout")
|
|
time.sleep(1)
|
|
self.assertEqual(instance.instance_type, instance_type)
|
|
#FIXME(dprince): validate more metadata here
|
|
|
|
def test_006_can_revoke_security_group_ingress(self):
|
|
self.assertTrue(self.conn.revoke_security_group(TEST_GROUP,
|
|
ip_protocol='tcp',
|
|
from_port=80,
|
|
to_port=80))
|
|
start_time = time.time()
|
|
while self.__public_instance_is_accessible():
|
|
# 1 minute to teardown
|
|
if time.time() - start_time > 60:
|
|
raise Exception("Timeout")
|
|
time.sleep(1)
|
|
|
|
def test_999_tearDown(self):
|
|
self.conn.disassociate_address(self.data['public_ip'])
|
|
self.conn.delete_key_pair(TEST_KEY)
|
|
self.conn.delete_security_group(TEST_GROUP)
|
|
groups = self.conn.get_all_security_groups()
|
|
self.assertFalse(TEST_GROUP in [group.name for group in groups])
|
|
self.conn.terminate_instances([self.data['instance'].id])
|
|
self.assertTrue(self.conn.release_address(self.data['public_ip']))
|