Support IPv6

This commit is contained in:
NTT PF Lab.
2010-12-24 20:38:49 +09:00
parent 0a93a9298d
commit c5c58cb20d
24 changed files with 702 additions and 32 deletions

View File

@@ -36,7 +36,7 @@ flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
# TODO(devamcar): Use random tempfile
ZIP_FILENAME = '/tmp/nova-me-x509.zip'
TEST_PREFIX = 'test%s' % int(random.random()*1000000)
TEST_PREFIX = 'test%s' % int(random.random() * 1000000)
TEST_USERNAME = '%suser' % TEST_PREFIX
TEST_PROJECTNAME = '%sproject' % TEST_PREFIX
@@ -89,4 +89,3 @@ class UserTests(AdminSmokeTestCase):
if __name__ == "__main__":
suites = {'user': unittest.makeSuite(UserTests)}
sys.exit(base.run_tests(suites))

View File

@@ -17,6 +17,7 @@
# under the License.
import boto
import boto_v6
import commands
import httplib
import os
@@ -69,6 +70,17 @@ class SmokeTestCase(unittest.TestCase):
'test.')
parts = self.split_clc_url(clc_url)
if FLAGS.use_ipv6:
return boto_v6.connect_ec2(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
is_secure=parts['is_secure'],
region=RegionInfo(None,
'nova',
parts['ip']),
port=parts['port'],
path='/services/Cloud',
**kwargs)
return boto.connect_ec2(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
is_secure=parts['is_secure'],
@@ -115,7 +127,8 @@ class SmokeTestCase(unittest.TestCase):
return True
def upload_image(self, bucket_name, image):
cmd = 'euca-upload-bundle -b %s -m /tmp/%s.manifest.xml' % (bucket_name, image)
cmd = 'euca-upload-bundle -b '
cmd += '%s -m /tmp/%s.manifest.xml' % (bucket_name, image)
status, output = commands.getstatusoutput(cmd)
if status != 0:
print '%s -> \n %s' % (cmd, output)
@@ -130,6 +143,7 @@ class SmokeTestCase(unittest.TestCase):
raise Exception(output)
return True
def run_tests(suites):
argv = FLAGS(sys.argv)
@@ -151,4 +165,3 @@ def run_tests(suites):
else:
for suite in suites.itervalues():
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@@ -33,6 +33,7 @@ DEFINE_bool = DEFINE_bool
# __GLOBAL FLAGS ONLY__
# Define any app-specific flags in their own files, docs at:
# http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39
DEFINE_string('region', 'nova', 'Region to use')
DEFINE_string('test_image', 'ami-tiny', 'Image to use for launch tests')
DEFINE_string('use_ipv6', True, 'use the ipv6 or not')

View File

@@ -0,0 +1,180 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import commands
import os
import random
import socket
import sys
import time
import unittest
from smoketests import flags
from smoketests import base
from smoketests import user_smoketests
#Note that this test should run from
#public network (outside of private network segments)
#Please set EC2_URL correctly
#You should use admin account in this test
FLAGS = flags.FLAGS
TEST_PREFIX = 'test%s' % int(random.random() * 1000000)
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
TEST_KEY = '%s_key' % TEST_PREFIX
TEST_KEY2 = '%s_key2' % TEST_PREFIX
TEST_DATA = {}
class InstanceTestsFromPublic(user_smoketests.UserSmokeTestCase):
def test_001_can_create_keypair(self):
key = self.create_key_pair(self.conn, TEST_KEY)
self.assertEqual(key.name, TEST_KEY)
def test_002_security_group(self):
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
group = self.conn.create_security_group(security_group_name,
'test group')
group.connection = self.conn
group.authorize('tcp', 22, 22, '0.0.0.0/0')
if FLAGS.use_ipv6:
group.authorize('tcp', 22, 22, '::/0')
reservation = self.conn.run_instances(FLAGS.test_image,
key_name=TEST_KEY,
security_groups=[security_group_name],
instance_type='m1.tiny')
self.data['security_group_name'] = security_group_name
self.data['group'] = group
self.data['instance_id'] = reservation.instances[0].id
def test_003_instance_with_group_runs_within_60_seconds(self):
reservations = self.conn.get_all_instances([self.data['instance_id']])
instance = reservations[0].instances[0]
# allow 60 seconds to exit pending with IP
for x in xrange(60):
instance.update()
if instance.state == u'running':
break
time.sleep(1)
else:
self.fail('instance failed to start')
ip = reservations[0].instances[0].private_dns_name
self.failIf(ip == '0.0.0.0')
self.data['private_ip'] = ip
if FLAGS.use_ipv6:
ipv6 = reservations[0].instances[0].dns_name_v6
self.failIf(ipv6 is None)
self.data['ip_v6'] = ipv6
def test_004_can_ssh_to_ipv6(self):
if FLAGS.use_ipv6:
for x in xrange(20):
try:
conn = self.connect_ssh(
self.data['ip_v6'], TEST_KEY)
conn.close()
except Exception as ex:
print ex
time.sleep(1)
else:
break
else:
self.fail('could not ssh to instance')
def test_012_can_create_instance_with_keypair(self):
if 'instance_id' in self.data:
self.conn.terminate_instances([self.data['instance_id']])
reservation = self.conn.run_instances(FLAGS.test_image,
key_name=TEST_KEY,
instance_type='m1.tiny')
self.assertEqual(len(reservation.instances), 1)
self.data['instance_id'] = reservation.instances[0].id
def test_013_instance_runs_within_60_seconds(self):
reservations = self.conn.get_all_instances([self.data['instance_id']])
instance = reservations[0].instances[0]
# allow 60 seconds to exit pending with IP
for x in xrange(60):
instance.update()
if instance.state == u'running':
break
time.sleep(1)
else:
self.fail('instance failed to start')
ip = reservations[0].instances[0].private_dns_name
self.failIf(ip == '0.0.0.0')
self.data['private_ip'] = ip
if FLAGS.use_ipv6:
ipv6 = reservations[0].instances[0].dns_name_v6
self.failIf(ipv6 is None)
self.data['ip_v6'] = ipv6
def test_014_can_not_ping_private_ip(self):
for x in xrange(4):
# ping waits for 1 second
status, output = commands.getstatusoutput(
'ping -c1 %s' % self.data['private_ip'])
if status == 0:
self.fail('can ping private ip from public network')
if FLAGS.use_ipv6:
status, output = commands.getstatusoutput(
'ping6 -c1 %s' % self.data['ip_v6'])
if status == 0:
self.fail('can ping ipv6 from public network')
else:
pass
def test_015_can_not_ssh_to_private_ip(self):
for x in xrange(1):
try:
conn = self.connect_ssh(self.data['private_ip'], TEST_KEY)
conn.close()
except Exception:
time.sleep(1)
else:
self.fail('can ssh for ipv4 address from public network')
if FLAGS.use_ipv6:
for x in xrange(1):
try:
conn = self.connect_ssh(
self.data['ip_v6'], TEST_KEY)
conn.close()
except Exception:
time.sleep(1)
else:
self.fail('can ssh for ipv6 address from public network')
def test_999_tearDown(self):
self.delete_key_pair(self.conn, TEST_KEY)
security_group_name = self.data['security_group_name']
group = self.data['group']
if group:
group.revoke('tcp', 22, 22, '0.0.0.0/0')
if FLAGS.use_ipv6:
group.revoke('tcp', 22, 22, '::/0')
self.conn.delete_security_group(security_group_name)
if 'instance_id' in self.data:
self.conn.terminate_instances([self.data['instance_id']])
if __name__ == "__main__":
suites = {'instance': unittest.makeSuite(InstanceTestsFromPublic)}
sys.exit(base.run_tests(suites))

View File

@@ -37,7 +37,7 @@ flags.DEFINE_string('bundle_kernel', 'openwrt-x86-vmlinuz',
flags.DEFINE_string('bundle_image', 'openwrt-x86-ext2.image',
'Local image file to use for bundling tests')
TEST_PREFIX = 'test%s' % int (random.random()*1000000)
TEST_PREFIX = 'test%s' % int(random.random() * 1000000)
TEST_BUCKET = '%s_bucket' % TEST_PREFIX
TEST_KEY = '%s_key' % TEST_PREFIX
TEST_DATA = {}
@@ -71,7 +71,7 @@ class ImageTests(UserSmokeTestCase):
def test_006_can_register_kernel(self):
kernel_id = self.conn.register_image('%s/%s.manifest.xml' %
(TEST_BUCKET, FLAGS.bundle_kernel))
(TEST_BUCKET, FLAGS.bundle_kernel))
self.assert_(kernel_id is not None)
self.data['kernel_id'] = kernel_id
@@ -83,7 +83,7 @@ class ImageTests(UserSmokeTestCase):
time.sleep(1)
else:
print image.state
self.assert_(False) # wasn't available within 10 seconds
self.assert_(False) # wasn't available within 10 seconds
self.assert_(image.type == 'machine')
for i in xrange(10):
@@ -92,7 +92,7 @@ class ImageTests(UserSmokeTestCase):
break
time.sleep(1)
else:
self.assert_(False) # wasn't available within 10 seconds
self.assert_(False) # wasn't available within 10 seconds
self.assert_(kernel.type == 'kernel')
def test_008_can_describe_image_attribute(self):
@@ -137,20 +137,23 @@ class InstanceTests(UserSmokeTestCase):
self.data['instance_id'] = reservation.instances[0].id
def test_003_instance_runs_within_60_seconds(self):
reservations = self.conn.get_all_instances([data['instance_id']])
reservations = self.conn.get_all_instances([self.data['instance_id']])
instance = reservations[0].instances[0]
# allow 60 seconds to exit pending with IP
for x in xrange(60):
instance.update()
if instance.state == u'running':
break
break
time.sleep(1)
else:
self.fail('instance failed to start')
ip = reservations[0].instances[0].private_dns_name
self.failIf(ip == '0.0.0.0')
self.data['private_ip'] = ip
print self.data['private_ip']
if FLAGS.use_ipv6:
ipv6 = reservations[0].instances[0].dns_name_v6
self.failIf(ipv6 is None)
self.data['ip_v6'] = ipv6
def test_004_can_ping_private_ip(self):
for x in xrange(120):
@@ -159,6 +162,11 @@ class InstanceTests(UserSmokeTestCase):
'ping -c1 %s' % self.data['private_ip'])
if status == 0:
break
if FLAGS.use_ipv6:
status, output = commands.getstatusoutput(
'ping6 -c1 %s' % self.data['ip_v6'])
if status == 0:
break
else:
self.fail('could not ping instance')
@@ -174,6 +182,19 @@ class InstanceTests(UserSmokeTestCase):
else:
self.fail('could not ssh to instance')
if FLAGS.use_ipv6:
for x in xrange(30):
try:
conn = self.connect_ssh(
self.data['ip_v6'], TEST_KEY)
conn.close()
except Exception:
time.sleep(1)
else:
break
else:
self.fail('could not ssh to instance')
def test_006_can_allocate_elastic_ip(self):
result = self.conn.allocate_address()
self.assertTrue(hasattr(result, 'public_ip'))
@@ -206,8 +227,8 @@ class InstanceTests(UserSmokeTestCase):
def test_999_tearDown(self):
self.delete_key_pair(self.conn, TEST_KEY)
if self.data.has_key('instance_id'):
self.conn.terminate_instances([data['instance_id']])
if 'instance_id' in self.data:
self.conn.terminate_instances([self.data['instance_id']])
class VolumeTests(UserSmokeTestCase):