OVS agent functional test for policy rule delete

Partially-Implements: ml2-qos
Change-Id: I57a006352d97363005f4f2a7d79ec8f1c91d1555
This commit is contained in:
Jakub Libosvar 2015-08-11 12:36:05 +00:00 committed by Miguel Angel Ajo
parent 2aac5991ac
commit 8aedbd7ef5
1 changed files with 20 additions and 12 deletions

View File

@ -104,6 +104,15 @@ class OVSAgentQoSExtensionTestFramework(base.OVSAgentTestFramework):
l2_extensions.wait_until_bandwidth_limit_rule_applied( l2_extensions.wait_until_bandwidth_limit_rule_applied(
self.agent.int_br, port['vif_name'], rule) self.agent.int_br, port['vif_name'], rule)
def _create_port_with_qos(self):
port_dict = self._create_test_port_dict()
port_dict['qos_policy_id'] = TEST_POLICY_ID1
self.setup_agent_and_ports([port_dict])
self.wait_until_ports_state(self.ports, up=True)
self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_1)
return port_dict
class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework): class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
@ -156,12 +165,7 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
"""Test that qos_policy_id set to None will remove all qos rules from """Test that qos_policy_id set to None will remove all qos rules from
given port. given port.
""" """
port_dict = self._create_test_port_dict() port_dict = self._create_port_with_qos()
port_dict['qos_policy_id'] = TEST_POLICY_ID1
self.setup_agent_and_ports([port_dict])
self.wait_until_ports_state(self.ports, up=True)
self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_1)
port_dict['qos_policy_id'] = None port_dict['qos_policy_id'] = None
self.agent.port_update(None, port=port_dict) self.agent.port_update(None, port=port_dict)
@ -172,15 +176,19 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
"""Test that change of qos policy id on given port refreshes all its """Test that change of qos policy id on given port refreshes all its
rules. rules.
""" """
port_dict = self._create_test_port_dict() port_dict = self._create_port_with_qos()
port_dict['qos_policy_id'] = TEST_POLICY_ID1
self.setup_agent_and_ports([port_dict])
self.wait_until_ports_state(self.ports, up=True)
self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_1)
port_dict['qos_policy_id'] = TEST_POLICY_ID2 port_dict['qos_policy_id'] = TEST_POLICY_ID2
self.agent.port_update(None, port=port_dict) self.agent.port_update(None, port=port_dict)
self.wait_until_bandwidth_limit_rule_applied(port_dict, self.wait_until_bandwidth_limit_rule_applied(port_dict,
TEST_BW_LIMIT_RULE_2) TEST_BW_LIMIT_RULE_2)
def test_policy_rule_delete(self):
port_dict = self._create_port_with_qos()
policy_copy = copy.deepcopy(self.qos_policies[TEST_POLICY_ID1])
policy_copy.rules = list()
consumer_reg.push(resources.QOS_POLICY, policy_copy, events.UPDATED)
self.wait_until_bandwidth_limit_rule_applied(port_dict, None)