add scenario directory

this adds the scenario directory as part of the tempest
restructure, and moves scenario tests over to the new directory.

To ensure we got everything that used the old DefaultClient and
DefaultManager classes these are refactored out into the scenario
directory as OfficialClient and OfficialClientManager.

The Nework Smoke test base class becomes NetworkScenarioTest, and
the two test files that used that come over to scenario directory
in the process.

These are all done with a single tempest/scenario/manager.py file
for now. As the scenario tests grow we'll figure out better
refactorings there.

This gives us 4 scenario test files as a starting point for future
scenario work.

Fixed remaining refactor issues with network tests

Clean up the documentation on the extracted classes

Part of bp:tempest-repo-restructure

Change-Id: I143b282c58cfac3bf979ba5ac68226155beff343
This commit is contained in:
Sean Dague 2013-05-08 17:49:46 -04:00
parent 75c3409bfe
commit 6dbc6da416
11 changed files with 478 additions and 408 deletions

View File

@ -17,15 +17,6 @@
import logging import logging
# Default client libs
import glanceclient
import keystoneclient.v2_0.client
import novaclient.client
try:
import quantumclient.v2_0.client
except ImportError:
pass
import tempest.config import tempest.config
from tempest import exceptions from tempest import exceptions
# Tempest REST Fuzz testing client libs # Tempest REST Fuzz testing client libs
@ -86,121 +77,6 @@ class FuzzClientManager(Manager):
pass pass
class DefaultClientManager(Manager):
"""
Manager that provides the default clients to access the various
OpenStack APIs.
"""
NOVACLIENT_VERSION = '2'
def __init__(self):
super(DefaultClientManager, self).__init__()
self.compute_client = self._get_compute_client()
self.image_client = self._get_image_client()
self.identity_client = self._get_identity_client()
self.network_client = self._get_network_client()
self.client_attr_names = [
'compute_client',
'image_client',
'identity_client',
'network_client',
]
def _get_compute_client(self, username=None, password=None,
tenant_name=None):
# Novaclient will not execute operations for anyone but the
# identified user, so a new client needs to be created for
# each user that operations need to be performed for.
if not username:
username = self.config.identity.username
if not password:
password = self.config.identity.password
if not tenant_name:
tenant_name = self.config.identity.tenant_name
if None in (username, password, tenant_name):
msg = ("Missing required credentials for compute client. "
"username: %(username)s, password: %(password)s, "
"tenant_name: %(tenant_name)s") % locals()
raise exceptions.InvalidConfiguration(msg)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
client_args = (username, password, tenant_name, auth_url)
# Create our default Nova client to use in testing
service_type = self.config.compute.catalog_type
return novaclient.client.Client(self.NOVACLIENT_VERSION,
*client_args,
service_type=service_type,
no_cache=True,
insecure=dscv)
def _get_image_client(self):
keystone = self._get_identity_client()
token = keystone.auth_token
endpoint = keystone.service_catalog.url_for(service_type='image',
endpoint_type='publicURL')
dscv = self.config.identity.disable_ssl_certificate_validation
return glanceclient.Client('1', endpoint=endpoint, token=token,
insecure=dscv)
def _get_identity_client(self, username=None, password=None,
tenant_name=None):
# This identity client is not intended to check the security
# of the identity service, so use admin credentials by default.
if not username:
username = self.config.identity.admin_username
if not password:
password = self.config.identity.admin_password
if not tenant_name:
tenant_name = self.config.identity.admin_tenant_name
if None in (username, password, tenant_name):
msg = ("Missing required credentials for identity client. "
"username: %(username)s, password: %(password)s, "
"tenant_name: %(tenant_name)s") % locals()
raise exceptions.InvalidConfiguration(msg)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
return keystoneclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
def _get_network_client(self):
# The intended configuration is for the network client to have
# admin privileges and indicate for whom resources are being
# created via a 'tenant_id' parameter. This will often be
# preferable to authenticating as a specific user because
# working with certain resources (public routers and networks)
# often requires admin privileges anyway.
username = self.config.identity.admin_username
password = self.config.identity.admin_password
tenant_name = self.config.identity.admin_tenant_name
if None in (username, password, tenant_name):
msg = ("Missing required credentials for network client. "
"username: %(username)s, password: %(password)s, "
"tenant_name: %(tenant_name)s") % locals()
raise exceptions.InvalidConfiguration(msg)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
return quantumclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
class ComputeFuzzClientManager(FuzzClientManager): class ComputeFuzzClientManager(FuzzClientManager):
""" """

View File

@ -0,0 +1,45 @@
Tempest Guide to Scenario tests
========
What are these tests?
--------
Scenario tests are "through path" tests of OpenStack
function. Complicated setups where one part might depend on completion
of a previous part. They ideally involve the integration between
multiple OpenStack services to exercise the touch points between them.
An example would be: start with a blank environment, upload a glance
image, deploy a vm from it, ssh to the guest, make changes, capture
that vm's image back into glance as a snapshot, and launch a second vm
from that snapshot.
Why are these tests in tempest?
--------
This is one of tempests core purposes, testing the integration between
projects.
Scope of these tests
--------
Scenario tests should always test at least 2 services in
interaction. They should use the official python client libraries for
OpenStack, as they provide a more realistic approach in how people
will interact with the services.
TODO: once we have service tags, tests should be tagged with which
services they exercise.
Example of a good test
--------
While we are looking for interaction of 2 or more services, be
specific in your interactions. A giant "this is my data center" smoke
test is hard to debug when it goes wrong.
A flow of interactions between glance and nova, like in the
introduction, is a good example. Especially if it involves a repeated
interaction when a resource is setup, modified, detached, and then
reused later again.

View File

422
tempest/scenario/manager.py Normal file
View File

@ -0,0 +1,422 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import subprocess
# Default client libs
import glanceclient
import keystoneclient.v2_0.client
import netaddr
import novaclient.client
try:
# TODO(sdague): is there are reason this is still optional
from quantumclient.common import exceptions as exc
import quantumclient.v2_0.client
except ImportError:
pass
from tempest.common.utils.data_utils import rand_name
from tempest import exceptions
import tempest.manager
import tempest.test
from tempest.tests.network import common as net_common
LOG = logging.getLogger(__name__)
class OfficialClientManager(tempest.manager.Manager):
"""
Manager that provides access to the official python clients for
calling various OpenStack APIs.
"""
NOVACLIENT_VERSION = '2'
def __init__(self):
super(OfficialClientManager, self).__init__()
self.compute_client = self._get_compute_client()
self.image_client = self._get_image_client()
self.identity_client = self._get_identity_client()
self.network_client = self._get_network_client()
self.client_attr_names = [
'compute_client',
'image_client',
'identity_client',
'network_client',
]
def _get_compute_client(self, username=None, password=None,
tenant_name=None):
# Novaclient will not execute operations for anyone but the
# identified user, so a new client needs to be created for
# each user that operations need to be performed for.
if not username:
username = self.config.identity.username
if not password:
password = self.config.identity.password
if not tenant_name:
tenant_name = self.config.identity.tenant_name
if None in (username, password, tenant_name):
msg = ("Missing required credentials for compute client. "
"username: %(username)s, password: %(password)s, "
"tenant_name: %(tenant_name)s") % locals()
raise exceptions.InvalidConfiguration(msg)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
client_args = (username, password, tenant_name, auth_url)
# Create our default Nova client to use in testing
service_type = self.config.compute.catalog_type
return novaclient.client.Client(self.NOVACLIENT_VERSION,
*client_args,
service_type=service_type,
no_cache=True,
insecure=dscv)
def _get_image_client(self):
keystone = self._get_identity_client()
token = keystone.auth_token
endpoint = keystone.service_catalog.url_for(service_type='image',
endpoint_type='publicURL')
dscv = self.config.identity.disable_ssl_certificate_validation
return glanceclient.Client('1', endpoint=endpoint, token=token,
insecure=dscv)
def _get_identity_client(self, username=None, password=None,
tenant_name=None):
# This identity client is not intended to check the security
# of the identity service, so use admin credentials by default.
if not username:
username = self.config.identity.admin_username
if not password:
password = self.config.identity.admin_password
if not tenant_name:
tenant_name = self.config.identity.admin_tenant_name
if None in (username, password, tenant_name):
msg = ("Missing required credentials for identity client. "
"username: %(username)s, password: %(password)s, "
"tenant_name: %(tenant_name)s") % locals()
raise exceptions.InvalidConfiguration(msg)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
return keystoneclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
def _get_network_client(self):
# The intended configuration is for the network client to have
# admin privileges and indicate for whom resources are being
# created via a 'tenant_id' parameter. This will often be
# preferable to authenticating as a specific user because
# working with certain resources (public routers and networks)
# often requires admin privileges anyway.
username = self.config.identity.admin_username
password = self.config.identity.admin_password
tenant_name = self.config.identity.admin_tenant_name
if None in (username, password, tenant_name):
msg = ("Missing required credentials for network client. "
"username: %(username)s, password: %(password)s, "
"tenant_name: %(tenant_name)s") % locals()
raise exceptions.InvalidConfiguration(msg)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
return quantumclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
class OfficialClientTest(tempest.test.TestCase):
"""
Official Client test base class for scenario testing.
Official Client tests are tests that have the following characteristics:
* Test basic operations of an API, typically in an order that
a regular user would perform those operations
* Test only the correct inputs and action paths -- no fuzz or
random input data is sent, only valid inputs.
* Use only the default client tool for calling an API
"""
manager_class = OfficialClientManager
@classmethod
def tearDownClass(cls):
# NOTE(jaypipes): Because scenario tests are typically run in a
# specific order, and because test methods in scenario tests
# generally create resources in a particular order, we destroy
# resources in the reverse order in which resources are added to
# the scenario test class object
while cls.os_resources:
thing = cls.os_resources.pop()
LOG.debug("Deleting %r from shared resources of %s" %
(thing, cls.__name__))
try:
# OpenStack resources are assumed to have a delete()
# method which destroys the resource...
thing.delete()
except Exception as e:
# If the resource is already missing, mission accomplished.
if e.__class__.__name__ == 'NotFound':
continue
raise
def is_deletion_complete():
# Deletion testing is only required for objects whose
# existence cannot be checked via retrieval.
if isinstance(thing, dict):
return True
try:
thing.get()
except Exception as e:
# Clients are expected to return an exception
# called 'NotFound' if retrieval fails.
if e.__class__.__name__ == 'NotFound':
return True
raise
return False
# Block until resource deletion has completed or timed-out
tempest.test.call_until_true(is_deletion_complete, 10, 1)
class NetworkScenarioTest(OfficialClientTest):
"""
Base class for network scenario tests
"""
@classmethod
def check_preconditions(cls):
if (cls.config.network.quantum_available):
cls.enabled = True
#verify that quantum_available is telling the truth
try:
cls.network_client.list_networks()
except exc.EndpointNotFound:
cls.enabled = False
raise
else:
cls.enabled = False
msg = 'Quantum not available'
raise cls.skipException(msg)
@classmethod
def setUpClass(cls):
super(NetworkScenarioTest, cls).setUpClass()
cls.tenant_id = cls.manager._get_identity_client(
cls.config.identity.username,
cls.config.identity.password,
cls.config.identity.tenant_name).tenant_id
def _create_keypair(self, client, namestart='keypair-smoke-'):
kp_name = rand_name(namestart)
keypair = client.keypairs.create(kp_name)
try:
self.assertEqual(keypair.id, kp_name)
self.set_resource(kp_name, keypair)
except AttributeError:
self.fail("Keypair object not successfully created.")
return keypair
def _create_security_group(self, client, namestart='secgroup-smoke-'):
# Create security group
sg_name = rand_name(namestart)
sg_desc = sg_name + " description"
secgroup = client.security_groups.create(sg_name, sg_desc)
try:
self.assertEqual(secgroup.name, sg_name)
self.assertEqual(secgroup.description, sg_desc)
self.set_resource(sg_name, secgroup)
except AttributeError:
self.fail("SecurityGroup object not successfully created.")
# Add rules to the security group
rulesets = [
{
# ssh
'ip_protocol': 'tcp',
'from_port': 22,
'to_port': 22,
'cidr': '0.0.0.0/0',
'group_id': secgroup.id
},
{
# ping
'ip_protocol': 'icmp',
'from_port': -1,
'to_port': -1,
'cidr': '0.0.0.0/0',
'group_id': secgroup.id
}
]
for ruleset in rulesets:
try:
client.security_group_rules.create(secgroup.id, **ruleset)
except Exception:
self.fail("Failed to create rule in security group.")
return secgroup
def _create_network(self, tenant_id, namestart='network-smoke-'):
name = rand_name(namestart)
body = dict(
network=dict(
name=name,
tenant_id=tenant_id,
),
)
result = self.network_client.create_network(body=body)
network = net_common.DeletableNetwork(client=self.network_client,
**result['network'])
self.assertEqual(network.name, name)
self.set_resource(name, network)
return network
def _list_networks(self):
nets = self.network_client.list_networks()
return nets['networks']
def _list_subnets(self):
subnets = self.network_client.list_subnets()
return subnets['subnets']
def _list_routers(self):
routers = self.network_client.list_routers()
return routers['routers']
def _create_subnet(self, network, namestart='subnet-smoke-'):
"""
Create a subnet for the given network within the cidr block
configured for tenant networks.
"""
cfg = self.config.network
tenant_cidr = netaddr.IPNetwork(cfg.tenant_network_cidr)
result = None
# Repeatedly attempt subnet creation with sequential cidr
# blocks until an unallocated block is found.
for subnet_cidr in tenant_cidr.subnet(cfg.tenant_network_mask_bits):
body = dict(
subnet=dict(
ip_version=4,
network_id=network.id,
tenant_id=network.tenant_id,
cidr=str(subnet_cidr),
),
)
try:
result = self.network_client.create_subnet(body=body)
break
except exc.QuantumClientException as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
self.assertIsNotNone(result, 'Unable to allocate tenant network')
subnet = net_common.DeletableSubnet(client=self.network_client,
**result['subnet'])
self.assertEqual(subnet.cidr, str(subnet_cidr))
self.set_resource(rand_name(namestart), subnet)
return subnet
def _create_port(self, network, namestart='port-quotatest-'):
name = rand_name(namestart)
body = dict(
port=dict(name=name,
network_id=network.id,
tenant_id=network.tenant_id))
result = self.network_client.create_port(body=body)
self.assertIsNotNone(result, 'Unable to allocate port')
port = net_common.DeletablePort(client=self.network_client,
**result['port'])
self.set_resource(name, port)
return port
def _create_server(self, client, network, name, key_name, security_groups):
flavor_id = self.config.compute.flavor_ref
base_image_id = self.config.compute.image_ref
create_kwargs = {
'nics': [
{'net-id': network.id},
],
'key_name': key_name,
'security_groups': security_groups,
}
server = client.servers.create(name, base_image_id, flavor_id,
**create_kwargs)
try:
self.assertEqual(server.name, name)
self.set_resource(name, server)
except AttributeError:
self.fail("Server not successfully created.")
self.status_timeout(client.servers, server.id, 'ACTIVE')
# The instance retrieved on creation is missing network
# details, necessitating retrieval after it becomes active to
# ensure correct details.
server = client.servers.get(server.id)
self.set_resource(name, server)
return server
def _create_floating_ip(self, server, external_network_id):
result = self.network_client.list_ports(device_id=server.id)
ports = result.get('ports', [])
self.assertEqual(len(ports), 1,
"Unable to determine which port to target.")
port_id = ports[0]['id']
body = dict(
floatingip=dict(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=server.tenant_id,
)
)
result = self.network_client.create_floatingip(body=body)
floating_ip = net_common.DeletableFloatingIp(
client=self.network_client,
**result['floatingip'])
self.set_resource(rand_name('floatingip-'), floating_ip)
return floating_ip
def _ping_ip_address(self, ip_address):
cmd = ['ping', '-c1', '-w1', ip_address]
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.wait()
if proc.returncode == 0:
return True
# TODO(mnewby) Allow configuration of execution and sleep duration.
return tempest.test.call_until_true(ping, 20, 1)

View File

@ -17,11 +17,12 @@
# under the License. # under the License.
from tempest.common.utils.data_utils import rand_name from tempest.common.utils.data_utils import rand_name
from tempest.scenario import manager
from tempest.test import attr from tempest.test import attr
import tempest.tests.network.common as net_common from tempest.tests.network import common as net_common
class TestNetworkBasicOps(net_common.TestNetworkSmokeCommon): class TestNetworkBasicOps(manager.NetworkScenarioTest):
""" """
This smoke test suite assumes that Nova has been configured to This smoke test suite assumes that Nova has been configured to

View File

@ -16,12 +16,12 @@
# under the License. # under the License.
from quantumclient.common import exceptions as exc from quantumclient.common import exceptions as exc
from tempest.tests.network.common import TestNetworkSmokeCommon from tempest.scenario.manager import NetworkScenarioTest
MAX_REASONABLE_ITERATIONS = 51 # more than enough. Default for port is 50. MAX_REASONABLE_ITERATIONS = 51 # more than enough. Default for port is 50.
class TestNetworkQuotaBasic(TestNetworkSmokeCommon): class TestNetworkQuotaBasic(NetworkScenarioTest):
""" """
This test suite contains tests that each loop trying to grab a This test suite contains tests that each loop trying to grab a
particular resource until a quota limit is hit. particular resource until a quota limit is hit.

View File

@ -19,12 +19,12 @@ import logging
from tempest.common.utils.data_utils import rand_name from tempest.common.utils.data_utils import rand_name
from tempest import test from tempest.scenario import manager
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TestServerAdvancedOps(test.DefaultClientSmokeTest): class TestServerAdvancedOps(manager.OfficialClientTest):
""" """
This test case stresses some advanced server instance operations: This test case stresses some advanced server instance operations:

View File

@ -18,12 +18,12 @@
import logging import logging
from tempest.common.utils.data_utils import rand_name from tempest.common.utils.data_utils import rand_name
from tempest import test from tempest.scenario import manager
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TestServerBasicOps(test.DefaultClientSmokeTest): class TestServerBasicOps(manager.OfficialClientTest):
""" """
This smoke test case follows this basic set of operations: This smoke test case follows this basic set of operations:

View File

@ -146,63 +146,6 @@ class TestCase(BaseTestCase):
% (thing_id, expected_status)) % (thing_id, expected_status))
class DefaultClientSmokeTest(TestCase):
"""
Base smoke test case class that provides the default clients to
access the various OpenStack APIs.
Smoke tests are tests that have the following characteristics:
* Test basic operations of an API, typically in an order that
a regular user would perform those operations
* Test only the correct inputs and action paths -- no fuzz or
random input data is sent, only valid inputs.
* Use only the default client tool for calling an API
"""
manager_class = manager.DefaultClientManager
@classmethod
def tearDownClass(cls):
# NOTE(jaypipes): Because smoke tests are typically run in a specific
# order, and because test methods in smoke tests generally create
# resources in a particular order, we destroy resources in the reverse
# order in which resources are added to the smoke test class object
while cls.os_resources:
thing = cls.os_resources.pop()
LOG.debug("Deleting %r from shared resources of %s" %
(thing, cls.__name__))
try:
# OpenStack resources are assumed to have a delete()
# method which destroys the resource...
thing.delete()
except Exception as e:
# If the resource is already missing, mission accomplished.
if e.__class__.__name__ == 'NotFound':
continue
raise
def is_deletion_complete():
# Deletion testing is only required for objects whose
# existence cannot be checked via retrieval.
if isinstance(thing, dict):
return True
try:
thing.get()
except Exception as e:
# Clients are expected to return an exception
# called 'NotFound' if retrieval fails.
if e.__class__.__name__ == 'NotFound':
return True
raise
return False
# Block until resource deletion has completed or timed-out
call_until_true(is_deletion_complete, 10, 1)
class ComputeFuzzClientTest(TestCase): class ComputeFuzzClientTest(TestCase):
""" """

View File

@ -15,14 +15,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import subprocess
import netaddr
from quantumclient.common import exceptions as exc
from tempest.common.utils.data_utils import rand_name
from tempest import test
class AttributeDict(dict): class AttributeDict(dict):
@ -100,212 +92,3 @@ class DeletablePort(DeletableResource):
def delete(self): def delete(self):
self.client.delete_port(self.id) self.client.delete_port(self.id)
class TestNetworkSmokeCommon(test.DefaultClientSmokeTest):
"""
Base class for network smoke tests
"""
@classmethod
def check_preconditions(cls):
if (cls.config.network.quantum_available):
cls.enabled = True
#verify that quantum_available is telling the truth
try:
cls.network_client.list_networks()
except exc.EndpointNotFound:
cls.enabled = False
raise
else:
cls.enabled = False
msg = 'Quantum not available'
raise cls.skipException(msg)
@classmethod
def setUpClass(cls):
super(TestNetworkSmokeCommon, cls).setUpClass()
cls.tenant_id = cls.manager._get_identity_client(
cls.config.identity.username,
cls.config.identity.password,
cls.config.identity.tenant_name).tenant_id
def _create_keypair(self, client, namestart='keypair-smoke-'):
kp_name = rand_name(namestart)
keypair = client.keypairs.create(kp_name)
try:
self.assertEqual(keypair.id, kp_name)
self.set_resource(kp_name, keypair)
except AttributeError:
self.fail("Keypair object not successfully created.")
return keypair
def _create_security_group(self, client, namestart='secgroup-smoke-'):
# Create security group
sg_name = rand_name(namestart)
sg_desc = sg_name + " description"
secgroup = client.security_groups.create(sg_name, sg_desc)
try:
self.assertEqual(secgroup.name, sg_name)
self.assertEqual(secgroup.description, sg_desc)
self.set_resource(sg_name, secgroup)
except AttributeError:
self.fail("SecurityGroup object not successfully created.")
# Add rules to the security group
rulesets = [
{
# ssh
'ip_protocol': 'tcp',
'from_port': 22,
'to_port': 22,
'cidr': '0.0.0.0/0',
'group_id': secgroup.id
},
{
# ping
'ip_protocol': 'icmp',
'from_port': -1,
'to_port': -1,
'cidr': '0.0.0.0/0',
'group_id': secgroup.id
}
]
for ruleset in rulesets:
try:
client.security_group_rules.create(secgroup.id, **ruleset)
except Exception:
self.fail("Failed to create rule in security group.")
return secgroup
def _create_network(self, tenant_id, namestart='network-smoke-'):
name = rand_name(namestart)
body = dict(
network=dict(
name=name,
tenant_id=tenant_id,
),
)
result = self.network_client.create_network(body=body)
network = DeletableNetwork(client=self.network_client,
**result['network'])
self.assertEqual(network.name, name)
self.set_resource(name, network)
return network
def _list_networks(self):
nets = self.network_client.list_networks()
return nets['networks']
def _list_subnets(self):
subnets = self.network_client.list_subnets()
return subnets['subnets']
def _list_routers(self):
routers = self.network_client.list_routers()
return routers['routers']
def _create_subnet(self, network, namestart='subnet-smoke-'):
"""
Create a subnet for the given network within the cidr block
configured for tenant networks.
"""
cfg = self.config.network
tenant_cidr = netaddr.IPNetwork(cfg.tenant_network_cidr)
result = None
# Repeatedly attempt subnet creation with sequential cidr
# blocks until an unallocated block is found.
for subnet_cidr in tenant_cidr.subnet(cfg.tenant_network_mask_bits):
body = dict(
subnet=dict(
ip_version=4,
network_id=network.id,
tenant_id=network.tenant_id,
cidr=str(subnet_cidr),
),
)
try:
result = self.network_client.create_subnet(body=body)
break
except exc.QuantumClientException as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
self.assertIsNotNone(result, 'Unable to allocate tenant network')
subnet = DeletableSubnet(client=self.network_client,
**result['subnet'])
self.assertEqual(subnet.cidr, str(subnet_cidr))
self.set_resource(rand_name(namestart), subnet)
return subnet
def _create_port(self, network, namestart='port-quotatest-'):
name = rand_name(namestart)
body = dict(
port=dict(name=name,
network_id=network.id,
tenant_id=network.tenant_id))
result = self.network_client.create_port(body=body)
self.assertIsNotNone(result, 'Unable to allocate port')
port = DeletablePort(client=self.network_client,
**result['port'])
self.set_resource(name, port)
return port
def _create_server(self, client, network, name, key_name, security_groups):
flavor_id = self.config.compute.flavor_ref
base_image_id = self.config.compute.image_ref
create_kwargs = {
'nics': [
{'net-id': network.id},
],
'key_name': key_name,
'security_groups': security_groups,
}
server = client.servers.create(name, base_image_id, flavor_id,
**create_kwargs)
try:
self.assertEqual(server.name, name)
self.set_resource(name, server)
except AttributeError:
self.fail("Server not successfully created.")
self.status_timeout(client.servers, server.id, 'ACTIVE')
# The instance retrieved on creation is missing network
# details, necessitating retrieval after it becomes active to
# ensure correct details.
server = client.servers.get(server.id)
self.set_resource(name, server)
return server
def _create_floating_ip(self, server, external_network_id):
result = self.network_client.list_ports(device_id=server.id)
ports = result.get('ports', [])
self.assertEqual(len(ports), 1,
"Unable to determine which port to target.")
port_id = ports[0]['id']
body = dict(
floatingip=dict(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=server.tenant_id,
)
)
result = self.network_client.create_floatingip(body=body)
floating_ip = DeletableFloatingIp(client=self.network_client,
**result['floatingip'])
self.set_resource(rand_name('floatingip-'), floating_ip)
return floating_ip
def _ping_ip_address(self, ip_address):
cmd = ['ping', '-c1', '-w1', ip_address]
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.wait()
if proc.returncode == 0:
return True
# TODO(mnewby) Allow configuration of execution and sleep duration.
return test.call_until_true(ping, 20, 1)

View File

@ -20,7 +20,7 @@ setenv = VIRTUAL_ENV={envdir}
NOSE_OPENSTACK_SHOW_ELAPSED=1 NOSE_OPENSTACK_SHOW_ELAPSED=1
NOSE_OPENSTACK_STDOUT=1 NOSE_OPENSTACK_STDOUT=1
commands = commands =
nosetests --logging-format '%(asctime)-15s %(message)s' --with-xunit --xunit-file=nosetests-full.xml -sv tempest/tests tempest/cli nosetests --logging-format '%(asctime)-15s %(message)s' --with-xunit --xunit-file=nosetests-full.xml -sv tempest/tests tempest/scenario tempest/cli
[testenv:smoke] [testenv:smoke]
sitepackages = True sitepackages = True
@ -46,7 +46,7 @@ setenv = VIRTUAL_ENV={envdir}
NOSE_OPENSTACK_STDOUT=1 NOSE_OPENSTACK_STDOUT=1
commands = commands =
python -m tools/tempest_coverage -c start --combine python -m tools/tempest_coverage -c start --combine
nosetests --logging-format '%(asctime)-15s %(message)s' --with-xunit --xunit-file=nosetests-full.xml -sv tempest/tests tempest/cli nosetests --logging-format '%(asctime)-15s %(message)s' --with-xunit --xunit-file=nosetests-full.xml -sv tempest/tests tempest/scenario tempest/cli
python -m tools/tempest_coverage -c report --html python -m tools/tempest_coverage -c report --html
[testenv:pep8] [testenv:pep8]