1.Created tests package.

2.Moved sanity tests to tests package.
3.Implemented smoke tests, added them to tests package.
4.Implemented base.py, __init__.py modules for smoke tests.
5.Modified config.py, test.conf to enable smoke tests.

Change-Id: Id29f8c74284ba82fb93c4537ae29a3e770de0173
Note: smoke tests might not work while test.conf is not set up properly.

Implemented allocate floating ip, create volume, create flavor tests (which don`t work on local PC - I`ll test them on Chech lab).
Implemented test for fixed ips.
Implemented test image and test flavor creation (added args to base.test, modified images client). Doesn`t work so far (some issues with modules).
Added small image to /etc directory.

Change-Id: I8cb10fed65ad358a5aa5e47b01ac4d83b556359f

Add smoke tests for next objects:

* Create flavor
* added args to base.test, modified images client
* Added small image to /etc directory.

Change-Id: I8cb10fed65ad358a5aa5e47b01ac4d83b556359f

Conflicts:

	fuel/tests/smoke/base.py

Conflicts:

	fuel/tests/smoke/base.py
	fuel/tests/smoke/test_create_network.py
	fuel/tests/smoke/test_create_sec_group.py
This commit is contained in:
anaboikina 2013-06-19 12:04:38 +03:00
parent 452150945a
commit 599a8b1263
13 changed files with 235 additions and 26 deletions

BIN
etc/ostf_test_image.vdi Normal file

Binary file not shown.

View File

@ -11,7 +11,7 @@ catalog_type = identity
# environments that have self-signed SSL certs.
disable_ssl_certificate_validation = False
# URL for where to find the OpenStack Identity API endpoint (Keystone)
uri = http://172.18.164.38:5000/v2.0/
uri = http://172.18.194.41:5000/v2.0/
# URL for where to find the OpenStack V3 Identity API endpoint (Keystone)
#uri_v3 = http://127.0.0.1:5000/v3/
# Should typically be left as keystone unless you have a non-Keystone
@ -46,18 +46,6 @@ admin_tenant_name = admin
# This section contains configuration options used when executing tests
# against the OpenStack Compute API.
#One of the controller nodes
controller_node = 172.18.164.38
#Controller node user who able connect via ssh
controller_node_ssh_user = root
#Controller node ssh user's password
controller_node_ssh_password = r00tme
#The list of the services should be enabled
enabled_services=nova-cert, nova-consoleauth, nova-scheduler, nova-conductor, nova-compute, nova-compute
# Allows test cases to create/destroy tenants and users. This option
# enables isolated test cases and better parallel execution,
# but also requires that OpenStack Identity API admin credentials
@ -198,7 +186,7 @@ public_network_id =
public_router_id =
# Whether or not quantum is expected to be available
quantum_available = true
quantum_available = false
[volume]
# This section contains the configuration options used when executing tests

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import time
import urllib
@ -49,6 +50,54 @@ class ImagesClientJSON(RestClient):
post_body, self.headers)
return resp, body
def _image_meta_to_headers(self, fields):
headers = {}
fields_copy = copy.deepcopy(fields)
copy_from = fields_copy.pop('copy_from', None)
if copy_from is not None:
headers['x-glance-api-copy-from'] = copy_from
for key, value in fields_copy.pop('properties', {}).iteritems():
headers['x-image-meta-property-%s' % key] = str(value)
for key, value in fields_copy.pop('api', {}).iteritems():
headers['x-glance-api-property-%s' % key] = str(value)
for key, value in fields_copy.iteritems():
headers['x-image-meta-%s' % key] = str(value)
return headers
def _create_with_data(self, headers, data):
resp, body_iter = self.http.raw_request('POST', '/v1/images',
headers=headers, body=data)
self._error_checker('POST', '/v1/images', headers, data, resp,
body_iter)
body = json.loads(''.join([c for c in body_iter]))
return resp, body['image']
def create_image_by_file(self, name, container_format, disk_format, **kwargs):
"""
Create a test image based on located file.
"""
params = {
"name": name,
"container_format": container_format,
"disk_format": disk_format,
}
headers = {}
for option in ['is_public', 'location', 'properties',
'copy_from', 'min_ram']:
if option in kwargs:
params[option] = kwargs.get(option)
headers.update(self._image_meta_to_headers(params))
if 'data' in kwargs:
return self._create_with_data(headers, kwargs.get('data'))
resp, body = self.post('v1/images', None, headers)
body = json.loads(body)
return resp, body['image']
def list_images(self, params=None):
"""Returns a list of all images filtered by any parameters."""
url = 'images'

View File

@ -128,7 +128,6 @@ class TestCase(BaseTestCase):
self.os_resources.remove(thing)
del self.resource_keys[key]
def status_timeout(self, things, thing_id, expected_status):
"""
Given a thing and an expected status, do a loop, sleeping

View File

@ -3,10 +3,11 @@ import time
from fuel import clients
from fuel import exceptions
from fuel import clients
import fuel.test
from fuel.common import log as logging
from fuel.common.utils.data_utils import rand_name
from fuel.tests import smoke
import fuel.test
LOG = logging.getLogger(__name__)
@ -56,10 +57,40 @@ class BaseComputeTest(fuel.test.BaseTestCase):
if os.config.network.quantum_available:
cls.network_client = os.config.network
# TODO: Deal with exception:
# {"forbidden": {"message": "Policy doesn't allow compute_extension:flavormanage to be performed.", "code": 403}}
# cls.test_flavor = cls.create_test_flavor()
# # TODO: Resolve issue with paths (doesn`t work so far):
# path = fuel.test.__file__
# path = path[:path.find("fuel")]
# etc_path = path + "etc/"
# cls.test_image = os.images_client.create_image_by_file(
# name='test_image',
# container_format='bare',
# disk_format='vdi',
# is_public=True,
# location=etc_path+'ostf_test_image.vdi')
cls.servers = []
cls.servers_client_v3_auth = os.servers_client_v3_auth
@classmethod
def create_test_flavor(cls):
os = clients.AdminManager(interface=cls._interface)
admin_username = cls.config.compute_admin.username
admin_password = cls.config.compute_admin.password
admin_tenant = cls.config.compute_admin.tenant_name
token_client = os.token_client
token_client.auth(admin_username, admin_password, admin_tenant)
cls.flavors_client.create_flavor(name='test_flavor',
ram=256,
vcpus=4,
disk=4,
flavor_id=111111)
@classmethod
def _get_identity_admin_client(cls):
"""
@ -392,4 +423,4 @@ class FixedIPsBase(BaseComputeAdminTest):
cls.ip = ip['addr']
break
if cls.ip:
break
break

View File

@ -0,0 +1,59 @@
from fuel.common.utils.data_utils import rand_name
from fuel.test import attr
from fuel.tests.smoke import base
class FloatingIPsTestJSON(base.BaseComputeAdminTest):
_interface = 'json'
server_id = None
floating_ip = None
@classmethod
def setUpClass(cls):
super(FloatingIPsTestJSON, cls).setUpClass()
cls.client = cls.floating_ips_client
cls.servers_client = cls.servers_client
#Server creation
resp, server = cls.create_server(wait_until='ACTIVE')
cls.server_id = server['id']
resp, body = cls.servers_client.get_server(server['id'])
#Floating IP creation
resp, body = cls.client.create_floating_ip()
cls.floating_ip_id = body['id']
cls.floating_ip = body['ip']
#Generating a nonexistent floatingIP id
cls.floating_ip_ids = []
resp, body = cls.client.list_floating_ips()
for i in range(len(body)):
cls.floating_ip_ids.append(body[i]['id'])
while True:
cls.non_exist_id = rand_name('999')
if cls.non_exist_id not in cls.floating_ip_ids:
break
@attr(type=['fuel', 'smoke'])
def test_allocate_floating_ip(self):
# Positive test:Allocation of a new floating IP to a project
# should be successful
resp, ip_body = self.client.create_floating_ip()
self.assertEqual(200, resp.status)
floating_ip_id_allocated = ip_body['id']
resp, floating_ip_details = \
self.client.get_floating_ip_details(floating_ip_id_allocated)
#Checking if the details of allocated IP is in list of floating IP
resp, body = self.client.list_floating_ips()
self.assertTrue(floating_ip_details in body)
resp, body = self.client.associate_floating_ip_to_server(floating_ip_details['ip'], self.server_id)
self.assertTrue(200, resp.status)
resp, body = self.client.disassociate_floating_ip_from_server(floating_ip_details['ip'], self.server_id)
self.assertTrue(200, resp.status)
#Deleting the floating IP which is created in this method
self.client.delete_floating_ip(floating_ip_id_allocated)
self.assertTrue(200, resp.status)

View File

@ -3,6 +3,7 @@ from fuel.common.utils import data_utils
from fuel.test import attr
from fuel.tests.smoke import base
""" Test module contains tests for flavor creation/deletion. """
class FlavorsAdminTest(base.BaseComputeAdminTest):
@ -27,7 +28,6 @@ class FlavorsAdminTest(base.BaseComputeAdminTest):
cls.swap = 256
cls.rxtx = 2
@attr(type=["fuel", "smoke"])
def test_create_flavor(self):
"""
@ -46,7 +46,7 @@ class FlavorsAdminTest(base.BaseComputeAdminTest):
ephemeral=self.ephemeral,
swap=self.swap,
rxtx=self.rxtx)
self.flavor = flavor
self.assertEqual(200, resp.status)
self.assertEqual(flavor['name'], flavor_name)
self.assertEqual(flavor['vcpus'], self.vcpus)
@ -57,7 +57,7 @@ class FlavorsAdminTest(base.BaseComputeAdminTest):
self.assertEqual(flavor['rxtx_factor'], self.rxtx)
self.assertEqual(flavor['OS-FLV-EXT-DATA:ephemeral'],
self.ephemeral)
#Verify flavor is retrieved
resp, flavor = self.client.get_flavor_details(new_flavor_id)
self.assertEqual(resp.status, 200)

View File

@ -1,4 +1,3 @@
from fuel.common.utils import data_utils
from fuel.test import attr
from fuel.tests.smoke import base
@ -12,10 +11,12 @@ class KeyPairsTestJSON(base.BaseComputeTest):
super(KeyPairsTestJSON, cls).setUpClass()
cls.client = cls.keypairs_client
@attr(type=['fuel', 'smoke'])
def test_keypair_create_delete(self):
""" Test keypair creation and deletion. """
k_name = data_utils.rand_name('keypair-')
resp, keypair = self.client.create_keypair(k_name)
self.assertEqual(200, resp.status)
private_key = keypair['private_key']
@ -27,6 +28,7 @@ class KeyPairsTestJSON(base.BaseComputeTest):
"Field private_key is empty or not found.")
resp, _ = self.client.delete_keypair(k_name)
self.assertEqual(202, resp.status)
# self.assertEqual(202, self.image)
# TODO: add teardown for this test.

View File

@ -5,7 +5,6 @@ from fuel.test import attr
from fuel.tests.smoke import base
"""
Test module for network creation.

View File

@ -23,11 +23,12 @@ class SecurityGroupsTest(base.BaseComputeTest):
# Security Group should be created, verified and deleted
s_name = data_utils.rand_name('securitygroup-')
s_description = data_utils.rand_name('description-')
resp, securitygroup = \
self.client.create_security_group(s_name, s_description)
self.assertTrue('id' in securitygroup)
securitygroup_id = securitygroup['id']
self.id = securitygroup_id
self.addCleanup(self._delete_security_group,
securitygroup_id)
self.assertEqual(200, resp.status)
@ -35,6 +36,5 @@ class SecurityGroupsTest(base.BaseComputeTest):
self.assertTrue('name' in securitygroup)
securitygroup_name = securitygroup['name']
self.assertEqual(securitygroup_name, s_name,
"The created Security Group name is "
"not equal to the requested name")
"The created Security Group name is not equal to the requested name")

View File

@ -0,0 +1,57 @@
from fuel.common.utils.data_utils import rand_name
from fuel.test import attr
from fuel.tests.smoke import base
class VolumesGetTestJSON(base.BaseComputeAdminTest):
"""
Test class contains volume actions checks.
"""
_interface = 'json'
@classmethod
def setUpClass(cls):
super(VolumesGetTestJSON, cls).setUpClass()
cls.server = cls.create_server(wait_until='ACTIVE')[1]
cls.client = cls.volumes_client
cls.snapshot_client = cls.snapshots_client
@attr(type=['fuel', 'smoke'])
def test_volume_create_attach_delete(self):
"""
Test contains the following steps:
- create volume;
- attach it to a new instance;
- delete volume;
"""
volume = None
v_name = rand_name('Volume-%s-') % self._interface
metadata = {'Type': 'work'}
#Create volume
resp, volume = self.client.create_volume(size=1,
display_name=v_name,
metadata=metadata)
self.assertEqual(200, resp.status)
self.assertTrue('id' in volume)
# Wait for Volume status to become ACTIVE
self.client.wait_for_volume_status(volume['id'], 'available')
#GET Volume
resp, fetched_volume = self.client.get_volume(volume['id'])
self.assertEqual(200, resp.status)
resp, body = self.client.attach_volume(volume, self.server)
self.assertEqual(200, resp.status)
if volume:
#Delete the Volume created in this method
resp, _ = self.client.delete_volume(volume['id'])
self.assertEqual(202, resp.status)
#Checking if the deleted Volume still exists
self.client.wait_for_resource_deletion(volume['id'])

View File

@ -0,0 +1,25 @@
from fuel.test import attr
from fuel.tests.smoke import base
class FixedIPsTestJson(base.FixedIPsBase):
"""
Test class contains the following verifications:
- fixed ip reservation test;
- fixed ip unreservation test.
"""
_interface = 'json'
@attr(type=['fuel', 'smoke'])
def test_set_reserve(self):
body = {"reserve": "None"}
resp, body = self.client.reserve_fixed_ip(self.ip, body)
self.assertEqual(resp.status, 202)
@attr(type=['fuel', 'smoke'])
def test_set_unreserve(self):
body = {"unreserve": "None"}
resp, body = self.client.reserve_fixed_ip(self.ip, body)
self.assertEqual(resp.status, 202)

View File

@ -48,7 +48,7 @@ class TestUserTenantRole(base.BaseIdentityAdminTest):
client = requests.session()
url = self.config.identity.url
# Retrieve the CSRF token first
# Retrieve the CSRF token first
client.get(url) # sets cookie
if len(client.cookies) == 0:
login_data = dict(username=user['name'],