3a0d78e057
Fixes bug 1108978 Change-Id: I5113e67bad1aaafdbaa0adc202cac97ba8d0dfdb
181 lines
7.0 KiB
Python
181 lines
7.0 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import commands
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
|
|
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
|
# it will override what happens to be installed in /usr/(local/)lib/python...
|
|
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
|
os.pardir,
|
|
os.pardir))
|
|
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
|
|
sys.path.insert(0, possible_topdir)
|
|
|
|
from smoketests import base
|
|
from smoketests import flags
|
|
|
|
#Note that this test should run from
|
|
#public network (outside of private network segments)
|
|
#Please set EC2_URL correctly
|
|
#You should use admin account in this test
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
TEST_PREFIX = 'test%s' % int(random.random() * 1000000)
|
|
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
|
|
TEST_KEY = '%s_key' % TEST_PREFIX
|
|
TEST_KEY2 = '%s_key2' % TEST_PREFIX
|
|
TEST_DATA = {}
|
|
|
|
|
|
class InstanceTestsFromPublic(base.UserSmokeTestCase):
|
|
def test_001_can_create_keypair(self):
|
|
key = self.create_key_pair(self.conn, TEST_KEY)
|
|
self.assertEqual(key.name, TEST_KEY)
|
|
|
|
def test_002_security_group(self):
|
|
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
|
|
for x in range(random.randint(4, 8)))
|
|
group = self.conn.create_security_group(security_group_name,
|
|
'test group')
|
|
group.connection = self.conn
|
|
group.authorize('tcp', 22, 22, '0.0.0.0/0')
|
|
if FLAGS.use_ipv6:
|
|
group.authorize('tcp', 22, 22, '::/0')
|
|
|
|
reservation = self.conn.run_instances(FLAGS.test_image,
|
|
key_name=TEST_KEY,
|
|
security_groups=[security_group_name],
|
|
instance_type='m1.tiny')
|
|
self.data['security_group_name'] = security_group_name
|
|
self.data['group'] = group
|
|
self.data['instance_id'] = reservation.instances[0].id
|
|
|
|
def test_003_instance_with_group_runs_within_60_seconds(self):
|
|
reservations = self.conn.get_all_instances([self.data['instance_id']])
|
|
instance = reservations[0].instances[0]
|
|
# allow 60 seconds to exit pending with IP
|
|
for x in xrange(60):
|
|
instance.update()
|
|
if instance.state == u'running':
|
|
break
|
|
time.sleep(1)
|
|
else:
|
|
self.fail('instance failed to start')
|
|
ip = reservations[0].instances[0].private_ip_address
|
|
self.failIf(ip == '0.0.0.0')
|
|
self.data['private_ip'] = ip
|
|
if FLAGS.use_ipv6:
|
|
ipv6 = reservations[0].instances[0].dns_name_v6
|
|
self.failIf(ipv6 is None)
|
|
self.data['ip_v6'] = ipv6
|
|
|
|
def test_004_can_ssh_to_ipv6(self):
|
|
if FLAGS.use_ipv6:
|
|
for x in xrange(20):
|
|
try:
|
|
conn = self.connect_ssh(
|
|
self.data['ip_v6'], TEST_KEY)
|
|
conn.close()
|
|
except Exception as ex:
|
|
time.sleep(1)
|
|
else:
|
|
break
|
|
else:
|
|
self.fail('could not ssh to instance')
|
|
|
|
def test_012_can_create_instance_with_keypair(self):
|
|
if 'instance_id' in self.data:
|
|
self.conn.terminate_instances([self.data['instance_id']])
|
|
reservation = self.conn.run_instances(FLAGS.test_image,
|
|
key_name=TEST_KEY,
|
|
instance_type='m1.tiny')
|
|
self.assertEqual(len(reservation.instances), 1)
|
|
self.data['instance_id'] = reservation.instances[0].id
|
|
|
|
def test_013_instance_runs_within_60_seconds(self):
|
|
reservations = self.conn.get_all_instances([self.data['instance_id']])
|
|
instance = reservations[0].instances[0]
|
|
# allow 60 seconds to exit pending with IP
|
|
for x in xrange(60):
|
|
instance.update()
|
|
if instance.state == u'running':
|
|
break
|
|
time.sleep(1)
|
|
else:
|
|
self.fail('instance failed to start')
|
|
ip = reservations[0].instances[0].private_ip_address
|
|
self.failIf(ip == '0.0.0.0')
|
|
self.data['private_ip'] = ip
|
|
if FLAGS.use_ipv6:
|
|
ipv6 = reservations[0].instances[0].dns_name_v6
|
|
self.failIf(ipv6 is None)
|
|
self.data['ip_v6'] = ipv6
|
|
|
|
def test_014_can_not_ping_private_ip(self):
|
|
for x in xrange(4):
|
|
# ping waits for 1 second
|
|
status, output = commands.getstatusoutput(
|
|
'ping -c1 %s' % self.data['private_ip'])
|
|
if status == 0:
|
|
self.fail('can ping private ip from public network')
|
|
if FLAGS.use_ipv6:
|
|
status, output = commands.getstatusoutput(
|
|
'ping6 -c1 %s' % self.data['ip_v6'])
|
|
if status == 0:
|
|
self.fail('can ping ipv6 from public network')
|
|
else:
|
|
pass
|
|
|
|
def test_015_can_not_ssh_to_private_ip(self):
|
|
for x in xrange(1):
|
|
try:
|
|
conn = self.connect_ssh(self.data['private_ip'], TEST_KEY)
|
|
conn.close()
|
|
except Exception:
|
|
time.sleep(1)
|
|
else:
|
|
self.fail('can ssh for ipv4 address from public network')
|
|
|
|
if FLAGS.use_ipv6:
|
|
for x in xrange(1):
|
|
try:
|
|
conn = self.connect_ssh(
|
|
self.data['ip_v6'], TEST_KEY)
|
|
conn.close()
|
|
except Exception:
|
|
time.sleep(1)
|
|
else:
|
|
self.fail('can ssh for ipv6 address from public network')
|
|
|
|
def test_999_tearDown(self):
|
|
self.delete_key_pair(self.conn, TEST_KEY)
|
|
security_group_name = self.data['security_group_name']
|
|
group = self.data['group']
|
|
if group:
|
|
group.revoke('tcp', 22, 22, '0.0.0.0/0')
|
|
if FLAGS.use_ipv6:
|
|
group.revoke('tcp', 22, 22, '::/0')
|
|
self.conn.delete_security_group(security_group_name)
|
|
if 'instance_id' in self.data:
|
|
self.conn.terminate_instances([self.data['instance_id']])
|