neutron-tempest-plugin/neutron_tempest_plugin/api/test_qos.py
Slawek Kaplonski ec162e0f56 Remove usage of testscenarios and replace it with ddt library
As our internal QE reported to us, ddt is producing more consistent
names of the same tests with different variables then testscenarios.
With testscenarios it is something like:

foo_tempest_plugin.tests.something.SomeClass.test_foo[id-123456](c1,c2)

and with ddt is more similar to tests without scenarios:

foo_tempest_plugin.tests.something.SomeClass.test_foo_1[id-123456]

Apparently due to this inconsistency results of some tests couldn't be
properly reported properly to some tools which we are using downstream.

So this patch proposes to remove usage of testscenarios in
neutron_tempest_plugin and replace it with ddt where it is needed.
Actually ddt was already used in some of the tests here so this will
make it also more consistent across the tests in that repo.

Also as part of this effort I relalised that in some cases testscenarios
where used, tests were running e.g. 2 times but actually variable set to
different value in different scenarios were not used at all. So in such
case I simply dropped usage of testscenarios and left only one variant
of the test to be run.

Change-Id: Ieabab4ccaa0e2a365939425dff0c0776839251eb
2024-03-06 14:27:39 +01:00

1695 lines
76 KiB
Python

# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from neutron_lib.api.definitions import qos as qos_apidef
from neutron_lib import constants as n_constants
from neutron_lib.services.qos import constants as qos_consts
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
import testtools
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
CONF = config.CONF
class QosTestJSON(base.BaseAdminNetworkTest):
required_extensions = [qos_apidef.ALIAS]
@classmethod
def setup_clients(cls):
super(QosTestJSON, cls).setup_clients()
cls.qos_bw_limit_rule_client = \
cls.os_admin.qos_limit_bandwidth_rules_client
def setUp(self):
super(QosTestJSON, self).setUp()
self.policy_name = data_utils.rand_name(name='test', prefix='policy')
@staticmethod
def _get_driver_details(rule_type_details, driver_name):
for driver in rule_type_details['drivers']:
if driver['name'] == driver_name:
return driver
def _create_qos_bw_limit_rule(self, policy_id, rule_data):
rule = self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
qos_policy_id=policy_id,
**rule_data)['bandwidth_limit_rule']
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule,
policy_id, rule['id'])
return rule
@decorators.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
def test_create_policy(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy desc1',
shared=False)
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test-policy', retrieved_policy['name'])
self.assertEqual('test policy desc1', retrieved_policy['description'])
self.assertFalse(retrieved_policy['shared'])
# Test 'list policies'
policies = self.admin_client.list_qos_policies()['policies']
policies_ids = [p['id'] for p in policies]
self.assertIn(policy['id'], policies_ids)
@decorators.idempotent_id('606a48e2-5403-4052-b40f-4d54b855af76')
@utils.requires_ext(extension="project-id", service="network")
def test_show_policy_has_project_id(self):
policy = self.create_qos_policy(name=self.policy_name, shared=False)
body = self.admin_client.show_qos_policy(policy['id'])
show_policy = body['policy']
self.assertIn('project_id', show_policy)
self.assertEqual(self.admin_client.project_id,
show_policy['project_id'])
@decorators.idempotent_id('f8d20e92-f06d-4805-b54f-230f77715815')
def test_list_policy_filter_by_name(self):
policy1 = 'test' + data_utils.rand_name("policy")
policy2 = 'test' + data_utils.rand_name("policy")
self.create_qos_policy(name=policy1, description='test policy',
shared=False)
self.create_qos_policy(name=policy2, description='test policy',
shared=False)
policies = (self.admin_client.
list_qos_policies(name=policy1)['policies'])
self.assertEqual(1, len(policies))
retrieved_policy = policies[0]
self.assertEqual(policy1, retrieved_policy['name'])
@decorators.idempotent_id('dde0b449-a400-4a87-b5a5-4d1c413c917b')
def test_list_policy_sort_by_name(self):
policyA = 'A' + data_utils.rand_name("policy")
policyB = 'B' + data_utils.rand_name("policy")
self.create_qos_policy(name=policyA, description='test policy',
shared=False)
self.create_qos_policy(name=policyB, description='test policy',
shared=False)
param = {
'sort_key': 'name',
'sort_dir': 'asc'
}
policies = (self.admin_client.list_qos_policies(**param)['policies'])
policy_names = [p['name'] for p in policies]
self.assertLess(policy_names.index(policyA),
policy_names.index(policyB))
param = {
'sort_key': 'name',
'sort_dir': 'desc'
}
policies = (self.admin_client.list_qos_policies(**param)['policies'])
policy_names = [p['name'] for p in policies]
self.assertLess(policy_names.index(policyB),
policy_names.index(policyA))
@decorators.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')
def test_policy_update(self):
policy = self.create_qos_policy(
name=self.policy_name,
description='',
shared=False,
project_id=self.admin_client.project_id)
self.admin_client.update_qos_policy(policy['id'],
description='test policy desc2',
shared=True)
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test policy desc2', retrieved_policy['description'])
self.assertTrue(retrieved_policy['shared'])
self.assertEqual([], retrieved_policy['rules'])
@decorators.idempotent_id('6e880e0f-bbfc-4e54-87c6-680f90e1b618')
def test_policy_update_forbidden_for_regular_tenants_own_policy(self):
policy = self.create_qos_policy(name=self.policy_name,
description='',
shared=False,
project_id=self.client.project_id)
self.assertRaises(
exceptions.Forbidden,
self.client.update_qos_policy,
policy['id'], description='test policy')
@decorators.idempotent_id('4ecfd7e7-47b6-4702-be38-be9235901a87')
def test_policy_update_forbidden_for_regular_tenants_foreign_policy(self):
policy = self.create_qos_policy(
name=self.policy_name,
description='',
shared=False,
project_id=self.admin_client.project_id)
self.assertRaises(
exceptions.NotFound,
self.client.update_qos_policy,
policy['id'], description='test policy')
@decorators.idempotent_id('ee263db4-009a-4641-83e5-d0e83506ba4c')
def test_shared_policy_update(self):
policy = self.create_qos_policy(
name=self.policy_name,
description='',
shared=True,
project_id=self.admin_client.project_id)
self.admin_client.update_qos_policy(policy['id'],
description='test policy desc2')
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertTrue(retrieved_policy['shared'])
self.admin_client.update_qos_policy(policy['id'],
shared=False)
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertFalse(retrieved_policy['shared'])
@decorators.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
def test_delete_policy(self):
policy = self.admin_client.create_qos_policy(
'test-policy', 'desc', True)['policy']
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test-policy', retrieved_policy['name'])
self.admin_client.delete_qos_policy(policy['id'])
self.assertRaises(exceptions.NotFound,
self.admin_client.show_qos_policy, policy['id'])
@decorators.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
def test_list_admin_rule_types(self):
self._test_list_rule_types(self.admin_client)
@decorators.idempotent_id('49c8ea35-83a9-453a-bd23-239cf3b13929')
def test_list_regular_rule_types(self):
self._test_list_rule_types(self.client)
def _test_list_rule_types(self, client):
# List supported rule types
# Since returned rule types depends on loaded backend drivers this test
# is checking only if returned keys are same as expected keys
#
# In theory, we could make the test conditional on which ml2 drivers
# are enabled in gate (or more specifically, on which supported qos
# rules are claimed by core plugin), but that option doesn't seem to be
# available through tempest.lib framework
expected_rule_keys = ['type']
rule_types = client.list_qos_rule_types()
actual_list_rule_types = rule_types['rule_types']
# Verify that only required fields present in rule details
for rule in actual_list_rule_types:
self.assertEqual(tuple(expected_rule_keys), tuple(rule.keys()))
@decorators.idempotent_id('8ececa21-ef97-4904-a152-9f04c90f484d')
def test_show_rule_type_details_as_user(self):
self.assertRaises(
exceptions.Forbidden,
self.client.show_qos_rule_type,
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
@decorators.idempotent_id('d0a2460b-7325-481f-a531-050bd96ab25e')
def test_show_rule_type_details_as_admin(self):
# Since returned rule types depend on loaded backend drivers this test
# is checking only if returned keys are same as expected keys
# In theory, we could make the test conditional on which ml2 drivers
# are enabled in gate, but that option doesn't seem to be
# available through tempest.lib framework
expected_rule_type_details_keys = ['type', 'drivers']
rule_type_details = self.admin_client.show_qos_rule_type(
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT).get("rule_type")
# Verify that only required fields present in rule details
self.assertEqual(
sorted(tuple(expected_rule_type_details_keys)),
sorted(tuple(rule_type_details.keys())))
@decorators.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')
def test_policy_association_with_admin_network(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
network = self.create_network('test network', shared=True,
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
@decorators.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')
def test_policy_association_with_tenant_network(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=True)
network = self.create_network('test network',
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
@decorators.idempotent_id('9efe63d0-836f-4cc2-b00c-468e63aa614e')
def test_policy_association_with_network_nonexistent_policy(self):
self.assertRaises(
exceptions.NotFound,
self.create_network,
'test network',
qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e')
@decorators.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')
def test_policy_association_with_network_non_shared_policy(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self.assertRaises(
exceptions.NotFound,
self.create_network,
'test network', qos_policy_id=policy['id'])
@decorators.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')
def test_policy_update_association_with_admin_network(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
network = self.create_network('test network', shared=True)
retrieved_network = self.admin_client.show_network(network['id'])
self.assertIsNone(retrieved_network['network']['qos_policy_id'])
self.admin_client.update_network(network['id'],
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
@decorators.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')
def test_policy_association_with_port_shared_policy(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=True)
network = self.create_network('test network', shared=True)
port = self.create_port(network, qos_policy_id=policy['id'])
retrieved_port = self.admin_client.show_port(port['id'])
self.assertEqual(
policy['id'], retrieved_port['port']['qos_policy_id'])
@decorators.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
def test_policy_association_with_port_nonexistent_policy(self):
network = self.create_network('test network', shared=True)
self.assertRaises(
exceptions.NotFound,
self.create_port,
network,
qos_policy_id='49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
@decorators.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')
def test_policy_association_with_port_non_shared_policy(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
network = self.create_network('test network', shared=True)
self.assertRaises(
exceptions.NotFound,
self.create_port,
network, qos_policy_id=policy['id'])
@decorators.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')
def test_policy_update_association_with_port_shared_policy(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=True)
network = self.create_network('test network', shared=True)
port = self.create_port(network)
retrieved_port = self.admin_client.show_port(port['id'])
self.assertIsNone(retrieved_port['port']['qos_policy_id'])
self.client.update_port(port['id'], qos_policy_id=policy['id'])
retrieved_port = self.admin_client.show_port(port['id'])
self.assertEqual(
policy['id'], retrieved_port['port']['qos_policy_id'])
@decorators.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')
def test_delete_not_allowed_if_policy_in_use_by_network(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=True)
self.create_network('test network', qos_policy_id=policy['id'],
shared=True)
self.assertRaises(
exceptions.Conflict,
self.admin_client.delete_qos_policy, policy['id'])
@decorators.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')
def test_delete_not_allowed_if_policy_in_use_by_port(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=True)
network = self.create_network('test network', shared=True)
self.create_port(network, qos_policy_id=policy['id'])
self.assertRaises(
exceptions.Conflict,
self.admin_client.delete_qos_policy, policy['id'])
@decorators.idempotent_id('a2a5849b-dd06-4b18-9664-0b6828a1fc27')
def test_qos_policy_delete_with_rules(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
self.admin_client.delete_qos_policy(policy['id'])
with testtools.ExpectedException(exceptions.NotFound):
self.admin_client.show_qos_policy(policy['id'])
@decorators.idempotent_id('fb384bde-a973-41c3-a542-6f77a092155f')
def test_get_policy_that_is_shared(self):
policy = self.create_qos_policy(
name='test-policy-shared',
description='shared policy',
shared=True,
project_id=self.admin_client.project_id)
obtained_policy = self.client.show_qos_policy(policy['id'])['policy']
self.assertEqual(obtained_policy, policy)
@decorators.idempotent_id('aed8e2a6-22da-421b-89b9-935a2c1a1b50')
def test_policy_create_forbidden_for_regular_tenants(self):
self.assertRaises(
exceptions.Forbidden,
self.client.create_qos_policy,
'test-policy', 'test policy', False)
@decorators.idempotent_id('18d94f22-b9d5-4390-af12-d30a0cfc4cd3')
def test_default_policy_creating_network_without_policy(self):
project_id = self.create_project()['id']
policy = self.create_qos_policy(name=self.policy_name,
project_id=project_id,
is_default=True)
network = self.create_network('test network', client=self.admin_client,
project_id=project_id)
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
@decorators.idempotent_id('807cce45-38e5-482d-94db-36e1796aba73')
def test_default_policy_creating_network_with_policy(self):
project_id = self.create_project()['id']
self.create_qos_policy(name='test-policy',
project_id=project_id,
is_default=True)
policy = self.create_qos_policy(name='test-policy',
project_id=project_id)
network = self.create_network('test network', client=self.admin_client,
project_id=project_id,
qos_policy_id=policy['id'])
retrieved_network = self.admin_client.show_network(network['id'])
self.assertEqual(
policy['id'], retrieved_network['network']['qos_policy_id'])
@decorators.idempotent_id('06060880-2956-4c16-9a63-f284c3879229')
def test_user_create_port_with_admin_qos_policy(self):
qos_policy = self.create_qos_policy(
name=self.policy_name,
project_id=self.admin_client.project_id,
shared=False)
network = self.create_network(
'test network', client=self.admin_client,
project_id=self.client.project_id,
qos_policy_id=qos_policy['id'])
port = self.create_port(network)
self.assertEqual(network['id'], port['network_id'])
class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
credentials = ['primary', 'admin']
direction = None
required_extensions = [qos_apidef.ALIAS]
@classmethod
def setup_clients(cls):
super(QosBandwidthLimitRuleTestJSON, cls).setup_clients()
cls.qos_bw_limit_rule_client = \
cls.os_admin.qos_limit_bandwidth_rules_client
cls.qos_bw_limit_rule_client_primary = \
cls.os_primary.qos_limit_bandwidth_rules_client
@classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
def resource_setup(cls):
super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
def setUp(self):
super(QosBandwidthLimitRuleTestJSON, self).setUp()
self.policy_name = data_utils.rand_name(name='test', prefix='policy')
def _create_qos_bw_limit_rule(self, policy_id, rule_data):
rule = self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
qos_policy_id=policy_id,
**rule_data)['bandwidth_limit_rule']
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule,
policy_id, rule['id'])
return rule
@decorators.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
def test_rule_create(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self._create_qos_bw_limit_rule(
policy['id'],
{'max_kbps': 200, 'max_burst_kbps': 1337, 'direction': 'ingress'})
# Test 'show rule'
retrieved_rule = \
self.qos_bw_limit_rule_client.show_limit_bandwidth_rule(
policy['id'], rule['id'])
retrieved_rule = retrieved_rule['bandwidth_limit_rule']
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(200, retrieved_rule['max_kbps'])
self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
self.assertEqual('ingress', retrieved_rule['direction'])
# Test 'list rules'
rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules(
policy['id'])
rules = rules['bandwidth_limit_rules']
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
policy_rules = retrieved_policy['policy']['rules']
self.assertEqual(1, len(policy_rules))
self.assertEqual(rule['id'], policy_rules[0]['id'])
self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
policy_rules[0]['type'])
@decorators.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
def test_rule_create_fail_for_the_same_type(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
self.assertRaises(
exceptions.Conflict,
self._create_qos_bw_limit_rule,
policy['id'],
{'max_kbps': 201, 'max_burst_kbps': 1338})
@decorators.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
def test_rule_update(self):
self._test_rule_update(opposite_direction=None)
def _test_rule_update(self, opposite_direction=None):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 1, 'max_burst_kbps': 1})
if opposite_direction:
self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
policy['id'], rule['id'],
**{'max_kbps': 200, 'max_burst_kbps': 1337,
'direction': opposite_direction})
else:
self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
policy['id'], rule['id'],
**{'max_kbps': 200, 'max_burst_kbps': 1337})
retrieved_policy = self.qos_bw_limit_rule_client.\
show_limit_bandwidth_rule(policy['id'], rule['id'])
retrieved_policy = retrieved_policy['bandwidth_limit_rule']
self.assertEqual(200, retrieved_policy['max_kbps'])
self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
if opposite_direction:
self.assertEqual(opposite_direction,
retrieved_policy['direction'])
@decorators.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
def test_rule_delete(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
retrieved_policy = \
self.qos_bw_limit_rule_client.show_limit_bandwidth_rule(
policy['id'], rule['id'])
retrieved_policy = retrieved_policy['bandwidth_limit_rule']
self.assertEqual(rule['id'], retrieved_policy['id'])
self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule(
policy['id'], rule['id'])
self.assertRaises(
exceptions.NotFound,
self.qos_bw_limit_rule_client.show_limit_bandwidth_rule,
policy['id'], rule['id'])
@decorators.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
def test_rule_create_rule_nonexistent_policy(self):
self.assertRaises(
exceptions.NotFound,
self._create_qos_bw_limit_rule,
'policy', {'max_kbps': 200, 'max_burst_kbps': 1337})
@decorators.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
def test_rule_create_forbidden_for_regular_tenants(self):
self.assertRaises(
exceptions.Forbidden,
self.qos_bw_limit_rule_client_primary.create_limit_bandwidth_rule,
'policy', **{'max_kbps': 1, 'max_burst_kbps': 2})
@decorators.idempotent_id('1bfc55d9-6fd8-4293-ab3a-b1d69bf7cd2e')
def test_rule_update_forbidden_for_regular_tenants_own_policy(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False,
project_id=self.client.project_id)
rule = self._create_qos_bw_limit_rule(
policy['id'],
{'max_kbps': 1, 'max_burst_kbps': 1})
self.assertRaises(
exceptions.Forbidden,
self.qos_bw_limit_rule_client_primary.update_limit_bandwidth_rule,
policy['id'], rule['id'], **{'max_kbps': 2, 'max_burst_kbps': 4})
@decorators.idempotent_id('9a607936-4b6f-4c2f-ad21-bd5b3d4fc91f')
def test_rule_update_forbidden_for_regular_tenants_foreign_policy(self):
policy = self.create_qos_policy(
name=self.policy_name,
description='test policy',
shared=False,
project_id=self.admin_client.project_id)
rule = self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 1, 'max_burst_kbps': 1})
self.assertRaises(
exceptions.NotFound,
self.qos_bw_limit_rule_client_primary.update_limit_bandwidth_rule,
policy['id'], rule['id'], **{'max_kbps': 2, 'max_burst_kbps': 4})
@decorators.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
def test_get_rules_by_policy(self):
policy1 = self.create_qos_policy(
name='test-policy1',
description='test policy1',
shared=False)
rule1 = self._create_qos_bw_limit_rule(
policy1['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
policy2 = self.create_qos_policy(
name='test-policy2',
description='test policy2',
shared=False)
rule2 = self._create_qos_bw_limit_rule(
policy2['id'], {'max_kbps': 5000, 'max_burst_kbps': 2523})
# Test 'list rules'
rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules(
policy1['id'])
rules = rules['bandwidth_limit_rules']
rules_ids = [r['id'] for r in rules]
self.assertIn(rule1['id'], rules_ids)
self.assertNotIn(rule2['id'], rules_ids)
@testtools.skipUnless(
CONF.neutron_plugin_options.create_shared_resources,
"""Creation of shared resources should be allowed,
setting the create_shared_resources option as 'True' is needed""")
@decorators.idempotent_id('d911707e-fa2c-11e9-9553-5076af30bbf5')
def test_attach_and_detach_a_policy_by_a_tenant(self):
# As an admin create an non shared QoS policy,add a rule
# and associate it with a network
self.network = self.create_network()
policy = self.create_qos_policy(name=self.policy_name,
description='test policy for attach',
shared=False)
self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024})
self.admin_client.update_network(
self.network['id'], qos_policy_id=policy['id'])
# As a tenant, try to detach the policy from the network
# The operation should be forbidden
self.assertRaises(
exceptions.Forbidden,
self.client.update_network,
self.network['id'], qos_policy_id=None)
# As an admin, make the policy shared
self.admin_client.update_qos_policy(policy['id'], shared=True)
# As a tenant, try to detach the policy from the network
# The operation should be allowed
self.client.update_network(self.network['id'],
qos_policy_id=None)
retrieved_network = self.admin_client.show_network(self.network['id'])
self.assertIsNone(retrieved_network['network']['qos_policy_id'])
# As a tenant, try to delete the policy from the network
# should be forbidden
self.assertRaises(
exceptions.Forbidden,
self.client.delete_qos_policy,
policy['id'])
@ddt.ddt
class QosBandwidthLimitRuleWithDirectionTestJSON(
QosBandwidthLimitRuleTestJSON):
required_extensions = (
QosBandwidthLimitRuleTestJSON.required_extensions +
['qos-bw-limit-direction']
)
@classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
def resource_setup(cls):
super(QosBandwidthLimitRuleWithDirectionTestJSON, cls).resource_setup()
def setUp(self):
super(QosBandwidthLimitRuleWithDirectionTestJSON, self).setUp()
self.policy_name = data_utils.rand_name(name='test', prefix='policy')
@decorators.idempotent_id('c8cbe502-0f7e-11ea-8d71-362b9e155667')
def test_create_policy_with_multiple_rules(self):
# Create a policy with multiple rules
policy = self.create_qos_policy(name=self.policy_name,
description='test policy1',
shared=False)
rule1 = self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024,
'direction': n_constants.EGRESS_DIRECTION})
rule2 = self._create_qos_bw_limit_rule(
policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024,
'direction': n_constants.INGRESS_DIRECTION})
# Check that the rules were added to the policy
rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules(
policy['id'])['bandwidth_limit_rules']
rules_ids = [rule['id'] for rule in rules]
self.assertIn(rule1['id'], rules_ids)
self.assertIn(rule2['id'], rules_ids)
# Check that the rules creation fails for the same rule types
self.assertRaises(
exceptions.Conflict,
self._create_qos_bw_limit_rule,
policy['id'],
{'max_kbps': 1025, 'max_burst_kbps': 1025,
'direction': n_constants.EGRESS_DIRECTION})
self.assertRaises(
exceptions.Conflict,
self._create_qos_bw_limit_rule,
policy['id'],
{'max_kbps': 1025, 'max_burst_kbps': 1025,
'direction': n_constants.INGRESS_DIRECTION})
@decorators.idempotent_id('7ca7c83b-0555-4a0f-a422-4338f77f79e5')
@ddt.unpack
@ddt.data({'opposite_direction': 'ingress'},
{'opposite_direction': 'egress'})
def test_rule_update(self, opposite_direction):
self._test_rule_update(opposite_direction=opposite_direction)
class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
force_tenant_isolation = True
credentials = ['primary', 'alt', 'admin']
required_extensions = [qos_apidef.ALIAS]
@classmethod
def resource_setup(cls):
super(RbacSharedQosPoliciesTest, cls).resource_setup()
cls.client2 = cls.os_alt.network_client
def _create_qos_policy(self, project_id=None):
args = {'name': data_utils.rand_name('test-policy'),
'description': 'test policy',
'shared': False,
'project_id': project_id}
qos_policy = self.admin_client.create_qos_policy(**args)['policy']
self.addCleanup(self.admin_client.delete_qos_policy, qos_policy['id'])
return qos_policy
def _make_admin_policy_shared_to_project_id(self, project_id):
policy = self._create_qos_policy()
rbac_policy = self.admin_client.create_rbac_policy(
object_type='qos_policy',
object_id=policy['id'],
action='access_as_shared',
target_tenant=project_id,
)['rbac_policy']
return {'policy': policy, 'rbac_policy': rbac_policy}
def _create_network(self, qos_policy_id, client, should_cleanup=True):
net = client.create_network(
name=data_utils.rand_name('test-network'),
qos_policy_id=qos_policy_id)['network']
if should_cleanup:
self.addCleanup(client.delete_network, net['id'])
return net
@decorators.idempotent_id('b9dcf582-d3b3-11e5-950a-54ee756c66df')
def test_policy_sharing_with_wildcard(self):
qos_pol = self.create_qos_policy(
name=data_utils.rand_name('test-policy'),
description='test-shared-policy', shared=False,
project_id=self.admin_client.project_id)
self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
# test update shared False -> True
self.admin_client.update_qos_policy(qos_pol['id'], shared=True)
qos_pol['shared'] = True
self.client2.show_qos_policy(qos_pol['id'])
rbac_pol = {'target_tenant': '*',
'tenant_id': self.admin_client.project_id,
'project_id': self.admin_client.project_id,
'object_type': 'qos_policy',
'object_id': qos_pol['id'],
'action': 'access_as_shared'}
rbac_policies = self.admin_client.list_rbac_policies()['rbac_policies']
rbac_policies = [r for r in rbac_policies if r.pop('id')]
self.assertIn(rbac_pol, rbac_policies)
# update shared True -> False should fail because the policy is bound
# to a network
net = self._create_network(qos_pol['id'], self.admin_client, False)
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
# delete the network, and update shared True -> False should pass now
self.admin_client.delete_network(net['id'])
self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
qos_pol['shared'] = False
self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
def _create_net_bound_qos_rbacs(self):
res = self._make_admin_policy_shared_to_project_id(
self.client.project_id)
qos_policy, rbac_for_client_tenant = res['policy'], res['rbac_policy']
# add a wildcard rbac rule - now the policy globally shared
rbac_wildcard = self.admin_client.create_rbac_policy(
object_type='qos_policy',
object_id=qos_policy['id'],
action='access_as_shared',
target_tenant='*',
)['rbac_policy']
# tenant1 now uses qos policy for net
self._create_network(qos_policy['id'], self.client)
return rbac_for_client_tenant, rbac_wildcard
@decorators.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
def test_net_bound_shared_policy_wildcard_and_project_id_wild_remove(self):
client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
# globally unshare the qos-policy, the specific share should remain
self.admin_client.delete_rbac_policy(wildcard_rbac['id'])
self.client.list_rbac_policies(id=client_rbac['id'])
@decorators.idempotent_id('1997b00c-0c75-4e43-8ce2-999f9fa555ee')
def test_net_bound_shared_policy_wildcard_and_projectid_wild_remains(self):
client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
# remove client_rbac policy the wildcard share should remain
self.admin_client.delete_rbac_policy(client_rbac['id'])
self.client.list_rbac_policies(id=wildcard_rbac['id'])
@decorators.idempotent_id('2ace9adc-da6e-11e5-aafe-54ee756c66df')
def test_policy_sharing_with_wildcard_and_project_id(self):
res = self._make_admin_policy_shared_to_project_id(
self.client.project_id)
qos_policy, rbac = res['policy'], res['rbac_policy']
qos_pol = self.client.show_qos_policy(qos_policy['id'])['policy']
self.assertTrue(qos_pol['shared'])
with testtools.ExpectedException(exceptions.NotFound):
self.client2.show_qos_policy(qos_policy['id'])
# make the qos-policy globally shared
self.admin_client.update_qos_policy(qos_policy['id'], shared=True)
qos_pol = self.client2.show_qos_policy(qos_policy['id'])['policy']
self.assertTrue(qos_pol['shared'])
# globally unshare the qos-policy, the specific share should remain
self.admin_client.update_qos_policy(qos_policy['id'], shared=False)
self.client.show_qos_policy(qos_policy['id'])
with testtools.ExpectedException(exceptions.NotFound):
self.client2.show_qos_policy(qos_policy['id'])
self.assertIn(rbac,
self.admin_client.list_rbac_policies()['rbac_policies'])
@decorators.idempotent_id('9f85c76a-a350-11e5-8ae5-54ee756c66df')
def test_policy_target_update(self):
res = self._make_admin_policy_shared_to_project_id(
self.client.project_id)
# change to client2
update_res = self.admin_client.update_rbac_policy(
res['rbac_policy']['id'], target_tenant=self.client2.project_id)
self.assertEqual(self.client2.project_id,
update_res['rbac_policy']['target_tenant'])
# make sure everything else stayed the same
res['rbac_policy'].pop('target_tenant')
update_res['rbac_policy'].pop('target_tenant')
self.assertEqual(res['rbac_policy'], update_res['rbac_policy'])
@decorators.idempotent_id('a9b39f46-a350-11e5-97c7-54ee756c66df')
def test_network_presence_prevents_policy_rbac_policy_deletion(self):
res = self._make_admin_policy_shared_to_project_id(
self.client2.project_id)
qos_policy_id = res['policy']['id']
self._create_network(qos_policy_id, self.client2)
# a network with shared qos-policy should prevent the deletion of an
# rbac-policy required for it to be shared
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
# a wildcard policy should allow the specific policy to be deleted
# since it allows the remaining port
wild = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=res['policy']['id'],
action='access_as_shared', target_tenant='*')['rbac_policy']
self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
# now that wildcard is the only remaining, it should be subjected to
# the same restriction
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.delete_rbac_policy(wild['id'])
# we can't update the policy to a different tenant
with testtools.ExpectedException(exceptions.Conflict):
self.admin_client.update_rbac_policy(
wild['id'], target_tenant=self.client2.project_id)
@decorators.idempotent_id('b0fe87e8-a350-11e5-9f08-54ee756c66df')
def test_regular_client_shares_to_another_regular_client(self):
# owned by self.admin_client
policy = self._create_qos_policy()
with testtools.ExpectedException(exceptions.NotFound):
self.client.show_qos_policy(policy['id'])
rbac_policy = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared',
target_tenant=self.client.project_id)['rbac_policy']
self.client.show_qos_policy(policy['id'])
self.assertIn(rbac_policy,
self.admin_client.list_rbac_policies()['rbac_policies'])
# ensure that 'client2' can't see the rbac-policy sharing the
# qos-policy to it because the rbac-policy belongs to 'client'
self.assertNotIn(rbac_policy['id'], [p['id'] for p in
self.client2.list_rbac_policies()['rbac_policies']])
@decorators.idempotent_id('ba88d0ca-a350-11e5-a06f-54ee756c66df')
def test_filter_fields(self):
policy = self._create_qos_policy()
self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared', target_tenant=self.client2.project_id)
field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
('project_id', 'target_tenant'))
for fields in field_args:
res = self.admin_client.list_rbac_policies(fields=fields)
self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
@decorators.idempotent_id('c10d993a-a350-11e5-9c7a-54ee756c66df')
def test_rbac_policy_show(self):
res = self._make_admin_policy_shared_to_project_id(
self.client.project_id)
p1 = res['rbac_policy']
p2 = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=res['policy']['id'],
action='access_as_shared',
target_tenant='*')['rbac_policy']
self.assertEqual(
p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
self.assertEqual(
p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
@decorators.idempotent_id('c7496f86-a350-11e5-b380-54ee756c66df')
def test_filter_rbac_policies(self):
policy = self._create_qos_policy()
rbac_pol1 = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared',
target_tenant=self.client2.project_id)['rbac_policy']
rbac_pol2 = self.admin_client.create_rbac_policy(
object_type='qos_policy', object_id=policy['id'],
action='access_as_shared',
target_tenant=self.admin_client.project_id)['rbac_policy']
res1 = self.admin_client.list_rbac_policies(id=rbac_pol1['id'])[
'rbac_policies']
res2 = self.admin_client.list_rbac_policies(id=rbac_pol2['id'])[
'rbac_policies']
self.assertEqual(1, len(res1))
self.assertEqual(1, len(res2))
self.assertEqual(rbac_pol1['id'], res1[0]['id'])
self.assertEqual(rbac_pol2['id'], res2[0]['id'])
@decorators.idempotent_id('cd7d755a-a350-11e5-a344-54ee756c66df')
def test_regular_client_blocked_from_sharing_anothers_policy(self):
qos_policy = self._make_admin_policy_shared_to_project_id(
self.client.project_id)['policy']
with testtools.ExpectedException(exceptions.BadRequest):
self.client.create_rbac_policy(
object_type='qos_policy', object_id=qos_policy['id'],
action='access_as_shared',
target_tenant=self.client2.project_id)
# make sure the rbac-policy is invisible to the tenant for which it's
# being shared
self.assertFalse(self.client.list_rbac_policies()['rbac_policies'])
class QosDscpMarkingRuleTestJSON(base.BaseAdminNetworkTest):
VALID_DSCP_MARK1 = 56
VALID_DSCP_MARK2 = 48
required_extensions = [qos_apidef.ALIAS]
@classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_DSCP_MARKING)
def resource_setup(cls):
super(QosDscpMarkingRuleTestJSON, cls).resource_setup()
def setUp(self):
super(QosDscpMarkingRuleTestJSON, self).setUp()
self.policy_name = data_utils.rand_name(name='test', prefix='policy')
@decorators.idempotent_id('f5cbaceb-5829-497c-9c60-ad70969e9a08')
def test_rule_create(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self.admin_client.create_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
# Test 'show rule'
retrieved_rule = self.admin_client.show_dscp_marking_rule(
policy['id'], rule['id'])
retrieved_rule = retrieved_rule['dscp_marking_rule']
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(self.VALID_DSCP_MARK1, retrieved_rule['dscp_mark'])
# Test 'list rules'
rules = self.admin_client.list_dscp_marking_rules(policy['id'])
rules = rules['dscp_marking_rules']
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
policy_rules = retrieved_policy['policy']['rules']
self.assertEqual(1, len(policy_rules))
self.assertEqual(rule['id'], policy_rules[0]['id'])
self.assertEqual(qos_consts.RULE_TYPE_DSCP_MARKING,
policy_rules[0]['type'])
@decorators.idempotent_id('08553ffe-030f-4037-b486-7e0b8fb9385a')
def test_rule_create_fail_for_the_same_type(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self.admin_client.create_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
self.assertRaises(exceptions.Conflict,
self.admin_client.create_dscp_marking_rule,
policy_id=policy['id'],
dscp_mark=self.VALID_DSCP_MARK2)
@decorators.idempotent_id('76f632e5-3175-4408-9a32-3625e599c8a2')
def test_rule_update(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self.admin_client.create_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
self.admin_client.update_dscp_marking_rule(
policy['id'], rule['id'], dscp_mark=self.VALID_DSCP_MARK2)
retrieved_policy = self.admin_client.show_dscp_marking_rule(
policy['id'], rule['id'])
retrieved_policy = retrieved_policy['dscp_marking_rule']
self.assertEqual(self.VALID_DSCP_MARK2, retrieved_policy['dscp_mark'])
@decorators.idempotent_id('74f81904-c35f-48a3-adae-1f5424cb3c18')
def test_rule_delete(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self.admin_client.create_dscp_marking_rule(
policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
retrieved_policy = self.admin_client.show_dscp_marking_rule(
policy['id'], rule['id'])
retrieved_policy = retrieved_policy['dscp_marking_rule']
self.assertEqual(rule['id'], retrieved_policy['id'])
self.admin_client.delete_dscp_marking_rule(policy['id'], rule['id'])
self.assertRaises(exceptions.NotFound,
self.admin_client.show_dscp_marking_rule,
policy['id'], rule['id'])
@decorators.idempotent_id('9cb8ef5c-96fc-4978-9ee0-e3b02bab628a')
def test_rule_create_rule_nonexistent_policy(self):
self.assertRaises(
exceptions.NotFound,
self.admin_client.create_dscp_marking_rule,
'policy', self.VALID_DSCP_MARK1)
@decorators.idempotent_id('bf6002ea-29de-486f-b65d-08aea6d4c4e2')
def test_rule_create_forbidden_for_regular_tenants(self):
self.assertRaises(
exceptions.Forbidden,
self.client.create_dscp_marking_rule,
'policy', self.VALID_DSCP_MARK1)
@decorators.idempotent_id('33646b08-4f05-4493-a48a-bde768a18533')
def test_invalid_rule_create(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self.assertRaises(
exceptions.BadRequest,
self.admin_client.create_dscp_marking_rule,
policy['id'], 58)
@decorators.idempotent_id('c565131d-4c80-4231-b0f3-9ae2be4de129')
def test_get_rules_by_policy(self):
policy1 = self.create_qos_policy(name='test-policy1',
description='test policy1',
shared=False)
rule1 = self.admin_client.create_dscp_marking_rule(
policy1['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
policy2 = self.create_qos_policy(name='test-policy2',
description='test policy2',
shared=False)
rule2 = self.admin_client.create_dscp_marking_rule(
policy2['id'], self.VALID_DSCP_MARK2)['dscp_marking_rule']
# Test 'list rules'
rules = self.admin_client.list_dscp_marking_rules(policy1['id'])
rules = rules['dscp_marking_rules']
rules_ids = [r['id'] for r in rules]
self.assertIn(rule1['id'], rules_ids)
self.assertNotIn(rule2['id'], rules_ids)
@decorators.idempotent_id('19ed2286-ccb1-11e9-87d7-525400d6f522')
def test_qos_dscp_create_and_update(self):
"""This test covers:
1.Creating a basic QoS policy with DSCP marking rule.
2.Updating QoS policy:
Administrator should have the ability to update existing QoS policy.
This test should verify that:
It's possible to update the existing DSCP marking rule with all of
the valid marks between 0-56, except of the invalid marks:
2-6, 42, 44, and 50-54 (which should be forbidden)
"""
def _test_update_dscp_mark_values(self, dscp_policy_id, rule_id,
valid_dscp_marks):
for mark in range(valid_dscp_marks[1], self.VALID_DSCP_MARK1 + 1):
if mark in valid_dscp_marks:
self.admin_client.update_dscp_marking_rule(
dscp_policy_id, rule_id, dscp_mark=mark)
retrieved_rule = self.admin_client.show_dscp_marking_rule(
dscp_policy_id, rule_id)['dscp_marking_rule']
self.assertEqual(mark, retrieved_rule['dscp_mark'],
"""current DSCP mark is incorrect:
expected value {0} actual value {1}
""".format(mark,
retrieved_rule['dscp_mark']))
else:
self.assertRaises(exceptions.BadRequest,
self.admin_client.create_dscp_marking_rule,
dscp_policy_id,
mark)
# Retrieve the valid range of DSCP marks from the API.
rule_type_details = self.admin_client.show_qos_rule_type(
qos_consts.RULE_TYPE_DSCP_MARKING).get('rule_type')
# There should be at least one driver supporting the DSCP marking rule.
dscp_driver = rule_type_details['drivers'][0]
for parameter in dscp_driver['supported_parameters']:
if parameter['parameter_name'] == qos_consts.DSCP_MARK:
valid_dscp_marks = parameter['parameter_values']
break
else:
self.fail('The DSCP marking rule does not have the %s parameter' %
qos_consts.DSCP_MARK)
# Setup network
self.network = self.create_network()
# Create QoS policy
dscp_policy_id = self.create_qos_policy(
name=self.policy_name,
description='test-qos-policy',
project_id=self.client.project_id,
shared=True)['id']
# Associate QoS to the network
self.admin_client.update_network(
self.network['id'], qos_policy_id=dscp_policy_id)
# Set a new DSCP rule with the first mark in range
rule_id = self.admin_client.create_dscp_marking_rule(
dscp_policy_id,
n_constants.VALID_DSCP_MARKS[0])[
'dscp_marking_rule']['id']
# Validate that the rule was set up properly
retrieved_rule = self.client.show_dscp_marking_rule(
dscp_policy_id, rule_id)['dscp_marking_rule']
self.assertEqual(n_constants.VALID_DSCP_MARKS[0],
retrieved_rule['dscp_mark'],
"""current DSCP mark is incorrect:
expected value {0} actual value {1}
""".format(n_constants.VALID_DSCP_MARKS[0],
retrieved_rule['dscp_mark']))
# Try to set marks in range 8:56 (invalid marks should raise an error)
_test_update_dscp_mark_values(self, dscp_policy_id, rule_id,
valid_dscp_marks)
class QosMinimumBandwidthRuleTestJSON(base.BaseAdminNetworkTest):
DIRECTION_EGRESS = "egress"
DIRECTION_INGRESS = "ingress"
RULE_NAME = qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH + "_rule"
RULES_NAME = RULE_NAME + "s"
required_extensions = [qos_apidef.ALIAS]
@classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH)
def resource_setup(cls):
super(QosMinimumBandwidthRuleTestJSON, cls).resource_setup()
@classmethod
def setup_clients(cls):
super(QosMinimumBandwidthRuleTestJSON, cls).setup_clients()
cls.qos_min_bw_rules_client = \
cls.os_admin.qos_minimum_bandwidth_rules_client
cls.qos_min_bw_rules_client_primary = \
cls.os_primary.qos_minimum_bandwidth_rules_client
def setUp(self):
super(QosMinimumBandwidthRuleTestJSON, self).setUp()
self.policy_name = data_utils.rand_name(name='test', prefix='policy')
@decorators.idempotent_id('aa59b00b-3e9c-4787-92f8-93a5cdf5e378')
def test_rule_create(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
qos_policy_id=policy['id'],
**{'direction': self.DIRECTION_EGRESS,
'min_kbps': 1138})[self.RULE_NAME]
# Test 'show rule'
retrieved_rule = \
self.qos_min_bw_rules_client.show_minimum_bandwidth_rule(
policy['id'], rule['id'])
retrieved_rule = retrieved_rule[self.RULE_NAME]
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(1138, retrieved_rule['min_kbps'])
self.assertEqual(self.DIRECTION_EGRESS, retrieved_rule['direction'])
# Test 'list rules'
rules = self.qos_min_bw_rules_client.list_minimum_bandwidth_rules(
policy['id'])
rules = rules[self.RULES_NAME]
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
policy_rules = retrieved_policy['policy']['rules']
self.assertEqual(1, len(policy_rules))
self.assertEqual(rule['id'], policy_rules[0]['id'])
self.assertEqual(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH,
policy_rules[0]['type'])
@decorators.idempotent_id('266d9b87-e51c-48bd-9aa7-8269573621be')
def test_rule_create_fail_for_missing_min_kbps(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self.assertRaises(
exceptions.BadRequest,
self.qos_min_bw_rules_client.create_minimum_bandwidth_rule,
qos_policy_id=policy['id'],
**{'direction': self.DIRECTION_EGRESS})
@decorators.idempotent_id('aa59b00b-ab01-4787-92f8-93a5cdf5e378')
def test_rule_create_fail_for_the_same_type(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
qos_policy_id=policy['id'],
**{'direction': self.DIRECTION_EGRESS,
'min_kbps': 200})
self.assertRaises(
exceptions.Conflict,
self.qos_min_bw_rules_client.create_minimum_bandwidth_rule,
qos_policy_id=policy['id'],
**{'direction': self.DIRECTION_EGRESS,
'min_kbps': 201})
@decorators.idempotent_id('35baf998-ae65-495c-9902-35a0d11e8936')
@utils.requires_ext(extension="qos-bw-minimum-ingress",
service="network")
def test_rule_create_pass_for_direction_ingress(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
qos_policy_id=policy['id'],
**{'direction': self.DIRECTION_INGRESS,
'min_kbps': 201})
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
policy_rules = retrieved_policy['policy']['rules']
self.assertEqual(1, len(policy_rules))
self.assertEqual(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH,
policy_rules[0]['type'])
self.assertEqual(self.DIRECTION_INGRESS, policy_rules[0]['direction'])
@decorators.idempotent_id('a49a6988-2568-47d2-931e-2dbc858943b3')
def test_rule_update(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
qos_policy_id=policy['id'],
**{'direction': self.DIRECTION_EGRESS,
'min_kbps': 300})[self.RULE_NAME]
self.qos_min_bw_rules_client.update_minimum_bandwidth_rule(
policy['id'], rule['id'],
**{'min_kbps': 350, 'direction': self.DIRECTION_EGRESS})
retrieved_policy = \
self.qos_min_bw_rules_client.show_minimum_bandwidth_rule(
policy['id'], rule['id'])
retrieved_policy = retrieved_policy[self.RULE_NAME]
self.assertEqual(350, retrieved_policy['min_kbps'])
self.assertEqual(self.DIRECTION_EGRESS, retrieved_policy['direction'])
@decorators.idempotent_id('a7ee6efd-7b33-4a68-927d-275b4f8ba958')
def test_rule_delete(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
policy['id'],
**{'direction': self.DIRECTION_EGRESS,
'min_kbps': 200})[self.RULE_NAME]
retrieved_policy = \
self.qos_min_bw_rules_client.show_minimum_bandwidth_rule(
policy['id'], rule['id'])
retrieved_policy = retrieved_policy[self.RULE_NAME]
self.assertEqual(rule['id'], retrieved_policy['id'])
self.qos_min_bw_rules_client.delete_minimum_bandwidth_rule(
policy['id'], rule['id'])
self.assertRaises(
exceptions.NotFound,
self.qos_min_bw_rules_client.show_minimum_bandwidth_rule,
policy['id'], rule['id'])
@decorators.idempotent_id('a211222c-5808-46cb-a961-983bbab6b852')
def test_rule_create_rule_nonexistent_policy(self):
self.assertRaises(
exceptions.NotFound,
self.qos_min_bw_rules_client.create_minimum_bandwidth_rule,
'policy',
**{'direction': self.DIRECTION_EGRESS, 'min_kbps': 200})
@decorators.idempotent_id('b4a2e7ad-786f-4927-a85a-e545a93bd274')
def test_rule_create_forbidden_for_regular_tenants(self):
self.assertRaises(
exceptions.Forbidden,
self.qos_min_bw_rules_client_primary.create_minimum_bandwidth_rule,
'policy', **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 300})
@decorators.idempotent_id('de0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
def test_get_rules_by_policy(self):
policy1 = self.create_qos_policy(name='test-policy1',
description='test policy1',
shared=False)
rule1 = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
qos_policy_id=policy1['id'],
**{'direction': self.DIRECTION_EGRESS,
'min_kbps': 200})[self.RULE_NAME]
policy2 = self.create_qos_policy(name='test-policy2',
description='test policy2',
shared=False)
rule2 = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
qos_policy_id=policy2['id'],
**{'direction': self.DIRECTION_EGRESS,
'min_kbps': 5000})[self.RULE_NAME]
# Test 'list rules'
rules = self.qos_min_bw_rules_client.list_minimum_bandwidth_rules(
policy1['id'])
rules = rules[self.RULES_NAME]
rules_ids = [r['id'] for r in rules]
self.assertIn(rule1['id'], rules_ids)
self.assertNotIn(rule2['id'], rules_ids)
class QosMinimumPpsRuleTestJSON(base.BaseAdminNetworkTest):
required_extensions = [qos_apidef.ALIAS]
@classmethod
@utils.requires_ext(service='network',
extension='port-resource-request-groups')
def resource_setup(cls):
super(QosMinimumPpsRuleTestJSON, cls).resource_setup()
@classmethod
def setup_clients(cls):
super(QosMinimumPpsRuleTestJSON, cls).setup_clients()
cls.min_pps_client = cls.os_admin.qos_minimum_packet_rate_rules_client
cls.min_pps_client_primary = \
cls.os_primary.qos_minimum_packet_rate_rules_client
def setUp(self):
super(QosMinimumPpsRuleTestJSON, self).setUp()
self.policy_name = data_utils.rand_name(name='test', prefix='policy')
self.RULE_NAME = qos_consts.RULE_TYPE_MINIMUM_PACKET_RATE + "_rule"
self.RULES_NAME = self.RULE_NAME + "s"
def _create_qos_min_pps_rule(self, policy_id, rule_data):
rule = self.min_pps_client.create_minimum_packet_rate_rule(
policy_id, **rule_data)['minimum_packet_rate_rule']
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.min_pps_client.delete_minimum_packet_rate_rule,
policy_id, rule['id'])
return rule
@decorators.idempotent_id('66a5b9b4-d4f9-4af8-b238-9e1881b78487')
def test_rule_create(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self._create_qos_min_pps_rule(
policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 1138})
# Test 'show rule'
retrieved_rule = self.min_pps_client.show_minimum_packet_rate_rule(
policy['id'], rule['id'])[self.RULE_NAME]
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(1138, retrieved_rule[qos_consts.MIN_KPPS])
self.assertEqual(n_constants.EGRESS_DIRECTION,
retrieved_rule[qos_consts.DIRECTION])
# Test 'list rules'
rules = self.min_pps_client.list_minimum_packet_rate_rules(
policy['id'])
rules = rules[self.RULES_NAME]
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
policy_rules = retrieved_policy['policy']['rules']
self.assertEqual(1, len(policy_rules))
self.assertEqual(rule['id'], policy_rules[0]['id'])
self.assertEqual('minimum_packet_rate',
policy_rules[0]['type'])
@decorators.idempotent_id('6b656b57-d2bf-47f9-89a9-1baad1bd5418')
def test_rule_create_fail_for_missing_min_kpps(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self.assertRaises(exceptions.BadRequest,
self._create_qos_min_pps_rule,
policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION})
@decorators.idempotent_id('f41213e5-2ab8-4916-b106-38d2cac5e18c')
def test_rule_create_fail_for_the_same_type(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self._create_qos_min_pps_rule(policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 200})
self.assertRaises(exceptions.Conflict,
self._create_qos_min_pps_rule,
policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 201})
@decorators.idempotent_id('ceb8e41e-3d72-11ec-a446-d7faae6daec2')
def test_rule_create_any_direction_when_egress_direction_exists(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self._create_qos_min_pps_rule(policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 200})
self.assertRaises(exceptions.Conflict,
self._create_qos_min_pps_rule,
policy['id'],
{qos_consts.DIRECTION: n_constants.ANY_DIRECTION,
qos_consts.MIN_KPPS: 201})
@decorators.idempotent_id('a147a71e-3d7b-11ec-8097-278b1afd5fa2')
def test_rule_create_egress_direction_when_any_direction_exists(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
self._create_qos_min_pps_rule(policy['id'],
{qos_consts.DIRECTION: n_constants.ANY_DIRECTION,
qos_consts.MIN_KPPS: 200})
self.assertRaises(exceptions.Conflict,
self._create_qos_min_pps_rule,
policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 201})
@decorators.idempotent_id('522ed09a-1d7f-4c1b-9195-61f19caf916f')
def test_rule_update(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self._create_qos_min_pps_rule(
policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 300})
self.min_pps_client.update_minimum_packet_rate_rule(
policy['id'], rule['id'],
**{qos_consts.MIN_KPPS: 350,
qos_consts.DIRECTION: n_constants.ANY_DIRECTION})
retrieved_rule = self.min_pps_client.show_minimum_packet_rate_rule(
policy['id'], rule['id'])[self.RULE_NAME]
self.assertEqual(350, retrieved_rule[qos_consts.MIN_KPPS])
self.assertEqual(n_constants.ANY_DIRECTION,
retrieved_rule[qos_consts.DIRECTION])
@decorators.idempotent_id('a020e186-3d60-11ec-88ca-d7f5eec22764')
def test_rule_update_direction_conflict(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule1 = self._create_qos_min_pps_rule(
policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 300})
rule2 = self._create_qos_min_pps_rule(
policy['id'],
{qos_consts.DIRECTION: n_constants.INGRESS_DIRECTION,
qos_consts.MIN_KPPS: 300})
retrieved_rule1 = self.min_pps_client.show_minimum_packet_rate_rule(
policy['id'], rule1['id'])[self.RULE_NAME]
self.assertEqual(n_constants.EGRESS_DIRECTION,
retrieved_rule1[qos_consts.DIRECTION])
retrieved_rule2 = self.min_pps_client.show_minimum_packet_rate_rule(
policy['id'], rule2['id'])[self.RULE_NAME]
self.assertEqual(n_constants.INGRESS_DIRECTION,
retrieved_rule2[qos_consts.DIRECTION])
self.assertRaises(exceptions.Conflict,
self.min_pps_client.update_minimum_packet_rate_rule,
policy['id'], rule2['id'],
**{qos_consts.DIRECTION: n_constants.ANY_DIRECTION})
@decorators.idempotent_id('c49018b6-d358-49a1-a94b-d53224165045')
def test_rule_delete(self):
policy = self.create_qos_policy(name=self.policy_name,
description='test policy',
shared=False)
rule = self._create_qos_min_pps_rule(
policy['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 200})
retrieved_rule = self.min_pps_client.show_minimum_packet_rate_rule(
policy['id'], rule['id'])[self.RULE_NAME]
self.assertEqual(rule['id'], retrieved_rule['id'])
self.min_pps_client.delete_minimum_packet_rate_rule(policy['id'],
rule['id'])
self.assertRaises(exceptions.NotFound,
self.min_pps_client.show_minimum_packet_rate_rule,
policy['id'], rule['id'])
@decorators.idempotent_id('1a6b6128-3d3e-11ec-bf49-57b326d417c0')
def test_rule_create_forbidden_for_regular_tenants(self):
self.assertRaises(
exceptions.Forbidden,
self.min_pps_client_primary.create_minimum_packet_rate_rule,
'policy', **{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 300})
@decorators.idempotent_id('1b94f4e2-3d3e-11ec-bb21-6f98e4044b8b')
def test_get_rules_by_policy(self):
policy1 = self.create_qos_policy(name='test-policy1',
description='test policy1',
shared=False)
rule1 = self._create_qos_min_pps_rule(
policy1['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 200})
policy2 = self.create_qos_policy(name='test-policy2',
description='test policy2',
shared=False)
rule2 = self._create_qos_min_pps_rule(
policy2['id'],
{qos_consts.DIRECTION: n_constants.EGRESS_DIRECTION,
qos_consts.MIN_KPPS: 5000})
# Test 'list rules'
rules = self.min_pps_client.list_minimum_packet_rate_rules(
policy1['id'])
rules = rules[self.RULES_NAME]
rules_ids = [r['id'] for r in rules]
self.assertIn(rule1['id'], rules_ids)
self.assertNotIn(rule2['id'], rules_ids)
class QosSearchCriteriaTest(base.BaseSearchCriteriaTest,
base.BaseAdminNetworkTest):
resource = 'policy'
plural_name = 'policies'
# Use unique description to isolate the tests from other QoS tests
list_kwargs = {'description': 'search-criteria-test'}
list_as_admin = True
required_extensions = [qos_apidef.ALIAS]
@classmethod
def resource_setup(cls):
super(QosSearchCriteriaTest, cls).resource_setup()
for name in cls.resource_names:
cls.create_qos_policy(
name=name, description='search-criteria-test')
@decorators.idempotent_id('55fc0103-fdc1-4d34-ab62-c579bb739a91')
def test_list_sorts_asc(self):
self._test_list_sorts_asc()
@decorators.idempotent_id('13e08ac3-bfed-426b-892a-b3b158560c23')
def test_list_sorts_desc(self):
self._test_list_sorts_desc()
@decorators.idempotent_id('719e61cc-e33c-4918-aa4d-1a791e6e0e86')
def test_list_pagination(self):
self._test_list_pagination()
@decorators.idempotent_id('3bd8fb58-c0f8-4954-87fb-f286e1eb096a')
def test_list_pagination_with_marker(self):
self._test_list_pagination_with_marker()
@decorators.idempotent_id('3bad0747-8082-46e9-be4d-c428a842db41')
def test_list_pagination_with_href_links(self):
self._test_list_pagination_with_href_links()
@decorators.idempotent_id('d6a8bacd-d5e8-4ef3-bc55-23ca6998d208')
def test_list_pagination_page_reverse_asc(self):
self._test_list_pagination_page_reverse_asc()
@decorators.idempotent_id('0b9aecdc-2b27-421b-b104-53d24e905ae8')
def test_list_pagination_page_reverse_desc(self):
self._test_list_pagination_page_reverse_desc()
@decorators.idempotent_id('1a3dc257-dafd-4870-8c71-639ae7ddc6ea')
def test_list_pagination_page_reverse_with_href_links(self):
self._test_list_pagination_page_reverse_with_href_links()
@decorators.idempotent_id('40e09b53-4eb8-4526-9181-d438c8005a20')
def test_list_no_pagination_limit_0(self):
self._test_list_no_pagination_limit_0()