Add Neutron QoS bandwidth limit rule commands
Create/Update/List/Get/Delete QoS bandwidth limit rules is now possible to do with shade. Change-Id: Ife015dd3f9584df901462b3998a4d775374073cf
This commit is contained in:
parent
10e6fbe44f
commit
806378f4f3
@ -136,6 +136,7 @@ class ShadeAdapter(adapter.Adapter):
|
||||
'router', 'routers', 'floatingip', 'floatingips',
|
||||
'floating_ip', 'floating_ips', 'port', 'ports',
|
||||
'rule_types', 'policy', 'policies',
|
||||
'bandwidth_limit_rule', 'bandwidth_limit_rules',
|
||||
'stack', 'stacks', 'zones', 'events',
|
||||
'security_group', 'security_groups',
|
||||
'security_group_rule', 'security_group_rules',
|
||||
|
@ -3269,6 +3269,220 @@ class OpenStackCloud(
|
||||
|
||||
return True
|
||||
|
||||
def search_qos_bandwidth_limit_rules(self, policy_name_or_id, rule_id=None,
|
||||
filters=None):
|
||||
"""Search QoS bandwidth limit rules
|
||||
|
||||
:param string policy_name_or_id: Name or ID of the QoS policy to which
|
||||
rules should be associated.
|
||||
:param string rule_id: ID of searched rule.
|
||||
:param filters: a dict containing additional filters to use. e.g.
|
||||
{'max_kbps': 1000}
|
||||
|
||||
:returns: a list of ``munch.Munch`` containing the bandwidth limit
|
||||
rule descriptions.
|
||||
|
||||
:raises: ``OpenStackCloudException`` if something goes wrong during the
|
||||
OpenStack API call.
|
||||
"""
|
||||
rules = self.list_qos_bandwidth_limit_rules(policy_name_or_id, filters)
|
||||
return _utils._filter_list(rules, rule_id, filters)
|
||||
|
||||
def list_qos_bandwidth_limit_rules(self, policy_name_or_id, filters=None):
|
||||
"""List all available QoS bandwith limit rules.
|
||||
|
||||
:param string policy_name_or_id: Name or ID of the QoS policy from
|
||||
from rules should be listed.
|
||||
:param filters: (optional) dict of filter conditions to push down
|
||||
:returns: A list of ``munch.Munch`` containing rule info.
|
||||
|
||||
:raises: ``OpenStackCloudResourceNotFound`` if QoS policy will not be
|
||||
found.
|
||||
"""
|
||||
if not self._has_neutron_extension('qos'):
|
||||
raise OpenStackCloudUnavailableExtension(
|
||||
'QoS extension is not available on target cloud')
|
||||
|
||||
policy = self.get_qos_policy(policy_name_or_id)
|
||||
if not policy:
|
||||
raise OpenStackCloudResourceNotFound(
|
||||
"QoS policy {name_or_id} not Found.".format(
|
||||
name_or_id=policy_name_or_id))
|
||||
|
||||
# Translate None from search interface to empty {} for kwargs below
|
||||
if not filters:
|
||||
filters = {}
|
||||
|
||||
data = self._network_client.get(
|
||||
"/qos/policies/{policy_id}/bandwidth_limit_rules.json".format(
|
||||
policy_id=policy['id']),
|
||||
params=filters,
|
||||
error_message="Error fetching QoS bandwith limit rules from "
|
||||
"{policy}".format(policy=policy['id']))
|
||||
return meta.get_and_munchify('bandwidth_limit_rules', data)
|
||||
|
||||
def get_qos_bandwidth_limit_rule(self, policy_name_or_id, rule_id):
|
||||
"""Get a QoS bandwidth limit rule by name or ID.
|
||||
|
||||
:param string policy_name_or_id: Name or ID of the QoS policy to which
|
||||
rule should be associated.
|
||||
:param rule_id: ID of the rule.
|
||||
|
||||
:returns: A bandwidth limit rule ``munch.Munch`` or None if
|
||||
no matching rule is found.
|
||||
|
||||
"""
|
||||
if not self._has_neutron_extension('qos'):
|
||||
raise OpenStackCloudUnavailableExtension(
|
||||
'QoS extension is not available on target cloud')
|
||||
|
||||
policy = self.get_qos_policy(policy_name_or_id)
|
||||
if not policy:
|
||||
raise OpenStackCloudResourceNotFound(
|
||||
"QoS policy {name_or_id} not Found.".format(
|
||||
name_or_id=policy_name_or_id))
|
||||
|
||||
data = self._network_client.get(
|
||||
"/qos/policies/{policy_id}/bandwidth_limit_rules/{rule_id}.json".
|
||||
format(policy_id=policy['id'], rule_id=rule_id),
|
||||
error_message="Error fetching QoS bandwith limit rule {rule_id} "
|
||||
"from {policy}".format(rule_id=rule_id,
|
||||
policy=policy['id']))
|
||||
return meta.get_and_munchify('bandwidth_limit_rule', data)
|
||||
|
||||
def create_qos_bandwidth_limit_rule(self, policy_name_or_id, max_kbps=None,
|
||||
max_burst_kbps=None, direction=None):
|
||||
"""Create a QoS bandwidth limit rule.
|
||||
|
||||
:param string policy_name_or_id: Name or ID of the QoS policy to which
|
||||
rule should be associated.
|
||||
:param int max_kbps: Maximum bandwidth limit value
|
||||
(in kilobits per second).
|
||||
:param int max_burst_kbps: Maximum burst value (in kilobits).
|
||||
:param string direction: Ingress or egress.
|
||||
The direction in which the traffic will be limited.
|
||||
|
||||
:returns: The QoS bandwidth limit rule.
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
if not self._has_neutron_extension('qos'):
|
||||
raise OpenStackCloudUnavailableExtension(
|
||||
'QoS extension is not available on target cloud')
|
||||
|
||||
policy = self.get_qos_policy(policy_name_or_id)
|
||||
if not policy:
|
||||
raise OpenStackCloudResourceNotFound(
|
||||
"QoS policy {name_or_id} not Found.".format(
|
||||
name_or_id=policy_name_or_id))
|
||||
|
||||
rule = {}
|
||||
if max_kbps:
|
||||
rule['max_kbps'] = max_kbps
|
||||
if max_burst_kbps:
|
||||
rule['max_burst_kbps'] = max_burst_kbps
|
||||
if direction is not None:
|
||||
if self._has_neutron_extension('qos-bw-limit-direction'):
|
||||
rule['direction'] = direction
|
||||
else:
|
||||
self.log.debug(
|
||||
"'qos-bw-limit-direction' extension is not available on "
|
||||
"target cloud")
|
||||
|
||||
data = self._network_client.post(
|
||||
"/qos/policies/{policy_id}/bandwidth_limit_rules".format(
|
||||
policy_id=policy['id']),
|
||||
json={'bandwidth_limit_rule': rule})
|
||||
return meta.get_and_munchify('bandwidth_limit_rule', data)
|
||||
|
||||
def update_qos_bandwidth_limit_rule(self, policy_name_or_id, rule_id,
|
||||
max_kbps=None, max_burst_kbps=None,
|
||||
direction=None):
|
||||
"""Update a QoS bandwidth limit rule.
|
||||
|
||||
:param string policy_name_or_id: Name or ID of the QoS policy to which
|
||||
rule is associated.
|
||||
:param string rule_id: ID of rule to update.
|
||||
:param int max_kbps: Maximum bandwidth limit value
|
||||
(in kilobits per second).
|
||||
:param int max_burst_kbps: Maximum burst value (in kilobits).
|
||||
:param string direction: Ingress or egress.
|
||||
The direction in which the traffic will be limited.
|
||||
|
||||
:returns: The updated QoS bandwidth limit rule.
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
if not self._has_neutron_extension('qos'):
|
||||
raise OpenStackCloudUnavailableExtension(
|
||||
'QoS extension is not available on target cloud')
|
||||
|
||||
policy = self.get_qos_policy(policy_name_or_id)
|
||||
if not policy:
|
||||
raise OpenStackCloudResourceNotFound(
|
||||
"QoS policy {name_or_id} not Found.".format(
|
||||
name_or_id=policy_name_or_id))
|
||||
|
||||
rule = {}
|
||||
if max_kbps:
|
||||
rule['max_kbps'] = max_kbps
|
||||
if max_burst_kbps:
|
||||
rule['max_burst_kbps'] = max_burst_kbps
|
||||
if direction is not None:
|
||||
if self._has_neutron_extension('qos-bw-limit-direction'):
|
||||
rule['direction'] = direction
|
||||
else:
|
||||
self.log.debug(
|
||||
"'qos-bw-limit-direction' extension is not available on "
|
||||
"target cloud")
|
||||
if not rule:
|
||||
self.log.debug("No QoS bandwidth limit rule data to update")
|
||||
return
|
||||
|
||||
curr_rule = self.get_qos_bandwidth_limit_rule(
|
||||
policy_name_or_id, rule_id)
|
||||
if not curr_rule:
|
||||
raise OpenStackCloudException(
|
||||
"QoS bandwidth_limit_rule {rule_id} not found in policy "
|
||||
"{policy_id}".format(rule_id=rule_id,
|
||||
policy_id=policy['id']))
|
||||
|
||||
data = self._network_client.put(
|
||||
"/qos/policies/{policy_id}/bandwidth_limit_rules/{rule_id}.json".
|
||||
format(policy_id=policy['id'], rule_id=rule_id),
|
||||
json={'bandwidth_limit_rule': rule})
|
||||
return meta.get_and_munchify('bandwidth_limit_rule', data)
|
||||
|
||||
def delete_qos_bandwidth_limit_rule(self, policy_name_or_id, rule_id):
|
||||
"""Delete a QoS bandwidth limit rule.
|
||||
|
||||
:param string policy_name_or_id: Name or ID of the QoS policy to which
|
||||
rule is associated.
|
||||
:param string rule_id: ID of rule to update.
|
||||
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
if not self._has_neutron_extension('qos'):
|
||||
raise OpenStackCloudUnavailableExtension(
|
||||
'QoS extension is not available on target cloud')
|
||||
|
||||
policy = self.get_qos_policy(policy_name_or_id)
|
||||
if not policy:
|
||||
raise OpenStackCloudResourceNotFound(
|
||||
"QoS policy {name_or_id} not Found.".format(
|
||||
name_or_id=policy_name_or_id))
|
||||
|
||||
try:
|
||||
self._network_client.delete(
|
||||
"/qos/policies/{policy}/bandwidth_limit_rules/{rule}.json".
|
||||
format(policy=policy['id'], rule=rule_id))
|
||||
except OpenStackCloudURINotFound:
|
||||
self.log.debug(
|
||||
"QoS bandwidth limit rule {rule_id} not found in policy "
|
||||
"{policy_id}. Ignoring.".format(rule_id=rule_id,
|
||||
policy_id=policy['id']))
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _build_external_gateway_info(self, ext_gateway_net_id, enable_snat,
|
||||
ext_fixed_ips):
|
||||
info = {}
|
||||
|
397
shade/tests/unit/test_qos_bandwidth_limit_rule.py
Normal file
397
shade/tests/unit/test_qos_bandwidth_limit_rule.py
Normal file
@ -0,0 +1,397 @@
|
||||
# Copyright 2017 OVH SAS
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from shade import exc
|
||||
from shade.tests.unit import base
|
||||
|
||||
|
||||
class TestQosBandwidthLimitRule(base.RequestsMockTestCase):
|
||||
|
||||
policy_name = 'qos test policy'
|
||||
policy_id = '881d1bb7-a663-44c0-8f9f-ee2765b74486'
|
||||
project_id = 'c88fc89f-5121-4a4c-87fd-496b5af864e9'
|
||||
|
||||
rule_id = 'ed1a2b05-0ad7-45d7-873f-008b575a02b3'
|
||||
rule_max_kbps = 1000
|
||||
rule_max_burst = 100
|
||||
|
||||
mock_policy = {
|
||||
'id': policy_id,
|
||||
'name': policy_name,
|
||||
'description': '',
|
||||
'rules': [],
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id,
|
||||
'shared': False,
|
||||
'is_default': False
|
||||
}
|
||||
|
||||
mock_rule = {
|
||||
'id': rule_id,
|
||||
'max_kbps': rule_max_kbps,
|
||||
'max_burst_kbps': rule_max_burst,
|
||||
'direction': 'egress'
|
||||
}
|
||||
|
||||
qos_extension = {
|
||||
"updated": "2015-06-08T10:00:00-00:00",
|
||||
"name": "Quality of Service",
|
||||
"links": [],
|
||||
"alias": "qos",
|
||||
"description": "The Quality of Service extension."
|
||||
}
|
||||
|
||||
qos_bw_limit_direction_extension = {
|
||||
"updated": "2017-04-10T10:00:00-00:00",
|
||||
"name": "Direction for QoS bandwidth limit rule",
|
||||
"links": [],
|
||||
"alias": "qos-bw-limit-direction",
|
||||
"description": ("Allow to configure QoS bandwidth limit rule with "
|
||||
"specific direction: ingress or egress")
|
||||
}
|
||||
|
||||
enabled_neutron_extensions = [qos_extension,
|
||||
qos_bw_limit_direction_extension]
|
||||
|
||||
def test_get_qos_bandwidth_limit_rule(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules',
|
||||
'%s.json' % self.rule_id]),
|
||||
json={'bandwidth_limit_rule': self.mock_rule})
|
||||
])
|
||||
r = self.cloud.get_qos_bandwidth_limit_rule(self.policy_name,
|
||||
self.rule_id)
|
||||
self.assertDictEqual(self.mock_rule, r)
|
||||
self.assert_calls()
|
||||
|
||||
def test_get_qos_bandwidth_limit_rule_no_qos_policy_found(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': []})
|
||||
])
|
||||
self.assertRaises(
|
||||
exc.OpenStackCloudResourceNotFound,
|
||||
self.cloud.get_qos_bandwidth_limit_rule,
|
||||
self.policy_name, self.rule_id)
|
||||
self.assert_calls()
|
||||
|
||||
def test_get_qos_bandwidth_limit_rule_no_qos_extension(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': []})
|
||||
])
|
||||
self.assertRaises(
|
||||
exc.OpenStackCloudException,
|
||||
self.cloud.get_qos_bandwidth_limit_rule,
|
||||
self.policy_name, self.rule_id)
|
||||
self.assert_calls()
|
||||
|
||||
def test_create_qos_bandwidth_limit_rule(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='POST',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules']),
|
||||
json={'bandwidth_limit_rule': self.mock_rule})
|
||||
])
|
||||
rule = self.cloud.create_qos_bandwidth_limit_rule(
|
||||
self.policy_name, max_kbps=self.rule_max_kbps)
|
||||
self.assertDictEqual(self.mock_rule, rule)
|
||||
self.assert_calls()
|
||||
|
||||
def test_create_qos_bandwidth_limit_rule_no_qos_extension(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': []})
|
||||
])
|
||||
self.assertRaises(
|
||||
exc.OpenStackCloudException,
|
||||
self.cloud.create_qos_bandwidth_limit_rule, self.policy_name,
|
||||
max_kbps=100)
|
||||
self.assert_calls()
|
||||
|
||||
def test_create_qos_bandwidth_limit_rule_no_qos_direction_extension(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='POST',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules']),
|
||||
json={'bandwidth_limit_rule': self.mock_rule})
|
||||
])
|
||||
rule = self.cloud.create_qos_bandwidth_limit_rule(
|
||||
self.policy_name, max_kbps=self.rule_max_kbps, direction="ingress")
|
||||
self.assertDictEqual(self.mock_rule, rule)
|
||||
self.assert_calls()
|
||||
|
||||
def test_update_qos_bandwidth_limit_rule(self):
|
||||
expected_rule = copy.copy(self.mock_rule)
|
||||
expected_rule['max_kbps'] = self.rule_max_kbps + 100
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules',
|
||||
'%s.json' % self.rule_id]),
|
||||
json={'bandwidth_limit_rule': self.mock_rule}),
|
||||
dict(method='PUT',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules',
|
||||
'%s.json' % self.rule_id]),
|
||||
json={'bandwidth_limit_rule': expected_rule},
|
||||
validate=dict(
|
||||
json={'bandwidth_limit_rule': {
|
||||
'max_kbps': self.rule_max_kbps + 100}}))
|
||||
])
|
||||
rule = self.cloud.update_qos_bandwidth_limit_rule(
|
||||
self.policy_id, self.rule_id, max_kbps=self.rule_max_kbps + 100)
|
||||
self.assertDictEqual(expected_rule, rule)
|
||||
self.assert_calls()
|
||||
|
||||
def test_update_qos_bandwidth_limit_rule_no_qos_extension(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': []})
|
||||
])
|
||||
self.assertRaises(
|
||||
exc.OpenStackCloudException,
|
||||
self.cloud.update_qos_bandwidth_limit_rule,
|
||||
self.policy_id, self.rule_id, max_kbps=2000)
|
||||
self.assert_calls()
|
||||
|
||||
def test_update_qos_bandwidth_limit_rule_no_qos_direction_extension(self):
|
||||
expected_rule = copy.copy(self.mock_rule)
|
||||
expected_rule['direction'] = self.rule_max_kbps + 100
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': [self.qos_extension]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules',
|
||||
'%s.json' % self.rule_id]),
|
||||
json={'bandwidth_limit_rule': self.mock_rule}),
|
||||
dict(method='PUT',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules',
|
||||
'%s.json' % self.rule_id]),
|
||||
json={'bandwidth_limit_rule': expected_rule},
|
||||
validate=dict(
|
||||
json={'bandwidth_limit_rule': {
|
||||
'max_kbps': self.rule_max_kbps + 100}}))
|
||||
])
|
||||
rule = self.cloud.update_qos_bandwidth_limit_rule(
|
||||
self.policy_id, self.rule_id, max_kbps=self.rule_max_kbps + 100,
|
||||
direction="ingress")
|
||||
# Even if there was attempt to change direction to 'ingress' it should
|
||||
# be not changed in returned rule
|
||||
self.assertDictEqual(expected_rule, rule)
|
||||
self.assert_calls()
|
||||
|
||||
def test_delete_qos_bandwidth_limit_rule(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='DELETE',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules',
|
||||
'%s.json' % self.rule_id]),
|
||||
json={})
|
||||
])
|
||||
self.assertTrue(
|
||||
self.cloud.delete_qos_bandwidth_limit_rule(
|
||||
self.policy_name, self.rule_id))
|
||||
self.assert_calls()
|
||||
|
||||
def test_delete_qos_bandwidth_limit_rule_no_qos_extension(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': []})
|
||||
])
|
||||
self.assertRaises(
|
||||
exc.OpenStackCloudException,
|
||||
self.cloud.delete_qos_bandwidth_limit_rule,
|
||||
self.policy_name, self.rule_id)
|
||||
self.assert_calls()
|
||||
|
||||
def test_delete_qos_bandwidth_limit_rule_not_found(self):
|
||||
self.register_uris([
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public', append=['v2.0', 'extensions.json']),
|
||||
json={'extensions': self.enabled_neutron_extensions}),
|
||||
dict(method='GET',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies.json']),
|
||||
json={'policies': [self.mock_policy]}),
|
||||
dict(method='DELETE',
|
||||
uri=self.get_mock_url(
|
||||
'network', 'public',
|
||||
append=['v2.0', 'qos', 'policies', self.policy_id,
|
||||
'bandwidth_limit_rules',
|
||||
'%s.json' % self.rule_id]),
|
||||
status_code=404)
|
||||
])
|
||||
self.assertFalse(
|
||||
self.cloud.delete_qos_bandwidth_limit_rule(
|
||||
self.policy_name, self.rule_id))
|
||||
self.assert_calls()
|
Loading…
Reference in New Issue
Block a user