Removing remaining Tempest bits

All the Neutron Stadium projects are importing Tempest code from the
neutron-tempest-plugin repo. We can now safely remove the remaining
bits of Tempest code.

Depends-On: I34efe893b93240c4689de1dda978209926022dfd
Change-Id: I2cf6e1f60926f36384bf056066bcdd4e0301494f
changes/31/536931/2
Miguel Lavalle 5 years ago
parent bfefbaf1c7
commit 3d0f9f3f4f
  1. 0
      neutron/tests/tempest/__init__.py
  2. 0
      neutron/tests/tempest/api/__init__.py
  3. 834
      neutron/tests/tempest/api/base.py
  4. 91
      neutron/tests/tempest/api/clients.py
  5. 0
      neutron/tests/tempest/common/__init__.py
  6. 24
      neutron/tests/tempest/common/ssh.py
  7. 21
      neutron/tests/tempest/common/tempest_fixtures.py
  8. 30
      neutron/tests/tempest/exceptions.py
  9. 0
      neutron/tests/tempest/scenario/__init__.py
  10. 317
      neutron/tests/tempest/scenario/base.py
  11. 18
      neutron/tests/tempest/scenario/constants.py
  12. 33
      neutron/tests/tempest/scenario/exceptions.py
  13. 0
      neutron/tests/tempest/services/__init__.py
  14. 0
      neutron/tests/tempest/services/network/__init__.py
  15. 0
      neutron/tests/tempest/services/network/json/__init__.py
  16. 974
      neutron/tests/tempest/services/network/json/network_client.py

@ -1,834 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import math
import netaddr
from neutron_lib import constants as const
from tempest.common import utils as tutils
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron.common import constants
from neutron.common import utils
from neutron.tests.tempest.api import clients
from neutron.tests.tempest import config
from neutron.tests.tempest import exceptions
CONF = config.CONF
class BaseNetworkTest(test.BaseTestCase):
"""
Base class for the Neutron tests that use the Tempest Neutron REST client
Per the Neutron API Guide, API v1.x was removed from the source code tree
(docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
following options are defined in the [network] section of etc/tempest.conf:
project_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant networks
project_network_mask_bits with the mask bits to be used to partition
the block defined by tenant-network_cidr
Finally, it is assumed that the following option is defined in the
[service_available] section of etc/tempest.conf
neutron as True
"""
force_tenant_isolation = False
credentials = ['primary']
# Default to ipv4.
_ip_version = 4
@classmethod
def get_client_manager(cls, credential_type=None, roles=None,
force_new=None):
manager = super(BaseNetworkTest, cls).get_client_manager(
credential_type=credential_type,
roles=roles,
force_new=force_new
)
# Neutron uses a different clients manager than the one in the Tempest
return clients.Manager(manager.credentials)
@classmethod
def skip_checks(cls):
super(BaseNetworkTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException("Neutron support is required")
if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
raise cls.skipException("IPv6 Tests are disabled.")
for req_ext in getattr(cls, 'required_extensions', []):
if not tutils.is_extension_enabled(req_ext, 'network'):
msg = "%s extension not enabled." % req_ext
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
# Create no network resources for these test.
cls.set_network_resources()
super(BaseNetworkTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(BaseNetworkTest, cls).setup_clients()
cls.client = cls.os_primary.network_client
@classmethod
def resource_setup(cls):
super(BaseNetworkTest, cls).resource_setup()
cls.networks = []
cls.admin_networks = []
cls.subnets = []
cls.admin_subnets = []
cls.ports = []
cls.routers = []
cls.floating_ips = []
cls.metering_labels = []
cls.service_profiles = []
cls.flavors = []
cls.metering_label_rules = []
cls.qos_rules = []
cls.qos_policies = []
cls.ethertype = "IPv" + str(cls._ip_version)
cls.address_scopes = []
cls.admin_address_scopes = []
cls.subnetpools = []
cls.admin_subnetpools = []
cls.security_groups = []
cls.projects = []
@classmethod
def resource_cleanup(cls):
if CONF.service_available.neutron:
# Clean up floating IPs
for floating_ip in cls.floating_ips:
cls._try_delete_resource(cls.client.delete_floatingip,
floating_ip['id'])
# Clean up routers
for router in cls.routers:
cls._try_delete_resource(cls.delete_router,
router)
# Clean up metering label rules
for metering_label_rule in cls.metering_label_rules:
cls._try_delete_resource(
cls.admin_client.delete_metering_label_rule,
metering_label_rule['id'])
# Clean up metering labels
for metering_label in cls.metering_labels:
cls._try_delete_resource(
cls.admin_client.delete_metering_label,
metering_label['id'])
# Clean up flavors
for flavor in cls.flavors:
cls._try_delete_resource(
cls.admin_client.delete_flavor,
flavor['id'])
# Clean up service profiles
for service_profile in cls.service_profiles:
cls._try_delete_resource(
cls.admin_client.delete_service_profile,
service_profile['id'])
# Clean up ports
for port in cls.ports:
cls._try_delete_resource(cls.client.delete_port,
port['id'])
# Clean up subnets
for subnet in cls.subnets:
cls._try_delete_resource(cls.client.delete_subnet,
subnet['id'])
# Clean up admin subnets
for subnet in cls.admin_subnets:
cls._try_delete_resource(cls.admin_client.delete_subnet,
subnet['id'])
# Clean up networks
for network in cls.networks:
cls._try_delete_resource(cls.client.delete_network,
network['id'])
# Clean up admin networks
for network in cls.admin_networks:
cls._try_delete_resource(cls.admin_client.delete_network,
network['id'])
# Clean up security groups
for secgroup in cls.security_groups:
cls._try_delete_resource(cls.client.delete_security_group,
secgroup['id'])
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
subnetpool['id'])
for subnetpool in cls.admin_subnetpools:
cls._try_delete_resource(cls.admin_client.delete_subnetpool,
subnetpool['id'])
for address_scope in cls.address_scopes:
cls._try_delete_resource(cls.client.delete_address_scope,
address_scope['id'])
for address_scope in cls.admin_address_scopes:
cls._try_delete_resource(
cls.admin_client.delete_address_scope,
address_scope['id'])
for project in cls.projects:
cls._try_delete_resource(
cls.identity_admin_client.delete_project,
project['id'])
# Clean up QoS rules
for qos_rule in cls.qos_rules:
cls._try_delete_resource(cls.admin_client.delete_qos_rule,
qos_rule['id'])
# Clean up QoS policies
# as all networks and ports are already removed, QoS policies
# shouldn't be "in use"
for qos_policy in cls.qos_policies:
cls._try_delete_resource(cls.admin_client.delete_qos_policy,
qos_policy['id'])
super(BaseNetworkTest, cls).resource_cleanup()
@classmethod
def _try_delete_resource(cls, delete_callable, *args, **kwargs):
"""Cleanup resources in case of test-failure
Some resources are explicitly deleted by the test.
If the test failed to delete a resource, this method will execute
the appropriate delete methods. Otherwise, the method ignores NotFound
exceptions thrown for resources that were correctly deleted by the
test.
:param delete_callable: delete method
:param args: arguments for delete method
:param kwargs: keyword arguments for delete method
"""
try:
delete_callable(*args, **kwargs)
# if resource is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
@classmethod
def create_network(cls, network_name=None, client=None, **kwargs):
"""Wrapper utility that returns a test network."""
network_name = network_name or data_utils.rand_name('test-network-')
client = client or cls.client
body = client.create_network(name=network_name, **kwargs)
network = body['network']
if client is cls.client:
cls.networks.append(network)
else:
cls.admin_networks.append(network)
return network
@classmethod
def create_shared_network(cls, network_name=None, **post_body):
network_name = network_name or data_utils.rand_name('sharednetwork-')
post_body.update({'name': network_name, 'shared': True})
body = cls.admin_client.create_network(**post_body)
network = body['network']
cls.admin_networks.append(network)
return network
@classmethod
def create_network_keystone_v3(cls, network_name=None, project_id=None,
tenant_id=None, client=None):
"""Wrapper utility that creates a test network with project_id."""
client = client or cls.client
network_name = network_name or data_utils.rand_name(
'test-network-with-project_id')
project_id = cls.client.tenant_id
body = client.create_network_keystone_v3(network_name, project_id,
tenant_id)
network = body['network']
if client is cls.client:
cls.networks.append(network)
else:
cls.admin_networks.append(network)
return network
@classmethod
def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
ip_version=None, client=None, **kwargs):
"""Wrapper utility that returns a test subnet."""
# allow tests to use admin client
if not client:
client = cls.client
# The cidr and mask_bits depend on the ip version.
ip_version = ip_version if ip_version is not None else cls._ip_version
gateway_not_set = gateway == ''
if ip_version == 4:
cidr = cidr or netaddr.IPNetwork(
config.safe_get_config_value(
'network', 'project_network_cidr'))
mask_bits = (
mask_bits or config.safe_get_config_value(
'network', 'project_network_mask_bits'))
elif ip_version == 6:
cidr = (
cidr or netaddr.IPNetwork(
config.safe_get_config_value(
'network', 'project_network_v6_cidr')))
mask_bits = (
mask_bits or config.safe_get_config_value(
'network', 'project_network_v6_mask_bits'))
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
if gateway_not_set:
gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
else:
gateway_ip = gateway
try:
body = client.create_subnet(
network_id=network['id'],
cidr=str(subnet_cidr),
ip_version=ip_version,
gateway_ip=gateway_ip,
**kwargs)
break
except lib_exc.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
else:
message = 'Available CIDR for subnet creation could not be found'
raise ValueError(message)
subnet = body['subnet']
if client is cls.client:
cls.subnets.append(subnet)
else:
cls.admin_subnets.append(subnet)
return subnet
@classmethod
def create_port(cls, network, **kwargs):
"""Wrapper utility that returns a test port."""
body = cls.client.create_port(network_id=network['id'],
**kwargs)
port = body['port']
cls.ports.append(port)
return port
@classmethod
def update_port(cls, port, **kwargs):
"""Wrapper utility that updates a test port."""
body = cls.client.update_port(port['id'],
**kwargs)
return body['port']
@classmethod
def _create_router_with_client(
cls, client, router_name=None, admin_state_up=False,
external_network_id=None, enable_snat=None, **kwargs
):
ext_gw_info = {}
if external_network_id:
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
body = client.create_router(
router_name, external_gateway_info=ext_gw_info,
admin_state_up=admin_state_up, **kwargs)
router = body['router']
cls.routers.append(router)
return router
@classmethod
def create_router(cls, *args, **kwargs):
return cls._create_router_with_client(cls.client, *args, **kwargs)
@classmethod
def create_admin_router(cls, *args, **kwargs):
return cls._create_router_with_client(cls.os_admin.network_client,
*args, **kwargs)
@classmethod
def create_floatingip(cls, external_network_id):
"""Wrapper utility that returns a test floating IP."""
body = cls.client.create_floatingip(
floating_network_id=external_network_id)
fip = body['floatingip']
cls.floating_ips.append(fip)
return fip
@classmethod
def create_router_interface(cls, router_id, subnet_id):
"""Wrapper utility that returns a router interface."""
interface = cls.client.add_router_interface_with_subnet_id(
router_id, subnet_id)
return interface
@classmethod
def get_supported_qos_rule_types(cls):
body = cls.client.list_qos_rule_types()
return [rule_type['type'] for rule_type in body['rule_types']]
@classmethod
def create_qos_policy(cls, name, description=None, shared=False,
tenant_id=None, is_default=False):
"""Wrapper utility that returns a test QoS policy."""
body = cls.admin_client.create_qos_policy(
name, description, shared, tenant_id, is_default)
qos_policy = body['policy']
cls.qos_policies.append(qos_policy)
return qos_policy
@classmethod
def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
max_burst_kbps,
direction=const.EGRESS_DIRECTION):
"""Wrapper utility that returns a test QoS bandwidth limit rule."""
body = cls.admin_client.create_bandwidth_limit_rule(
policy_id, max_kbps, max_burst_kbps, direction)
qos_rule = body['bandwidth_limit_rule']
cls.qos_rules.append(qos_rule)
return qos_rule
@classmethod
def delete_router(cls, router):
body = cls.client.list_router_interfaces(router['id'])
interfaces = [port for port in body['ports']
if port['device_owner'] in const.ROUTER_INTERFACE_OWNERS]
for i in interfaces:
try:
cls.client.remove_router_interface_with_subnet_id(
router['id'], i['fixed_ips'][0]['subnet_id'])
except lib_exc.NotFound:
pass
cls.client.delete_router(router['id'])
@classmethod
def create_address_scope(cls, name, is_admin=False, **kwargs):
if is_admin:
body = cls.admin_client.create_address_scope(name=name, **kwargs)
cls.admin_address_scopes.append(body['address_scope'])
else:
body = cls.client.create_address_scope(name=name, **kwargs)
cls.address_scopes.append(body['address_scope'])
return body['address_scope']
@classmethod
def create_subnetpool(cls, name, is_admin=False, **kwargs):
if is_admin:
body = cls.admin_client.create_subnetpool(name, **kwargs)
cls.admin_subnetpools.append(body['subnetpool'])
else:
body = cls.client.create_subnetpool(name, **kwargs)
cls.subnetpools.append(body['subnetpool'])
return body['subnetpool']
@classmethod
def create_project(cls, name=None, description=None):
test_project = name or data_utils.rand_name('test_project_')
test_description = description or data_utils.rand_name('desc_')
project = cls.identity_admin_client.create_project(
name=test_project,
description=test_description)['project']
cls.projects.append(project)
return project
@classmethod
def create_security_group(cls, name, **kwargs):
body = cls.client.create_security_group(name=name, **kwargs)
cls.security_groups.append(body['security_group'])
return body['security_group']
class BaseAdminNetworkTest(BaseNetworkTest):
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseAdminNetworkTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
cls.identity_admin_client = cls.os_admin.projects_client
@classmethod
def create_metering_label(cls, name, description):
"""Wrapper utility that returns a test metering label."""
body = cls.admin_client.create_metering_label(
description=description,
name=data_utils.rand_name("metering-label"))
metering_label = body['metering_label']
cls.metering_labels.append(metering_label)
return metering_label
@classmethod
def create_metering_label_rule(cls, remote_ip_prefix, direction,
metering_label_id):
"""Wrapper utility that returns a test metering label rule."""
body = cls.admin_client.create_metering_label_rule(
remote_ip_prefix=remote_ip_prefix, direction=direction,
metering_label_id=metering_label_id)
metering_label_rule = body['metering_label_rule']
cls.metering_label_rules.append(metering_label_rule)
return metering_label_rule
@classmethod
def create_flavor(cls, name, description, service_type):
"""Wrapper utility that returns a test flavor."""
body = cls.admin_client.create_flavor(
description=description, service_type=service_type,
name=name)
flavor = body['flavor']
cls.flavors.append(flavor)
return flavor
@classmethod
def create_service_profile(cls, description, metainfo, driver):
"""Wrapper utility that returns a test service profile."""
body = cls.admin_client.create_service_profile(
driver=driver, metainfo=metainfo, description=description)
service_profile = body['service_profile']
cls.service_profiles.append(service_profile)
return service_profile
@classmethod
def get_unused_ip(cls, net_id, ip_version=None):
"""Get an unused ip address in a allocation pool of net"""
body = cls.admin_client.list_ports(network_id=net_id)
ports = body['ports']
used_ips = []
for port in ports:
used_ips.extend(
[fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
body = cls.admin_client.list_subnets(network_id=net_id)
subnets = body['subnets']
for subnet in subnets:
if ip_version and subnet['ip_version'] != ip_version:
continue
cidr = subnet['cidr']
allocation_pools = subnet['allocation_pools']
iterators = []
if allocation_pools:
for allocation_pool in allocation_pools:
iterators.append(netaddr.iter_iprange(
allocation_pool['start'], allocation_pool['end']))
else:
net = netaddr.IPNetwork(cidr)
def _iterip():
for ip in net:
if ip not in (net.network, net.broadcast):
yield ip
iterators.append(iter(_iterip()))
for iterator in iterators:
for ip in iterator:
if str(ip) not in used_ips:
return str(ip)
message = (
"net(%s) has no usable IP address in allocation pools" % net_id)
raise exceptions.InvalidConfiguration(message)
def require_qos_rule_type(rule_type):
def decorator(f):
@functools.wraps(f)
def wrapper(self, *func_args, **func_kwargs):
if rule_type not in self.get_supported_qos_rule_types():
raise self.skipException(
"%s rule type is required." % rule_type)
return f(self, *func_args, **func_kwargs)
return wrapper
return decorator
def _require_sorting(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not tutils.is_extension_enabled("sorting", "network"):
self.skipTest('Sorting feature is required')
return f(self, *args, **kwargs)
return inner
def _require_pagination(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not tutils.is_extension_enabled("pagination", "network"):
self.skipTest('Pagination feature is required')
return f(self, *args, **kwargs)
return inner
class BaseSearchCriteriaTest(BaseNetworkTest):
# This should be defined by subclasses to reflect resource name to test
resource = None
field = 'name'
# NOTE(ihrachys): some names, like those starting with an underscore (_)
# are sorted differently depending on whether the plugin implements native
# sorting support, or not. So we avoid any such cases here, sticking to
# alphanumeric. Also test a case when there are multiple resources with the
# same name
resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
force_tenant_isolation = True
list_kwargs = {}
list_as_admin = False
def assertSameOrder(self, original, actual):
# gracefully handle iterators passed
original = list(original)
actual = list(actual)
self.assertEqual(len(original), len(actual))
for expected, res in zip(original, actual):
self.assertEqual(expected[self.field], res[self.field])
@utils.classproperty
def plural_name(self):
return '%ss' % self.resource
@property
def list_client(self):
return self.admin_client if self.list_as_admin else self.client
def list_method(self, *args, **kwargs):
method = getattr(self.list_client, 'list_%s' % self.plural_name)
kwargs.update(self.list_kwargs)
return method(*args, **kwargs)
def get_bare_url(self, url):
base_url = self.client.base_url
self.assertTrue(url.startswith(base_url))
return url[len(base_url):]
@classmethod
def _extract_resources(cls, body):
return body[cls.plural_name]
def _test_list_sorts(self, direction):
sort_args = {
'sort_dir': direction,
'sort_key': self.field
}
body = self.list_method(**sort_args)
resources = self._extract_resources(body)
self.assertNotEmpty(
resources, "%s list returned is empty" % self.resource)
retrieved_names = [res[self.field] for res in resources]
expected = sorted(retrieved_names)
if direction == constants.SORT_DIRECTION_DESC:
expected = list(reversed(expected))
self.assertEqual(expected, retrieved_names)
@_require_sorting
def _test_list_sorts_asc(self):
self._test_list_sorts(constants.SORT_DIRECTION_ASC)
@_require_sorting
def _test_list_sorts_desc(self):
self._test_list_sorts(constants.SORT_DIRECTION_DESC)
@_require_pagination
def _test_list_pagination(self):
for limit in range(1, len(self.resource_names) + 1):
pagination_args = {
'limit': limit,
}
body = self.list_method(**pagination_args)
resources = self._extract_resources(body)
self.assertEqual(limit, len(resources))
@_require_pagination
def _test_list_no_pagination_limit_0(self):
pagination_args = {
'limit': 0,
}
body = self.list_method(**pagination_args)
resources = self._extract_resources(body)
self.assertGreaterEqual(len(resources), len(self.resource_names))
def _test_list_pagination_iteratively(self, lister):
# first, collect all resources for later comparison
sort_args = {
'sort_dir': constants.SORT_DIRECTION_ASC,
'sort_key': self.field
}
body = self.list_method(**sort_args)
expected_resources = self._extract_resources(body)
self.assertNotEmpty(expected_resources)
resources = lister(
len(expected_resources), sort_args
)
# finally, compare that the list retrieved in one go is identical to
# the one containing pagination results
self.assertSameOrder(expected_resources, resources)
def _list_all_with_marker(self, niterations, sort_args):
# paginate resources one by one, using last fetched resource as a
# marker
resources = []
for i in range(niterations):
pagination_args = sort_args.copy()
pagination_args['limit'] = 1
if resources:
pagination_args['marker'] = resources[-1]['id']
body = self.list_method(**pagination_args)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources.extend(resources_)
return resources
@_require_pagination
@_require_sorting
def _test_list_pagination_with_marker(self):
self._test_list_pagination_iteratively(self._list_all_with_marker)
def _list_all_with_hrefs(self, niterations, sort_args):
# paginate resources one by one, using next href links
resources = []
prev_links = {}
for i in range(niterations):
if prev_links:
uri = self.get_bare_url(prev_links['next'])
else:
sort_args.update(self.list_kwargs)
uri = self.list_client.build_uri(
self.plural_name, limit=1, **sort_args)
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources.extend(resources_)
# The last element is empty and does not contain 'next' link
uri = self.get_bare_url(prev_links['next'])
prev_links, body = self.client.get_uri_with_links(
self.plural_name, uri
)
self.assertNotIn('next', prev_links)
# Now walk backwards and compare results
resources2 = []
for i in range(niterations):
uri = self.get_bare_url(prev_links['previous'])
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources2.extend(resources_)
self.assertSameOrder(resources, reversed(resources2))
return resources
@_require_pagination
@_require_sorting
def _test_list_pagination_with_href_links(self):
self._test_list_pagination_iteratively(self._list_all_with_hrefs)
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_with_href_links(
self, direction=constants.SORT_DIRECTION_ASC):
pagination_args = {
'sort_dir': direction,
'sort_key': self.field,
}
body = self.list_method(**pagination_args)
expected_resources = self._extract_resources(body)
page_size = 2
pagination_args['limit'] = page_size
prev_links = {}
resources = []
num_resources = len(expected_resources)
niterations = int(math.ceil(float(num_resources) / page_size))
for i in range(niterations):
if prev_links:
uri = self.get_bare_url(prev_links['previous'])
else:
pagination_args.update(self.list_kwargs)
uri = self.list_client.build_uri(
self.plural_name, page_reverse=True, **pagination_args)
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertGreaterEqual(page_size, len(resources_))
resources.extend(reversed(resources_))
self.assertSameOrder(expected_resources, reversed(resources))
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_asc(self):
self._test_list_pagination_page_reverse(
direction=constants.SORT_DIRECTION_ASC)
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_desc(self):
self._test_list_pagination_page_reverse(
direction=constants.SORT_DIRECTION_DESC)
def _test_list_pagination_page_reverse(self, direction):
pagination_args = {
'sort_dir': direction,
'sort_key': self.field,
'limit': 3,
}
body = self.list_method(**pagination_args)
expected_resources = self._extract_resources(body)
pagination_args['limit'] -= 1
pagination_args['marker'] = expected_resources[-1]['id']
pagination_args['page_reverse'] = True
body = self.list_method(**pagination_args)
self.assertSameOrder(
# the last entry is not included in 2nd result when used as a
# marker
expected_resources[:-1],
self._extract_resources(body))
def _test_list_validation_filters(self):
validation_args = {
'unknown_filter': 'value',
}
body = self.list_method(**validation_args)
resources = self._extract_resources(body)
for resource in resources:
self.assertIn(resource['name'], self.resource_names)

@ -1,91 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.services.compute import keypairs_client
from tempest.lib.services.compute import servers_client
from tempest.lib.services.identity.v2 import tenants_client
from tempest.lib.services.identity.v3 import projects_client
from tempest import manager
from neutron.tests.tempest import config
from neutron.tests.tempest.services.network.json import network_client
CONF = config.CONF
class Manager(manager.Manager):
"""
Top level manager for OpenStack tempest clients
"""
default_params = {
'disable_ssl_certificate_validation':
CONF.identity.disable_ssl_certificate_validation,
'ca_certs': CONF.identity.ca_certificates_file,
'trace_requests': CONF.debug.trace_requests
}
# NOTE: Tempest uses timeout values of compute API if project specific
# timeout values don't exist.
default_params_with_timeout_values = {
'build_interval': CONF.compute.build_interval,
'build_timeout': CONF.compute.build_timeout
}
default_params_with_timeout_values.update(default_params)
def __init__(self, credentials=None, service=None):
super(Manager, self).__init__(credentials=credentials)
self._set_identity_clients()
self.network_client = network_client.NetworkClientJSON(
self.auth_provider,
CONF.network.catalog_type,
CONF.network.region or CONF.identity.region,
endpoint_type=CONF.network.endpoint_type,
build_interval=CONF.network.build_interval,
build_timeout=CONF.network.build_timeout,
**self.default_params)
params = {
'service': CONF.compute.catalog_type,
'region': CONF.compute.region or CONF.identity.region,
'endpoint_type': CONF.compute.endpoint_type,
'build_interval': CONF.compute.build_interval,
'build_timeout': CONF.compute.build_timeout
}
params.update(self.default_params)
self.servers_client = servers_client.ServersClient(
self.auth_provider,
enable_instance_password=CONF.compute_feature_enabled
.enable_instance_password,
**params)
self.keypairs_client = keypairs_client.KeyPairsClient(
self.auth_provider, **params)
def _set_identity_clients(self):
params = {
'service': CONF.identity.catalog_type,
'region': CONF.identity.region
}
params.update(self.default_params_with_timeout_values)
params_v2_admin = params.copy()
params_v2_admin['endpoint_type'] = CONF.identity.v2_admin_endpoint_type
# Client uses admin endpoint type of Keystone API v2
self.tenants_client = tenants_client.TenantsClient(self.auth_provider,
**params_v2_admin)
# Client uses admin endpoint type of Keystone API v3
self.projects_client = projects_client.ProjectsClient(
self.auth_provider, **params_v2_admin)

@ -1,24 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common import ssh
from neutron.tests.tempest import config
class Client(ssh.Client):
def __init__(self, *args, **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = config.CONF.validation.ssh_timeout
super(Client, self).__init__(*args, **kwargs)

@ -1,21 +0,0 @@
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency.fixture import lockutils
class LockFixture(lockutils.LockFixture):
def __init__(self, name):
super(LockFixture, self).__init__(name, 'tempest-')

@ -1,30 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import exceptions
TempestException = exceptions.TempestException
class InvalidConfiguration(TempestException):
message = "Invalid Configuration"
class InvalidCredentials(TempestException):
message = "Invalid Credentials"
class InvalidServiceTag(TempestException):
message = "Invalid service tag"

@ -1,317 +0,0 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import subprocess
import netaddr
from oslo_log import log
from tempest.common.utils import net_utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from neutron.tests.tempest.api import base as base_api
from neutron.tests.tempest.common import ssh
from neutron.tests.tempest import config
from neutron.tests.tempest.scenario import constants
CONF = config.CONF
LOG = log.getLogger(__name__)
class BaseTempestTestCase(base_api.BaseNetworkTest):
@classmethod
def resource_setup(cls):
super(BaseTempestTestCase, cls).resource_setup()
cls.keypairs = []
@classmethod
def resource_cleanup(cls):
for keypair in cls.keypairs:
cls.os_primary.keypairs_client.delete_keypair(
keypair_name=keypair['name'])
super(BaseTempestTestCase, cls).resource_cleanup()
def create_server(self, flavor_ref, image_ref, key_name, networks,
name=None, security_groups=None):
"""Create a server using tempest lib
All the parameters are the ones used in Compute API
Args:
flavor_ref(str): The flavor of the server to be provisioned.
image_ref(str): The image of the server to be provisioned.
key_name(str): SSH key to to be used to connect to the
provisioned server.
networks(list): List of dictionaries where each represent
an interface to be attached to the server. For network
it should be {'uuid': network_uuid} and for port it should
be {'port': port_uuid}
name(str): Name of the server to be provisioned.
security_groups(list): List of dictionaries where
the keys is 'name' and the value is the name of
the security group. If it's not passed the default
security group will be used.
"""
name = name or data_utils.rand_name('server-test')
if not security_groups:
security_groups = [{'name': 'default'}]
server = self.os_primary.servers_client.create_server(
name=name,
flavorRef=flavor_ref,
imageRef=image_ref,
key_name=key_name,
networks=networks,
security_groups=security_groups)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_server_termination,
self.os_primary.servers_client, server['server']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.os_primary.servers_client.delete_server,
server['server']['id'])
return server
@classmethod
def create_keypair(cls, client=None):
client = client or cls.os_primary.keypairs_client
name = data_utils.rand_name('keypair-test')
body = client.create_keypair(name=name)
cls.keypairs.append(body['keypair'])
return body['keypair']
@classmethod
def create_secgroup_rules(cls, rule_list, secgroup_id=None):
client = cls.os_primary.network_client
if not secgroup_id:
sgs = client.list_security_groups()['security_groups']
for sg in sgs:
if sg['name'] == constants.DEFAULT_SECURITY_GROUP:
secgroup_id = sg['id']
break
for rule in rule_list:
direction = rule.pop('direction')
client.create_security_group_rule(
direction=direction,
security_group_id=secgroup_id,
**rule)
@classmethod
def create_loginable_secgroup_rule(cls, secgroup_id=None):
"""This rule is intended to permit inbound ssh
Allowing ssh traffic traffic from all sources, so no group_id is
provided.
Setting a group_id would only permit traffic from ports
belonging to the same security group.
"""
rule_list = [{'protocol': 'tcp',
'direction': 'ingress',
'port_range_min': 22,
'port_range_max': 22,
'remote_ip_prefix': '0.0.0.0/0'}]
cls.create_secgroup_rules(rule_list, secgroup_id=secgroup_id)
@classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None):
"""This rule is intended to permit inbound ping
"""
rule_list = [{'protocol': 'icmp',
'direction': 'ingress',
'port_range_min': 8, # type
'port_range_max': 0, # code
'remote_ip_prefix': '0.0.0.0/0'}]
cls.create_secgroup_rules(rule_list, secgroup_id=secgroup_id)
@classmethod
def create_router_by_client(cls, is_admin=False, **kwargs):
kwargs.update({'router_name': data_utils.rand_name('router'),
'admin_state_up': True,
'external_network_id': CONF.network.public_network_id})
if not is_admin:
router = cls.create_router(**kwargs)
else:
router = cls.create_admin_router(**kwargs)
LOG.debug("Created router %s", router['name'])
cls.routers.append(router)
return router
def create_and_associate_floatingip(self, port_id):
fip = self.os_primary.network_client.create_floatingip(
CONF.network.public_network_id,
port_id=port_id)['floatingip']
self.floating_ips.append(fip)
return fip
def setup_network_and_server(self, router=None, **kwargs):
"""Create network resources and a server.
Creating a network, subnet, router, keypair, security group
and a server.
"""
self.network = self.create_network()
LOG.debug("Created network %s", self.network['name'])
self.subnet = self.create_subnet(self.network)
LOG.debug("Created subnet %s", self.subnet['id'])
secgroup = self.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup'))
LOG.debug("Created security group %s",
secgroup['security_group']['name'])
self.security_groups.append(secgroup['security_group'])
if not router:
router = self.create_router_by_client(**kwargs)
self.create_router_interface(router['id'], self.subnet['id'])
self.keypair = self.create_keypair()
self.create_loginable_secgroup_rule(
secgroup_id=secgroup['security_group']['id'])
self.server = self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
networks=[{'uuid': self.network['id']}],
security_groups=[{'name': secgroup['security_group']['name']}])
waiters.wait_for_server_status(self.os_primary.servers_client,
self.server['server']['id'],
constants.SERVER_STATUS_ACTIVE)
self.port = self.client.list_ports(network_id=self.network['id'],
device_id=self.server[
'server']['id'])['ports'][0]
self.fip = self.create_and_associate_floatingip(self.port['id'])
def check_connectivity(self, host, ssh_user, ssh_key, servers=None):
ssh_client = ssh.Client(host, ssh_user, pkey=ssh_key)
try:
ssh_client.test_connection_auth()
except lib_exc.SSHTimeout as ssh_e:
LOG.debug(ssh_e)
self._log_console_output(servers)
raise
def _log_console_output(self, servers=None):
if not CONF.compute_feature_enabled.console_output:
LOG.debug('Console output not supported, cannot log')
return
if not servers:
servers = self.os_primary.servers_client.list_servers()
servers = servers['servers']
for server in servers:
try:
console_output = (
self.os_primary.servers_client.get_console_output(
server['id'])['output'])
LOG.debug('Console output for %s\nbody=\n%s',
server['id'], console_output)
except lib_exc.NotFound:
LOG.debug("Server %s disappeared(deleted) while looking "
"for the console log", server['id'])
def _check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True):
"""check ping server via source ssh connection
:param source: RemoteClient: an ssh connection from which to ping
:param dest: and IP to ping against
:param should_succeed: boolean should ping succeed or not
:param nic: specific network interface to ping from
:param mtu: mtu size for the packet to be sent
:param fragmentation: Flag for packet fragmentation
:returns: boolean -- should_succeed == ping
:returns: ping is false if ping failed
"""
def ping_host(source, host, count=CONF.validation.ping_count,
size=CONF.validation.ping_size, nic=None, mtu=None,
fragmentation=True):
addr = netaddr.IPAddress(host)
cmd = 'ping6' if addr.version == 6 else 'ping'
if nic:
cmd = 'sudo {cmd} -I {nic}'.format(cmd=cmd, nic=nic)
if mtu:
if not fragmentation:
cmd += ' -M do'
size = str(net_utils.get_ping_payload_size(
mtu=mtu, ip_version=addr.version))
cmd += ' -c{0} -w{0} -s{1} {2}'.format(count, size, host)
return source.exec_command(cmd)
def ping_remote():
try:
result = ping_host(source, dest, nic=nic, mtu=mtu,
fragmentation=fragmentation)
except lib_exc.SSHExecCommandFailed:
LOG.warning('Failed to ping IP: %s via a ssh connection '
'from: %s.', dest, source.host)
return not should_succeed
LOG.debug('ping result: %s', result)
# Assert that the return traffic was from the correct
# source address.
from_source = 'from %s' % dest
self.assertIn(from_source, result)
return should_succeed
return test_utils.call_until_true(ping_remote,
CONF.validation.ping_timeout,
1)
def check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True):
self.assertTrue(self._check_remote_connectivity(
source, dest, should_succeed, nic, mtu, fragmentation))
def ping_ip_address(self, ip_address, should_succeed=True,
ping_timeout=None, mtu=None):
# the code is taken from tempest/scenario/manager.py in tempest git
timeout = ping_timeout or CONF.validation.ping_timeout
cmd = ['ping', '-c1', '-w1']
if mtu:
cmd += [
# don't fragment
'-M', 'do',
# ping receives just the size of ICMP payload
'-s', str(net_utils.get_ping_payload_size(mtu, 4))
]
cmd.append(ip_address)
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
return (proc.returncode == 0) == should_succeed
caller = test_utils.find_test_caller()
LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
' expected result is %(should_succeed)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'should_succeed':
'reachable' if should_succeed else 'unreachable'
})
result = test_utils.call_until_true(ping, timeout, 1)
LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
'ping result is %(result)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'result': 'expected' if result else 'unexpected'
})
return result

@ -1,18 +0,0 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SERVER_STATUS_ACTIVE = 'ACTIVE'
DEFAULT_SECURITY_GROUP = 'default'
LIMIT_KILO_BITS_PER_SECOND = 1000