Clear old rules that have been applied before applying new rules.

To prevent data from being out of sync in the following situations:
1. Create a policy with two rules bound to the virtual machine
2. Stop l2-agent
3. Delete/change/clear policy rule
4. Start l2-agent (the rule is still there, out-of-sync)

Change-Id: I194c918d859172c31ae5ce1af925fdbb388f9cfb
Closes-Bug: #1812576
This commit is contained in:
liuchengqian90 2019-01-30 17:48:59 +08:00 committed by Chengqian Liu
parent 7b64623f96
commit 6f927cc119
3 changed files with 140 additions and 42 deletions

View File

@ -268,16 +268,11 @@ class QosAgentExtension(l2_extension.L2AgentExtension):
self._process_reset_port(port)
else:
old_qos_policy = self.policy_map.set_port_policy(port, qos_policy)
if old_qos_policy:
# Before applying the new rules, the old rules should be cleared,
# even if the old_qos_policy is None,
# to avoid the data being out of sync before the l2-agent starts.
self.qos_driver.delete(port, old_qos_policy)
self.qos_driver.update(port, qos_policy)
elif not qos_policy.rules:
# There are no rules in new qos_policy,
# so it should be deleted.
# But new policy has a relationship with the port,
# so reseting should not be called.
self.qos_driver.delete(port)
else:
if qos_policy.rules:
self.qos_driver.create(port, qos_policy)
def delete_port(self, context, port):

View File

@ -68,6 +68,9 @@ class BaseQoSRuleTestCase(object):
qos=True)
env = environment.Environment(env_desc, host_desc)
super(BaseQoSRuleTestCase, self).setUp(env)
self.l2_agent_process = self.environment.hosts[0].l2_agent
self.l2_agent = self.safe_client.client.list_agents(
agent_type=self.l2_agent_type)['agents'][0]
self.tenant_id = uuidutils.generate_uuid()
self.network = self.safe_client.create_network(self.tenant_id,
@ -130,6 +133,71 @@ class _TestBwLimitQoS(BaseQoSRuleTestCase):
rule['qos_policy_id'] = qos_policy_id
qos_policy['rules'].append(rule)
def _create_vm_with_limit_rules(self):
# Create port with qos policy attached, with different direction
vm, qos_policy = self._prepare_vm_with_qos_policy(
[functools.partial(
self._add_bw_limit_rule,
BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.direction),
functools.partial(
self._add_bw_limit_rule,
BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.reverse_direction)])
self._wait_for_bw_rule_applied(
vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.direction)
self._wait_for_bw_rule_applied(
vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.reverse_direction)
return vm, qos_policy
def _restart_agent_and_check_rules_applied(self, policy_id, vm,
final_rules,
add_rules=None,
update_rules=None,
delete_rules=None):
# final_rules: the last valid rule after all operations
# (clear/update/reset rules during the l2-agent stop) are completed.
# add_rules: rules that need to be added during the l2-agent stop.
# update_rules: rules that need to be updated during the l2-agent stop.
# delete_rules:rules that need to be deleted during the l2-agent stop.
add_rules = list() if not add_rules else add_rules
update_rules = list() if not update_rules else update_rules
delete_rules = list() if not delete_rules else delete_rules
# Stop l2_agent and clear/update/reset the port qos rules
self.l2_agent_process.stop()
self._wait_until_agent_down(self.l2_agent['id'])
for rule in delete_rules:
self.client.delete_bandwidth_limit_rule(rule['id'],
policy_id)
for rule in add_rules:
self.safe_client.create_bandwidth_limit_rule(
self.tenant_id, policy_id,
rule.get('limit'), rule.get('burst'), rule['direction'])
for rule in update_rules:
self.client.update_bandwidth_limit_rule(
rule['id'], policy_id,
body={'bandwidth_limit_rule':
{'max_kbps': rule.get('limit'),
'max_burst_kbps': rule.get('burst'),
'direction': rule.get('direction')}})
# Start l2_agent to check if these rules is cleared
self.l2_agent_process.start()
self._wait_until_agent_up(self.l2_agent['id'])
all_directions = set([self.direction, self.reverse_direction])
for final_rule in final_rules:
all_directions -= set([final_rule['direction']])
self._wait_for_bw_rule_applied(
vm, final_rule.get('limit'),
final_rule.get('burst'), final_rule['direction'])
# Make sure there are no other rules.
for direction in list(all_directions):
self._wait_for_bw_rule_applied(vm, None, None, direction)
def test_bw_limit_qos_policy_rule_lifecycle(self):
new_limit = BANDWIDTH_LIMIT + 100
@ -191,46 +259,80 @@ class _TestBwLimitQoS(BaseQoSRuleTestCase):
self._wait_for_bw_rule_applied(
vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.reverse_direction)
def test_bw_limit_qos_l2_agent_restart(self):
self.l2_agent_process = self.environment.hosts[0].l2_agent
self.l2_agent = self.safe_client.client.list_agents(
agent_type=self.l2_agent_type)['agents'][0]
# Create port with qos policy attached, with different direction
vm, qos_policy = self._prepare_vm_with_qos_policy(
[functools.partial(
self._add_bw_limit_rule,
BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.direction),
functools.partial(
self._add_bw_limit_rule,
BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.reverse_direction)])
def test_bw_limit_qos_no_rules_l2_agent_restart(self):
vm, qos_policy = self._create_vm_with_limit_rules()
bw_rule_1 = qos_policy['rules'][0]
bw_rule_2 = qos_policy['rules'][1]
qos_policy_id = qos_policy['id']
self._wait_for_bw_rule_applied(
vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.direction)
self._wait_for_bw_rule_applied(
vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST, self.reverse_direction)
# final_rules indicates the last valid rule after all operations
# (clear/update/reset rules during the l2-agent stop) are completed
final_rules = [{'direction': self.direction,
'limit': None},
{'direction': self.reverse_direction,
'limit': None}]
# Stop l2_agent and clear these rules
self.l2_agent_process.stop()
self._wait_until_agent_down(self.l2_agent['id'])
self._restart_agent_and_check_rules_applied(
qos_policy_id, vm, final_rules=final_rules,
delete_rules=[bw_rule_1, bw_rule_2])
self.client.delete_bandwidth_limit_rule(bw_rule_1['id'],
qos_policy_id)
self.client.delete_bandwidth_limit_rule(bw_rule_2['id'],
qos_policy_id)
def test_bw_limit_qos_rules_deleted_l2_agent_restart(self):
vm, qos_policy = self._create_vm_with_limit_rules()
# Start l2_agent to check if these rules is cleared
self.l2_agent_process.start()
self._wait_until_agent_up(self.l2_agent['id'])
bw_rule_1 = qos_policy['rules'][0]
qos_policy_id = qos_policy['id']
self._wait_for_bw_rule_applied(
vm, None, None, self.direction)
self._wait_for_bw_rule_applied(
vm, None, None, self.reverse_direction)
# final_rules indicates the last valid rule after all operations
# (clear/update/reset rules during the l2-agent stop) are completed
final_rules = [{'direction': self.direction,
'limit': None},
{'direction': self.reverse_direction,
'limit': BANDWIDTH_LIMIT,
'burst': BANDWIDTH_BURST}]
self._restart_agent_and_check_rules_applied(
qos_policy_id, vm, final_rules=final_rules,
delete_rules=[bw_rule_1])
def test_bw_limit_qos_rules_changed_l2_agent_restart(self):
vm, qos_policy = self._create_vm_with_limit_rules()
bw_rule_1 = qos_policy['rules'][0]
bw_rule_2 = qos_policy['rules'][1]
qos_policy_id = qos_policy['id']
add_rules = [{'direction': self.direction,
'limit': BANDWIDTH_LIMIT * 2,
'burst': BANDWIDTH_BURST * 2},
{'direction': self.reverse_direction,
'limit': BANDWIDTH_LIMIT * 2,
'burst': BANDWIDTH_BURST * 2}]
self._restart_agent_and_check_rules_applied(
qos_policy_id, vm, final_rules=add_rules,
add_rules=add_rules,
delete_rules=[bw_rule_1, bw_rule_2])
def test_bw_limit_qos_rules_updated_l2_agent_restart(self):
vm, qos_policy = self._create_vm_with_limit_rules()
bw_rule_1 = qos_policy['rules'][0]
bw_rule_2 = qos_policy['rules'][1]
qos_policy_id = qos_policy['id']
update_rules = [{'id': bw_rule_1['id'],
'direction': bw_rule_1['direction'],
'limit': BANDWIDTH_LIMIT * 2,
'burst': BANDWIDTH_BURST * 2},
{'id': bw_rule_2['id'],
'direction': bw_rule_2['direction'],
'limit': BANDWIDTH_LIMIT * 2,
'burst': BANDWIDTH_BURST * 2}]
self._restart_agent_and_check_rules_applied(
qos_policy_id, vm, final_rules=update_rules,
update_rules=update_rules)
class TestBwLimitQoSOvs(_TestBwLimitQoS, base.BaseFullStackTestCase):

View File

@ -296,7 +296,7 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
self.qos_ext.handle_port(self.context, port)
# we make sure the underlying qos driver is called with the
# right parameters
self.qos_ext.qos_driver.delete.assert_called_once_with(port)
self.qos_ext.qos_driver.delete.assert_called_once_with(port, None)
self.assertEqual(port,
self.qos_ext.policy_map.qos_policy_ports[qos_policy_id][port_id])
self.assertIn(port_id, self.qos_ext.policy_map.port_policies)
@ -320,6 +320,7 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
'id': uuidutils.generate_uuid()}
test_policy = policy.QosPolicy(**test_policy_with_rules)
setattr(test_policy, 'rules', [])
self.pull_mock.return_value = test_policy
port['qos_policy_id'] = test_policy.id
self.qos_ext.handle_port(self.context, port)