Merge "Removing remaining Tempest bits"

This commit is contained in:
Zuul 2018-03-27 08:27:50 +00:00 committed by Gerrit Code Review
commit 5f306a8b24
16 changed files with 0 additions and 2342 deletions

View File

@ -1,834 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import math
import netaddr
from neutron_lib import constants as const
from tempest.common import utils as tutils
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron.common import constants
from neutron.common import utils
from neutron.tests.tempest.api import clients
from neutron.tests.tempest import config
from neutron.tests.tempest import exceptions
CONF = config.CONF
class BaseNetworkTest(test.BaseTestCase):
"""
Base class for the Neutron tests that use the Tempest Neutron REST client
Per the Neutron API Guide, API v1.x was removed from the source code tree
(docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
following options are defined in the [network] section of etc/tempest.conf:
project_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant networks
project_network_mask_bits with the mask bits to be used to partition
the block defined by tenant-network_cidr
Finally, it is assumed that the following option is defined in the
[service_available] section of etc/tempest.conf
neutron as True
"""
force_tenant_isolation = False
credentials = ['primary']
# Default to ipv4.
_ip_version = 4
@classmethod
def get_client_manager(cls, credential_type=None, roles=None,
force_new=None):
manager = super(BaseNetworkTest, cls).get_client_manager(
credential_type=credential_type,
roles=roles,
force_new=force_new
)
# Neutron uses a different clients manager than the one in the Tempest
return clients.Manager(manager.credentials)
@classmethod
def skip_checks(cls):
super(BaseNetworkTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException("Neutron support is required")
if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
raise cls.skipException("IPv6 Tests are disabled.")
for req_ext in getattr(cls, 'required_extensions', []):
if not tutils.is_extension_enabled(req_ext, 'network'):
msg = "%s extension not enabled." % req_ext
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
# Create no network resources for these test.
cls.set_network_resources()
super(BaseNetworkTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(BaseNetworkTest, cls).setup_clients()
cls.client = cls.os_primary.network_client
@classmethod
def resource_setup(cls):
super(BaseNetworkTest, cls).resource_setup()
cls.networks = []
cls.admin_networks = []
cls.subnets = []
cls.admin_subnets = []
cls.ports = []
cls.routers = []
cls.floating_ips = []
cls.metering_labels = []
cls.service_profiles = []
cls.flavors = []
cls.metering_label_rules = []
cls.qos_rules = []
cls.qos_policies = []
cls.ethertype = "IPv" + str(cls._ip_version)
cls.address_scopes = []
cls.admin_address_scopes = []
cls.subnetpools = []
cls.admin_subnetpools = []
cls.security_groups = []
cls.projects = []
@classmethod
def resource_cleanup(cls):
if CONF.service_available.neutron:
# Clean up floating IPs
for floating_ip in cls.floating_ips:
cls._try_delete_resource(cls.client.delete_floatingip,
floating_ip['id'])
# Clean up routers
for router in cls.routers:
cls._try_delete_resource(cls.delete_router,
router)
# Clean up metering label rules
for metering_label_rule in cls.metering_label_rules:
cls._try_delete_resource(
cls.admin_client.delete_metering_label_rule,
metering_label_rule['id'])
# Clean up metering labels
for metering_label in cls.metering_labels:
cls._try_delete_resource(
cls.admin_client.delete_metering_label,
metering_label['id'])
# Clean up flavors
for flavor in cls.flavors:
cls._try_delete_resource(
cls.admin_client.delete_flavor,
flavor['id'])
# Clean up service profiles
for service_profile in cls.service_profiles:
cls._try_delete_resource(
cls.admin_client.delete_service_profile,
service_profile['id'])
# Clean up ports
for port in cls.ports:
cls._try_delete_resource(cls.client.delete_port,
port['id'])
# Clean up subnets
for subnet in cls.subnets:
cls._try_delete_resource(cls.client.delete_subnet,
subnet['id'])
# Clean up admin subnets
for subnet in cls.admin_subnets:
cls._try_delete_resource(cls.admin_client.delete_subnet,
subnet['id'])
# Clean up networks
for network in cls.networks:
cls._try_delete_resource(cls.client.delete_network,
network['id'])
# Clean up admin networks
for network in cls.admin_networks:
cls._try_delete_resource(cls.admin_client.delete_network,
network['id'])
# Clean up security groups
for secgroup in cls.security_groups:
cls._try_delete_resource(cls.client.delete_security_group,
secgroup['id'])
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
subnetpool['id'])
for subnetpool in cls.admin_subnetpools:
cls._try_delete_resource(cls.admin_client.delete_subnetpool,
subnetpool['id'])
for address_scope in cls.address_scopes:
cls._try_delete_resource(cls.client.delete_address_scope,
address_scope['id'])
for address_scope in cls.admin_address_scopes:
cls._try_delete_resource(
cls.admin_client.delete_address_scope,
address_scope['id'])
for project in cls.projects:
cls._try_delete_resource(
cls.identity_admin_client.delete_project,
project['id'])
# Clean up QoS rules
for qos_rule in cls.qos_rules:
cls._try_delete_resource(cls.admin_client.delete_qos_rule,
qos_rule['id'])
# Clean up QoS policies
# as all networks and ports are already removed, QoS policies
# shouldn't be "in use"
for qos_policy in cls.qos_policies:
cls._try_delete_resource(cls.admin_client.delete_qos_policy,
qos_policy['id'])
super(BaseNetworkTest, cls).resource_cleanup()
@classmethod
def _try_delete_resource(cls, delete_callable, *args, **kwargs):
"""Cleanup resources in case of test-failure
Some resources are explicitly deleted by the test.
If the test failed to delete a resource, this method will execute
the appropriate delete methods. Otherwise, the method ignores NotFound
exceptions thrown for resources that were correctly deleted by the
test.
:param delete_callable: delete method
:param args: arguments for delete method
:param kwargs: keyword arguments for delete method
"""
try:
delete_callable(*args, **kwargs)
# if resource is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
@classmethod
def create_network(cls, network_name=None, client=None, **kwargs):
"""Wrapper utility that returns a test network."""
network_name = network_name or data_utils.rand_name('test-network-')
client = client or cls.client
body = client.create_network(name=network_name, **kwargs)
network = body['network']
if client is cls.client:
cls.networks.append(network)
else:
cls.admin_networks.append(network)
return network
@classmethod
def create_shared_network(cls, network_name=None, **post_body):
network_name = network_name or data_utils.rand_name('sharednetwork-')
post_body.update({'name': network_name, 'shared': True})
body = cls.admin_client.create_network(**post_body)
network = body['network']
cls.admin_networks.append(network)
return network
@classmethod
def create_network_keystone_v3(cls, network_name=None, project_id=None,
tenant_id=None, client=None):
"""Wrapper utility that creates a test network with project_id."""
client = client or cls.client
network_name = network_name or data_utils.rand_name(
'test-network-with-project_id')
project_id = cls.client.tenant_id
body = client.create_network_keystone_v3(network_name, project_id,
tenant_id)
network = body['network']
if client is cls.client:
cls.networks.append(network)
else:
cls.admin_networks.append(network)
return network
@classmethod
def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
ip_version=None, client=None, **kwargs):
"""Wrapper utility that returns a test subnet."""
# allow tests to use admin client
if not client:
client = cls.client
# The cidr and mask_bits depend on the ip version.
ip_version = ip_version if ip_version is not None else cls._ip_version
gateway_not_set = gateway == ''
if ip_version == 4:
cidr = cidr or netaddr.IPNetwork(
config.safe_get_config_value(
'network', 'project_network_cidr'))
mask_bits = (
mask_bits or config.safe_get_config_value(
'network', 'project_network_mask_bits'))
elif ip_version == 6:
cidr = (
cidr or netaddr.IPNetwork(
config.safe_get_config_value(
'network', 'project_network_v6_cidr')))
mask_bits = (
mask_bits or config.safe_get_config_value(
'network', 'project_network_v6_mask_bits'))
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
if gateway_not_set:
gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
else:
gateway_ip = gateway
try:
body = client.create_subnet(
network_id=network['id'],
cidr=str(subnet_cidr),
ip_version=ip_version,
gateway_ip=gateway_ip,
**kwargs)
break
except lib_exc.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
else:
message = 'Available CIDR for subnet creation could not be found'
raise ValueError(message)
subnet = body['subnet']
if client is cls.client:
cls.subnets.append(subnet)
else:
cls.admin_subnets.append(subnet)
return subnet
@classmethod
def create_port(cls, network, **kwargs):
"""Wrapper utility that returns a test port."""
body = cls.client.create_port(network_id=network['id'],
**kwargs)
port = body['port']
cls.ports.append(port)
return port
@classmethod
def update_port(cls, port, **kwargs):
"""Wrapper utility that updates a test port."""
body = cls.client.update_port(port['id'],
**kwargs)
return body['port']
@classmethod
def _create_router_with_client(
cls, client, router_name=None, admin_state_up=False,
external_network_id=None, enable_snat=None, **kwargs
):
ext_gw_info = {}
if external_network_id:
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
body = client.create_router(
router_name, external_gateway_info=ext_gw_info,
admin_state_up=admin_state_up, **kwargs)
router = body['router']
cls.routers.append(router)
return router
@classmethod
def create_router(cls, *args, **kwargs):
return cls._create_router_with_client(cls.client, *args, **kwargs)
@classmethod
def create_admin_router(cls, *args, **kwargs):
return cls._create_router_with_client(cls.os_admin.network_client,
*args, **kwargs)
@classmethod
def create_floatingip(cls, external_network_id):
"""Wrapper utility that returns a test floating IP."""
body = cls.client.create_floatingip(
floating_network_id=external_network_id)
fip = body['floatingip']
cls.floating_ips.append(fip)
return fip
@classmethod
def create_router_interface(cls, router_id, subnet_id):
"""Wrapper utility that returns a router interface."""
interface = cls.client.add_router_interface_with_subnet_id(
router_id, subnet_id)
return interface
@classmethod
def get_supported_qos_rule_types(cls):
body = cls.client.list_qos_rule_types()
return [rule_type['type'] for rule_type in body['rule_types']]
@classmethod
def create_qos_policy(cls, name, description=None, shared=False,
tenant_id=None, is_default=False):
"""Wrapper utility that returns a test QoS policy."""
body = cls.admin_client.create_qos_policy(
name, description, shared, tenant_id, is_default)
qos_policy = body['policy']
cls.qos_policies.append(qos_policy)
return qos_policy
@classmethod
def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
max_burst_kbps,
direction=const.EGRESS_DIRECTION):
"""Wrapper utility that returns a test QoS bandwidth limit rule."""
body = cls.admin_client.create_bandwidth_limit_rule(
policy_id, max_kbps, max_burst_kbps, direction)
qos_rule = body['bandwidth_limit_rule']
cls.qos_rules.append(qos_rule)
return qos_rule
@classmethod
def delete_router(cls, router):
body = cls.client.list_router_interfaces(router['id'])
interfaces = [port for port in body['ports']
if port['device_owner'] in const.ROUTER_INTERFACE_OWNERS]
for i in interfaces:
try:
cls.client.remove_router_interface_with_subnet_id(
router['id'], i['fixed_ips'][0]['subnet_id'])
except lib_exc.NotFound:
pass
cls.client.delete_router(router['id'])
@classmethod
def create_address_scope(cls, name, is_admin=False, **kwargs):
if is_admin:
body = cls.admin_client.create_address_scope(name=name, **kwargs)
cls.admin_address_scopes.append(body['address_scope'])
else:
body = cls.client.create_address_scope(name=name, **kwargs)
cls.address_scopes.append(body['address_scope'])
return body['address_scope']
@classmethod
def create_subnetpool(cls, name, is_admin=False, **kwargs):
if is_admin:
body = cls.admin_client.create_subnetpool(name, **kwargs)
cls.admin_subnetpools.append(body['subnetpool'])
else:
body = cls.client.create_subnetpool(name, **kwargs)
cls.subnetpools.append(body['subnetpool'])
return body['subnetpool']
@classmethod
def create_project(cls, name=None, description=None):
test_project = name or data_utils.rand_name('test_project_')
test_description = description or data_utils.rand_name('desc_')
project = cls.identity_admin_client.create_project(
name=test_project,
description=test_description)['project']
cls.projects.append(project)
return project
@classmethod
def create_security_group(cls, name, **kwargs):
body = cls.client.create_security_group(name=name, **kwargs)
cls.security_groups.append(body['security_group'])
return body['security_group']
class BaseAdminNetworkTest(BaseNetworkTest):
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseAdminNetworkTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
cls.identity_admin_client = cls.os_admin.projects_client
@classmethod
def create_metering_label(cls, name, description):
"""Wrapper utility that returns a test metering label."""
body = cls.admin_client.create_metering_label(
description=description,
name=data_utils.rand_name("metering-label"))
metering_label = body['metering_label']
cls.metering_labels.append(metering_label)
return metering_label
@classmethod
def create_metering_label_rule(cls, remote_ip_prefix, direction,
metering_label_id):
"""Wrapper utility that returns a test metering label rule."""
body = cls.admin_client.create_metering_label_rule(
remote_ip_prefix=remote_ip_prefix, direction=direction,
metering_label_id=metering_label_id)
metering_label_rule = body['metering_label_rule']
cls.metering_label_rules.append(metering_label_rule)
return metering_label_rule
@classmethod
def create_flavor(cls, name, description, service_type):
"""Wrapper utility that returns a test flavor."""
body = cls.admin_client.create_flavor(
description=description, service_type=service_type,
name=name)
flavor = body['flavor']
cls.flavors.append(flavor)
return flavor
@classmethod
def create_service_profile(cls, description, metainfo, driver):
"""Wrapper utility that returns a test service profile."""
body = cls.admin_client.create_service_profile(
driver=driver, metainfo=metainfo, description=description)
service_profile = body['service_profile']
cls.service_profiles.append(service_profile)
return service_profile
@classmethod
def get_unused_ip(cls, net_id, ip_version=None):
"""Get an unused ip address in a allocation pool of net"""
body = cls.admin_client.list_ports(network_id=net_id)
ports = body['ports']
used_ips = []
for port in ports:
used_ips.extend(
[fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
body = cls.admin_client.list_subnets(network_id=net_id)
subnets = body['subnets']
for subnet in subnets:
if ip_version and subnet['ip_version'] != ip_version:
continue
cidr = subnet['cidr']
allocation_pools = subnet['allocation_pools']
iterators = []
if allocation_pools:
for allocation_pool in allocation_pools:
iterators.append(netaddr.iter_iprange(
allocation_pool['start'], allocation_pool['end']))
else:
net = netaddr.IPNetwork(cidr)
def _iterip():
for ip in net:
if ip not in (net.network, net.broadcast):
yield ip
iterators.append(iter(_iterip()))
for iterator in iterators:
for ip in iterator:
if str(ip) not in used_ips:
return str(ip)
message = (
"net(%s) has no usable IP address in allocation pools" % net_id)
raise exceptions.InvalidConfiguration(message)
def require_qos_rule_type(rule_type):
def decorator(f):
@functools.wraps(f)
def wrapper(self, *func_args, **func_kwargs):
if rule_type not in self.get_supported_qos_rule_types():
raise self.skipException(
"%s rule type is required." % rule_type)
return f(self, *func_args, **func_kwargs)
return wrapper
return decorator
def _require_sorting(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not tutils.is_extension_enabled("sorting", "network"):
self.skipTest('Sorting feature is required')
return f(self, *args, **kwargs)
return inner
def _require_pagination(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not tutils.is_extension_enabled("pagination", "network"):
self.skipTest('Pagination feature is required')
return f(self, *args, **kwargs)
return inner
class BaseSearchCriteriaTest(BaseNetworkTest):
# This should be defined by subclasses to reflect resource name to test
resource = None
field = 'name'
# NOTE(ihrachys): some names, like those starting with an underscore (_)
# are sorted differently depending on whether the plugin implements native
# sorting support, or not. So we avoid any such cases here, sticking to
# alphanumeric. Also test a case when there are multiple resources with the
# same name
resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
force_tenant_isolation = True
list_kwargs = {}
list_as_admin = False
def assertSameOrder(self, original, actual):
# gracefully handle iterators passed
original = list(original)
actual = list(actual)
self.assertEqual(len(original), len(actual))
for expected, res in zip(original, actual):
self.assertEqual(expected[self.field], res[self.field])
@utils.classproperty
def plural_name(self):
return '%ss' % self.resource
@property
def list_client(self):
return self.admin_client if self.list_as_admin else self.client
def list_method(self, *args, **kwargs):
method = getattr(self.list_client, 'list_%s' % self.plural_name)
kwargs.update(self.list_kwargs)
return method(*args, **kwargs)
def get_bare_url(self, url):
base_url = self.client.base_url
self.assertTrue(url.startswith(base_url))
return url[len(base_url):]
@classmethod
def _extract_resources(cls, body):
return body[cls.plural_name]
def _test_list_sorts(self, direction):
sort_args = {
'sort_dir': direction,
'sort_key': self.field
}
body = self.list_method(**sort_args)
resources = self._extract_resources(body)
self.assertNotEmpty(
resources, "%s list returned is empty" % self.resource)
retrieved_names = [res[self.field] for res in resources]
expected = sorted(retrieved_names)
if direction == constants.SORT_DIRECTION_DESC:
expected = list(reversed(expected))
self.assertEqual(expected, retrieved_names)
@_require_sorting
def _test_list_sorts_asc(self):
self._test_list_sorts(constants.SORT_DIRECTION_ASC)
@_require_sorting
def _test_list_sorts_desc(self):
self._test_list_sorts(constants.SORT_DIRECTION_DESC)
@_require_pagination
def _test_list_pagination(self):
for limit in range(1, len(self.resource_names) + 1):
pagination_args = {
'limit': limit,
}
body = self.list_method(**pagination_args)
resources = self._extract_resources(body)
self.assertEqual(limit, len(resources))
@_require_pagination
def _test_list_no_pagination_limit_0(self):
pagination_args = {
'limit': 0,
}
body = self.list_method(**pagination_args)
resources = self._extract_resources(body)
self.assertGreaterEqual(len(resources), len(self.resource_names))
def _test_list_pagination_iteratively(self, lister):
# first, collect all resources for later comparison
sort_args = {
'sort_dir': constants.SORT_DIRECTION_ASC,
'sort_key': self.field
}
body = self.list_method(**sort_args)
expected_resources = self._extract_resources(body)
self.assertNotEmpty(expected_resources)
resources = lister(
len(expected_resources), sort_args
)
# finally, compare that the list retrieved in one go is identical to
# the one containing pagination results
self.assertSameOrder(expected_resources, resources)
def _list_all_with_marker(self, niterations, sort_args):
# paginate resources one by one, using last fetched resource as a
# marker
resources = []
for i in range(niterations):
pagination_args = sort_args.copy()
pagination_args['limit'] = 1
if resources:
pagination_args['marker'] = resources[-1]['id']
body = self.list_method(**pagination_args)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources.extend(resources_)
return resources
@_require_pagination
@_require_sorting
def _test_list_pagination_with_marker(self):
self._test_list_pagination_iteratively(self._list_all_with_marker)
def _list_all_with_hrefs(self, niterations, sort_args):
# paginate resources one by one, using next href links
resources = []
prev_links = {}
for i in range(niterations):
if prev_links:
uri = self.get_bare_url(prev_links['next'])
else:
sort_args.update(self.list_kwargs)
uri = self.list_client.build_uri(
self.plural_name, limit=1, **sort_args)
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources.extend(resources_)
# The last element is empty and does not contain 'next' link
uri = self.get_bare_url(prev_links['next'])
prev_links, body = self.client.get_uri_with_links(
self.plural_name, uri
)
self.assertNotIn('next', prev_links)
# Now walk backwards and compare results
resources2 = []
for i in range(niterations):
uri = self.get_bare_url(prev_links['previous'])
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources2.extend(resources_)
self.assertSameOrder(resources, reversed(resources2))
return resources
@_require_pagination
@_require_sorting
def _test_list_pagination_with_href_links(self):
self._test_list_pagination_iteratively(self._list_all_with_hrefs)
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_with_href_links(
self, direction=constants.SORT_DIRECTION_ASC):
pagination_args = {
'sort_dir': direction,
'sort_key': self.field,
}
body = self.list_method(**pagination_args)
expected_resources = self._extract_resources(body)
page_size = 2
pagination_args['limit'] = page_size
prev_links = {}
resources = []
num_resources = len(expected_resources)
niterations = int(math.ceil(float(num_resources) / page_size))
for i in range(niterations):
if prev_links:
uri = self.get_bare_url(prev_links['previous'])
else:
pagination_args.update(self.list_kwargs)
uri = self.list_client.build_uri(
self.plural_name, page_reverse=True, **pagination_args)
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertGreaterEqual(page_size, len(resources_))
resources.extend(reversed(resources_))
self.assertSameOrder(expected_resources, reversed(resources))
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_asc(self):
self._test_list_pagination_page_reverse(
direction=constants.SORT_DIRECTION_ASC)
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_desc(self):
self._test_list_pagination_page_reverse(
direction=constants.SORT_DIRECTION_DESC)
def _test_list_pagination_page_reverse(self, direction):
pagination_args = {
'sort_dir': direction,
'sort_key': self.field,
'limit': 3,
}
body = self.list_method(**pagination_args)
expected_resources = self._extract_resources(body)
pagination_args['limit'] -= 1
pagination_args['marker'] = expected_resources[-1]['id']
pagination_args['page_reverse'] = True
body = self.list_method(**pagination_args)
self.assertSameOrder(
# the last entry is not included in 2nd result when used as a
# marker
expected_resources[:-1],
self._extract_resources(body))
def _test_list_validation_filters(self):
validation_args = {
'unknown_filter': 'value',
}
body = self.list_method(**validation_args)
resources = self._extract_resources(body)
for resource in resources:
self.assertIn(resource['name'], self.resource_names)

View File

@ -1,91 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.services.compute import keypairs_client
from tempest.lib.services.compute import servers_client
from tempest.lib.services.identity.v2 import tenants_client
from tempest.lib.services.identity.v3 import projects_client
from tempest import manager
from neutron.tests.tempest import config
from neutron.tests.tempest.services.network.json import network_client
CONF = config.CONF
class Manager(manager.Manager):
"""
Top level manager for OpenStack tempest clients
"""
default_params = {
'disable_ssl_certificate_validation':
CONF.identity.disable_ssl_certificate_validation,
'ca_certs': CONF.identity.ca_certificates_file,
'trace_requests': CONF.debug.trace_requests
}
# NOTE: Tempest uses timeout values of compute API if project specific
# timeout values don't exist.
default_params_with_timeout_values = {
'build_interval': CONF.compute.build_interval,
'build_timeout': CONF.compute.build_timeout
}
default_params_with_timeout_values.update(default_params)
def __init__(self, credentials=None, service=None):
super(Manager, self).__init__(credentials=credentials)
self._set_identity_clients()
self.network_client = network_client.NetworkClientJSON(
self.auth_provider,
CONF.network.catalog_type,
CONF.network.region or CONF.identity.region,
endpoint_type=CONF.network.endpoint_type,
build_interval=CONF.network.build_interval,
build_timeout=CONF.network.build_timeout,
**self.default_params)
params = {
'service': CONF.compute.catalog_type,
'region': CONF.compute.region or CONF.identity.region,
'endpoint_type': CONF.compute.endpoint_type,
'build_interval': CONF.compute.build_interval,
'build_timeout': CONF.compute.build_timeout
}
params.update(self.default_params)
self.servers_client = servers_client.ServersClient(
self.auth_provider,
enable_instance_password=CONF.compute_feature_enabled
.enable_instance_password,
**params)
self.keypairs_client = keypairs_client.KeyPairsClient(
self.auth_provider, **params)
def _set_identity_clients(self):
params = {
'service': CONF.identity.catalog_type,
'region': CONF.identity.region
}
params.update(self.default_params_with_timeout_values)
params_v2_admin = params.copy()
params_v2_admin['endpoint_type'] = CONF.identity.v2_admin_endpoint_type
# Client uses admin endpoint type of Keystone API v2
self.tenants_client = tenants_client.TenantsClient(self.auth_provider,
**params_v2_admin)
# Client uses admin endpoint type of Keystone API v3
self.projects_client = projects_client.ProjectsClient(
self.auth_provider, **params_v2_admin)

View File

@ -1,24 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common import ssh
from neutron.tests.tempest import config
class Client(ssh.Client):
def __init__(self, *args, **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = config.CONF.validation.ssh_timeout
super(Client, self).__init__(*args, **kwargs)

View File

@ -1,21 +0,0 @@
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency.fixture import lockutils
class LockFixture(lockutils.LockFixture):
def __init__(self, name):
super(LockFixture, self).__init__(name, 'tempest-')

View File

@ -1,30 +0,0 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import exceptions
TempestException = exceptions.TempestException
class InvalidConfiguration(TempestException):
message = "Invalid Configuration"
class InvalidCredentials(TempestException):
message = "Invalid Credentials"
class InvalidServiceTag(TempestException):
message = "Invalid service tag"

View File

@ -1,317 +0,0 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import subprocess
import netaddr
from oslo_log import log
from tempest.common.utils import net_utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from neutron.tests.tempest.api import base as base_api
from neutron.tests.tempest.common import ssh
from neutron.tests.tempest import config
from neutron.tests.tempest.scenario import constants
CONF = config.CONF
LOG = log.getLogger(__name__)
class BaseTempestTestCase(base_api.BaseNetworkTest):
@classmethod
def resource_setup(cls):
super(BaseTempestTestCase, cls).resource_setup()
cls.keypairs = []
@classmethod
def resource_cleanup(cls):
for keypair in cls.keypairs:
cls.os_primary.keypairs_client.delete_keypair(
keypair_name=keypair['name'])
super(BaseTempestTestCase, cls).resource_cleanup()
def create_server(self, flavor_ref, image_ref, key_name, networks,
name=None, security_groups=None):
"""Create a server using tempest lib
All the parameters are the ones used in Compute API
Args:
flavor_ref(str): The flavor of the server to be provisioned.
image_ref(str): The image of the server to be provisioned.
key_name(str): SSH key to to be used to connect to the
provisioned server.
networks(list): List of dictionaries where each represent
an interface to be attached to the server. For network
it should be {'uuid': network_uuid} and for port it should
be {'port': port_uuid}
name(str): Name of the server to be provisioned.
security_groups(list): List of dictionaries where
the keys is 'name' and the value is the name of
the security group. If it's not passed the default
security group will be used.
"""
name = name or data_utils.rand_name('server-test')
if not security_groups:
security_groups = [{'name': 'default'}]
server = self.os_primary.servers_client.create_server(
name=name,
flavorRef=flavor_ref,
imageRef=image_ref,
key_name=key_name,
networks=networks,
security_groups=security_groups)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_server_termination,
self.os_primary.servers_client, server['server']['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.os_primary.servers_client.delete_server,
server['server']['id'])
return server
@classmethod
def create_keypair(cls, client=None):
client = client or cls.os_primary.keypairs_client
name = data_utils.rand_name('keypair-test')
body = client.create_keypair(name=name)
cls.keypairs.append(body['keypair'])
return body['keypair']
@classmethod
def create_secgroup_rules(cls, rule_list, secgroup_id=None):
client = cls.os_primary.network_client
if not secgroup_id:
sgs = client.list_security_groups()['security_groups']
for sg in sgs:
if sg['name'] == constants.DEFAULT_SECURITY_GROUP:
secgroup_id = sg['id']
break
for rule in rule_list:
direction = rule.pop('direction')
client.create_security_group_rule(
direction=direction,
security_group_id=secgroup_id,
**rule)
@classmethod
def create_loginable_secgroup_rule(cls, secgroup_id=None):
"""This rule is intended to permit inbound ssh
Allowing ssh traffic traffic from all sources, so no group_id is
provided.
Setting a group_id would only permit traffic from ports
belonging to the same security group.
"""
rule_list = [{'protocol': 'tcp',
'direction': 'ingress',
'port_range_min': 22,
'port_range_max': 22,
'remote_ip_prefix': '0.0.0.0/0'}]
cls.create_secgroup_rules(rule_list, secgroup_id=secgroup_id)
@classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None):
"""This rule is intended to permit inbound ping
"""
rule_list = [{'protocol': 'icmp',
'direction': 'ingress',
'port_range_min': 8, # type
'port_range_max': 0, # code
'remote_ip_prefix': '0.0.0.0/0'}]
cls.create_secgroup_rules(rule_list, secgroup_id=secgroup_id)
@classmethod
def create_router_by_client(cls, is_admin=False, **kwargs):
kwargs.update({'router_name': data_utils.rand_name('router'),
'admin_state_up': True,
'external_network_id': CONF.network.public_network_id})
if not is_admin:
router = cls.create_router(**kwargs)
else:
router = cls.create_admin_router(**kwargs)
LOG.debug("Created router %s", router['name'])
cls.routers.append(router)
return router
def create_and_associate_floatingip(self, port_id):
fip = self.os_primary.network_client.create_floatingip(
CONF.network.public_network_id,
port_id=port_id)['floatingip']
self.floating_ips.append(fip)
return fip
def setup_network_and_server(self, router=None, **kwargs):
"""Create network resources and a server.
Creating a network, subnet, router, keypair, security group
and a server.
"""
self.network = self.create_network()
LOG.debug("Created network %s", self.network['name'])
self.subnet = self.create_subnet(self.network)
LOG.debug("Created subnet %s", self.subnet['id'])
secgroup = self.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup'))
LOG.debug("Created security group %s",
secgroup['security_group']['name'])
self.security_groups.append(secgroup['security_group'])
if not router:
router = self.create_router_by_client(**kwargs)
self.create_router_interface(router['id'], self.subnet['id'])
self.keypair = self.create_keypair()
self.create_loginable_secgroup_rule(
secgroup_id=secgroup['security_group']['id'])
self.server = self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
networks=[{'uuid': self.network['id']}],
security_groups=[{'name': secgroup['security_group']['name']}])
waiters.wait_for_server_status(self.os_primary.servers_client,
self.server['server']['id'],
constants.SERVER_STATUS_ACTIVE)
self.port = self.client.list_ports(network_id=self.network['id'],
device_id=self.server[
'server']['id'])['ports'][0]
self.fip = self.create_and_associate_floatingip(self.port['id'])
def check_connectivity(self, host, ssh_user, ssh_key, servers=None):
ssh_client = ssh.Client(host, ssh_user, pkey=ssh_key)
try:
ssh_client.test_connection_auth()
except lib_exc.SSHTimeout as ssh_e:
LOG.debug(ssh_e)
self._log_console_output(servers)
raise
def _log_console_output(self, servers=None):
if not CONF.compute_feature_enabled.console_output:
LOG.debug('Console output not supported, cannot log')
return
if not servers:
servers = self.os_primary.servers_client.list_servers()
servers = servers['servers']
for server in servers:
try:
console_output = (
self.os_primary.servers_client.get_console_output(
server['id'])['output'])
LOG.debug('Console output for %s\nbody=\n%s',
server['id'], console_output)
except lib_exc.NotFound:
LOG.debug("Server %s disappeared(deleted) while looking "
"for the console log", server['id'])
def _check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True):
"""check ping server via source ssh connection
:param source: RemoteClient: an ssh connection from which to ping
:param dest: and IP to ping against
:param should_succeed: boolean should ping succeed or not
:param nic: specific network interface to ping from
:param mtu: mtu size for the packet to be sent
:param fragmentation: Flag for packet fragmentation
:returns: boolean -- should_succeed == ping
:returns: ping is false if ping failed
"""
def ping_host(source, host, count=CONF.validation.ping_count,
size=CONF.validation.ping_size, nic=None, mtu=None,
fragmentation=True):
addr = netaddr.IPAddress(host)
cmd = 'ping6' if addr.version == 6 else 'ping'
if nic:
cmd = 'sudo {cmd} -I {nic}'.format(cmd=cmd, nic=nic)
if mtu:
if not fragmentation:
cmd += ' -M do'
size = str(net_utils.get_ping_payload_size(
mtu=mtu, ip_version=addr.version))
cmd += ' -c{0} -w{0} -s{1} {2}'.format(count, size, host)
return source.exec_command(cmd)
def ping_remote():
try:
result = ping_host(source, dest, nic=nic, mtu=mtu,
fragmentation=fragmentation)
except lib_exc.SSHExecCommandFailed:
LOG.warning('Failed to ping IP: %s via a ssh connection '
'from: %s.', dest, source.host)
return not should_succeed
LOG.debug('ping result: %s', result)
# Assert that the return traffic was from the correct
# source address.
from_source = 'from %s' % dest
self.assertIn(from_source, result)
return should_succeed
return test_utils.call_until_true(ping_remote,
CONF.validation.ping_timeout,
1)
def check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True):
self.assertTrue(self._check_remote_connectivity(
source, dest, should_succeed, nic, mtu, fragmentation))
def ping_ip_address(self, ip_address, should_succeed=True,
ping_timeout=None, mtu=None):
# the code is taken from tempest/scenario/manager.py in tempest git
timeout = ping_timeout or CONF.validation.ping_timeout
cmd = ['ping', '-c1', '-w1']
if mtu:
cmd += [
# don't fragment
'-M', 'do',
# ping receives just the size of ICMP payload
'-s', str(net_utils.get_ping_payload_size(mtu, 4))
]
cmd.append(ip_address)
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
return (proc.returncode == 0) == should_succeed
caller = test_utils.find_test_caller()
LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
' expected result is %(should_succeed)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'should_succeed':
'reachable' if should_succeed else 'unreachable'
})
result = test_utils.call_until_true(ping, timeout, 1)
LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
'ping result is %(result)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'result': 'expected' if result else 'unexpected'
})
return result

View File

@ -1,18 +0,0 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SERVER_STATUS_ACTIVE = 'ACTIVE'
DEFAULT_SECURITY_GROUP = 'default'
LIMIT_KILO_BITS_PER_SECOND = 1000
SOCKET_CONNECT_TIMEOUT = 60

View File

@ -1,33 +0,0 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import exceptions
TempestException = exceptions.TempestException
class QoSLimitReached(TempestException):
message = "Limit reached, limit = %(limit)d"
class SocketConnectionRefused(TempestException):
message = "Unable to connect to %(host)s port %(port)d:Connection Refused"
class ConnectionTimeoutException(TempestException):
message = "Timeout connecting to %(host)s port %(port)d"
class FileCreationFailedException(TempestException):
message = "File %(file)s has not been created or has the wrong size"

View File

@ -1,974 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_serialization import jsonutils
from six.moves.urllib import parse as urlparse
from tempest.lib.common import rest_client as service_client
from tempest.lib import exceptions as lib_exc
from neutron.tests.tempest import exceptions
class NetworkClientJSON(service_client.RestClient):
"""
Tempest REST client for Neutron. Uses v2 of the Neutron API, since the
V1 API has been removed from the code base.
Implements create, delete, update, list and show for the basic Neutron
abstractions (networks, sub-networks, routers, ports and floating IP):
Implements add/remove interface to router using subnet ID / port ID
It also implements list, show, update and reset for OpenStack Networking
quotas
"""
version = '2.0'
uri_prefix = "v2.0"
def get_uri(self, plural_name):
# get service prefix from resource name
# The following list represents resource names that do not require
# changing underscore to a hyphen
hyphen_exceptions = ["service_profiles"]
# the following map is used to construct proper URI
# for the given neutron resource
service_resource_prefix_map = {
'networks': '',
'subnets': '',
'subnetpools': '',
'ports': '',
'metering_labels': 'metering',
'metering_label_rules': 'metering',
'policies': 'qos',
'bandwidth_limit_rules': 'qos',
'minimum_bandwidth_rules': 'qos',
'rule_types': 'qos',
'rbac-policies': '',
}
service_prefix = service_resource_prefix_map.get(
plural_name)
if plural_name not in hyphen_exceptions:
plural_name = plural_name.replace("_", "-")
if service_prefix:
uri = '%s/%s/%s' % (self.uri_prefix, service_prefix,
plural_name)
else:
uri = '%s/%s' % (self.uri_prefix, plural_name)
return uri
def build_uri(self, plural_name, **kwargs):
uri = self.get_uri(plural_name)
if kwargs:
uri += '?' + urlparse.urlencode(kwargs, doseq=1)
return uri
def pluralize(self, resource_name):
# get plural from map or just add 's'
# map from resource name to a plural name
# needed only for those which can't be constructed as name + 's'
resource_plural_map = {
'security_groups': 'security_groups',
'security_group_rules': 'security_group_rules',
'quotas': 'quotas',
'qos_policy': 'policies',
'rbac_policy': 'rbac_policies',
'network_ip_availability': 'network_ip_availabilities',
}
return resource_plural_map.get(resource_name, resource_name + 's')
def get_uri_with_links(self, plural_name, uri):
resp, body = self.get(uri)
result = {plural_name: self.deserialize_list(body)}
links = self.deserialize_links(body)
self.expected_success(200, resp.status)
return links, service_client.ResponseBody(resp, result)
def _lister(self, plural_name):
def _list(**filters):
uri = self.build_uri(plural_name, **filters)
resp, body = self.get(uri)
result = {plural_name: self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, result)
return _list
def _deleter(self, resource_name):
def _delete(resource_id):
plural = self.pluralize(resource_name)
uri = '%s/%s' % (self.get_uri(plural), resource_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
return _delete
def _shower(self, resource_name):
def _show(resource_id, **fields):
# fields is a dict which key is 'fields' and value is a
# list of field's name. An example:
# {'fields': ['id', 'name']}
plural = self.pluralize(resource_name)
if 'details_quotas' in plural:
details, plural = plural.split('_')
uri = '%s/%s/%s' % (self.get_uri(plural),
resource_id, details)
else:
uri = '%s/%s' % (self.get_uri(plural), resource_id)
if fields:
uri += '?' + urlparse.urlencode(fields, doseq=1)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
return _show
def _creater(self, resource_name):
def _create(**kwargs):
plural = self.pluralize(resource_name)
uri = self.get_uri(plural)
post_data = self.serialize({resource_name: kwargs})
resp, body = self.post(uri, post_data)
body = self.deserialize_single(body)
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
return _create
def _updater(self, resource_name):
def _update(res_id, **kwargs):
headers = kwargs.pop('headers', {})
plural = self.pluralize(resource_name)
uri = '%s/%s' % (self.get_uri(plural), res_id)
post_data = self.serialize({resource_name: kwargs})
resp, body = self.put(uri, post_data, headers=headers)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
return _update
def __getattr__(self, name):
method_prefixes = ["list_", "delete_", "show_", "create_", "update_"]
method_functors = [self._lister,
self._deleter,
self._shower,
self._creater,
self._updater]
for index, prefix in enumerate(method_prefixes):
prefix_len = len(prefix)
if name[:prefix_len] == prefix:
return method_functors[index](name[prefix_len:])
raise AttributeError(name)
# Subnetpool methods
def create_subnetpool(self, name, **kwargs):
subnetpool_data = {'name': name}
for arg in kwargs:
subnetpool_data[arg] = kwargs[arg]
post_data = {'subnetpool': subnetpool_data}
body = self.serialize_list(post_data, "subnetpools", "subnetpool")
uri = self.get_uri("subnetpools")
resp, body = self.post(uri, body)
body = {'subnetpool': self.deserialize_list(body)}
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def get_subnetpool(self, id):
uri = self.get_uri("subnetpools")
subnetpool_uri = '%s/%s' % (uri, id)
resp, body = self.get(subnetpool_uri)
body = {'subnetpool': self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def delete_subnetpool(self, id):
uri = self.get_uri("subnetpools")
subnetpool_uri = '%s/%s' % (uri, id)
resp, body = self.delete(subnetpool_uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_subnetpools(self, **filters):
uri = self.get_uri("subnetpools")
if filters:
uri = '?'.join([uri, urlparse.urlencode(filters)])
resp, body = self.get(uri)
body = {'subnetpools': self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def update_subnetpool(self, id, **kwargs):
subnetpool_data = {}
for arg in kwargs:
subnetpool_data[arg] = kwargs[arg]
post_data = {'subnetpool': subnetpool_data}
body = self.serialize_list(post_data, "subnetpools", "subnetpool")
uri = self.get_uri("subnetpools")
subnetpool_uri = '%s/%s' % (uri, id)
resp, body = self.put(subnetpool_uri, body)
body = {'subnetpool': self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
# Common methods that are hard to automate
def create_bulk_network(self, names, shared=False):
network_list = [{'name': name, 'shared': shared} for name in names]
post_data = {'networks': network_list}
body = self.serialize_list(post_data, "networks", "network")
uri = self.get_uri("networks")
resp, body = self.post(uri, body)
body = {'networks': self.deserialize_list(body)}
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def create_bulk_subnet(self, subnet_list):
post_data = {'subnets': subnet_list}
body = self.serialize_list(post_data, 'subnets', 'subnet')
uri = self.get_uri('subnets')
resp, body = self.post(uri, body)
body = {'subnets': self.deserialize_list(body)}
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def create_bulk_port(self, port_list):
post_data = {'ports': port_list}
body = self.serialize_list(post_data, 'ports', 'port')
uri = self.get_uri('ports')
resp, body = self.post(uri, body)
body = {'ports': self.deserialize_list(body)}
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def create_bulk_security_groups(self, security_group_list):
group_list = [{'security_group': {'name': name}}
for name in security_group_list]
post_data = {'security_groups': group_list}
body = self.serialize_list(post_data, 'security_groups',
'security_group')
uri = self.get_uri("security-groups")
resp, body = self.post(uri, body)
body = {'security_groups': self.deserialize_list(body)}
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def wait_for_resource_deletion(self, resource_type, id):
"""Waits for a resource to be deleted."""
start_time = int(time.time())
while True:
if self.is_resource_deleted(resource_type, id):
return
if int(time.time()) - start_time >= self.build_timeout:
raise exceptions.TimeoutException
time.sleep(self.build_interval)
def is_resource_deleted(self, resource_type, id):
method = 'show_' + resource_type
try:
getattr(self, method)(id)
except AttributeError:
raise Exception("Unknown resource type %s " % resource_type)
except lib_exc.NotFound:
return True
return False
def deserialize_single(self, body):
return jsonutils.loads(body)
def deserialize_list(self, body):
res = jsonutils.loads(body)
# expecting response in form
# {'resources': [ res1, res2] } => when pagination disabled
# {'resources': [..], 'resources_links': {}} => if pagination enabled
for k in res.keys():
if k.endswith("_links"):
continue
return res[k]
def deserialize_links(self, body):
res = jsonutils.loads(body)
# expecting response in form
# {'resources': [ res1, res2] } => when pagination disabled
# {'resources': [..], 'resources_links': {}} => if pagination enabled
for k in res.keys():
if k.endswith("_links"):
return {
link['rel']: link['href']
for link in res[k]
}
return {}
def serialize(self, data):
return jsonutils.dumps(data)
def serialize_list(self, data, root=None, item=None):
return self.serialize(data)
def update_quotas(self, tenant_id, **kwargs):
put_body = {'quota': kwargs}
body = jsonutils.dumps(put_body)
uri = '%s/quotas/%s' % (self.uri_prefix, tenant_id)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body['quota'])
def reset_quotas(self, tenant_id):
uri = '%s/quotas/%s' % (self.uri_prefix, tenant_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def create_router(self, name, admin_state_up=True, **kwargs):
post_body = {'router': kwargs}
post_body['router']['name'] = name
post_body['router']['admin_state_up'] = admin_state_up
body = jsonutils.dumps(post_body)
uri = '%s/routers' % (self.uri_prefix)
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def _update_router(self, router_id, set_enable_snat, **kwargs):
uri = '%s/routers/%s' % (self.uri_prefix, router_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
update_body = {}
update_body['name'] = kwargs.get('name', body['router']['name'])
update_body['admin_state_up'] = kwargs.get(
'admin_state_up', body['router']['admin_state_up'])
if 'description' in kwargs:
update_body['description'] = kwargs['description']
cur_gw_info = body['router']['external_gateway_info']
if cur_gw_info:
# TODO(kevinbenton): setting the external gateway info is not
# allowed for a regular tenant. If the ability to update is also
# merged, a test case for this will need to be added similar to
# the SNAT case.
cur_gw_info.pop('external_fixed_ips', None)
if not set_enable_snat:
cur_gw_info.pop('enable_snat', None)
update_body['external_gateway_info'] = kwargs.get(
'external_gateway_info', body['router']['external_gateway_info'])
if 'distributed' in kwargs:
update_body['distributed'] = kwargs['distributed']
if 'ha' in kwargs:
update_body['ha'] = kwargs['ha']
update_body = dict(router=update_body)
update_body = jsonutils.dumps(update_body)
resp, body = self.put(uri, update_body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def update_router(self, router_id, **kwargs):
"""Update a router leaving enable_snat to its default value."""
# If external_gateway_info contains enable_snat the request will fail
# with 404 unless executed with admin client, and therefore we instruct
# _update_router to not set this attribute
# NOTE(salv-orlando): The above applies as long as Neutron's default
# policy is to restrict enable_snat usage to admins only.
return self._update_router(router_id, set_enable_snat=False, **kwargs)
def update_router_with_snat_gw_info(self, router_id, **kwargs):
"""Update a router passing also the enable_snat attribute.
This method must be execute with admin credentials, otherwise the API
call will return a 404 error.
"""
return self._update_router(router_id, set_enable_snat=True, **kwargs)
def add_router_interface_with_subnet_id(self, router_id, subnet_id):
uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix,
router_id)
update_body = {"subnet_id": subnet_id}
update_body = jsonutils.dumps(update_body)
resp, body = self.put(uri, update_body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def add_router_interface_with_port_id(self, router_id, port_id):
uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix,
router_id)
update_body = {"port_id": port_id}
update_body = jsonutils.dumps(update_body)
resp, body = self.put(uri, update_body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def remove_router_interface_with_subnet_id(self, router_id, subnet_id):
uri = '%s/routers/%s/remove_router_interface' % (self.uri_prefix,
router_id)
update_body = {"subnet_id": subnet_id}
update_body = jsonutils.dumps(update_body)
resp, body = self.put(uri, update_body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def remove_router_interface_with_port_id(self, router_id, port_id):
uri = '%s/routers/%s/remove_router_interface' % (self.uri_prefix,
router_id)
update_body = {"port_id": port_id}
update_body = jsonutils.dumps(update_body)
resp, body = self.put(uri, update_body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_router_interfaces(self, uuid):
uri = '%s/ports?device_id=%s' % (self.uri_prefix, uuid)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def update_agent(self, agent_id, agent_info):
"""
:param agent_info: Agent update information.
E.g {"admin_state_up": True}
"""
uri = '%s/agents/%s' % (self.uri_prefix, agent_id)
agent = {"agent": agent_info}
body = jsonutils.dumps(agent)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_routers_on_l3_agent(self, agent_id):
uri = '%s/agents/%s/l3-routers' % (self.uri_prefix, agent_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_l3_agents_hosting_router(self, router_id):
uri = '%s/routers/%s/l3-agents' % (self.uri_prefix, router_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def add_router_to_l3_agent(self, agent_id, router_id):
uri = '%s/agents/%s/l3-routers' % (self.uri_prefix, agent_id)
post_body = {"router_id": router_id}
body = jsonutils.dumps(post_body)
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def remove_router_from_l3_agent(self, agent_id, router_id):
uri = '%s/agents/%s/l3-routers/%s' % (
self.uri_prefix, agent_id, router_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_dhcp_agent_hosting_network(self, network_id):
uri = '%s/networks/%s/dhcp-agents' % (self.uri_prefix, network_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_networks_hosted_by_one_dhcp_agent(self, agent_id):
uri = '%s/agents/%s/dhcp-networks' % (self.uri_prefix, agent_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def remove_network_from_dhcp_agent(self, agent_id, network_id):
uri = '%s/agents/%s/dhcp-networks/%s' % (self.uri_prefix, agent_id,
network_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def update_extra_routes(self, router_id, nexthop, destination):
uri = '%s/routers/%s' % (self.uri_prefix, router_id)
put_body = {
'router': {
'routes': [{'nexthop': nexthop,
"destination": destination}]
}
}
body = jsonutils.dumps(put_body)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def delete_extra_routes(self, router_id):
uri = '%s/routers/%s' % (self.uri_prefix, router_id)
null_routes = None
put_body = {
'router': {
'routes': null_routes
}
}
body = jsonutils.dumps(put_body)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def add_dhcp_agent_to_network(self, agent_id, network_id):
post_body = {'network_id': network_id}
body = jsonutils.dumps(post_body)
uri = '%s/agents/%s/dhcp-networks' % (self.uri_prefix, agent_id)
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_qos_policies(self, **filters):
if filters:
uri = '%s/qos/policies?%s' % (self.uri_prefix,
urlparse.urlencode(filters))
else:
uri = '%s/qos/policies' % self.uri_prefix
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def create_qos_policy(self, name, description=None, shared=False,
tenant_id=None, is_default=False):
uri = '%s/qos/policies' % self.uri_prefix
post_data = {
'policy': {
'name': name,
'shared': shared,
'is_default': is_default
}
}
if description is not None:
post_data['policy']['description'] = description
if tenant_id is not None:
post_data['policy']['tenant_id'] = tenant_id
resp, body = self.post(uri, self.serialize(post_data))
body = self.deserialize_single(body)
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def update_qos_policy(self, policy_id, **kwargs):
uri = '%s/qos/policies/%s' % (self.uri_prefix, policy_id)
post_data = self.serialize({'policy': kwargs})
resp, body = self.put(uri, post_data)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def create_bandwidth_limit_rule(self, policy_id, max_kbps,
max_burst_kbps, direction=None):
uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
self.uri_prefix, policy_id)
post_data = {
'bandwidth_limit_rule': {
'max_kbps': max_kbps,
'max_burst_kbps': max_burst_kbps
}
}
if direction:
post_data['bandwidth_limit_rule']['direction'] = direction
resp, body = self.post(uri, self.serialize(post_data))
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_bandwidth_limit_rules(self, policy_id):
uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
self.uri_prefix, policy_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def show_bandwidth_limit_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def update_bandwidth_limit_rule(self, policy_id, rule_id, **kwargs):
uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
if "direction" in kwargs and kwargs['direction'] is None:
kwargs.pop('direction')
post_data = {'bandwidth_limit_rule': kwargs}
resp, body = self.put(uri, jsonutils.dumps(post_data))
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def delete_bandwidth_limit_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def create_dscp_marking_rule(self, policy_id, dscp_mark):
uri = '%s/qos/policies/%s/dscp_marking_rules' % (
self.uri_prefix, policy_id)
post_data = self.serialize({
'dscp_marking_rule': {
'dscp_mark': dscp_mark
}
})
resp, body = self.post(uri, post_data)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_dscp_marking_rules(self, policy_id):
uri = '%s/qos/policies/%s/dscp_marking_rules' % (
self.uri_prefix, policy_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def show_dscp_marking_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/dscp_marking_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def update_dscp_marking_rule(self, policy_id, rule_id, **kwargs):
uri = '%s/qos/policies/%s/dscp_marking_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
post_data = {'dscp_marking_rule': kwargs}
resp, body = self.put(uri, jsonutils.dumps(post_data))
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def delete_dscp_marking_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/dscp_marking_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def create_minimum_bandwidth_rule(self, policy_id, direction,
min_kbps=None):
uri = '%s/qos/policies/%s/minimum_bandwidth_rules' % (
self.uri_prefix, policy_id)
data = {
'direction': direction,
}
if min_kbps is not None:
data['min_kbps'] = min_kbps
post_data = self.serialize({'minimum_bandwidth_rule': data})
resp, body = self.post(uri, post_data)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_minimum_bandwidth_rules(self, policy_id):
uri = '%s/qos/policies/%s/minimum_bandwidth_rules' % (
self.uri_prefix, policy_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def show_minimum_bandwidth_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def update_minimum_bandwidth_rule(self, policy_id, rule_id, **kwargs):
uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
post_data = {'minimum_bandwidth_rule': kwargs}
resp, body = self.put(uri, jsonutils.dumps(post_data))
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def delete_minimum_bandwidth_rule(self, policy_id, rule_id):
uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % (
self.uri_prefix, policy_id, rule_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_qos_rule_types(self):
uri = '%s/qos/rule-types' % self.uri_prefix
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def show_qos_rule_type(self, rule_type_name):
uri = '%s/qos/rule-types/%s' % (
self.uri_prefix, rule_type_name)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def create_trunk(self, parent_port_id, subports,
tenant_id=None, name=None, admin_state_up=None,
description=None):
uri = '%s/trunks' % self.uri_prefix
post_data = {
'trunk': {
'port_id': parent_port_id,
}
}
if subports is not None:
post_data['trunk']['sub_ports'] = subports
if tenant_id is not None:
post_data['trunk']['tenant_id'] = tenant_id
if name is not None:
post_data['trunk']['name'] = name
if description is not None:
post_data['trunk']['description'] = description
if admin_state_up is not None:
post_data['trunk']['admin_state_up'] = admin_state_up
resp, body = self.post(uri, self.serialize(post_data))
body = self.deserialize_single(body)
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def update_trunk(self, trunk_id, **kwargs):
put_body = {'trunk': kwargs}
body = jsonutils.dumps(put_body)
uri = '%s/trunks/%s' % (self.uri_prefix, trunk_id)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def show_trunk(self, trunk_id):
uri = '%s/trunks/%s' % (self.uri_prefix, trunk_id)
resp, body = self.get(uri)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def list_trunks(self, **kwargs):
uri = '%s/trunks' % self.uri_prefix
if kwargs:
uri += '?' + urlparse.urlencode(kwargs, doseq=1)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = self.deserialize_single(body)
return service_client.ResponseBody(resp, body)
def delete_trunk(self, trunk_id):
uri = '%s/trunks/%s' % (self.uri_prefix, trunk_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def _subports_action(self, action, trunk_id, subports):
uri = '%s/trunks/%s/%s' % (self.uri_prefix, trunk_id, action)
resp, body = self.put(uri, jsonutils.dumps({'sub_ports': subports}))
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def add_subports(self, trunk_id, subports):
return self._subports_action('add_subports', trunk_id, subports)
def remove_subports(self, trunk_id, subports):
return self._subports_action('remove_subports', trunk_id, subports)
def get_subports(self, trunk_id):
uri = '%s/trunks/%s/%s' % (self.uri_prefix, trunk_id, 'get_subports')
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def get_auto_allocated_topology(self, tenant_id=None):
uri = '%s/auto-allocated-topology/%s' % (self.uri_prefix, tenant_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def delete_auto_allocated_topology(self, tenant_id=None):
uri = '%s/auto-allocated-topology/%s' % (self.uri_prefix, tenant_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def create_flavor_service_profile(self, flavor_id, service_profile_id):
body = jsonutils.dumps({'service_profile': {'id': service_profile_id}})
uri = '%s/flavors/%s/service_profiles' % (self.uri_prefix, flavor_id)
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_flavor_service_profiles(self, flavor_id):
uri = '%s/flavors/%s/service_profiles' % (self.uri_prefix, flavor_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def delete_flavor_service_profile(self, flavor_id, service_profile_id):
uri = '%s/flavors/%s/service_profiles/%s' % (self.uri_prefix,
flavor_id,
service_profile_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def create_security_group_rule(self, direction, security_group_id,
**kwargs):
post_body = {'security_group_rule': kwargs}
post_body['security_group_rule']['direction'] = direction
post_body['security_group_rule'][
'security_group_id'] = security_group_id
body = jsonutils.dumps(post_body)
uri = '%s/security-group-rules' % self.uri_prefix
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def list_security_groups(self, **kwargs):
post_body = {'security_groups': kwargs}
body = jsonutils.dumps(post_body)
uri = '%s/security-groups' % self.uri_prefix
if kwargs:
uri += '?' + urlparse.urlencode(kwargs, doseq=1)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def delete_security_group(self, security_group_id):
uri = '%s/security-groups/%s' % (
self.uri_prefix, security_group_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_ports(self, **kwargs):
post_body = {'ports': kwargs}
body = jsonutils.dumps(post_body)
uri = '%s/ports' % self.uri_prefix
if kwargs:
uri += '?' + urlparse.urlencode(kwargs, doseq=1)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def create_floatingip(self, floating_network_id, **kwargs):
post_body = {'floatingip': {
'floating_network_id': floating_network_id}}
if kwargs:
post_body['floatingip'].update(kwargs)
body = jsonutils.dumps(post_body)
uri = '%s/floatingips' % self.uri_prefix
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def create_network_keystone_v3(self, name, project_id, tenant_id=None):
uri = '%s/networks' % self.uri_prefix
post_data = {
'network': {
'name': name,
'project_id': project_id
}
}
if tenant_id is not None:
post_data['network']['tenant_id'] = tenant_id
resp, body = self.post(uri, self.serialize(post_data))
body = self.deserialize_single(body)
self.expected_success(201, resp.status)
return service_client.ResponseBody(resp, body)
def list_extensions(self, **filters):
uri = self.get_uri("extensions")
if filters:
uri = '?'.join([uri, urlparse.urlencode(filters)])
resp, body = self.get(uri)
body = {'extensions': self.deserialize_list(body)}
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
def get_tags(self, resource_type, resource_id):
uri = '%s/%s/%s/tags' % (
self.uri_prefix, resource_type, resource_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def get_tag(self, resource_type, resource_id, tag):
uri = '%s/%s/%s/tags/%s' % (
self.uri_prefix, resource_type, resource_id, tag)
resp, body = self.get(uri)
self.expected_success(204, resp.status)
def update_tag(self, resource_type, resource_id, tag):
uri = '%s/%s/%s/tags/%s' % (
self.uri_prefix, resource_type, resource_id, tag)
resp, body = self.put(uri, None)
self.expected_success(201, resp.status)
def update_tags(self, resource_type, resource_id, tags):
uri = '%s/%s/%s/tags' % (
self.uri_prefix, resource_type, resource_id)
req_body = jsonutils.dumps({'tags': tags})
resp, body = self.put(uri, req_body)
self.expected_success(200, resp.status)
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
def delete_tags(self, resource_type, resource_id):
uri = '%s/%s/%s/tags' % (
self.uri_prefix, resource_type, resource_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
def delete_tag(self, resource_type, resource_id, tag):
uri = '%s/%s/%s/tags/%s' % (
self.uri_prefix, resource_type, resource_id, tag)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)