Update port functional tests for qos agent

Change-Id: I4a1f4ec1ed9a9104fe7e5bbce66147d8ea6c0f27
Partially-Implements: quantum-qos-api
This commit is contained in:
Jakub Libosvar 2015-08-10 16:55:11 +00:00
parent af2e56d86c
commit 2aac5991ac
2 changed files with 38 additions and 4 deletions

View File

@ -18,9 +18,10 @@ from neutron.agent.linux import utils as agent_utils
def wait_until_bandwidth_limit_rule_applied(bridge, port_vif, rule):
def _bandwidth_limit_rule_applied():
max_rate, burst = (
bridge.get_qos_bw_limit_for_port(port_vif))
return (max_rate == rule.max_kbps and
burst == rule.max_burst_kbps)
bw_rule = bridge.get_qos_bw_limit_for_port(port_vif)
expected = None, None
if rule:
expected = rule.max_kbps, rule.max_burst_kbps
return bw_rule == expected
agent_utils.wait_until_true(_bandwidth_limit_rule_applied)

View File

@ -151,3 +151,36 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
policy_copy.rules[0])
self._assert_bandwidth_limit_rule_is_set(self.ports[0],
policy_copy.rules[0])
def test_port_qos_disassociation(self):
"""Test that qos_policy_id set to None will remove all qos rules from
given port.
"""
port_dict = self._create_test_port_dict()
port_dict['qos_policy_id'] = TEST_POLICY_ID1
self.setup_agent_and_ports([port_dict])
self.wait_until_ports_state(self.ports, up=True)
self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_1)
port_dict['qos_policy_id'] = None
self.agent.port_update(None, port=port_dict)
self.wait_until_bandwidth_limit_rule_applied(port_dict, None)
def test_port_qos_update_policy_id(self):
"""Test that change of qos policy id on given port refreshes all its
rules.
"""
port_dict = self._create_test_port_dict()
port_dict['qos_policy_id'] = TEST_POLICY_ID1
self.setup_agent_and_ports([port_dict])
self.wait_until_ports_state(self.ports, up=True)
self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_1)
port_dict['qos_policy_id'] = TEST_POLICY_ID2
self.agent.port_update(None, port=port_dict)
self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_2)