From 3d0f9f3f4f220a36c92c33cec68c863a88a67491 Mon Sep 17 00:00:00 2001 From: Miguel Lavalle <miguel.lavalle@huawei.com> Date: Tue, 23 Jan 2018 12:48:45 -0600 Subject: [PATCH] Removing remaining Tempest bits All the Neutron Stadium projects are importing Tempest code from the neutron-tempest-plugin repo. We can now safely remove the remaining bits of Tempest code. Depends-On: I34efe893b93240c4689de1dda978209926022dfd Change-Id: I2cf6e1f60926f36384bf056066bcdd4e0301494f --- neutron/tests/tempest/__init__.py | 0 neutron/tests/tempest/api/__init__.py | 0 neutron/tests/tempest/api/base.py | 834 --------------- neutron/tests/tempest/api/clients.py | 91 -- neutron/tests/tempest/common/__init__.py | 0 neutron/tests/tempest/common/ssh.py | 24 - .../tests/tempest/common/tempest_fixtures.py | 21 - neutron/tests/tempest/exceptions.py | 30 - neutron/tests/tempest/scenario/__init__.py | 0 neutron/tests/tempest/scenario/base.py | 317 ------ neutron/tests/tempest/scenario/constants.py | 18 - neutron/tests/tempest/scenario/exceptions.py | 33 - neutron/tests/tempest/services/__init__.py | 0 .../tempest/services/network/__init__.py | 0 .../tempest/services/network/json/__init__.py | 0 .../services/network/json/network_client.py | 974 ------------------ 16 files changed, 2342 deletions(-) delete mode 100644 neutron/tests/tempest/__init__.py delete mode 100644 neutron/tests/tempest/api/__init__.py delete mode 100644 neutron/tests/tempest/api/base.py delete mode 100644 neutron/tests/tempest/api/clients.py delete mode 100644 neutron/tests/tempest/common/__init__.py delete mode 100644 neutron/tests/tempest/common/ssh.py delete mode 100644 neutron/tests/tempest/common/tempest_fixtures.py delete mode 100644 neutron/tests/tempest/exceptions.py delete mode 100644 neutron/tests/tempest/scenario/__init__.py delete mode 100644 neutron/tests/tempest/scenario/base.py delete mode 100644 neutron/tests/tempest/scenario/constants.py delete mode 100644 neutron/tests/tempest/scenario/exceptions.py delete mode 100644 neutron/tests/tempest/services/__init__.py delete mode 100644 neutron/tests/tempest/services/network/__init__.py delete mode 100644 neutron/tests/tempest/services/network/json/__init__.py delete mode 100644 neutron/tests/tempest/services/network/json/network_client.py diff --git a/neutron/tests/tempest/__init__.py b/neutron/tests/tempest/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/tempest/api/__init__.py b/neutron/tests/tempest/api/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/tempest/api/base.py b/neutron/tests/tempest/api/base.py deleted file mode 100644 index 7ee6601ad26..00000000000 --- a/neutron/tests/tempest/api/base.py +++ /dev/null @@ -1,834 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import functools -import math - -import netaddr -from neutron_lib import constants as const -from tempest.common import utils as tutils -from tempest.lib.common.utils import data_utils -from tempest.lib import exceptions as lib_exc -from tempest import test - -from neutron.common import constants -from neutron.common import utils -from neutron.tests.tempest.api import clients -from neutron.tests.tempest import config -from neutron.tests.tempest import exceptions - -CONF = config.CONF - - -class BaseNetworkTest(test.BaseTestCase): - - """ - Base class for the Neutron tests that use the Tempest Neutron REST client - - Per the Neutron API Guide, API v1.x was removed from the source code tree - (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html) - Therefore, v2.x of the Neutron API is assumed. It is also assumed that the - following options are defined in the [network] section of etc/tempest.conf: - - project_network_cidr with a block of cidr's from which smaller blocks - can be allocated for tenant networks - - project_network_mask_bits with the mask bits to be used to partition - the block defined by tenant-network_cidr - - Finally, it is assumed that the following option is defined in the - [service_available] section of etc/tempest.conf - - neutron as True - """ - - force_tenant_isolation = False - credentials = ['primary'] - - # Default to ipv4. - _ip_version = 4 - - @classmethod - def get_client_manager(cls, credential_type=None, roles=None, - force_new=None): - manager = super(BaseNetworkTest, cls).get_client_manager( - credential_type=credential_type, - roles=roles, - force_new=force_new - ) - # Neutron uses a different clients manager than the one in the Tempest - return clients.Manager(manager.credentials) - - @classmethod - def skip_checks(cls): - super(BaseNetworkTest, cls).skip_checks() - if not CONF.service_available.neutron: - raise cls.skipException("Neutron support is required") - if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6: - raise cls.skipException("IPv6 Tests are disabled.") - for req_ext in getattr(cls, 'required_extensions', []): - if not tutils.is_extension_enabled(req_ext, 'network'): - msg = "%s extension not enabled." % req_ext - raise cls.skipException(msg) - - @classmethod - def setup_credentials(cls): - # Create no network resources for these test. - cls.set_network_resources() - super(BaseNetworkTest, cls).setup_credentials() - - @classmethod - def setup_clients(cls): - super(BaseNetworkTest, cls).setup_clients() - cls.client = cls.os_primary.network_client - - @classmethod - def resource_setup(cls): - super(BaseNetworkTest, cls).resource_setup() - - cls.networks = [] - cls.admin_networks = [] - cls.subnets = [] - cls.admin_subnets = [] - cls.ports = [] - cls.routers = [] - cls.floating_ips = [] - cls.metering_labels = [] - cls.service_profiles = [] - cls.flavors = [] - cls.metering_label_rules = [] - cls.qos_rules = [] - cls.qos_policies = [] - cls.ethertype = "IPv" + str(cls._ip_version) - cls.address_scopes = [] - cls.admin_address_scopes = [] - cls.subnetpools = [] - cls.admin_subnetpools = [] - cls.security_groups = [] - cls.projects = [] - - @classmethod - def resource_cleanup(cls): - if CONF.service_available.neutron: - # Clean up floating IPs - for floating_ip in cls.floating_ips: - cls._try_delete_resource(cls.client.delete_floatingip, - floating_ip['id']) - # Clean up routers - for router in cls.routers: - cls._try_delete_resource(cls.delete_router, - router) - # Clean up metering label rules - for metering_label_rule in cls.metering_label_rules: - cls._try_delete_resource( - cls.admin_client.delete_metering_label_rule, - metering_label_rule['id']) - # Clean up metering labels - for metering_label in cls.metering_labels: - cls._try_delete_resource( - cls.admin_client.delete_metering_label, - metering_label['id']) - # Clean up flavors - for flavor in cls.flavors: - cls._try_delete_resource( - cls.admin_client.delete_flavor, - flavor['id']) - # Clean up service profiles - for service_profile in cls.service_profiles: - cls._try_delete_resource( - cls.admin_client.delete_service_profile, - service_profile['id']) - # Clean up ports - for port in cls.ports: - cls._try_delete_resource(cls.client.delete_port, - port['id']) - # Clean up subnets - for subnet in cls.subnets: - cls._try_delete_resource(cls.client.delete_subnet, - subnet['id']) - # Clean up admin subnets - for subnet in cls.admin_subnets: - cls._try_delete_resource(cls.admin_client.delete_subnet, - subnet['id']) - # Clean up networks - for network in cls.networks: - cls._try_delete_resource(cls.client.delete_network, - network['id']) - - # Clean up admin networks - for network in cls.admin_networks: - cls._try_delete_resource(cls.admin_client.delete_network, - network['id']) - - # Clean up security groups - for secgroup in cls.security_groups: - cls._try_delete_resource(cls.client.delete_security_group, - secgroup['id']) - - for subnetpool in cls.subnetpools: - cls._try_delete_resource(cls.client.delete_subnetpool, - subnetpool['id']) - - for subnetpool in cls.admin_subnetpools: - cls._try_delete_resource(cls.admin_client.delete_subnetpool, - subnetpool['id']) - - for address_scope in cls.address_scopes: - cls._try_delete_resource(cls.client.delete_address_scope, - address_scope['id']) - - for address_scope in cls.admin_address_scopes: - cls._try_delete_resource( - cls.admin_client.delete_address_scope, - address_scope['id']) - - for project in cls.projects: - cls._try_delete_resource( - cls.identity_admin_client.delete_project, - project['id']) - - # Clean up QoS rules - for qos_rule in cls.qos_rules: - cls._try_delete_resource(cls.admin_client.delete_qos_rule, - qos_rule['id']) - # Clean up QoS policies - # as all networks and ports are already removed, QoS policies - # shouldn't be "in use" - for qos_policy in cls.qos_policies: - cls._try_delete_resource(cls.admin_client.delete_qos_policy, - qos_policy['id']) - - super(BaseNetworkTest, cls).resource_cleanup() - - @classmethod - def _try_delete_resource(cls, delete_callable, *args, **kwargs): - """Cleanup resources in case of test-failure - - Some resources are explicitly deleted by the test. - If the test failed to delete a resource, this method will execute - the appropriate delete methods. Otherwise, the method ignores NotFound - exceptions thrown for resources that were correctly deleted by the - test. - - :param delete_callable: delete method - :param args: arguments for delete method - :param kwargs: keyword arguments for delete method - """ - try: - delete_callable(*args, **kwargs) - # if resource is not found, this means it was deleted in the test - except lib_exc.NotFound: - pass - - @classmethod - def create_network(cls, network_name=None, client=None, **kwargs): - """Wrapper utility that returns a test network.""" - network_name = network_name or data_utils.rand_name('test-network-') - - client = client or cls.client - body = client.create_network(name=network_name, **kwargs) - network = body['network'] - if client is cls.client: - cls.networks.append(network) - else: - cls.admin_networks.append(network) - return network - - @classmethod - def create_shared_network(cls, network_name=None, **post_body): - network_name = network_name or data_utils.rand_name('sharednetwork-') - post_body.update({'name': network_name, 'shared': True}) - body = cls.admin_client.create_network(**post_body) - network = body['network'] - cls.admin_networks.append(network) - return network - - @classmethod - def create_network_keystone_v3(cls, network_name=None, project_id=None, - tenant_id=None, client=None): - """Wrapper utility that creates a test network with project_id.""" - client = client or cls.client - network_name = network_name or data_utils.rand_name( - 'test-network-with-project_id') - project_id = cls.client.tenant_id - body = client.create_network_keystone_v3(network_name, project_id, - tenant_id) - network = body['network'] - if client is cls.client: - cls.networks.append(network) - else: - cls.admin_networks.append(network) - return network - - @classmethod - def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None, - ip_version=None, client=None, **kwargs): - """Wrapper utility that returns a test subnet.""" - - # allow tests to use admin client - if not client: - client = cls.client - - # The cidr and mask_bits depend on the ip version. - ip_version = ip_version if ip_version is not None else cls._ip_version - gateway_not_set = gateway == '' - if ip_version == 4: - cidr = cidr or netaddr.IPNetwork( - config.safe_get_config_value( - 'network', 'project_network_cidr')) - mask_bits = ( - mask_bits or config.safe_get_config_value( - 'network', 'project_network_mask_bits')) - elif ip_version == 6: - cidr = ( - cidr or netaddr.IPNetwork( - config.safe_get_config_value( - 'network', 'project_network_v6_cidr'))) - mask_bits = ( - mask_bits or config.safe_get_config_value( - 'network', 'project_network_v6_mask_bits')) - # Find a cidr that is not in use yet and create a subnet with it - for subnet_cidr in cidr.subnet(mask_bits): - if gateway_not_set: - gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1) - else: - gateway_ip = gateway - try: - body = client.create_subnet( - network_id=network['id'], - cidr=str(subnet_cidr), - ip_version=ip_version, - gateway_ip=gateway_ip, - **kwargs) - break - except lib_exc.BadRequest as e: - is_overlapping_cidr = 'overlaps with another subnet' in str(e) - if not is_overlapping_cidr: - raise - else: - message = 'Available CIDR for subnet creation could not be found' - raise ValueError(message) - subnet = body['subnet'] - if client is cls.client: - cls.subnets.append(subnet) - else: - cls.admin_subnets.append(subnet) - return subnet - - @classmethod - def create_port(cls, network, **kwargs): - """Wrapper utility that returns a test port.""" - body = cls.client.create_port(network_id=network['id'], - **kwargs) - port = body['port'] - cls.ports.append(port) - return port - - @classmethod - def update_port(cls, port, **kwargs): - """Wrapper utility that updates a test port.""" - body = cls.client.update_port(port['id'], - **kwargs) - return body['port'] - - @classmethod - def _create_router_with_client( - cls, client, router_name=None, admin_state_up=False, - external_network_id=None, enable_snat=None, **kwargs - ): - ext_gw_info = {} - if external_network_id: - ext_gw_info['network_id'] = external_network_id - if enable_snat is not None: - ext_gw_info['enable_snat'] = enable_snat - body = client.create_router( - router_name, external_gateway_info=ext_gw_info, - admin_state_up=admin_state_up, **kwargs) - router = body['router'] - cls.routers.append(router) - return router - - @classmethod - def create_router(cls, *args, **kwargs): - return cls._create_router_with_client(cls.client, *args, **kwargs) - - @classmethod - def create_admin_router(cls, *args, **kwargs): - return cls._create_router_with_client(cls.os_admin.network_client, - *args, **kwargs) - - @classmethod - def create_floatingip(cls, external_network_id): - """Wrapper utility that returns a test floating IP.""" - body = cls.client.create_floatingip( - floating_network_id=external_network_id) - fip = body['floatingip'] - cls.floating_ips.append(fip) - return fip - - @classmethod - def create_router_interface(cls, router_id, subnet_id): - """Wrapper utility that returns a router interface.""" - interface = cls.client.add_router_interface_with_subnet_id( - router_id, subnet_id) - return interface - - @classmethod - def get_supported_qos_rule_types(cls): - body = cls.client.list_qos_rule_types() - return [rule_type['type'] for rule_type in body['rule_types']] - - @classmethod - def create_qos_policy(cls, name, description=None, shared=False, - tenant_id=None, is_default=False): - """Wrapper utility that returns a test QoS policy.""" - body = cls.admin_client.create_qos_policy( - name, description, shared, tenant_id, is_default) - qos_policy = body['policy'] - cls.qos_policies.append(qos_policy) - return qos_policy - - @classmethod - def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps, - max_burst_kbps, - direction=const.EGRESS_DIRECTION): - """Wrapper utility that returns a test QoS bandwidth limit rule.""" - body = cls.admin_client.create_bandwidth_limit_rule( - policy_id, max_kbps, max_burst_kbps, direction) - qos_rule = body['bandwidth_limit_rule'] - cls.qos_rules.append(qos_rule) - return qos_rule - - @classmethod - def delete_router(cls, router): - body = cls.client.list_router_interfaces(router['id']) - interfaces = [port for port in body['ports'] - if port['device_owner'] in const.ROUTER_INTERFACE_OWNERS] - for i in interfaces: - try: - cls.client.remove_router_interface_with_subnet_id( - router['id'], i['fixed_ips'][0]['subnet_id']) - except lib_exc.NotFound: - pass - cls.client.delete_router(router['id']) - - @classmethod - def create_address_scope(cls, name, is_admin=False, **kwargs): - if is_admin: - body = cls.admin_client.create_address_scope(name=name, **kwargs) - cls.admin_address_scopes.append(body['address_scope']) - else: - body = cls.client.create_address_scope(name=name, **kwargs) - cls.address_scopes.append(body['address_scope']) - return body['address_scope'] - - @classmethod - def create_subnetpool(cls, name, is_admin=False, **kwargs): - if is_admin: - body = cls.admin_client.create_subnetpool(name, **kwargs) - cls.admin_subnetpools.append(body['subnetpool']) - else: - body = cls.client.create_subnetpool(name, **kwargs) - cls.subnetpools.append(body['subnetpool']) - return body['subnetpool'] - - @classmethod - def create_project(cls, name=None, description=None): - test_project = name or data_utils.rand_name('test_project_') - test_description = description or data_utils.rand_name('desc_') - project = cls.identity_admin_client.create_project( - name=test_project, - description=test_description)['project'] - cls.projects.append(project) - return project - - @classmethod - def create_security_group(cls, name, **kwargs): - body = cls.client.create_security_group(name=name, **kwargs) - cls.security_groups.append(body['security_group']) - return body['security_group'] - - -class BaseAdminNetworkTest(BaseNetworkTest): - - credentials = ['primary', 'admin'] - - @classmethod - def setup_clients(cls): - super(BaseAdminNetworkTest, cls).setup_clients() - cls.admin_client = cls.os_admin.network_client - cls.identity_admin_client = cls.os_admin.projects_client - - @classmethod - def create_metering_label(cls, name, description): - """Wrapper utility that returns a test metering label.""" - body = cls.admin_client.create_metering_label( - description=description, - name=data_utils.rand_name("metering-label")) - metering_label = body['metering_label'] - cls.metering_labels.append(metering_label) - return metering_label - - @classmethod - def create_metering_label_rule(cls, remote_ip_prefix, direction, - metering_label_id): - """Wrapper utility that returns a test metering label rule.""" - body = cls.admin_client.create_metering_label_rule( - remote_ip_prefix=remote_ip_prefix, direction=direction, - metering_label_id=metering_label_id) - metering_label_rule = body['metering_label_rule'] - cls.metering_label_rules.append(metering_label_rule) - return metering_label_rule - - @classmethod - def create_flavor(cls, name, description, service_type): - """Wrapper utility that returns a test flavor.""" - body = cls.admin_client.create_flavor( - description=description, service_type=service_type, - name=name) - flavor = body['flavor'] - cls.flavors.append(flavor) - return flavor - - @classmethod - def create_service_profile(cls, description, metainfo, driver): - """Wrapper utility that returns a test service profile.""" - body = cls.admin_client.create_service_profile( - driver=driver, metainfo=metainfo, description=description) - service_profile = body['service_profile'] - cls.service_profiles.append(service_profile) - return service_profile - - @classmethod - def get_unused_ip(cls, net_id, ip_version=None): - """Get an unused ip address in a allocation pool of net""" - body = cls.admin_client.list_ports(network_id=net_id) - ports = body['ports'] - used_ips = [] - for port in ports: - used_ips.extend( - [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']]) - body = cls.admin_client.list_subnets(network_id=net_id) - subnets = body['subnets'] - - for subnet in subnets: - if ip_version and subnet['ip_version'] != ip_version: - continue - cidr = subnet['cidr'] - allocation_pools = subnet['allocation_pools'] - iterators = [] - if allocation_pools: - for allocation_pool in allocation_pools: - iterators.append(netaddr.iter_iprange( - allocation_pool['start'], allocation_pool['end'])) - else: - net = netaddr.IPNetwork(cidr) - - def _iterip(): - for ip in net: - if ip not in (net.network, net.broadcast): - yield ip - iterators.append(iter(_iterip())) - - for iterator in iterators: - for ip in iterator: - if str(ip) not in used_ips: - return str(ip) - - message = ( - "net(%s) has no usable IP address in allocation pools" % net_id) - raise exceptions.InvalidConfiguration(message) - - -def require_qos_rule_type(rule_type): - def decorator(f): - @functools.wraps(f) - def wrapper(self, *func_args, **func_kwargs): - if rule_type not in self.get_supported_qos_rule_types(): - raise self.skipException( - "%s rule type is required." % rule_type) - return f(self, *func_args, **func_kwargs) - return wrapper - return decorator - - -def _require_sorting(f): - @functools.wraps(f) - def inner(self, *args, **kwargs): - if not tutils.is_extension_enabled("sorting", "network"): - self.skipTest('Sorting feature is required') - return f(self, *args, **kwargs) - return inner - - -def _require_pagination(f): - @functools.wraps(f) - def inner(self, *args, **kwargs): - if not tutils.is_extension_enabled("pagination", "network"): - self.skipTest('Pagination feature is required') - return f(self, *args, **kwargs) - return inner - - -class BaseSearchCriteriaTest(BaseNetworkTest): - - # This should be defined by subclasses to reflect resource name to test - resource = None - - field = 'name' - - # NOTE(ihrachys): some names, like those starting with an underscore (_) - # are sorted differently depending on whether the plugin implements native - # sorting support, or not. So we avoid any such cases here, sticking to - # alphanumeric. Also test a case when there are multiple resources with the - # same name - resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',) - - force_tenant_isolation = True - - list_kwargs = {} - - list_as_admin = False - - def assertSameOrder(self, original, actual): - # gracefully handle iterators passed - original = list(original) - actual = list(actual) - self.assertEqual(len(original), len(actual)) - for expected, res in zip(original, actual): - self.assertEqual(expected[self.field], res[self.field]) - - @utils.classproperty - def plural_name(self): - return '%ss' % self.resource - - @property - def list_client(self): - return self.admin_client if self.list_as_admin else self.client - - def list_method(self, *args, **kwargs): - method = getattr(self.list_client, 'list_%s' % self.plural_name) - kwargs.update(self.list_kwargs) - return method(*args, **kwargs) - - def get_bare_url(self, url): - base_url = self.client.base_url - self.assertTrue(url.startswith(base_url)) - return url[len(base_url):] - - @classmethod - def _extract_resources(cls, body): - return body[cls.plural_name] - - def _test_list_sorts(self, direction): - sort_args = { - 'sort_dir': direction, - 'sort_key': self.field - } - body = self.list_method(**sort_args) - resources = self._extract_resources(body) - self.assertNotEmpty( - resources, "%s list returned is empty" % self.resource) - retrieved_names = [res[self.field] for res in resources] - expected = sorted(retrieved_names) - if direction == constants.SORT_DIRECTION_DESC: - expected = list(reversed(expected)) - self.assertEqual(expected, retrieved_names) - - @_require_sorting - def _test_list_sorts_asc(self): - self._test_list_sorts(constants.SORT_DIRECTION_ASC) - - @_require_sorting - def _test_list_sorts_desc(self): - self._test_list_sorts(constants.SORT_DIRECTION_DESC) - - @_require_pagination - def _test_list_pagination(self): - for limit in range(1, len(self.resource_names) + 1): - pagination_args = { - 'limit': limit, - } - body = self.list_method(**pagination_args) - resources = self._extract_resources(body) - self.assertEqual(limit, len(resources)) - - @_require_pagination - def _test_list_no_pagination_limit_0(self): - pagination_args = { - 'limit': 0, - } - body = self.list_method(**pagination_args) - resources = self._extract_resources(body) - self.assertGreaterEqual(len(resources), len(self.resource_names)) - - def _test_list_pagination_iteratively(self, lister): - # first, collect all resources for later comparison - sort_args = { - 'sort_dir': constants.SORT_DIRECTION_ASC, - 'sort_key': self.field - } - body = self.list_method(**sort_args) - expected_resources = self._extract_resources(body) - self.assertNotEmpty(expected_resources) - - resources = lister( - len(expected_resources), sort_args - ) - - # finally, compare that the list retrieved in one go is identical to - # the one containing pagination results - self.assertSameOrder(expected_resources, resources) - - def _list_all_with_marker(self, niterations, sort_args): - # paginate resources one by one, using last fetched resource as a - # marker - resources = [] - for i in range(niterations): - pagination_args = sort_args.copy() - pagination_args['limit'] = 1 - if resources: - pagination_args['marker'] = resources[-1]['id'] - body = self.list_method(**pagination_args) - resources_ = self._extract_resources(body) - self.assertEqual(1, len(resources_)) - resources.extend(resources_) - return resources - - @_require_pagination - @_require_sorting - def _test_list_pagination_with_marker(self): - self._test_list_pagination_iteratively(self._list_all_with_marker) - - def _list_all_with_hrefs(self, niterations, sort_args): - # paginate resources one by one, using next href links - resources = [] - prev_links = {} - - for i in range(niterations): - if prev_links: - uri = self.get_bare_url(prev_links['next']) - else: - sort_args.update(self.list_kwargs) - uri = self.list_client.build_uri( - self.plural_name, limit=1, **sort_args) - prev_links, body = self.list_client.get_uri_with_links( - self.plural_name, uri - ) - resources_ = self._extract_resources(body) - self.assertEqual(1, len(resources_)) - resources.extend(resources_) - - # The last element is empty and does not contain 'next' link - uri = self.get_bare_url(prev_links['next']) - prev_links, body = self.client.get_uri_with_links( - self.plural_name, uri - ) - self.assertNotIn('next', prev_links) - - # Now walk backwards and compare results - resources2 = [] - for i in range(niterations): - uri = self.get_bare_url(prev_links['previous']) - prev_links, body = self.list_client.get_uri_with_links( - self.plural_name, uri - ) - resources_ = self._extract_resources(body) - self.assertEqual(1, len(resources_)) - resources2.extend(resources_) - - self.assertSameOrder(resources, reversed(resources2)) - - return resources - - @_require_pagination - @_require_sorting - def _test_list_pagination_with_href_links(self): - self._test_list_pagination_iteratively(self._list_all_with_hrefs) - - @_require_pagination - @_require_sorting - def _test_list_pagination_page_reverse_with_href_links( - self, direction=constants.SORT_DIRECTION_ASC): - pagination_args = { - 'sort_dir': direction, - 'sort_key': self.field, - } - body = self.list_method(**pagination_args) - expected_resources = self._extract_resources(body) - - page_size = 2 - pagination_args['limit'] = page_size - - prev_links = {} - resources = [] - num_resources = len(expected_resources) - niterations = int(math.ceil(float(num_resources) / page_size)) - for i in range(niterations): - if prev_links: - uri = self.get_bare_url(prev_links['previous']) - else: - pagination_args.update(self.list_kwargs) - uri = self.list_client.build_uri( - self.plural_name, page_reverse=True, **pagination_args) - prev_links, body = self.list_client.get_uri_with_links( - self.plural_name, uri - ) - resources_ = self._extract_resources(body) - self.assertGreaterEqual(page_size, len(resources_)) - resources.extend(reversed(resources_)) - - self.assertSameOrder(expected_resources, reversed(resources)) - - @_require_pagination - @_require_sorting - def _test_list_pagination_page_reverse_asc(self): - self._test_list_pagination_page_reverse( - direction=constants.SORT_DIRECTION_ASC) - - @_require_pagination - @_require_sorting - def _test_list_pagination_page_reverse_desc(self): - self._test_list_pagination_page_reverse( - direction=constants.SORT_DIRECTION_DESC) - - def _test_list_pagination_page_reverse(self, direction): - pagination_args = { - 'sort_dir': direction, - 'sort_key': self.field, - 'limit': 3, - } - body = self.list_method(**pagination_args) - expected_resources = self._extract_resources(body) - - pagination_args['limit'] -= 1 - pagination_args['marker'] = expected_resources[-1]['id'] - pagination_args['page_reverse'] = True - body = self.list_method(**pagination_args) - - self.assertSameOrder( - # the last entry is not included in 2nd result when used as a - # marker - expected_resources[:-1], - self._extract_resources(body)) - - def _test_list_validation_filters(self): - validation_args = { - 'unknown_filter': 'value', - } - body = self.list_method(**validation_args) - resources = self._extract_resources(body) - for resource in resources: - self.assertIn(resource['name'], self.resource_names) diff --git a/neutron/tests/tempest/api/clients.py b/neutron/tests/tempest/api/clients.py deleted file mode 100644 index 949ce2ee2c0..00000000000 --- a/neutron/tests/tempest/api/clients.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from tempest.lib.services.compute import keypairs_client -from tempest.lib.services.compute import servers_client -from tempest.lib.services.identity.v2 import tenants_client -from tempest.lib.services.identity.v3 import projects_client -from tempest import manager - -from neutron.tests.tempest import config -from neutron.tests.tempest.services.network.json import network_client - -CONF = config.CONF - - -class Manager(manager.Manager): - """ - Top level manager for OpenStack tempest clients - """ - default_params = { - 'disable_ssl_certificate_validation': - CONF.identity.disable_ssl_certificate_validation, - 'ca_certs': CONF.identity.ca_certificates_file, - 'trace_requests': CONF.debug.trace_requests - } - - # NOTE: Tempest uses timeout values of compute API if project specific - # timeout values don't exist. - default_params_with_timeout_values = { - 'build_interval': CONF.compute.build_interval, - 'build_timeout': CONF.compute.build_timeout - } - default_params_with_timeout_values.update(default_params) - - def __init__(self, credentials=None, service=None): - super(Manager, self).__init__(credentials=credentials) - - self._set_identity_clients() - - self.network_client = network_client.NetworkClientJSON( - self.auth_provider, - CONF.network.catalog_type, - CONF.network.region or CONF.identity.region, - endpoint_type=CONF.network.endpoint_type, - build_interval=CONF.network.build_interval, - build_timeout=CONF.network.build_timeout, - **self.default_params) - - params = { - 'service': CONF.compute.catalog_type, - 'region': CONF.compute.region or CONF.identity.region, - 'endpoint_type': CONF.compute.endpoint_type, - 'build_interval': CONF.compute.build_interval, - 'build_timeout': CONF.compute.build_timeout - } - params.update(self.default_params) - - self.servers_client = servers_client.ServersClient( - self.auth_provider, - enable_instance_password=CONF.compute_feature_enabled - .enable_instance_password, - **params) - self.keypairs_client = keypairs_client.KeyPairsClient( - self.auth_provider, **params) - - def _set_identity_clients(self): - params = { - 'service': CONF.identity.catalog_type, - 'region': CONF.identity.region - } - params.update(self.default_params_with_timeout_values) - params_v2_admin = params.copy() - params_v2_admin['endpoint_type'] = CONF.identity.v2_admin_endpoint_type - # Client uses admin endpoint type of Keystone API v2 - self.tenants_client = tenants_client.TenantsClient(self.auth_provider, - **params_v2_admin) - # Client uses admin endpoint type of Keystone API v3 - self.projects_client = projects_client.ProjectsClient( - self.auth_provider, **params_v2_admin) diff --git a/neutron/tests/tempest/common/__init__.py b/neutron/tests/tempest/common/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/tempest/common/ssh.py b/neutron/tests/tempest/common/ssh.py deleted file mode 100644 index 095a12de391..00000000000 --- a/neutron/tests/tempest/common/ssh.py +++ /dev/null @@ -1,24 +0,0 @@ -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from tempest.lib.common import ssh - -from neutron.tests.tempest import config - - -class Client(ssh.Client): - def __init__(self, *args, **kwargs): - if 'timeout' not in kwargs: - kwargs['timeout'] = config.CONF.validation.ssh_timeout - super(Client, self).__init__(*args, **kwargs) diff --git a/neutron/tests/tempest/common/tempest_fixtures.py b/neutron/tests/tempest/common/tempest_fixtures.py deleted file mode 100644 index d416857ae63..00000000000 --- a/neutron/tests/tempest/common/tempest_fixtures.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2013 IBM Corp. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_concurrency.fixture import lockutils - - -class LockFixture(lockutils.LockFixture): - def __init__(self, name): - super(LockFixture, self).__init__(name, 'tempest-') diff --git a/neutron/tests/tempest/exceptions.py b/neutron/tests/tempest/exceptions.py deleted file mode 100644 index c9264ca1816..00000000000 --- a/neutron/tests/tempest/exceptions.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from tempest.lib import exceptions - -TempestException = exceptions.TempestException - - -class InvalidConfiguration(TempestException): - message = "Invalid Configuration" - - -class InvalidCredentials(TempestException): - message = "Invalid Credentials" - - -class InvalidServiceTag(TempestException): - message = "Invalid service tag" diff --git a/neutron/tests/tempest/scenario/__init__.py b/neutron/tests/tempest/scenario/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/tempest/scenario/base.py b/neutron/tests/tempest/scenario/base.py deleted file mode 100644 index e2436b94868..00000000000 --- a/neutron/tests/tempest/scenario/base.py +++ /dev/null @@ -1,317 +0,0 @@ -# Copyright 2016 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import subprocess - -import netaddr -from oslo_log import log -from tempest.common.utils import net_utils -from tempest.common import waiters -from tempest.lib.common.utils import data_utils -from tempest.lib.common.utils import test_utils -from tempest.lib import exceptions as lib_exc - -from neutron.tests.tempest.api import base as base_api -from neutron.tests.tempest.common import ssh -from neutron.tests.tempest import config -from neutron.tests.tempest.scenario import constants - -CONF = config.CONF - -LOG = log.getLogger(__name__) - - -class BaseTempestTestCase(base_api.BaseNetworkTest): - @classmethod - def resource_setup(cls): - super(BaseTempestTestCase, cls).resource_setup() - - cls.keypairs = [] - - @classmethod - def resource_cleanup(cls): - for keypair in cls.keypairs: - cls.os_primary.keypairs_client.delete_keypair( - keypair_name=keypair['name']) - - super(BaseTempestTestCase, cls).resource_cleanup() - - def create_server(self, flavor_ref, image_ref, key_name, networks, - name=None, security_groups=None): - """Create a server using tempest lib - All the parameters are the ones used in Compute API - - Args: - flavor_ref(str): The flavor of the server to be provisioned. - image_ref(str): The image of the server to be provisioned. - key_name(str): SSH key to to be used to connect to the - provisioned server. - networks(list): List of dictionaries where each represent - an interface to be attached to the server. For network - it should be {'uuid': network_uuid} and for port it should - be {'port': port_uuid} - name(str): Name of the server to be provisioned. - security_groups(list): List of dictionaries where - the keys is 'name' and the value is the name of - the security group. If it's not passed the default - security group will be used. - """ - - name = name or data_utils.rand_name('server-test') - if not security_groups: - security_groups = [{'name': 'default'}] - - server = self.os_primary.servers_client.create_server( - name=name, - flavorRef=flavor_ref, - imageRef=image_ref, - key_name=key_name, - networks=networks, - security_groups=security_groups) - - self.addCleanup(test_utils.call_and_ignore_notfound_exc, - waiters.wait_for_server_termination, - self.os_primary.servers_client, server['server']['id']) - self.addCleanup(test_utils.call_and_ignore_notfound_exc, - self.os_primary.servers_client.delete_server, - server['server']['id']) - return server - - @classmethod - def create_keypair(cls, client=None): - client = client or cls.os_primary.keypairs_client - name = data_utils.rand_name('keypair-test') - body = client.create_keypair(name=name) - cls.keypairs.append(body['keypair']) - return body['keypair'] - - @classmethod - def create_secgroup_rules(cls, rule_list, secgroup_id=None): - client = cls.os_primary.network_client - if not secgroup_id: - sgs = client.list_security_groups()['security_groups'] - for sg in sgs: - if sg['name'] == constants.DEFAULT_SECURITY_GROUP: - secgroup_id = sg['id'] - break - - for rule in rule_list: - direction = rule.pop('direction') - client.create_security_group_rule( - direction=direction, - security_group_id=secgroup_id, - **rule) - - @classmethod - def create_loginable_secgroup_rule(cls, secgroup_id=None): - """This rule is intended to permit inbound ssh - - Allowing ssh traffic traffic from all sources, so no group_id is - provided. - Setting a group_id would only permit traffic from ports - belonging to the same security group. - """ - - rule_list = [{'protocol': 'tcp', - 'direction': 'ingress', - 'port_range_min': 22, - 'port_range_max': 22, - 'remote_ip_prefix': '0.0.0.0/0'}] - cls.create_secgroup_rules(rule_list, secgroup_id=secgroup_id) - - @classmethod - def create_pingable_secgroup_rule(cls, secgroup_id=None): - """This rule is intended to permit inbound ping - """ - - rule_list = [{'protocol': 'icmp', - 'direction': 'ingress', - 'port_range_min': 8, # type - 'port_range_max': 0, # code - 'remote_ip_prefix': '0.0.0.0/0'}] - cls.create_secgroup_rules(rule_list, secgroup_id=secgroup_id) - - @classmethod - def create_router_by_client(cls, is_admin=False, **kwargs): - kwargs.update({'router_name': data_utils.rand_name('router'), - 'admin_state_up': True, - 'external_network_id': CONF.network.public_network_id}) - if not is_admin: - router = cls.create_router(**kwargs) - else: - router = cls.create_admin_router(**kwargs) - LOG.debug("Created router %s", router['name']) - cls.routers.append(router) - return router - - def create_and_associate_floatingip(self, port_id): - fip = self.os_primary.network_client.create_floatingip( - CONF.network.public_network_id, - port_id=port_id)['floatingip'] - self.floating_ips.append(fip) - return fip - - def setup_network_and_server(self, router=None, **kwargs): - """Create network resources and a server. - - Creating a network, subnet, router, keypair, security group - and a server. - """ - self.network = self.create_network() - LOG.debug("Created network %s", self.network['name']) - self.subnet = self.create_subnet(self.network) - LOG.debug("Created subnet %s", self.subnet['id']) - - secgroup = self.os_primary.network_client.create_security_group( - name=data_utils.rand_name('secgroup')) - LOG.debug("Created security group %s", - secgroup['security_group']['name']) - self.security_groups.append(secgroup['security_group']) - if not router: - router = self.create_router_by_client(**kwargs) - self.create_router_interface(router['id'], self.subnet['id']) - self.keypair = self.create_keypair() - self.create_loginable_secgroup_rule( - secgroup_id=secgroup['security_group']['id']) - self.server = self.create_server( - flavor_ref=CONF.compute.flavor_ref, - image_ref=CONF.compute.image_ref, - key_name=self.keypair['name'], - networks=[{'uuid': self.network['id']}], - security_groups=[{'name': secgroup['security_group']['name']}]) - waiters.wait_for_server_status(self.os_primary.servers_client, - self.server['server']['id'], - constants.SERVER_STATUS_ACTIVE) - self.port = self.client.list_ports(network_id=self.network['id'], - device_id=self.server[ - 'server']['id'])['ports'][0] - self.fip = self.create_and_associate_floatingip(self.port['id']) - - def check_connectivity(self, host, ssh_user, ssh_key, servers=None): - ssh_client = ssh.Client(host, ssh_user, pkey=ssh_key) - try: - ssh_client.test_connection_auth() - except lib_exc.SSHTimeout as ssh_e: - LOG.debug(ssh_e) - self._log_console_output(servers) - raise - - def _log_console_output(self, servers=None): - if not CONF.compute_feature_enabled.console_output: - LOG.debug('Console output not supported, cannot log') - return - if not servers: - servers = self.os_primary.servers_client.list_servers() - servers = servers['servers'] - for server in servers: - try: - console_output = ( - self.os_primary.servers_client.get_console_output( - server['id'])['output']) - LOG.debug('Console output for %s\nbody=\n%s', - server['id'], console_output) - except lib_exc.NotFound: - LOG.debug("Server %s disappeared(deleted) while looking " - "for the console log", server['id']) - - def _check_remote_connectivity(self, source, dest, should_succeed=True, - nic=None, mtu=None, fragmentation=True): - """check ping server via source ssh connection - - :param source: RemoteClient: an ssh connection from which to ping - :param dest: and IP to ping against - :param should_succeed: boolean should ping succeed or not - :param nic: specific network interface to ping from - :param mtu: mtu size for the packet to be sent - :param fragmentation: Flag for packet fragmentation - :returns: boolean -- should_succeed == ping - :returns: ping is false if ping failed - """ - def ping_host(source, host, count=CONF.validation.ping_count, - size=CONF.validation.ping_size, nic=None, mtu=None, - fragmentation=True): - addr = netaddr.IPAddress(host) - cmd = 'ping6' if addr.version == 6 else 'ping' - if nic: - cmd = 'sudo {cmd} -I {nic}'.format(cmd=cmd, nic=nic) - if mtu: - if not fragmentation: - cmd += ' -M do' - size = str(net_utils.get_ping_payload_size( - mtu=mtu, ip_version=addr.version)) - cmd += ' -c{0} -w{0} -s{1} {2}'.format(count, size, host) - return source.exec_command(cmd) - - def ping_remote(): - try: - result = ping_host(source, dest, nic=nic, mtu=mtu, - fragmentation=fragmentation) - - except lib_exc.SSHExecCommandFailed: - LOG.warning('Failed to ping IP: %s via a ssh connection ' - 'from: %s.', dest, source.host) - return not should_succeed - LOG.debug('ping result: %s', result) - # Assert that the return traffic was from the correct - # source address. - from_source = 'from %s' % dest - self.assertIn(from_source, result) - return should_succeed - - return test_utils.call_until_true(ping_remote, - CONF.validation.ping_timeout, - 1) - - def check_remote_connectivity(self, source, dest, should_succeed=True, - nic=None, mtu=None, fragmentation=True): - self.assertTrue(self._check_remote_connectivity( - source, dest, should_succeed, nic, mtu, fragmentation)) - - def ping_ip_address(self, ip_address, should_succeed=True, - ping_timeout=None, mtu=None): - # the code is taken from tempest/scenario/manager.py in tempest git - timeout = ping_timeout or CONF.validation.ping_timeout - cmd = ['ping', '-c1', '-w1'] - - if mtu: - cmd += [ - # don't fragment - '-M', 'do', - # ping receives just the size of ICMP payload - '-s', str(net_utils.get_ping_payload_size(mtu, 4)) - ] - cmd.append(ip_address) - - def ping(): - proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc.communicate() - - return (proc.returncode == 0) == should_succeed - - caller = test_utils.find_test_caller() - LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the' - ' expected result is %(should_succeed)s', { - 'caller': caller, 'ip': ip_address, 'timeout': timeout, - 'should_succeed': - 'reachable' if should_succeed else 'unreachable' - }) - result = test_utils.call_until_true(ping, timeout, 1) - LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the ' - 'ping result is %(result)s', { - 'caller': caller, 'ip': ip_address, 'timeout': timeout, - 'result': 'expected' if result else 'unexpected' - }) - return result diff --git a/neutron/tests/tempest/scenario/constants.py b/neutron/tests/tempest/scenario/constants.py deleted file mode 100644 index 258c5870b48..00000000000 --- a/neutron/tests/tempest/scenario/constants.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2016 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -SERVER_STATUS_ACTIVE = 'ACTIVE' -DEFAULT_SECURITY_GROUP = 'default' -LIMIT_KILO_BITS_PER_SECOND = 1000 -SOCKET_CONNECT_TIMEOUT = 60 diff --git a/neutron/tests/tempest/scenario/exceptions.py b/neutron/tests/tempest/scenario/exceptions.py deleted file mode 100644 index 369a85b3b5c..00000000000 --- a/neutron/tests/tempest/scenario/exceptions.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2016 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from tempest.lib import exceptions - -TempestException = exceptions.TempestException - - -class QoSLimitReached(TempestException): - message = "Limit reached, limit = %(limit)d" - - -class SocketConnectionRefused(TempestException): - message = "Unable to connect to %(host)s port %(port)d:Connection Refused" - - -class ConnectionTimeoutException(TempestException): - message = "Timeout connecting to %(host)s port %(port)d" - - -class FileCreationFailedException(TempestException): - message = "File %(file)s has not been created or has the wrong size" diff --git a/neutron/tests/tempest/services/__init__.py b/neutron/tests/tempest/services/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/tempest/services/network/__init__.py b/neutron/tests/tempest/services/network/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/tempest/services/network/json/__init__.py b/neutron/tests/tempest/services/network/json/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/tempest/services/network/json/network_client.py b/neutron/tests/tempest/services/network/json/network_client.py deleted file mode 100644 index 358dd64d848..00000000000 --- a/neutron/tests/tempest/services/network/json/network_client.py +++ /dev/null @@ -1,974 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -from oslo_serialization import jsonutils -from six.moves.urllib import parse as urlparse -from tempest.lib.common import rest_client as service_client -from tempest.lib import exceptions as lib_exc - -from neutron.tests.tempest import exceptions - - -class NetworkClientJSON(service_client.RestClient): - - """ - Tempest REST client for Neutron. Uses v2 of the Neutron API, since the - V1 API has been removed from the code base. - - Implements create, delete, update, list and show for the basic Neutron - abstractions (networks, sub-networks, routers, ports and floating IP): - - Implements add/remove interface to router using subnet ID / port ID - - It also implements list, show, update and reset for OpenStack Networking - quotas - """ - - version = '2.0' - uri_prefix = "v2.0" - - def get_uri(self, plural_name): - # get service prefix from resource name - - # The following list represents resource names that do not require - # changing underscore to a hyphen - hyphen_exceptions = ["service_profiles"] - # the following map is used to construct proper URI - # for the given neutron resource - service_resource_prefix_map = { - 'networks': '', - 'subnets': '', - 'subnetpools': '', - 'ports': '', - 'metering_labels': 'metering', - 'metering_label_rules': 'metering', - 'policies': 'qos', - 'bandwidth_limit_rules': 'qos', - 'minimum_bandwidth_rules': 'qos', - 'rule_types': 'qos', - 'rbac-policies': '', - } - service_prefix = service_resource_prefix_map.get( - plural_name) - if plural_name not in hyphen_exceptions: - plural_name = plural_name.replace("_", "-") - if service_prefix: - uri = '%s/%s/%s' % (self.uri_prefix, service_prefix, - plural_name) - else: - uri = '%s/%s' % (self.uri_prefix, plural_name) - return uri - - def build_uri(self, plural_name, **kwargs): - uri = self.get_uri(plural_name) - if kwargs: - uri += '?' + urlparse.urlencode(kwargs, doseq=1) - return uri - - def pluralize(self, resource_name): - # get plural from map or just add 's' - - # map from resource name to a plural name - # needed only for those which can't be constructed as name + 's' - resource_plural_map = { - 'security_groups': 'security_groups', - 'security_group_rules': 'security_group_rules', - 'quotas': 'quotas', - 'qos_policy': 'policies', - 'rbac_policy': 'rbac_policies', - 'network_ip_availability': 'network_ip_availabilities', - } - return resource_plural_map.get(resource_name, resource_name + 's') - - def get_uri_with_links(self, plural_name, uri): - resp, body = self.get(uri) - result = {plural_name: self.deserialize_list(body)} - links = self.deserialize_links(body) - self.expected_success(200, resp.status) - return links, service_client.ResponseBody(resp, result) - - def _lister(self, plural_name): - def _list(**filters): - uri = self.build_uri(plural_name, **filters) - resp, body = self.get(uri) - result = {plural_name: self.deserialize_list(body)} - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, result) - - return _list - - def _deleter(self, resource_name): - def _delete(resource_id): - plural = self.pluralize(resource_name) - uri = '%s/%s' % (self.get_uri(plural), resource_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - return _delete - - def _shower(self, resource_name): - def _show(resource_id, **fields): - # fields is a dict which key is 'fields' and value is a - # list of field's name. An example: - # {'fields': ['id', 'name']} - plural = self.pluralize(resource_name) - if 'details_quotas' in plural: - details, plural = plural.split('_') - uri = '%s/%s/%s' % (self.get_uri(plural), - resource_id, details) - else: - uri = '%s/%s' % (self.get_uri(plural), resource_id) - - if fields: - uri += '?' + urlparse.urlencode(fields, doseq=1) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - return _show - - def _creater(self, resource_name): - def _create(**kwargs): - plural = self.pluralize(resource_name) - uri = self.get_uri(plural) - post_data = self.serialize({resource_name: kwargs}) - resp, body = self.post(uri, post_data) - body = self.deserialize_single(body) - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - return _create - - def _updater(self, resource_name): - def _update(res_id, **kwargs): - headers = kwargs.pop('headers', {}) - plural = self.pluralize(resource_name) - uri = '%s/%s' % (self.get_uri(plural), res_id) - post_data = self.serialize({resource_name: kwargs}) - resp, body = self.put(uri, post_data, headers=headers) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - return _update - - def __getattr__(self, name): - method_prefixes = ["list_", "delete_", "show_", "create_", "update_"] - method_functors = [self._lister, - self._deleter, - self._shower, - self._creater, - self._updater] - for index, prefix in enumerate(method_prefixes): - prefix_len = len(prefix) - if name[:prefix_len] == prefix: - return method_functors[index](name[prefix_len:]) - raise AttributeError(name) - - # Subnetpool methods - def create_subnetpool(self, name, **kwargs): - subnetpool_data = {'name': name} - for arg in kwargs: - subnetpool_data[arg] = kwargs[arg] - - post_data = {'subnetpool': subnetpool_data} - body = self.serialize_list(post_data, "subnetpools", "subnetpool") - uri = self.get_uri("subnetpools") - resp, body = self.post(uri, body) - body = {'subnetpool': self.deserialize_list(body)} - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def get_subnetpool(self, id): - uri = self.get_uri("subnetpools") - subnetpool_uri = '%s/%s' % (uri, id) - resp, body = self.get(subnetpool_uri) - body = {'subnetpool': self.deserialize_list(body)} - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def delete_subnetpool(self, id): - uri = self.get_uri("subnetpools") - subnetpool_uri = '%s/%s' % (uri, id) - resp, body = self.delete(subnetpool_uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def list_subnetpools(self, **filters): - uri = self.get_uri("subnetpools") - if filters: - uri = '?'.join([uri, urlparse.urlencode(filters)]) - resp, body = self.get(uri) - body = {'subnetpools': self.deserialize_list(body)} - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def update_subnetpool(self, id, **kwargs): - subnetpool_data = {} - for arg in kwargs: - subnetpool_data[arg] = kwargs[arg] - - post_data = {'subnetpool': subnetpool_data} - body = self.serialize_list(post_data, "subnetpools", "subnetpool") - uri = self.get_uri("subnetpools") - subnetpool_uri = '%s/%s' % (uri, id) - resp, body = self.put(subnetpool_uri, body) - body = {'subnetpool': self.deserialize_list(body)} - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - # Common methods that are hard to automate - def create_bulk_network(self, names, shared=False): - network_list = [{'name': name, 'shared': shared} for name in names] - post_data = {'networks': network_list} - body = self.serialize_list(post_data, "networks", "network") - uri = self.get_uri("networks") - resp, body = self.post(uri, body) - body = {'networks': self.deserialize_list(body)} - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def create_bulk_subnet(self, subnet_list): - post_data = {'subnets': subnet_list} - body = self.serialize_list(post_data, 'subnets', 'subnet') - uri = self.get_uri('subnets') - resp, body = self.post(uri, body) - body = {'subnets': self.deserialize_list(body)} - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def create_bulk_port(self, port_list): - post_data = {'ports': port_list} - body = self.serialize_list(post_data, 'ports', 'port') - uri = self.get_uri('ports') - resp, body = self.post(uri, body) - body = {'ports': self.deserialize_list(body)} - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def create_bulk_security_groups(self, security_group_list): - group_list = [{'security_group': {'name': name}} - for name in security_group_list] - post_data = {'security_groups': group_list} - body = self.serialize_list(post_data, 'security_groups', - 'security_group') - uri = self.get_uri("security-groups") - resp, body = self.post(uri, body) - body = {'security_groups': self.deserialize_list(body)} - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def wait_for_resource_deletion(self, resource_type, id): - """Waits for a resource to be deleted.""" - start_time = int(time.time()) - while True: - if self.is_resource_deleted(resource_type, id): - return - if int(time.time()) - start_time >= self.build_timeout: - raise exceptions.TimeoutException - time.sleep(self.build_interval) - - def is_resource_deleted(self, resource_type, id): - method = 'show_' + resource_type - try: - getattr(self, method)(id) - except AttributeError: - raise Exception("Unknown resource type %s " % resource_type) - except lib_exc.NotFound: - return True - return False - - def deserialize_single(self, body): - return jsonutils.loads(body) - - def deserialize_list(self, body): - res = jsonutils.loads(body) - # expecting response in form - # {'resources': [ res1, res2] } => when pagination disabled - # {'resources': [..], 'resources_links': {}} => if pagination enabled - for k in res.keys(): - if k.endswith("_links"): - continue - return res[k] - - def deserialize_links(self, body): - res = jsonutils.loads(body) - # expecting response in form - # {'resources': [ res1, res2] } => when pagination disabled - # {'resources': [..], 'resources_links': {}} => if pagination enabled - for k in res.keys(): - if k.endswith("_links"): - return { - link['rel']: link['href'] - for link in res[k] - } - return {} - - def serialize(self, data): - return jsonutils.dumps(data) - - def serialize_list(self, data, root=None, item=None): - return self.serialize(data) - - def update_quotas(self, tenant_id, **kwargs): - put_body = {'quota': kwargs} - body = jsonutils.dumps(put_body) - uri = '%s/quotas/%s' % (self.uri_prefix, tenant_id) - resp, body = self.put(uri, body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body['quota']) - - def reset_quotas(self, tenant_id): - uri = '%s/quotas/%s' % (self.uri_prefix, tenant_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def create_router(self, name, admin_state_up=True, **kwargs): - post_body = {'router': kwargs} - post_body['router']['name'] = name - post_body['router']['admin_state_up'] = admin_state_up - body = jsonutils.dumps(post_body) - uri = '%s/routers' % (self.uri_prefix) - resp, body = self.post(uri, body) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def _update_router(self, router_id, set_enable_snat, **kwargs): - uri = '%s/routers/%s' % (self.uri_prefix, router_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - update_body = {} - update_body['name'] = kwargs.get('name', body['router']['name']) - update_body['admin_state_up'] = kwargs.get( - 'admin_state_up', body['router']['admin_state_up']) - if 'description' in kwargs: - update_body['description'] = kwargs['description'] - cur_gw_info = body['router']['external_gateway_info'] - if cur_gw_info: - # TODO(kevinbenton): setting the external gateway info is not - # allowed for a regular tenant. If the ability to update is also - # merged, a test case for this will need to be added similar to - # the SNAT case. - cur_gw_info.pop('external_fixed_ips', None) - if not set_enable_snat: - cur_gw_info.pop('enable_snat', None) - update_body['external_gateway_info'] = kwargs.get( - 'external_gateway_info', body['router']['external_gateway_info']) - if 'distributed' in kwargs: - update_body['distributed'] = kwargs['distributed'] - if 'ha' in kwargs: - update_body['ha'] = kwargs['ha'] - update_body = dict(router=update_body) - update_body = jsonutils.dumps(update_body) - resp, body = self.put(uri, update_body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def update_router(self, router_id, **kwargs): - """Update a router leaving enable_snat to its default value.""" - # If external_gateway_info contains enable_snat the request will fail - # with 404 unless executed with admin client, and therefore we instruct - # _update_router to not set this attribute - # NOTE(salv-orlando): The above applies as long as Neutron's default - # policy is to restrict enable_snat usage to admins only. - return self._update_router(router_id, set_enable_snat=False, **kwargs) - - def update_router_with_snat_gw_info(self, router_id, **kwargs): - """Update a router passing also the enable_snat attribute. - - This method must be execute with admin credentials, otherwise the API - call will return a 404 error. - """ - return self._update_router(router_id, set_enable_snat=True, **kwargs) - - def add_router_interface_with_subnet_id(self, router_id, subnet_id): - uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix, - router_id) - update_body = {"subnet_id": subnet_id} - update_body = jsonutils.dumps(update_body) - resp, body = self.put(uri, update_body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def add_router_interface_with_port_id(self, router_id, port_id): - uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix, - router_id) - update_body = {"port_id": port_id} - update_body = jsonutils.dumps(update_body) - resp, body = self.put(uri, update_body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def remove_router_interface_with_subnet_id(self, router_id, subnet_id): - uri = '%s/routers/%s/remove_router_interface' % (self.uri_prefix, - router_id) - update_body = {"subnet_id": subnet_id} - update_body = jsonutils.dumps(update_body) - resp, body = self.put(uri, update_body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def remove_router_interface_with_port_id(self, router_id, port_id): - uri = '%s/routers/%s/remove_router_interface' % (self.uri_prefix, - router_id) - update_body = {"port_id": port_id} - update_body = jsonutils.dumps(update_body) - resp, body = self.put(uri, update_body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_router_interfaces(self, uuid): - uri = '%s/ports?device_id=%s' % (self.uri_prefix, uuid) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def update_agent(self, agent_id, agent_info): - """ - :param agent_info: Agent update information. - E.g {"admin_state_up": True} - """ - uri = '%s/agents/%s' % (self.uri_prefix, agent_id) - agent = {"agent": agent_info} - body = jsonutils.dumps(agent) - resp, body = self.put(uri, body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_routers_on_l3_agent(self, agent_id): - uri = '%s/agents/%s/l3-routers' % (self.uri_prefix, agent_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_l3_agents_hosting_router(self, router_id): - uri = '%s/routers/%s/l3-agents' % (self.uri_prefix, router_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def add_router_to_l3_agent(self, agent_id, router_id): - uri = '%s/agents/%s/l3-routers' % (self.uri_prefix, agent_id) - post_body = {"router_id": router_id} - body = jsonutils.dumps(post_body) - resp, body = self.post(uri, body) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def remove_router_from_l3_agent(self, agent_id, router_id): - uri = '%s/agents/%s/l3-routers/%s' % ( - self.uri_prefix, agent_id, router_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def list_dhcp_agent_hosting_network(self, network_id): - uri = '%s/networks/%s/dhcp-agents' % (self.uri_prefix, network_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_networks_hosted_by_one_dhcp_agent(self, agent_id): - uri = '%s/agents/%s/dhcp-networks' % (self.uri_prefix, agent_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def remove_network_from_dhcp_agent(self, agent_id, network_id): - uri = '%s/agents/%s/dhcp-networks/%s' % (self.uri_prefix, agent_id, - network_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def update_extra_routes(self, router_id, nexthop, destination): - uri = '%s/routers/%s' % (self.uri_prefix, router_id) - put_body = { - 'router': { - 'routes': [{'nexthop': nexthop, - "destination": destination}] - } - } - body = jsonutils.dumps(put_body) - resp, body = self.put(uri, body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def delete_extra_routes(self, router_id): - uri = '%s/routers/%s' % (self.uri_prefix, router_id) - null_routes = None - put_body = { - 'router': { - 'routes': null_routes - } - } - body = jsonutils.dumps(put_body) - resp, body = self.put(uri, body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def add_dhcp_agent_to_network(self, agent_id, network_id): - post_body = {'network_id': network_id} - body = jsonutils.dumps(post_body) - uri = '%s/agents/%s/dhcp-networks' % (self.uri_prefix, agent_id) - resp, body = self.post(uri, body) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_qos_policies(self, **filters): - if filters: - uri = '%s/qos/policies?%s' % (self.uri_prefix, - urlparse.urlencode(filters)) - else: - uri = '%s/qos/policies' % self.uri_prefix - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def create_qos_policy(self, name, description=None, shared=False, - tenant_id=None, is_default=False): - uri = '%s/qos/policies' % self.uri_prefix - post_data = { - 'policy': { - 'name': name, - 'shared': shared, - 'is_default': is_default - } - } - if description is not None: - post_data['policy']['description'] = description - if tenant_id is not None: - post_data['policy']['tenant_id'] = tenant_id - resp, body = self.post(uri, self.serialize(post_data)) - body = self.deserialize_single(body) - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def update_qos_policy(self, policy_id, **kwargs): - uri = '%s/qos/policies/%s' % (self.uri_prefix, policy_id) - post_data = self.serialize({'policy': kwargs}) - resp, body = self.put(uri, post_data) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def create_bandwidth_limit_rule(self, policy_id, max_kbps, - max_burst_kbps, direction=None): - uri = '%s/qos/policies/%s/bandwidth_limit_rules' % ( - self.uri_prefix, policy_id) - post_data = { - 'bandwidth_limit_rule': { - 'max_kbps': max_kbps, - 'max_burst_kbps': max_burst_kbps - } - } - if direction: - post_data['bandwidth_limit_rule']['direction'] = direction - resp, body = self.post(uri, self.serialize(post_data)) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_bandwidth_limit_rules(self, policy_id): - uri = '%s/qos/policies/%s/bandwidth_limit_rules' % ( - self.uri_prefix, policy_id) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def show_bandwidth_limit_rule(self, policy_id, rule_id): - uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def update_bandwidth_limit_rule(self, policy_id, rule_id, **kwargs): - uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - if "direction" in kwargs and kwargs['direction'] is None: - kwargs.pop('direction') - post_data = {'bandwidth_limit_rule': kwargs} - resp, body = self.put(uri, jsonutils.dumps(post_data)) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def delete_bandwidth_limit_rule(self, policy_id, rule_id): - uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def create_dscp_marking_rule(self, policy_id, dscp_mark): - uri = '%s/qos/policies/%s/dscp_marking_rules' % ( - self.uri_prefix, policy_id) - post_data = self.serialize({ - 'dscp_marking_rule': { - 'dscp_mark': dscp_mark - } - }) - resp, body = self.post(uri, post_data) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_dscp_marking_rules(self, policy_id): - uri = '%s/qos/policies/%s/dscp_marking_rules' % ( - self.uri_prefix, policy_id) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def show_dscp_marking_rule(self, policy_id, rule_id): - uri = '%s/qos/policies/%s/dscp_marking_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def update_dscp_marking_rule(self, policy_id, rule_id, **kwargs): - uri = '%s/qos/policies/%s/dscp_marking_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - post_data = {'dscp_marking_rule': kwargs} - resp, body = self.put(uri, jsonutils.dumps(post_data)) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def delete_dscp_marking_rule(self, policy_id, rule_id): - uri = '%s/qos/policies/%s/dscp_marking_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def create_minimum_bandwidth_rule(self, policy_id, direction, - min_kbps=None): - uri = '%s/qos/policies/%s/minimum_bandwidth_rules' % ( - self.uri_prefix, policy_id) - data = { - 'direction': direction, - } - if min_kbps is not None: - data['min_kbps'] = min_kbps - post_data = self.serialize({'minimum_bandwidth_rule': data}) - resp, body = self.post(uri, post_data) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_minimum_bandwidth_rules(self, policy_id): - uri = '%s/qos/policies/%s/minimum_bandwidth_rules' % ( - self.uri_prefix, policy_id) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def show_minimum_bandwidth_rule(self, policy_id, rule_id): - uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def update_minimum_bandwidth_rule(self, policy_id, rule_id, **kwargs): - uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - post_data = {'minimum_bandwidth_rule': kwargs} - resp, body = self.put(uri, jsonutils.dumps(post_data)) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def delete_minimum_bandwidth_rule(self, policy_id, rule_id): - uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % ( - self.uri_prefix, policy_id, rule_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def list_qos_rule_types(self): - uri = '%s/qos/rule-types' % self.uri_prefix - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def show_qos_rule_type(self, rule_type_name): - uri = '%s/qos/rule-types/%s' % ( - self.uri_prefix, rule_type_name) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def create_trunk(self, parent_port_id, subports, - tenant_id=None, name=None, admin_state_up=None, - description=None): - uri = '%s/trunks' % self.uri_prefix - post_data = { - 'trunk': { - 'port_id': parent_port_id, - } - } - if subports is not None: - post_data['trunk']['sub_ports'] = subports - if tenant_id is not None: - post_data['trunk']['tenant_id'] = tenant_id - if name is not None: - post_data['trunk']['name'] = name - if description is not None: - post_data['trunk']['description'] = description - if admin_state_up is not None: - post_data['trunk']['admin_state_up'] = admin_state_up - resp, body = self.post(uri, self.serialize(post_data)) - body = self.deserialize_single(body) - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def update_trunk(self, trunk_id, **kwargs): - put_body = {'trunk': kwargs} - body = jsonutils.dumps(put_body) - uri = '%s/trunks/%s' % (self.uri_prefix, trunk_id) - resp, body = self.put(uri, body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def show_trunk(self, trunk_id): - uri = '%s/trunks/%s' % (self.uri_prefix, trunk_id) - resp, body = self.get(uri) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def list_trunks(self, **kwargs): - uri = '%s/trunks' % self.uri_prefix - if kwargs: - uri += '?' + urlparse.urlencode(kwargs, doseq=1) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = self.deserialize_single(body) - return service_client.ResponseBody(resp, body) - - def delete_trunk(self, trunk_id): - uri = '%s/trunks/%s' % (self.uri_prefix, trunk_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def _subports_action(self, action, trunk_id, subports): - uri = '%s/trunks/%s/%s' % (self.uri_prefix, trunk_id, action) - resp, body = self.put(uri, jsonutils.dumps({'sub_ports': subports})) - body = self.deserialize_single(body) - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def add_subports(self, trunk_id, subports): - return self._subports_action('add_subports', trunk_id, subports) - - def remove_subports(self, trunk_id, subports): - return self._subports_action('remove_subports', trunk_id, subports) - - def get_subports(self, trunk_id): - uri = '%s/trunks/%s/%s' % (self.uri_prefix, trunk_id, 'get_subports') - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def get_auto_allocated_topology(self, tenant_id=None): - uri = '%s/auto-allocated-topology/%s' % (self.uri_prefix, tenant_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def delete_auto_allocated_topology(self, tenant_id=None): - uri = '%s/auto-allocated-topology/%s' % (self.uri_prefix, tenant_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def create_flavor_service_profile(self, flavor_id, service_profile_id): - body = jsonutils.dumps({'service_profile': {'id': service_profile_id}}) - uri = '%s/flavors/%s/service_profiles' % (self.uri_prefix, flavor_id) - resp, body = self.post(uri, body) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_flavor_service_profiles(self, flavor_id): - uri = '%s/flavors/%s/service_profiles' % (self.uri_prefix, flavor_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def delete_flavor_service_profile(self, flavor_id, service_profile_id): - uri = '%s/flavors/%s/service_profiles/%s' % (self.uri_prefix, - flavor_id, - service_profile_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def create_security_group_rule(self, direction, security_group_id, - **kwargs): - post_body = {'security_group_rule': kwargs} - post_body['security_group_rule']['direction'] = direction - post_body['security_group_rule'][ - 'security_group_id'] = security_group_id - body = jsonutils.dumps(post_body) - uri = '%s/security-group-rules' % self.uri_prefix - resp, body = self.post(uri, body) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def list_security_groups(self, **kwargs): - post_body = {'security_groups': kwargs} - body = jsonutils.dumps(post_body) - uri = '%s/security-groups' % self.uri_prefix - if kwargs: - uri += '?' + urlparse.urlencode(kwargs, doseq=1) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def delete_security_group(self, security_group_id): - uri = '%s/security-groups/%s' % ( - self.uri_prefix, security_group_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - return service_client.ResponseBody(resp, body) - - def list_ports(self, **kwargs): - post_body = {'ports': kwargs} - body = jsonutils.dumps(post_body) - uri = '%s/ports' % self.uri_prefix - if kwargs: - uri += '?' + urlparse.urlencode(kwargs, doseq=1) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def create_floatingip(self, floating_network_id, **kwargs): - post_body = {'floatingip': { - 'floating_network_id': floating_network_id}} - if kwargs: - post_body['floatingip'].update(kwargs) - body = jsonutils.dumps(post_body) - uri = '%s/floatingips' % self.uri_prefix - resp, body = self.post(uri, body) - self.expected_success(201, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def create_network_keystone_v3(self, name, project_id, tenant_id=None): - uri = '%s/networks' % self.uri_prefix - post_data = { - 'network': { - 'name': name, - 'project_id': project_id - } - } - if tenant_id is not None: - post_data['network']['tenant_id'] = tenant_id - resp, body = self.post(uri, self.serialize(post_data)) - body = self.deserialize_single(body) - self.expected_success(201, resp.status) - return service_client.ResponseBody(resp, body) - - def list_extensions(self, **filters): - uri = self.get_uri("extensions") - if filters: - uri = '?'.join([uri, urlparse.urlencode(filters)]) - resp, body = self.get(uri) - body = {'extensions': self.deserialize_list(body)} - self.expected_success(200, resp.status) - return service_client.ResponseBody(resp, body) - - def get_tags(self, resource_type, resource_id): - uri = '%s/%s/%s/tags' % ( - self.uri_prefix, resource_type, resource_id) - resp, body = self.get(uri) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def get_tag(self, resource_type, resource_id, tag): - uri = '%s/%s/%s/tags/%s' % ( - self.uri_prefix, resource_type, resource_id, tag) - resp, body = self.get(uri) - self.expected_success(204, resp.status) - - def update_tag(self, resource_type, resource_id, tag): - uri = '%s/%s/%s/tags/%s' % ( - self.uri_prefix, resource_type, resource_id, tag) - resp, body = self.put(uri, None) - self.expected_success(201, resp.status) - - def update_tags(self, resource_type, resource_id, tags): - uri = '%s/%s/%s/tags' % ( - self.uri_prefix, resource_type, resource_id) - req_body = jsonutils.dumps({'tags': tags}) - resp, body = self.put(uri, req_body) - self.expected_success(200, resp.status) - body = jsonutils.loads(body) - return service_client.ResponseBody(resp, body) - - def delete_tags(self, resource_type, resource_id): - uri = '%s/%s/%s/tags' % ( - self.uri_prefix, resource_type, resource_id) - resp, body = self.delete(uri) - self.expected_success(204, resp.status) - - def delete_tag(self, resource_type, resource_id, tag): - uri = '%s/%s/%s/tags/%s' % ( - self.uri_prefix, resource_type, resource_id, tag) - resp, body = self.delete(uri) - self.expected_success(204, resp.status)