Merge "Fix NVP FWaaS occurs error when deleting a shared rule"

This commit is contained in:
Jenkins 2014-06-20 05:19:57 +00:00 committed by Gerrit Code Review
commit ad67acfa79
3 changed files with 52 additions and 3 deletions

View File

@ -70,10 +70,10 @@ def add_vcns_edge_firewallrule_binding(session, map_info):
return binding
def delete_vcns_edge_firewallrule_binding(session, id):
def delete_vcns_edge_firewallrule_binding(session, id, edge_id):
with session.begin(subtransactions=True):
if not (session.query(vcns_models.VcnsEdgeFirewallRuleBinding).
filter_by(rule_id=id).delete()):
filter_by(rule_id=id, edge_id=edge_id).delete()):
msg = _("Rule Resource binding with id:%s not found!") % id
raise nsx_exc.NsxPluginException(err_msg=msg)

View File

@ -277,7 +277,7 @@ class EdgeFirewallDriver(db_base_plugin_v2.NeutronDbPluginV2):
{'rule_id': id,
'edge_id': edge_id})
vcns_db.delete_vcns_edge_firewallrule_binding(
context.session, id)
context.session, id, edge_id)
def _add_rule_above(self, context, ref_rule_id, edge_id, firewall_rule):
rule_map = vcns_db.get_vcns_edge_firewallrule_binding(

View File

@ -646,3 +646,52 @@ class FirewallPluginTestCase(test_db_firewall.FirewallPluginDbTestCase,
'remove', fwp_id, fw_rule_ids[2],
expected_code=webob.exc.HTTPBadRequest.code,
expected_body=None)
def test_remove_rule_with_firewalls(self):
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['id'] = fwp_id
with contextlib.nested(
self.firewall(router_id=self._create_and_get_router(),
firewall_policy_id=fwp_id),
self.firewall(router_id=self._create_and_get_router(),
firewall_policy_id=fwp_id)) as (fw1, fw2):
attrs['firewall_list'].insert(0, fw1['firewall']['id'])
attrs['firewall_list'].insert(1, fw2['firewall']['id'])
with contextlib.nested(self.firewall_rule(name='fwr1'),
self.firewall_rule(name='fwr2'),
self.firewall_rule(name='fwr3')) as fr1:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
attrs['firewall_rules'] = fw_rule_ids[:]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request(
'firewall_policies', data, fwp_id)
req.get_response(self.ext_api)
# test removing a rule from a policy that does not exist
self._rule_action(
'remove', '123',
fw_rule_ids[1],
expected_code=webob.exc.HTTPNotFound.code,
expected_body=None)
# test removing a rule in the middle of the list
attrs['firewall_rules'].remove(fw_rule_ids[1])
self._rule_action('remove', fwp_id, fw_rule_ids[1],
expected_body=attrs)
# test removing a rule at the top of the list
attrs['firewall_rules'].remove(fw_rule_ids[0])
self._rule_action('remove', fwp_id, fw_rule_ids[0],
expected_body=attrs)
# test removing remaining rule in the list
attrs['firewall_rules'].remove(fw_rule_ids[2])
self._rule_action('remove', fwp_id, fw_rule_ids[2],
expected_body=attrs)
# test removing rule that is not
#associated with the policy
self._rule_action(
'remove', fwp_id, fw_rule_ids[2],
expected_code=webob.exc.HTTPBadRequest.code,
expected_body=None)