nova/smoketests/smoketest.py

567 lines
20 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import commands
import os
import random
import re
import sys
import time
import unittest
import zipfile
import paramiko
from smoketests import flags
from smoketests import novatestcase
SUITE_NAMES = '[user, image, security, public_network, volume]'
FLAGS = flags.FLAGS
flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
# TODO(devamcar): Use random tempfile
ZIP_FILENAME = '/tmp/nova-me-x509.zip'
data = {}
test_prefix = 'test%s' % int(random.random()*1000000)
test_username = '%suser' % test_prefix
test_bucket = '%s_bucket' % test_prefix
test_key = '%s_key' % test_prefix
# Test admin credentials and user creation
class UserTests(novatestcase.NovaTestCase):
def test_001_admin_can_connect(self):
conn = self.connection_for('admin')
self.assert_(conn)
def test_002_admin_can_create_user(self):
userinfo = self.create_user(test_username)
self.assertEqual(userinfo.username, test_username)
def test_003_user_can_download_credentials(self):
buf = self.get_signed_zip(test_username)
output = open(ZIP_FILENAME, 'w')
output.write(buf)
output.close()
zip = zipfile.ZipFile(ZIP_FILENAME, 'a', zipfile.ZIP_DEFLATED)
bad = zip.testzip()
zip.close()
self.failIf(bad)
def test_999_tearDown(self):
self.delete_user(test_username)
user = self.get_user(test_username)
self.assert_(user is None)
try:
os.remove(ZIP_FILENAME)
except:
pass
# Test image bundling, registration, and launching
class ImageTests(novatestcase.NovaTestCase):
def test_000_setUp(self):
self.create_user(test_username)
def test_001_admin_can_bundle_image(self):
self.assertTrue(self.bundle_image(FLAGS.bundle_image))
def test_002_admin_can_upload_image(self):
self.assertTrue(self.upload_image(test_bucket, FLAGS.bundle_image))
def test_003_admin_can_register_image(self):
image_id = self.register_image(test_bucket, FLAGS.bundle_image)
self.assert_(image_id is not None)
data['image_id'] = image_id
def test_004_admin_can_bundle_kernel(self):
self.assertTrue(self.bundle_image(FLAGS.bundle_kernel, kernel=True))
def test_005_admin_can_upload_kernel(self):
self.assertTrue(self.upload_image(test_bucket, FLAGS.bundle_kernel))
def test_006_admin_can_register_kernel(self):
# FIXME(devcamcar): registration should verify that bucket/manifest
# exists before returning successfully.
kernel_id = self.register_image(test_bucket, FLAGS.bundle_kernel)
self.assert_(kernel_id is not None)
data['kernel_id'] = kernel_id
def test_007_admin_images_are_available_within_10_seconds(self):
for i in xrange(10):
image = self.admin.get_image(data['image_id'])
if image and image.state == 'available':
break
time.sleep(1)
else:
print image.state
self.assert_(False) # wasn't available within 10 seconds
self.assert_(image.type == 'machine')
for i in xrange(10):
kernel = self.admin.get_image(data['kernel_id'])
if kernel and kernel.state == 'available':
break
time.sleep(1)
else:
self.assert_(False) # wasn't available within 10 seconds
self.assert_(kernel.type == 'kernel')
def test_008_admin_can_describe_image_attribute(self):
attrs = self.admin.get_image_attribute(data['image_id'],
'launchPermission')
self.assert_(attrs.name, 'launch_permission')
def test_009_me_cannot_see_non_public_images(self):
conn = self.connection_for(test_username)
images = conn.get_all_images(image_ids=[data['image_id']])
self.assertEqual(len(images), 0)
def test_010_admin_can_modify_image_launch_permission(self):
conn = self.connection_for(test_username)
self.admin.modify_image_attribute(image_id=data['image_id'],
operation='add',
attribute='launchPermission',
groups='all')
image = conn.get_image(data['image_id'])
self.assertEqual(image.id, data['image_id'])
def test_011_me_can_list_public_images(self):
conn = self.connection_for(test_username)
images = conn.get_all_images(image_ids=[data['image_id']])
self.assertEqual(len(images), 1)
pass
def test_012_me_can_see_launch_permission(self):
attrs = self.admin.get_image_attribute(data['image_id'],
'launchPermission')
self.assert_(attrs.name, 'launch_permission')
self.assert_(attrs.groups[0], 'all')
# FIXME: add tests that user can launch image
# def test_013_user_can_launch_admin_public_image(self):
# # TODO: Use openwrt kernel instead of default kernel
# conn = self.connection_for(test_username)
# reservation = conn.run_instances(data['image_id'])
# self.assertEqual(len(reservation.instances), 1)
# data['my_instance_id'] = reservation.instances[0].id
# def test_014_instances_launch_within_30_seconds(self):
# pass
# def test_015_user_can_terminate(self):
# conn = self.connection_for(test_username)
# terminated = conn.terminate_instances(
# instance_ids=[data['my_instance_id']])
# self.assertEqual(len(terminated), 1)
def test_016_admin_can_deregister_kernel(self):
self.assertTrue(self.admin.deregister_image(data['kernel_id']))
def test_017_admin_can_deregister_image(self):
self.assertTrue(self.admin.deregister_image(data['image_id']))
def test_018_admin_can_delete_bundle(self):
self.assertTrue(self.delete_bundle_bucket(test_bucket))
def test_999_tearDown(self):
data = {}
self.delete_user(test_username)
# Test key pairs and security groups
class SecurityTests(novatestcase.NovaTestCase):
def test_000_setUp(self):
self.create_user(test_username + '_me')
self.create_user(test_username + '_you')
data['image_id'] = 'ami-tiny'
def test_001_me_can_create_keypair(self):
conn = self.connection_for(test_username + '_me')
key = self.create_key_pair(conn, test_key)
self.assertEqual(key.name, test_key)
def test_002_you_can_create_keypair(self):
conn = self.connection_for(test_username + '_you')
key = self.create_key_pair(conn, test_key+ 'yourkey')
self.assertEqual(key.name, test_key+'yourkey')
def test_003_me_can_create_instance_with_keypair(self):
conn = self.connection_for(test_username + '_me')
reservation = conn.run_instances(data['image_id'], key_name=test_key)
self.assertEqual(len(reservation.instances), 1)
data['my_instance_id'] = reservation.instances[0].id
def test_004_me_can_obtain_private_ip_within_60_seconds(self):
conn = self.connection_for(test_username + '_me')
reservations = conn.get_all_instances([data['my_instance_id']])
instance = reservations[0].instances[0]
# allow 60 seconds to exit pending with IP
for x in xrange(60):
instance.update()
if instance.state != u'pending':
break
time.sleep(1)
else:
self.assert_(False)
# self.assertEqual(instance.state, u'running')
ip = reservations[0].instances[0].private_dns_name
self.failIf(ip == '0.0.0.0')
data['my_private_ip'] = ip
print data['my_private_ip'],
def test_005_can_ping_private_ip(self):
for x in xrange(120):
# ping waits for 1 second
status, output = commands.getstatusoutput(
'ping -c1 -w1 %s' % data['my_private_ip'])
if status == 0:
break
else:
self.assert_('could not ping instance')
#def test_005_me_cannot_ssh_when_unauthorized(self):
# self.assertRaises(paramiko.SSHException, self.connect_ssh,
# data['my_private_ip'], 'mykey')
#def test_006_me_can_authorize_ssh(self):
# conn = self.connection_for(test_username + '_me')
# self.assertTrue(
# conn.authorize_security_group(
# 'default',
# ip_protocol='tcp',
# from_port=22,
# to_port=22,
# cidr_ip='0.0.0.0/0'
# )
# )
def test_007_me_can_ssh_when_authorized(self):
conn = self.connect_ssh(data['my_private_ip'], test_key)
conn.close()
#def test_008_me_can_revoke_ssh_authorization(self):
# conn = self.connection_for('me')
# self.assertTrue(
# conn.revoke_security_group(
# 'default',
# ip_protocol='tcp',
# from_port=22,
# to_port=22,
# cidr_ip='0.0.0.0/0'
# )
# )
#def test_009_you_cannot_ping_my_instance(self):
# TODO: should ping my_private_ip from with an instance started by you.
#self.assertFalse(self.can_ping(data['my_private_ip']))
def test_010_you_cannot_ssh_to_my_instance(self):
try:
conn = self.connect_ssh(data['my_private_ip'],
test_key + 'yourkey')
conn.close()
except paramiko.SSHException:
pass
else:
self.fail("expected SSHException")
def test_999_tearDown(self):
conn = self.connection_for(test_username + '_me')
self.delete_key_pair(conn, test_key)
if data.has_key('my_instance_id'):
conn.terminate_instances([data['my_instance_id']])
conn = self.connection_for(test_username + '_you')
self.delete_key_pair(conn, test_key + 'yourkey')
conn = self.connection_for('admin')
self.delete_user(test_username + '_me')
self.delete_user(test_username + '_you')
#self.tearDown_test_image(conn, data['image_id'])
# TODO: verify wrt image boots
# build python into wrt image
# build boto/m2crypto into wrt image
# build euca2ools into wrt image
# build a script to download and unpack credentials
# - return "ok" to stdout for comparison in self.assertEqual()
# build a script to bundle the instance
# build a script to upload the bundle
# status, output = commands.getstatusoutput('cmd')
# if status == 0:
# print 'ok'
# else:
# print output
# Testing rebundling
class RebundlingTests(novatestcase.NovaTestCase):
def test_000_setUp(self):
self.create_user('me')
self.create_user('you')
# TODO: create keypair for me
# upload smoketest img
# run instance
def test_001_me_can_download_credentials_within_instance(self):
conn = self.connect_ssh(data['my_private_ip'], 'mykey')
stdin, stdout = conn.exec_command(
'python ~/smoketests/install-credentials.py')
conn.close()
self.assertEqual(stdout, 'ok')
def test_002_me_can_rebundle_within_instance(self):
conn = self.connect_ssh(data['my_private_ip'], 'mykey')
stdin, stdout = conn.exec_command(
'python ~/smoketests/rebundle-instance.py')
conn.close()
self.assertEqual(stdout, 'ok')
def test_003_me_can_upload_image_within_instance(self):
conn = self.connect_ssh(data['my_private_ip'], 'mykey')
stdin, stdout = conn.exec_command(
'python ~/smoketests/upload-bundle.py')
conn.close()
self.assertEqual(stdout, 'ok')
def test_004_me_can_register_image_within_instance(self):
conn = self.connect_ssh(data['my_private_ip'], 'mykey')
stdin, stdout = conn.exec_command(
'python ~/smoketests/register-image.py')
conn.close()
if re.matches('ami-{\w+}', stdout):
data['my_image_id'] = stdout.strip()
else:
self.fail('expected ami-nnnnnn, got:\n ' + stdout)
def test_005_you_cannot_see_my_private_image(self):
conn = self.connection_for('you')
image = conn.get_image(data['my_image_id'])
self.assertEqual(image, None)
def test_006_me_can_make_image_public(self):
conn = self.connection_for(test_username)
conn.modify_image_attribute(image_id=data['my_image_id'],
operation='add',
attribute='launchPermission',
groups='all')
def test_007_you_can_see_my_public_image(self):
conn = self.connection_for('you')
image = conn.get_image(data['my_image_id'])
self.assertEqual(image.id, data['my_image_id'])
def test_999_tearDown(self):
self.delete_user('me')
self.delete_user('you')
#if data.has_key('image_id'):
# deregister rebundled image
# TODO: tear down instance
# delete keypairs
data = {}
# Test elastic IPs
class ElasticIPTests(novatestcase.NovaTestCase):
def test_000_setUp(self):
data['image_id'] = 'ami-tiny'
self.create_user('me')
conn = self.connection_for('me')
self.create_key_pair(conn, 'mykey')
conn = self.connection_for('admin')
#data['image_id'] = self.setUp_test_image(FLAGS.bundle_image)
def test_001_me_can_launch_image_with_keypair(self):
conn = self.connection_for('me')
reservation = conn.run_instances(data['image_id'], key_name='mykey')
self.assertEqual(len(reservation.instances), 1)
data['my_instance_id'] = reservation.instances[0].id
def test_002_me_can_allocate_elastic_ip(self):
conn = self.connection_for('me')
data['my_public_ip'] = conn.allocate_address()
self.assert_(data['my_public_ip'].public_ip)
def test_003_me_can_associate_ip_with_instance(self):
self.assertTrue(data['my_public_ip'].associate(data['my_instance_id']))
def test_004_me_can_ssh_with_public_ip(self):
conn = self.connect_ssh(data['my_public_ip'].public_ip, 'mykey')
conn.close()
def test_005_me_can_disassociate_ip_from_instance(self):
self.assertTrue(data['my_public_ip'].disassociate())
def test_006_me_can_deallocate_elastic_ip(self):
self.assertTrue(data['my_public_ip'].delete())
def test_999_tearDown(self):
conn = self.connection_for('me')
self.delete_key_pair(conn, 'mykey')
conn = self.connection_for('admin')
#self.tearDown_test_image(conn, data['image_id'])
data = {}
ZONE = 'nova'
DEVICE = 'vdb'
# Test iscsi volumes
class VolumeTests(novatestcase.NovaTestCase):
def test_000_setUp(self):
self.create_user(test_username)
data['image_id'] = 'ami-tiny' # A7370FE3
conn = self.connection_for(test_username)
self.create_key_pair(conn, test_key)
reservation = conn.run_instances(data['image_id'],
instance_type='m1.tiny',
key_name=test_key)
data['instance_id'] = reservation.instances[0].id
data['private_ip'] = reservation.instances[0].private_dns_name
# wait for instance to show up
for x in xrange(120):
# ping waits for 1 second
status, output = commands.getstatusoutput(
'ping -c1 -w1 %s' % data['private_ip'])
if status == 0:
break
else:
self.fail('unable to ping instance')
def test_001_me_can_create_volume(self):
conn = self.connection_for(test_username)
volume = conn.create_volume(1, ZONE)
self.assertEqual(volume.size, 1)
data['volume_id'] = volume.id
# give network time to find volume
time.sleep(5)
def test_002_me_can_attach_volume(self):
conn = self.connection_for(test_username)
conn.attach_volume(
volume_id = data['volume_id'],
instance_id = data['instance_id'],
device = '/dev/%s' % DEVICE
)
# give instance time to recognize volume
time.sleep(5)
def test_003_me_can_mount_volume(self):
conn = self.connect_ssh(data['private_ip'], test_key)
# FIXME(devcamcar): the tiny image doesn't create the node properly
# this will make /dev/vd* if it doesn't exist
stdin, stdout, stderr = conn.exec_command(
'grep %s /proc/partitions |' + \
'`awk \'{print "mknod /dev/"$4" b "$1" "$2}\'`' % DEVICE)
commands = []
commands.append('mkdir -p /mnt/vol')
commands.append('mkfs.ext2 /dev/%s' % DEVICE)
commands.append('mount /dev/%s /mnt/vol' % DEVICE)
commands.append('echo success')
stdin, stdout, stderr = conn.exec_command(' && '.join(commands))
out = stdout.read()
conn.close()
if not out.strip().endswith('success'):
self.fail('Unable to mount: %s %s' % (out, stderr.read()))
def test_004_me_can_write_to_volume(self):
conn = self.connect_ssh(data['private_ip'], test_key)
# FIXME(devcamcar): This doesn't fail if the volume hasn't been mounted
stdin, stdout, stderr = conn.exec_command(
'echo hello > /mnt/vol/test.txt')
err = stderr.read()
conn.close()
if len(err) > 0:
self.fail('Unable to write to mount: %s' % (err))
def test_005_volume_is_correct_size(self):
conn = self.connect_ssh(data['private_ip'], test_key)
stdin, stdout, stderr = conn.exec_command(
"df -h | grep %s | awk {'print $2'}" % DEVICE)
out = stdout.read()
conn.close()
if not out.strip() == '1007.9M':
self.fail('Volume is not the right size: %s %s' % (out, stderr.read()))
def test_006_me_can_umount_volume(self):
conn = self.connect_ssh(data['private_ip'], test_key)
stdin, stdout, stderr = conn.exec_command('umount /mnt/vol')
err = stderr.read()
conn.close()
if len(err) > 0:
self.fail('Unable to unmount: %s' % (err))
def test_007_me_can_detach_volume(self):
conn = self.connection_for(test_username)
self.assertTrue(conn.detach_volume(volume_id = data['volume_id']))
def test_008_me_can_delete_volume(self):
conn = self.connection_for(test_username)
self.assertTrue(conn.delete_volume(data['volume_id']))
def test_009_volume_size_must_be_int(self):
conn = self.connection_for(test_username)
self.assertRaises(Exception, conn.create_volume, 'foo', ZONE)
def test_999_tearDown(self):
global data
conn = self.connection_for(test_username)
self.delete_key_pair(conn, test_key)
if data.has_key('instance_id'):
conn.terminate_instances([data['instance_id']])
self.delete_user(test_username)
data = {}
def build_suites():
return {
'user': unittest.makeSuite(UserTests),
'image': unittest.makeSuite(ImageTests),
'security': unittest.makeSuite(SecurityTests),
'public_network': unittest.makeSuite(ElasticIPTests),
'volume': unittest.makeSuite(VolumeTests),
}
def main():
argv = FLAGS(sys.argv)
suites = build_suites()
if FLAGS.suite:
try:
suite = suites[FLAGS.suite]
except KeyError:
print >> sys.stderr, 'Available test suites:', SUITE_NAMES
return 1
unittest.TextTestRunner(verbosity=2).run(suite)
else:
for suite in suites.itervalues():
unittest.TextTestRunner(verbosity=2).run(suite)
if __name__ == "__main__":
sys.exit(main())