# Copyright 2015 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import ddt from neutron_lib.api.definitions import qos as qos_apidef from neutron_lib import constants as n_constants from neutron_lib.services.qos import constants as qos_consts from tempest.common import utils from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import test_utils from tempest.lib import decorators from tempest.lib import exceptions import testtools from neutron_tempest_plugin.api import base from neutron_tempest_plugin import config CONF = config.CONF class QosTestJSON(base.BaseAdminNetworkTest): required_extensions = [qos_apidef.ALIAS] @classmethod def setup_clients(cls): super(QosTestJSON, cls).setup_clients() cls.qos_bw_limit_rule_client = \ cls.os_admin.qos_limit_bandwidth_rules_client def setUp(self): super(QosTestJSON, self).setUp() self.policy_name = data_utils.rand_name(name='test', prefix='policy') @staticmethod def _get_driver_details(rule_type_details, driver_name): for driver in rule_type_details['drivers']: if driver['name'] == driver_name: return driver def _create_qos_bw_limit_rule(self, policy_id, rule_data): rule = self.qos_bw_limit_rule_client.create_limit_bandwidth_rule( qos_policy_id=policy_id, **rule_data)['bandwidth_limit_rule'] self.addCleanup( test_utils.call_and_ignore_notfound_exc, self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule, policy_id, rule['id']) return rule @decorators.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb') def test_create_policy(self): policy = self.create_qos_policy(name='test-policy', description='test policy desc1', shared=False) # Test 'show policy' retrieved_policy = self.admin_client.show_qos_policy(policy['id']) retrieved_policy = retrieved_policy['policy'] self.assertEqual('test-policy', retrieved_policy['name']) self.assertEqual('test policy desc1', retrieved_policy['description']) self.assertFalse(retrieved_policy['shared']) # Test 'list policies' policies = self.admin_client.list_qos_policies()['policies'] policies_ids = [p['id'] for p in policies] self.assertIn(policy['id'], policies_ids) @decorators.idempotent_id('606a48e2-5403-4052-b40f-4d54b855af76') @utils.requires_ext(extension="project-id", service="network") def test_show_policy_has_project_id(self): policy = self.create_qos_policy(name=self.policy_name, shared=False) body = self.admin_client.show_qos_policy(policy['id']) show_policy = body['policy'] self.assertIn('project_id', show_policy) self.assertEqual(self.admin_client.project_id, show_policy['project_id']) @decorators.idempotent_id('f8d20e92-f06d-4805-b54f-230f77715815') def test_list_policy_filter_by_name(self): policy1 = 'test' + data_utils.rand_name("policy") policy2 = 'test' + data_utils.rand_name("policy") self.create_qos_policy(name=policy1, description='test policy', shared=False) self.create_qos_policy(name=policy2, description='test policy', shared=False) policies = (self.admin_client. list_qos_policies(name=policy1)['policies']) self.assertEqual(1, len(policies)) retrieved_policy = policies[0] self.assertEqual(policy1, retrieved_policy['name']) @decorators.idempotent_id('dde0b449-a400-4a87-b5a5-4d1c413c917b') def test_list_policy_sort_by_name(self): policyA = 'A' + data_utils.rand_name("policy") policyB = 'B' + data_utils.rand_name("policy") self.create_qos_policy(name=policyA, description='test policy', shared=False) self.create_qos_policy(name=policyB, description='test policy', shared=False) param = { 'sort_key': 'name', 'sort_dir': 'asc' } policies = (self.admin_client.list_qos_policies(**param)['policies']) policy_names = [p['name'] for p in policies] self.assertLess(policy_names.index(policyA), policy_names.index(policyB)) param = { 'sort_key': 'name', 'sort_dir': 'desc' } policies = (self.admin_client.list_qos_policies(**param)['policies']) policy_names = [p['name'] for p in policies] self.assertLess(policy_names.index(policyB), policy_names.index(policyA)) @decorators.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6') def test_policy_update(self): policy = self.create_qos_policy( name=self.policy_name, description='', shared=False, project_id=self.admin_client.project_id) self.admin_client.update_qos_policy(policy['id'], description='test policy desc2', shared=True) retrieved_policy = self.admin_client.show_qos_policy(policy['id']) retrieved_policy = retrieved_policy['policy'] self.assertEqual('test policy desc2', retrieved_policy['description']) self.assertTrue(retrieved_policy['shared']) self.assertEqual([], retrieved_policy['rules']) @decorators.idempotent_id('6e880e0f-bbfc-4e54-87c6-680f90e1b618') def test_policy_update_forbidden_for_regular_tenants_own_policy(self): policy = self.create_qos_policy(name=self.policy_name, description='', shared=False, project_id=self.client.project_id) self.assertRaises( exceptions.Forbidden, self.client.update_qos_policy, policy['id'], description='test policy') @decorators.idempotent_id('4ecfd7e7-47b6-4702-be38-be9235901a87') def test_policy_update_forbidden_for_regular_tenants_foreign_policy(self): policy = self.create_qos_policy( name=self.policy_name, description='', shared=False, project_id=self.admin_client.project_id) self.assertRaises( exceptions.NotFound, self.client.update_qos_policy, policy['id'], description='test policy') @decorators.idempotent_id('ee263db4-009a-4641-83e5-d0e83506ba4c') def test_shared_policy_update(self): policy = self.create_qos_policy( name=self.policy_name, description='', shared=True, project_id=self.admin_client.project_id) self.admin_client.update_qos_policy(policy['id'], description='test policy desc2') retrieved_policy = self.admin_client.show_qos_policy(policy['id']) retrieved_policy = retrieved_policy['policy'] self.assertTrue(retrieved_policy['shared']) self.admin_client.update_qos_policy(policy['id'], shared=False) retrieved_policy = self.admin_client.show_qos_policy(policy['id']) retrieved_policy = retrieved_policy['policy'] self.assertFalse(retrieved_policy['shared']) @decorators.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201') def test_delete_policy(self): policy = self.admin_client.create_qos_policy( 'test-policy', 'desc', True)['policy'] retrieved_policy = self.admin_client.show_qos_policy(policy['id']) retrieved_policy = retrieved_policy['policy'] self.assertEqual('test-policy', retrieved_policy['name']) self.admin_client.delete_qos_policy(policy['id']) self.assertRaises(exceptions.NotFound, self.admin_client.show_qos_policy, policy['id']) @decorators.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224') def test_list_admin_rule_types(self): self._test_list_rule_types(self.admin_client) @decorators.idempotent_id('49c8ea35-83a9-453a-bd23-239cf3b13929') def test_list_regular_rule_types(self): self._test_list_rule_types(self.client) def _test_list_rule_types(self, client): # List supported rule types # Since returned rule types depends on loaded backend drivers this test # is checking only if returned keys are same as expected keys # # In theory, we could make the test conditional on which ml2 drivers # are enabled in gate (or more specifically, on which supported qos # rules are claimed by core plugin), but that option doesn't seem to be # available through tempest.lib framework expected_rule_keys = ['type'] rule_types = client.list_qos_rule_types() actual_list_rule_types = rule_types['rule_types'] # Verify that only required fields present in rule details for rule in actual_list_rule_types: self.assertEqual(tuple(expected_rule_keys), tuple(rule.keys())) @decorators.idempotent_id('8ececa21-ef97-4904-a152-9f04c90f484d') def test_show_rule_type_details_as_user(self): self.assertRaises( exceptions.Forbidden, self.client.show_qos_rule_type, qos_consts.RULE_TYPE_BANDWIDTH_LIMIT) @decorators.idempotent_id('d0a2460b-7325-481f-a531-050bd96ab25e') def test_show_rule_type_details_as_admin(self): # Since returned rule types depend on loaded backend drivers this test # is checking only if returned keys are same as expected keys # In theory, we could make the test conditional on which ml2 drivers # are enabled in gate, but that option doesn't seem to be # available through tempest.lib framework expected_rule_type_details_keys = ['type', 'drivers'] rule_type_details = self.admin_client.show_qos_rule_type( qos_consts.RULE_TYPE_BANDWIDTH_LIMIT).get("rule_type") # Verify that only required fields present in rule details self.assertEqual( sorted(tuple(expected_rule_type_details_keys)), sorted(tuple(rule_type_details.keys()))) @decorators.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0') def test_policy_association_with_admin_network(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) network = self.create_network('test network', shared=True, qos_policy_id=policy['id']) retrieved_network = self.admin_client.show_network(network['id']) self.assertEqual( policy['id'], retrieved_network['network']['qos_policy_id']) @decorators.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e') def test_policy_association_with_tenant_network(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=True) network = self.create_network('test network', qos_policy_id=policy['id']) retrieved_network = self.admin_client.show_network(network['id']) self.assertEqual( policy['id'], retrieved_network['network']['qos_policy_id']) @decorators.idempotent_id('9efe63d0-836f-4cc2-b00c-468e63aa614e') def test_policy_association_with_network_nonexistent_policy(self): self.assertRaises( exceptions.NotFound, self.create_network, 'test network', qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e') @decorators.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b') def test_policy_association_with_network_non_shared_policy(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self.assertRaises( exceptions.NotFound, self.create_network, 'test network', qos_policy_id=policy['id']) @decorators.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8') def test_policy_update_association_with_admin_network(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) network = self.create_network('test network', shared=True) retrieved_network = self.admin_client.show_network(network['id']) self.assertIsNone(retrieved_network['network']['qos_policy_id']) self.admin_client.update_network(network['id'], qos_policy_id=policy['id']) retrieved_network = self.admin_client.show_network(network['id']) self.assertEqual( policy['id'], retrieved_network['network']['qos_policy_id']) @decorators.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e') def test_policy_association_with_port_shared_policy(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=True) network = self.create_network('test network', shared=True) port = self.create_port(network, qos_policy_id=policy['id']) retrieved_port = self.admin_client.show_port(port['id']) self.assertEqual( policy['id'], retrieved_port['port']['qos_policy_id']) @decorators.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e') def test_policy_association_with_port_nonexistent_policy(self): network = self.create_network('test network', shared=True) self.assertRaises( exceptions.NotFound, self.create_port, network, qos_policy_id='49e02f5a-e1dd-41d5-9855-cfa37f2d195e') @decorators.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031') def test_policy_association_with_port_non_shared_policy(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) network = self.create_network('test network', shared=True) self.assertRaises( exceptions.NotFound, self.create_port, network, qos_policy_id=policy['id']) @decorators.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76') def test_policy_update_association_with_port_shared_policy(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=True) network = self.create_network('test network', shared=True) port = self.create_port(network) retrieved_port = self.admin_client.show_port(port['id']) self.assertIsNone(retrieved_port['port']['qos_policy_id']) self.client.update_port(port['id'], qos_policy_id=policy['id']) retrieved_port = self.admin_client.show_port(port['id']) self.assertEqual( policy['id'], retrieved_port['port']['qos_policy_id']) @decorators.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75') def test_delete_not_allowed_if_policy_in_use_by_network(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=True) self.create_network('test network', qos_policy_id=policy['id'], shared=True) self.assertRaises( exceptions.Conflict, self.admin_client.delete_qos_policy, policy['id']) @decorators.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75') def test_delete_not_allowed_if_policy_in_use_by_port(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=True) network = self.create_network('test network', shared=True) self.create_port(network, qos_policy_id=policy['id']) self.assertRaises( exceptions.Conflict, self.admin_client.delete_qos_policy, policy['id']) @decorators.idempotent_id('a2a5849b-dd06-4b18-9664-0b6828a1fc27') def test_qos_policy_delete_with_rules(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337}) self.admin_client.delete_qos_policy(policy['id']) with testtools.ExpectedException(exceptions.NotFound): self.admin_client.show_qos_policy(policy['id']) @decorators.idempotent_id('fb384bde-a973-41c3-a542-6f77a092155f') def test_get_policy_that_is_shared(self): policy = self.create_qos_policy( name='test-policy-shared', description='shared policy', shared=True, project_id=self.admin_client.project_id) obtained_policy = self.client.show_qos_policy(policy['id'])['policy'] self.assertEqual(obtained_policy, policy) @decorators.idempotent_id('aed8e2a6-22da-421b-89b9-935a2c1a1b50') def test_policy_create_forbidden_for_regular_tenants(self): self.assertRaises( exceptions.Forbidden, self.client.create_qos_policy, 'test-policy', 'test policy', False) @decorators.idempotent_id('18d94f22-b9d5-4390-af12-d30a0cfc4cd3') def test_default_policy_creating_network_without_policy(self): project_id = self.create_project()['id'] policy = self.create_qos_policy(name=self.policy_name, project_id=project_id, is_default=True) network = self.create_network('test network', client=self.admin_client, project_id=project_id) retrieved_network = self.admin_client.show_network(network['id']) self.assertEqual( policy['id'], retrieved_network['network']['qos_policy_id']) @decorators.idempotent_id('807cce45-38e5-482d-94db-36e1796aba73') def test_default_policy_creating_network_with_policy(self): project_id = self.create_project()['id'] self.create_qos_policy(name='test-policy', project_id=project_id, is_default=True) policy = self.create_qos_policy(name='test-policy', project_id=project_id) network = self.create_network('test network', client=self.admin_client, project_id=project_id, qos_policy_id=policy['id']) retrieved_network = self.admin_client.show_network(network['id']) self.assertEqual( policy['id'], retrieved_network['network']['qos_policy_id']) @decorators.idempotent_id('06060880-2956-4c16-9a63-f284c3879229') def test_user_create_port_with_admin_qos_policy(self): qos_policy = self.create_qos_policy( name=self.policy_name, project_id=self.admin_client.project_id, shared=False) network = self.create_network( 'test network', client=self.admin_client, project_id=self.client.project_id, qos_policy_id=qos_policy['id']) port = self.create_port(network) self.assertEqual(network['id'], port['network_id']) class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest): credentials = ['primary', 'admin'] direction = None required_extensions = [qos_apidef.ALIAS] @classmethod def setup_clients(cls): super(QosBandwidthLimitRuleTestJSON, cls).setup_clients() cls.qos_bw_limit_rule_client = \ cls.os_admin.qos_limit_bandwidth_rules_client cls.qos_bw_limit_rule_client_primary = \ cls.os_primary.qos_limit_bandwidth_rules_client @classmethod @base.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT) def resource_setup(cls): super(QosBandwidthLimitRuleTestJSON, cls).resource_setup() def setUp(self): super(QosBandwidthLimitRuleTestJSON, self).setUp() self.policy_name = data_utils.rand_name(name='test', prefix='policy') def _create_qos_bw_limit_rule(self, policy_id, rule_data): rule = self.qos_bw_limit_rule_client.create_limit_bandwidth_rule( qos_policy_id=policy_id, **rule_data)['bandwidth_limit_rule'] self.addCleanup( test_utils.call_and_ignore_notfound_exc, self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule, policy_id, rule['id']) return rule @decorators.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378') def test_rule_create(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337, 'direction': 'ingress'}) # Test 'show rule' retrieved_rule = \ self.qos_bw_limit_rule_client.show_limit_bandwidth_rule( policy['id'], rule['id']) retrieved_rule = retrieved_rule['bandwidth_limit_rule'] self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(200, retrieved_rule['max_kbps']) self.assertEqual(1337, retrieved_rule['max_burst_kbps']) self.assertEqual('ingress', retrieved_rule['direction']) # Test 'list rules' rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules( policy['id']) rules = rules['bandwidth_limit_rules'] rules_ids = [r['id'] for r in rules] self.assertIn(rule['id'], rules_ids) # Test 'show policy' retrieved_policy = self.admin_client.show_qos_policy(policy['id']) policy_rules = retrieved_policy['policy']['rules'] self.assertEqual(1, len(policy_rules)) self.assertEqual(rule['id'], policy_rules[0]['id']) self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT, policy_rules[0]['type']) @decorators.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378') def test_rule_create_fail_for_the_same_type(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337}) self.assertRaises( exceptions.Conflict, self._create_qos_bw_limit_rule, policy['id'], {'max_kbps': 201, 'max_burst_kbps': 1338}) @decorators.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3') def test_rule_update(self): self._test_rule_update(opposite_direction=None) def _test_rule_update(self, opposite_direction=None): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 1, 'max_burst_kbps': 1}) if opposite_direction: self.qos_bw_limit_rule_client.update_limit_bandwidth_rule( policy['id'], rule['id'], **{'max_kbps': 200, 'max_burst_kbps': 1337, 'direction': opposite_direction}) else: self.qos_bw_limit_rule_client.update_limit_bandwidth_rule( policy['id'], rule['id'], **{'max_kbps': 200, 'max_burst_kbps': 1337}) retrieved_policy = self.qos_bw_limit_rule_client.\ show_limit_bandwidth_rule(policy['id'], rule['id']) retrieved_policy = retrieved_policy['bandwidth_limit_rule'] self.assertEqual(200, retrieved_policy['max_kbps']) self.assertEqual(1337, retrieved_policy['max_burst_kbps']) if opposite_direction: self.assertEqual(opposite_direction, retrieved_policy['direction']) @decorators.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958') def test_rule_delete(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337}) retrieved_policy = \ self.qos_bw_limit_rule_client.show_limit_bandwidth_rule( policy['id'], rule['id']) retrieved_policy = retrieved_policy['bandwidth_limit_rule'] self.assertEqual(rule['id'], retrieved_policy['id']) self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule( policy['id'], rule['id']) self.assertRaises( exceptions.NotFound, self.qos_bw_limit_rule_client.show_limit_bandwidth_rule, policy['id'], rule['id']) @decorators.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852') def test_rule_create_rule_nonexistent_policy(self): self.assertRaises( exceptions.NotFound, self._create_qos_bw_limit_rule, 'policy', {'max_kbps': 200, 'max_burst_kbps': 1337}) @decorators.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274') def test_rule_create_forbidden_for_regular_tenants(self): self.assertRaises( exceptions.Forbidden, self.qos_bw_limit_rule_client_primary.create_limit_bandwidth_rule, 'policy', **{'max_kbps': 1, 'max_burst_kbps': 2}) @decorators.idempotent_id('1bfc55d9-6fd8-4293-ab3a-b1d69bf7cd2e') def test_rule_update_forbidden_for_regular_tenants_own_policy(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False, project_id=self.client.project_id) rule = self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 1, 'max_burst_kbps': 1}) self.assertRaises( exceptions.Forbidden, self.qos_bw_limit_rule_client_primary.update_limit_bandwidth_rule, policy['id'], rule['id'], **{'max_kbps': 2, 'max_burst_kbps': 4}) @decorators.idempotent_id('9a607936-4b6f-4c2f-ad21-bd5b3d4fc91f') def test_rule_update_forbidden_for_regular_tenants_foreign_policy(self): policy = self.create_qos_policy( name=self.policy_name, description='test policy', shared=False, project_id=self.admin_client.project_id) rule = self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 1, 'max_burst_kbps': 1}) self.assertRaises( exceptions.NotFound, self.qos_bw_limit_rule_client_primary.update_limit_bandwidth_rule, policy['id'], rule['id'], **{'max_kbps': 2, 'max_burst_kbps': 4}) @decorators.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2') def test_get_rules_by_policy(self): policy1 = self.create_qos_policy( name='test-policy1', description='test policy1', shared=False) rule1 = self._create_qos_bw_limit_rule( policy1['id'], {'max_kbps': 200, 'max_burst_kbps': 1337}) policy2 = self.create_qos_policy( name='test-policy2', description='test policy2', shared=False) rule2 = self._create_qos_bw_limit_rule( policy2['id'], {'max_kbps': 5000, 'max_burst_kbps': 2523}) # Test 'list rules' rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules( policy1['id']) rules = rules['bandwidth_limit_rules'] rules_ids = [r['id'] for r in rules] self.assertIn(rule1['id'], rules_ids) self.assertNotIn(rule2['id'], rules_ids) @testtools.skipUnless( CONF.neutron_plugin_options.create_shared_resources, """Creation of shared resources should be allowed, setting the create_shared_resources option as 'True' is needed""") @decorators.idempotent_id('d911707e-fa2c-11e9-9553-5076af30bbf5') def test_attach_and_detach_a_policy_by_a_tenant(self): # As an admin create an non shared QoS policy,add a rule # and associate it with a network self.network = self.create_network() policy = self.create_qos_policy(name=self.policy_name, description='test policy for attach', shared=False) self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024}) self.admin_client.update_network( self.network['id'], qos_policy_id=policy['id']) # As a tenant, try to detach the policy from the network # The operation should be forbidden self.assertRaises( exceptions.Forbidden, self.client.update_network, self.network['id'], qos_policy_id=None) # As an admin, make the policy shared self.admin_client.update_qos_policy(policy['id'], shared=True) # As a tenant, try to detach the policy from the network # The operation should be allowed self.client.update_network(self.network['id'], qos_policy_id=None) retrieved_network = self.admin_client.show_network(self.network['id']) self.assertIsNone(retrieved_network['network']['qos_policy_id']) # As a tenant, try to delete the policy from the network # should be forbidden self.assertRaises( exceptions.Forbidden, self.client.delete_qos_policy, policy['id']) @ddt.ddt class QosBandwidthLimitRuleWithDirectionTestJSON( QosBandwidthLimitRuleTestJSON): required_extensions = ( QosBandwidthLimitRuleTestJSON.required_extensions + ['qos-bw-limit-direction'] ) @classmethod @base.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT) def resource_setup(cls): super(QosBandwidthLimitRuleWithDirectionTestJSON, cls).resource_setup() def setUp(self): super(QosBandwidthLimitRuleWithDirectionTestJSON, self).setUp() self.policy_name = data_utils.rand_name(name='test', prefix='policy') @decorators.idempotent_id('c8cbe502-0f7e-11ea-8d71-362b9e155667') def test_create_policy_with_multiple_rules(self): # Create a policy with multiple rules policy = self.create_qos_policy(name=self.policy_name, description='test policy1', shared=False) rule1 = self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024, 'direction': n_constants.EGRESS_DIRECTION}) rule2 = self._create_qos_bw_limit_rule( policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024, 'direction': n_constants.INGRESS_DIRECTION}) # Check that the rules were added to the policy rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules( policy['id'])['bandwidth_limit_rules'] rules_ids = [rule['id'] for rule in rules] self.assertIn(rule1['id'], rules_ids) self.assertIn(rule2['id'], rules_ids) # Check that the rules creation fails for the same rule types self.assertRaises( exceptions.Conflict, self._create_qos_bw_limit_rule, policy['id'], {'max_kbps': 1025, 'max_burst_kbps': 1025, 'direction': n_constants.EGRESS_DIRECTION}) self.assertRaises( exceptions.Conflict, self._create_qos_bw_limit_rule, policy['id'], {'max_kbps': 1025, 'max_burst_kbps': 1025, 'direction': n_constants.INGRESS_DIRECTION}) @decorators.idempotent_id('7ca7c83b-0555-4a0f-a422-4338f77f79e5') @ddt.unpack @ddt.data({'opposite_direction': 'ingress'}, {'opposite_direction': 'egress'}) def test_rule_update(self, opposite_direction): self._test_rule_update(opposite_direction=opposite_direction) class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest): force_tenant_isolation = True credentials = ['primary', 'alt', 'admin'] required_extensions = [qos_apidef.ALIAS] @classmethod def resource_setup(cls): super(RbacSharedQosPoliciesTest, cls).resource_setup() cls.client2 = cls.os_alt.network_client def _create_qos_policy(self, project_id=None): args = {'name': data_utils.rand_name('test-policy'), 'description': 'test policy', 'shared': False, 'project_id': project_id} qos_policy = self.admin_client.create_qos_policy(**args)['policy'] self.addCleanup(self.admin_client.delete_qos_policy, qos_policy['id']) return qos_policy def _make_admin_policy_shared_to_project_id(self, project_id): policy = self._create_qos_policy() rbac_policy = self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=policy['id'], action='access_as_shared', target_tenant=project_id, )['rbac_policy'] return {'policy': policy, 'rbac_policy': rbac_policy} def _create_network(self, qos_policy_id, client, should_cleanup=True): net = client.create_network( name=data_utils.rand_name('test-network'), qos_policy_id=qos_policy_id)['network'] if should_cleanup: self.addCleanup(client.delete_network, net['id']) return net @decorators.idempotent_id('b9dcf582-d3b3-11e5-950a-54ee756c66df') def test_policy_sharing_with_wildcard(self): qos_pol = self.create_qos_policy( name=data_utils.rand_name('test-policy'), description='test-shared-policy', shared=False, project_id=self.admin_client.project_id) self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies']) # test update shared False -> True self.admin_client.update_qos_policy(qos_pol['id'], shared=True) qos_pol['shared'] = True self.client2.show_qos_policy(qos_pol['id']) rbac_pol = {'target_tenant': '*', 'tenant_id': self.admin_client.project_id, 'project_id': self.admin_client.project_id, 'object_type': 'qos_policy', 'object_id': qos_pol['id'], 'action': 'access_as_shared'} rbac_policies = self.admin_client.list_rbac_policies()['rbac_policies'] rbac_policies = [r for r in rbac_policies if r.pop('id')] self.assertIn(rbac_pol, rbac_policies) # update shared True -> False should fail because the policy is bound # to a network net = self._create_network(qos_pol['id'], self.admin_client, False) with testtools.ExpectedException(exceptions.Conflict): self.admin_client.update_qos_policy(qos_pol['id'], shared=False) # delete the network, and update shared True -> False should pass now self.admin_client.delete_network(net['id']) self.admin_client.update_qos_policy(qos_pol['id'], shared=False) qos_pol['shared'] = False self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies']) def _create_net_bound_qos_rbacs(self): res = self._make_admin_policy_shared_to_project_id( self.client.project_id) qos_policy, rbac_for_client_tenant = res['policy'], res['rbac_policy'] # add a wildcard rbac rule - now the policy globally shared rbac_wildcard = self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=qos_policy['id'], action='access_as_shared', target_tenant='*', )['rbac_policy'] # tenant1 now uses qos policy for net self._create_network(qos_policy['id'], self.client) return rbac_for_client_tenant, rbac_wildcard @decorators.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df') def test_net_bound_shared_policy_wildcard_and_project_id_wild_remove(self): client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs() # globally unshare the qos-policy, the specific share should remain self.admin_client.delete_rbac_policy(wildcard_rbac['id']) self.client.list_rbac_policies(id=client_rbac['id']) @decorators.idempotent_id('1997b00c-0c75-4e43-8ce2-999f9fa555ee') def test_net_bound_shared_policy_wildcard_and_projectid_wild_remains(self): client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs() # remove client_rbac policy the wildcard share should remain self.admin_client.delete_rbac_policy(client_rbac['id']) self.client.list_rbac_policies(id=wildcard_rbac['id']) @decorators.idempotent_id('2ace9adc-da6e-11e5-aafe-54ee756c66df') def test_policy_sharing_with_wildcard_and_project_id(self): res = self._make_admin_policy_shared_to_project_id( self.client.project_id) qos_policy, rbac = res['policy'], res['rbac_policy'] qos_pol = self.client.show_qos_policy(qos_policy['id'])['policy'] self.assertTrue(qos_pol['shared']) with testtools.ExpectedException(exceptions.NotFound): self.client2.show_qos_policy(qos_policy['id']) # make the qos-policy globally shared self.admin_client.update_qos_policy(qos_policy['id'], shared=True) qos_pol = self.client2.show_qos_policy(qos_policy['id'])['policy'] self.assertTrue(qos_pol['shared']) # globally unshare the qos-policy, the specific share should remain self.admin_client.update_qos_policy(qos_policy['id'], shared=False) self.client.show_qos_policy(qos_policy['id']) with testtools.ExpectedException(exceptions.NotFound): self.client2.show_qos_policy(qos_policy['id']) self.assertIn(rbac, self.admin_client.list_rbac_policies()['rbac_policies']) @decorators.idempotent_id('9f85c76a-a350-11e5-8ae5-54ee756c66df') def test_policy_target_update(self): res = self._make_admin_policy_shared_to_project_id( self.client.project_id) # change to client2 update_res = self.admin_client.update_rbac_policy( res['rbac_policy']['id'], target_tenant=self.client2.project_id) self.assertEqual(self.client2.project_id, update_res['rbac_policy']['target_tenant']) # make sure everything else stayed the same res['rbac_policy'].pop('target_tenant') update_res['rbac_policy'].pop('target_tenant') self.assertEqual(res['rbac_policy'], update_res['rbac_policy']) @decorators.idempotent_id('a9b39f46-a350-11e5-97c7-54ee756c66df') def test_network_presence_prevents_policy_rbac_policy_deletion(self): res = self._make_admin_policy_shared_to_project_id( self.client2.project_id) qos_policy_id = res['policy']['id'] self._create_network(qos_policy_id, self.client2) # a network with shared qos-policy should prevent the deletion of an # rbac-policy required for it to be shared with testtools.ExpectedException(exceptions.Conflict): self.admin_client.delete_rbac_policy(res['rbac_policy']['id']) # a wildcard policy should allow the specific policy to be deleted # since it allows the remaining port wild = self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=res['policy']['id'], action='access_as_shared', target_tenant='*')['rbac_policy'] self.admin_client.delete_rbac_policy(res['rbac_policy']['id']) # now that wildcard is the only remaining, it should be subjected to # the same restriction with testtools.ExpectedException(exceptions.Conflict): self.admin_client.delete_rbac_policy(wild['id']) # we can't update the policy to a different tenant with testtools.ExpectedException(exceptions.Conflict): self.admin_client.update_rbac_policy( wild['id'], target_tenant=self.client2.project_id) @decorators.idempotent_id('b0fe87e8-a350-11e5-9f08-54ee756c66df') def test_regular_client_shares_to_another_regular_client(self): # owned by self.admin_client policy = self._create_qos_policy() with testtools.ExpectedException(exceptions.NotFound): self.client.show_qos_policy(policy['id']) rbac_policy = self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=policy['id'], action='access_as_shared', target_tenant=self.client.project_id)['rbac_policy'] self.client.show_qos_policy(policy['id']) self.assertIn(rbac_policy, self.admin_client.list_rbac_policies()['rbac_policies']) # ensure that 'client2' can't see the rbac-policy sharing the # qos-policy to it because the rbac-policy belongs to 'client' self.assertNotIn(rbac_policy['id'], [p['id'] for p in self.client2.list_rbac_policies()['rbac_policies']]) @decorators.idempotent_id('ba88d0ca-a350-11e5-a06f-54ee756c66df') def test_filter_fields(self): policy = self._create_qos_policy() self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=policy['id'], action='access_as_shared', target_tenant=self.client2.project_id) field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'), ('project_id', 'target_tenant')) for fields in field_args: res = self.admin_client.list_rbac_policies(fields=fields) self.assertEqual(set(fields), set(res['rbac_policies'][0].keys())) @decorators.idempotent_id('c10d993a-a350-11e5-9c7a-54ee756c66df') def test_rbac_policy_show(self): res = self._make_admin_policy_shared_to_project_id( self.client.project_id) p1 = res['rbac_policy'] p2 = self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=res['policy']['id'], action='access_as_shared', target_tenant='*')['rbac_policy'] self.assertEqual( p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy']) self.assertEqual( p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy']) @decorators.idempotent_id('c7496f86-a350-11e5-b380-54ee756c66df') def test_filter_rbac_policies(self): policy = self._create_qos_policy() rbac_pol1 = self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=policy['id'], action='access_as_shared', target_tenant=self.client2.project_id)['rbac_policy'] rbac_pol2 = self.admin_client.create_rbac_policy( object_type='qos_policy', object_id=policy['id'], action='access_as_shared', target_tenant=self.admin_client.project_id)['rbac_policy'] res1 = self.admin_client.list_rbac_policies(id=rbac_pol1['id'])[ 'rbac_policies'] res2 = self.admin_client.list_rbac_policies(id=rbac_pol2['id'])[ 'rbac_policies'] self.assertEqual(1, len(res1)) self.assertEqual(1, len(res2)) self.assertEqual(rbac_pol1['id'], res1[0]['id']) self.assertEqual(rbac_pol2['id'], res2[0]['id']) @decorators.idempotent_id('cd7d755a-a350-11e5-a344-54ee756c66df') def test_regular_client_blocked_from_sharing_anothers_policy(self): qos_policy = self._make_admin_policy_shared_to_project_id( self.client.project_id)['policy'] with testtools.ExpectedException(exceptions.BadRequest): self.client.create_rbac_policy( object_type='qos_policy', object_id=qos_policy['id'], action='access_as_shared', target_tenant=self.client2.project_id) # make sure the rbac-policy is invisible to the tenant for which it's # being shared self.assertFalse(self.client.list_rbac_policies()['rbac_policies']) class QosDscpMarkingRuleTestJSON(base.BaseAdminNetworkTest): VALID_DSCP_MARK1 = 56 VALID_DSCP_MARK2 = 48 required_extensions = [qos_apidef.ALIAS] @classmethod @base.require_qos_rule_type(qos_consts.RULE_TYPE_DSCP_MARKING) def resource_setup(cls): super(QosDscpMarkingRuleTestJSON, cls).resource_setup() def setUp(self): super(QosDscpMarkingRuleTestJSON, self).setUp() self.policy_name = data_utils.rand_name(name='test', prefix='policy') @decorators.idempotent_id('f5cbaceb-5829-497c-9c60-ad70969e9a08') def test_rule_create(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self.admin_client.create_dscp_marking_rule( policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule'] # Test 'show rule' retrieved_rule = self.admin_client.show_dscp_marking_rule( policy['id'], rule['id']) retrieved_rule = retrieved_rule['dscp_marking_rule'] self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(self.VALID_DSCP_MARK1, retrieved_rule['dscp_mark']) # Test 'list rules' rules = self.admin_client.list_dscp_marking_rules(policy['id']) rules = rules['dscp_marking_rules'] rules_ids = [r['id'] for r in rules] self.assertIn(rule['id'], rules_ids) # Test 'show policy' retrieved_policy = self.admin_client.show_qos_policy(policy['id']) policy_rules = retrieved_policy['policy']['rules'] self.assertEqual(1, len(policy_rules)) self.assertEqual(rule['id'], policy_rules[0]['id']) self.assertEqual(qos_consts.RULE_TYPE_DSCP_MARKING, policy_rules[0]['type']) @decorators.idempotent_id('08553ffe-030f-4037-b486-7e0b8fb9385a') def test_rule_create_fail_for_the_same_type(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self.admin_client.create_dscp_marking_rule( policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule'] self.assertRaises(exceptions.Conflict, self.admin_client.create_dscp_marking_rule, policy_id=policy['id'], dscp_mark=self.VALID_DSCP_MARK2) @decorators.idempotent_id('76f632e5-3175-4408-9a32-3625e599c8a2') def test_rule_update(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self.admin_client.create_dscp_marking_rule( policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule'] self.admin_client.update_dscp_marking_rule( policy['id'], rule['id'], dscp_mark=self.VALID_DSCP_MARK2) retrieved_policy = self.admin_client.show_dscp_marking_rule( policy['id'], rule['id']) retrieved_policy = retrieved_policy['dscp_marking_rule'] self.assertEqual(self.VALID_DSCP_MARK2, retrieved_policy['dscp_mark']) @decorators.idempotent_id('74f81904-c35f-48a3-adae-1f5424cb3c18') def test_rule_delete(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self.admin_client.create_dscp_marking_rule( policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule'] retrieved_policy = self.admin_client.show_dscp_marking_rule( policy['id'], rule['id']) retrieved_policy = retrieved_policy['dscp_marking_rule'] self.assertEqual(rule['id'], retrieved_policy['id']) self.admin_client.delete_dscp_marking_rule(policy['id'], rule['id']) self.assertRaises(exceptions.NotFound, self.admin_client.show_dscp_marking_rule, policy['id'], rule['id']) @decorators.idempotent_id('9cb8ef5c-96fc-4978-9ee0-e3b02bab628a') def test_rule_create_rule_nonexistent_policy(self): self.assertRaises( exceptions.NotFound, self.admin_client.create_dscp_marking_rule, 'policy', self.VALID_DSCP_MARK1) @decorators.idempotent_id('bf6002ea-29de-486f-b65d-08aea6d4c4e2') def test_rule_create_forbidden_for_regular_tenants(self): self.assertRaises( exceptions.Forbidden, self.client.create_dscp_marking_rule, 'policy', self.VALID_DSCP_MARK1) @decorators.idempotent_id('33646b08-4f05-4493-a48a-bde768a18533') def test_invalid_rule_create(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self.assertRaises( exceptions.BadRequest, self.admin_client.create_dscp_marking_rule, policy['id'], 58) @decorators.idempotent_id('c565131d-4c80-4231-b0f3-9ae2be4de129') def test_get_rules_by_policy(self): policy1 = self.create_qos_policy(name='test-policy1', description='test policy1', shared=False) rule1 = self.admin_client.create_dscp_marking_rule( policy1['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule'] policy2 = self.create_qos_policy(name='test-policy2', description='test policy2', shared=False) rule2 = self.admin_client.create_dscp_marking_rule( policy2['id'], self.VALID_DSCP_MARK2)['dscp_marking_rule'] # Test 'list rules' rules = self.admin_client.list_dscp_marking_rules(policy1['id']) rules = rules['dscp_marking_rules'] rules_ids = [r['id'] for r in rules] self.assertIn(rule1['id'], rules_ids) self.assertNotIn(rule2['id'], rules_ids) @decorators.idempotent_id('19ed2286-ccb1-11e9-87d7-525400d6f522') def test_qos_dscp_create_and_update(self): """This test covers: 1.Creating a basic QoS policy with DSCP marking rule. 2.Updating QoS policy: Administrator should have the ability to update existing QoS policy. This test should verify that: It's possible to update the existing DSCP marking rule with all of the valid marks between 0-56, except of the invalid marks: 2-6, 42, 44, and 50-54 (which should be forbidden) """ def _test_update_dscp_mark_values(self, dscp_policy_id, rule_id, valid_dscp_marks): for mark in range(valid_dscp_marks[1], self.VALID_DSCP_MARK1 + 1): if mark in valid_dscp_marks: self.admin_client.update_dscp_marking_rule( dscp_policy_id, rule_id, dscp_mark=mark) retrieved_rule = self.admin_client.show_dscp_marking_rule( dscp_policy_id, rule_id)['dscp_marking_rule'] self.assertEqual(mark, retrieved_rule['dscp_mark'], """current DSCP mark is incorrect: expected value {0} actual value {1} """.format(mark, retrieved_rule['dscp_mark'])) else: self.assertRaises(exceptions.BadRequest, self.admin_client.create_dscp_marking_rule, dscp_policy_id, mark) # Retrieve the valid range of DSCP marks from the API. rule_type_details = self.admin_client.show_qos_rule_type( qos_consts.RULE_TYPE_DSCP_MARKING).get('rule_type') # There should be at least one driver supporting the DSCP marking rule. dscp_driver = rule_type_details['drivers'][0] for parameter in dscp_driver['supported_parameters']: if parameter['parameter_name'] == qos_consts.DSCP_MARK: valid_dscp_marks = parameter['parameter_values'] break else: self.fail('The DSCP marking rule does not have the %s parameter' % qos_consts.DSCP_MARK) # Setup network self.network = self.create_network() # Create QoS policy dscp_policy_id = self.create_qos_policy( name=self.policy_name, description='test-qos-policy', project_id=self.client.project_id, shared=True)['id'] # Associate QoS to the network self.admin_client.update_network( self.network['id'], qos_policy_id=dscp_policy_id) # Set a new DSCP rule with the first mark in range rule_id = self.admin_client.create_dscp_marking_rule( dscp_policy_id, n_constants.VALID_DSCP_MARKS[0])[ 'dscp_marking_rule']['id'] # Validate that the rule was set up properly retrieved_rule = self.client.show_dscp_marking_rule( dscp_policy_id, rule_id)['dscp_marking_rule'] self.assertEqual(n_constants.VALID_DSCP_MARKS[0], retrieved_rule['dscp_mark'], """current DSCP mark is incorrect: expected value {0} actual value {1} """.format(n_constants.VALID_DSCP_MARKS[0], retrieved_rule['dscp_mark'])) # Try to set marks in range 8:56 (invalid marks should raise an error) _test_update_dscp_mark_values(self, dscp_policy_id, rule_id, valid_dscp_marks) class QosMinimumBandwidthRuleTestJSON(base.BaseAdminNetworkTest): DIRECTION_EGRESS = "egress" DIRECTION_INGRESS = "ingress" RULE_NAME = qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH + "_rule" RULES_NAME = RULE_NAME + "s" required_extensions = [qos_apidef.ALIAS] @classmethod @base.require_qos_rule_type(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH) def resource_setup(cls): super(QosMinimumBandwidthRuleTestJSON, cls).resource_setup() @classmethod def setup_clients(cls): super(QosMinimumBandwidthRuleTestJSON, cls).setup_clients() cls.qos_min_bw_rules_client = \ cls.os_admin.qos_minimum_bandwidth_rules_client cls.qos_min_bw_rules_client_primary = \ cls.os_primary.qos_minimum_bandwidth_rules_client def setUp(self): super(QosMinimumBandwidthRuleTestJSON, self).setUp() self.policy_name = data_utils.rand_name(name='test', prefix='policy') @decorators.idempotent_id('aa59b00b-3e9c-4787-92f8-93a5cdf5e378') def test_rule_create(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule( qos_policy_id=policy['id'], **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 1138})[self.RULE_NAME] # Test 'show rule' retrieved_rule = \ self.qos_min_bw_rules_client.show_minimum_bandwidth_rule( policy['id'], rule['id']) retrieved_rule = retrieved_rule[self.RULE_NAME] self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(1138, retrieved_rule['min_kbps']) self.assertEqual(self.DIRECTION_EGRESS, retrieved_rule['direction']) # Test 'list rules' rules = self.qos_min_bw_rules_client.list_minimum_bandwidth_rules( policy['id']) rules = rules[self.RULES_NAME] rules_ids = [r['id'] for r in rules] self.assertIn(rule['id'], rules_ids) # Test 'show policy' retrieved_policy = self.admin_client.show_qos_policy(policy['id']) policy_rules = retrieved_policy['policy']['rules'] self.assertEqual(1, len(policy_rules)) self.assertEqual(rule['id'], policy_rules[0]['id']) self.assertEqual(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH, policy_rules[0]['type']) @decorators.idempotent_id('266d9b87-e51c-48bd-9aa7-8269573621be') def test_rule_create_fail_for_missing_min_kbps(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self.assertRaises( exceptions.BadRequest, self.qos_min_bw_rules_client.create_minimum_bandwidth_rule, qos_policy_id=policy['id'], **{'direction': self.DIRECTION_EGRESS}) @decorators.idempotent_id('aa59b00b-ab01-4787-92f8-93a5cdf5e378') def test_rule_create_fail_for_the_same_type(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self.qos_min_bw_rules_client.create_minimum_bandwidth_rule( qos_policy_id=policy['id'], **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 200}) self.assertRaises( exceptions.Conflict, self.qos_min_bw_rules_client.create_minimum_bandwidth_rule, qos_policy_id=policy['id'], **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 201}) @decorators.idempotent_id('35baf998-ae65-495c-9902-35a0d11e8936') @utils.requires_ext(extension="qos-bw-minimum-ingress", service="network") def test_rule_create_pass_for_direction_ingress(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self.qos_min_bw_rules_client.create_minimum_bandwidth_rule( qos_policy_id=policy['id'], **{'direction': self.DIRECTION_INGRESS, 'min_kbps': 201}) retrieved_policy = self.admin_client.show_qos_policy(policy['id']) policy_rules = retrieved_policy['policy']['rules'] self.assertEqual(1, len(policy_rules)) self.assertEqual(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH, policy_rules[0]['type']) self.assertEqual(self.DIRECTION_INGRESS, policy_rules[0]['direction']) @decorators.idempotent_id('a49a6988-2568-47d2-931e-2dbc858943b3') def test_rule_update(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule( qos_policy_id=policy['id'], **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 300})[self.RULE_NAME] self.qos_min_bw_rules_client.update_minimum_bandwidth_rule( policy['id'], rule['id'], **{'min_kbps': 350, 'direction': self.DIRECTION_EGRESS}) retrieved_policy = \ self.qos_min_bw_rules_client.show_minimum_bandwidth_rule( policy['id'], rule['id']) retrieved_policy = retrieved_policy[self.RULE_NAME] self.assertEqual(350, retrieved_policy['min_kbps']) self.assertEqual(self.DIRECTION_EGRESS, retrieved_policy['direction']) @decorators.idempotent_id('a7ee6efd-7b33-4a68-927d-275b4f8ba958') def test_rule_delete(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule( policy['id'], **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 200})[self.RULE_NAME] retrieved_policy = \ self.qos_min_bw_rules_client.show_minimum_bandwidth_rule( policy['id'], rule['id']) retrieved_policy = retrieved_policy[self.RULE_NAME] self.assertEqual(rule['id'], retrieved_policy['id']) self.qos_min_bw_rules_client.delete_minimum_bandwidth_rule( policy['id'], rule['id']) self.assertRaises( exceptions.NotFound, self.qos_min_bw_rules_client.show_minimum_bandwidth_rule, policy['id'], rule['id']) @decorators.idempotent_id('a211222c-5808-46cb-a961-983bbab6b852') def test_rule_create_rule_nonexistent_policy(self): self.assertRaises( exceptions.NotFound, self.qos_min_bw_rules_client.create_minimum_bandwidth_rule, 'policy', **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 200}) @decorators.idempotent_id('b4a2e7ad-786f-4927-a85a-e545a93bd274') def test_rule_create_forbidden_for_regular_tenants(self): self.assertRaises( exceptions.Forbidden, self.qos_min_bw_rules_client_primary.create_minimum_bandwidth_rule, 'policy', **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 300}) @decorators.idempotent_id('de0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2') def test_get_rules_by_policy(self): policy1 = self.create_qos_policy(name='test-policy1', description='test policy1', shared=False) rule1 = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule( qos_policy_id=policy1['id'], **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 200})[self.RULE_NAME] policy2 = self.create_qos_policy(name='test-policy2', description='test policy2', shared=False) rule2 = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule( qos_policy_id=policy2['id'], **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 5000})[self.RULE_NAME] # Test 'list rules' rules = self.qos_min_bw_rules_client.list_minimum_bandwidth_rules( policy1['id']) rules = rules[self.RULES_NAME] rules_ids = [r['id'] for r in rules] self.assertIn(rule1['id'], rules_ids) self.assertNotIn(rule2['id'], rules_ids) class QosMinimumPpsRuleTestJSON(base.BaseAdminNetworkTest): required_extensions = [qos_apidef.ALIAS] @classmethod @utils.requires_ext(service='network', extension='port-resource-request-groups') def resource_setup(cls): super(QosMinimumPpsRuleTestJSON, cls).resource_setup() @classmethod def setup_clients(cls): super(QosMinimumPpsRuleTestJSON, cls).setup_clients() cls.min_pps_client = cls.os_admin.qos_minimum_packet_rate_rules_client cls.min_pps_client_primary = \ cls.os_primary.qos_minimum_packet_rate_rules_client def setUp(self): super(QosMinimumPpsRuleTestJSON, self).setUp() self.policy_name = data_utils.rand_name(name='test', prefix='policy') self.RULE_NAME = qos_consts.RULE_TYPE_MINIMUM_PACKET_RATE + "_rule" self.RULES_NAME = self.RULE_NAME + "s" def _create_qos_min_pps_rule(self, policy_id, rule_data): rule = self.min_pps_client.create_minimum_packet_rate_rule( policy_id, **rule_data)['minimum_packet_rate_rule'] self.addCleanup( test_utils.call_and_ignore_notfound_exc, self.min_pps_client.delete_minimum_packet_rate_rule, policy_id, rule['id']) return rule @decorators.idempotent_id('66a5b9b4-d4f9-4af8-b238-9e1881b78487') def test_rule_create(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self._create_qos_min_pps_rule( policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 1138}) # Test 'show rule' retrieved_rule = self.min_pps_client.show_minimum_packet_rate_rule( policy['id'], rule['id'])[self.RULE_NAME] self.assertEqual(rule['id'], retrieved_rule['id']) self.assertEqual(1138, retrieved_rule[qos_consts.MIN_KPPS]) self.assertEqual(n_constants.EGRESS_DIRECTION, retrieved_rule[qos_consts.DIRECTION]) # Test 'list rules' rules = self.min_pps_client.list_minimum_packet_rate_rules( policy['id']) rules = rules[self.RULES_NAME] rules_ids = [r['id'] for r in rules] self.assertIn(rule['id'], rules_ids) # Test 'show policy' retrieved_policy = self.admin_client.show_qos_policy(policy['id']) policy_rules = retrieved_policy['policy']['rules'] self.assertEqual(1, len(policy_rules)) self.assertEqual(rule['id'], policy_rules[0]['id']) self.assertEqual('minimum_packet_rate', policy_rules[0]['type']) @decorators.idempotent_id('6b656b57-d2bf-47f9-89a9-1baad1bd5418') def test_rule_create_fail_for_missing_min_kpps(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self.assertRaises(exceptions.BadRequest, self._create_qos_min_pps_rule, policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION}) @decorators.idempotent_id('f41213e5-2ab8-4916-b106-38d2cac5e18c') def test_rule_create_fail_for_the_same_type(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self._create_qos_min_pps_rule(policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 200}) self.assertRaises(exceptions.Conflict, self._create_qos_min_pps_rule, policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 201}) @decorators.idempotent_id('ceb8e41e-3d72-11ec-a446-d7faae6daec2') def test_rule_create_any_direction_when_egress_direction_exists(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self._create_qos_min_pps_rule(policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 200}) self.assertRaises(exceptions.Conflict, self._create_qos_min_pps_rule, policy['id'], {qos_consts.DIRECTION: n_constants.ANY_DIRECTION, qos_consts.MIN_KPPS: 201}) @decorators.idempotent_id('a147a71e-3d7b-11ec-8097-278b1afd5fa2') def test_rule_create_egress_direction_when_any_direction_exists(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) self._create_qos_min_pps_rule(policy['id'], {qos_consts.DIRECTION: n_constants.ANY_DIRECTION, qos_consts.MIN_KPPS: 200}) self.assertRaises(exceptions.Conflict, self._create_qos_min_pps_rule, policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 201}) @decorators.idempotent_id('522ed09a-1d7f-4c1b-9195-61f19caf916f') def test_rule_update(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self._create_qos_min_pps_rule( policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 300}) self.min_pps_client.update_minimum_packet_rate_rule( policy['id'], rule['id'], **{qos_consts.MIN_KPPS: 350, qos_consts.DIRECTION: n_constants.ANY_DIRECTION}) retrieved_rule = self.min_pps_client.show_minimum_packet_rate_rule( policy['id'], rule['id'])[self.RULE_NAME] self.assertEqual(350, retrieved_rule[qos_consts.MIN_KPPS]) self.assertEqual(n_constants.ANY_DIRECTION, retrieved_rule[qos_consts.DIRECTION]) @decorators.idempotent_id('a020e186-3d60-11ec-88ca-d7f5eec22764') def test_rule_update_direction_conflict(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule1 = self._create_qos_min_pps_rule( policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 300}) rule2 = self._create_qos_min_pps_rule( policy['id'], {qos_consts.DIRECTION: n_constants.INGRESS_DIRECTION, qos_consts.MIN_KPPS: 300}) retrieved_rule1 = self.min_pps_client.show_minimum_packet_rate_rule( policy['id'], rule1['id'])[self.RULE_NAME] self.assertEqual(n_constants.EGRESS_DIRECTION, retrieved_rule1[qos_consts.DIRECTION]) retrieved_rule2 = self.min_pps_client.show_minimum_packet_rate_rule( policy['id'], rule2['id'])[self.RULE_NAME] self.assertEqual(n_constants.INGRESS_DIRECTION, retrieved_rule2[qos_consts.DIRECTION]) self.assertRaises(exceptions.Conflict, self.min_pps_client.update_minimum_packet_rate_rule, policy['id'], rule2['id'], **{qos_consts.DIRECTION: n_constants.ANY_DIRECTION}) @decorators.idempotent_id('c49018b6-d358-49a1-a94b-d53224165045') def test_rule_delete(self): policy = self.create_qos_policy(name=self.policy_name, description='test policy', shared=False) rule = self._create_qos_min_pps_rule( policy['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 200}) retrieved_rule = self.min_pps_client.show_minimum_packet_rate_rule( policy['id'], rule['id'])[self.RULE_NAME] self.assertEqual(rule['id'], retrieved_rule['id']) self.min_pps_client.delete_minimum_packet_rate_rule(policy['id'], rule['id']) self.assertRaises(exceptions.NotFound, self.min_pps_client.show_minimum_packet_rate_rule, policy['id'], rule['id']) @decorators.idempotent_id('1a6b6128-3d3e-11ec-bf49-57b326d417c0') def test_rule_create_forbidden_for_regular_tenants(self): self.assertRaises( exceptions.Forbidden, self.min_pps_client_primary.create_minimum_packet_rate_rule, 'policy', **{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 300}) @decorators.idempotent_id('1b94f4e2-3d3e-11ec-bb21-6f98e4044b8b') def test_get_rules_by_policy(self): policy1 = self.create_qos_policy(name='test-policy1', description='test policy1', shared=False) rule1 = self._create_qos_min_pps_rule( policy1['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 200}) policy2 = self.create_qos_policy(name='test-policy2', description='test policy2', shared=False) rule2 = self._create_qos_min_pps_rule( policy2['id'], {qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION, qos_consts.MIN_KPPS: 5000}) # Test 'list rules' rules = self.min_pps_client.list_minimum_packet_rate_rules( policy1['id']) rules = rules[self.RULES_NAME] rules_ids = [r['id'] for r in rules] self.assertIn(rule1['id'], rules_ids) self.assertNotIn(rule2['id'], rules_ids) class QosSearchCriteriaTest(base.BaseSearchCriteriaTest, base.BaseAdminNetworkTest): resource = 'policy' plural_name = 'policies' # Use unique description to isolate the tests from other QoS tests list_kwargs = {'description': 'search-criteria-test'} list_as_admin = True required_extensions = [qos_apidef.ALIAS] @classmethod def resource_setup(cls): super(QosSearchCriteriaTest, cls).resource_setup() for name in cls.resource_names: cls.create_qos_policy( name=name, description='search-criteria-test') @decorators.idempotent_id('55fc0103-fdc1-4d34-ab62-c579bb739a91') def test_list_sorts_asc(self): self._test_list_sorts_asc() @decorators.idempotent_id('13e08ac3-bfed-426b-892a-b3b158560c23') def test_list_sorts_desc(self): self._test_list_sorts_desc() @decorators.idempotent_id('719e61cc-e33c-4918-aa4d-1a791e6e0e86') def test_list_pagination(self): self._test_list_pagination() @decorators.idempotent_id('3bd8fb58-c0f8-4954-87fb-f286e1eb096a') def test_list_pagination_with_marker(self): self._test_list_pagination_with_marker() @decorators.idempotent_id('3bad0747-8082-46e9-be4d-c428a842db41') def test_list_pagination_with_href_links(self): self._test_list_pagination_with_href_links() @decorators.idempotent_id('d6a8bacd-d5e8-4ef3-bc55-23ca6998d208') def test_list_pagination_page_reverse_asc(self): self._test_list_pagination_page_reverse_asc() @decorators.idempotent_id('0b9aecdc-2b27-421b-b104-53d24e905ae8') def test_list_pagination_page_reverse_desc(self): self._test_list_pagination_page_reverse_desc() @decorators.idempotent_id('1a3dc257-dafd-4870-8c71-639ae7ddc6ea') def test_list_pagination_page_reverse_with_href_links(self): self._test_list_pagination_page_reverse_with_href_links() @decorators.idempotent_id('40e09b53-4eb8-4526-9181-d438c8005a20') def test_list_no_pagination_limit_0(self): self._test_list_no_pagination_limit_0()