diff --git a/neutron/plugins/vmware/dbexts/vcns_db.py b/neutron/plugins/vmware/dbexts/vcns_db.py index bfb14aa29..24b3e5b8a 100644 --- a/neutron/plugins/vmware/dbexts/vcns_db.py +++ b/neutron/plugins/vmware/dbexts/vcns_db.py @@ -70,10 +70,10 @@ def add_vcns_edge_firewallrule_binding(session, map_info): return binding -def delete_vcns_edge_firewallrule_binding(session, id): +def delete_vcns_edge_firewallrule_binding(session, id, edge_id): with session.begin(subtransactions=True): if not (session.query(vcns_models.VcnsEdgeFirewallRuleBinding). - filter_by(rule_id=id).delete()): + filter_by(rule_id=id, edge_id=edge_id).delete()): msg = _("Rule Resource binding with id:%s not found!") % id raise nsx_exc.NsxPluginException(err_msg=msg) diff --git a/neutron/plugins/vmware/vshield/edge_firewall_driver.py b/neutron/plugins/vmware/vshield/edge_firewall_driver.py index d07c0456a..f2e899645 100644 --- a/neutron/plugins/vmware/vshield/edge_firewall_driver.py +++ b/neutron/plugins/vmware/vshield/edge_firewall_driver.py @@ -277,7 +277,7 @@ class EdgeFirewallDriver(db_base_plugin_v2.NeutronDbPluginV2): {'rule_id': id, 'edge_id': edge_id}) vcns_db.delete_vcns_edge_firewallrule_binding( - context.session, id) + context.session, id, edge_id) def _add_rule_above(self, context, ref_rule_id, edge_id, firewall_rule): rule_map = vcns_db.get_vcns_edge_firewallrule_binding( diff --git a/neutron/tests/unit/vmware/vshield/test_fwaas_plugin.py b/neutron/tests/unit/vmware/vshield/test_fwaas_plugin.py index acd8e7da7..ff54be72a 100644 --- a/neutron/tests/unit/vmware/vshield/test_fwaas_plugin.py +++ b/neutron/tests/unit/vmware/vshield/test_fwaas_plugin.py @@ -626,3 +626,52 @@ class FirewallPluginTestCase(test_db_firewall.FirewallPluginDbTestCase, 'remove', fwp_id, fw_rule_ids[2], expected_code=webob.exc.HTTPBadRequest.code, expected_body=None) + + def test_remove_rule_with_firewalls(self): + attrs = self._get_test_firewall_policy_attrs() + attrs['audited'] = False + attrs['firewall_list'] = [] + with self.firewall_policy() as fwp: + fwp_id = fwp['firewall_policy']['id'] + attrs['id'] = fwp_id + with contextlib.nested( + self.firewall(router_id=self._create_and_get_router(), + firewall_policy_id=fwp_id), + self.firewall(router_id=self._create_and_get_router(), + firewall_policy_id=fwp_id)) as (fw1, fw2): + attrs['firewall_list'].insert(0, fw1['firewall']['id']) + attrs['firewall_list'].insert(1, fw2['firewall']['id']) + with contextlib.nested(self.firewall_rule(name='fwr1'), + self.firewall_rule(name='fwr2'), + self.firewall_rule(name='fwr3')) as fr1: + fw_rule_ids = [r['firewall_rule']['id'] for r in fr1] + attrs['firewall_rules'] = fw_rule_ids[:] + data = {'firewall_policy': + {'firewall_rules': fw_rule_ids}} + req = self.new_update_request( + 'firewall_policies', data, fwp_id) + req.get_response(self.ext_api) + # test removing a rule from a policy that does not exist + self._rule_action( + 'remove', '123', + fw_rule_ids[1], + expected_code=webob.exc.HTTPNotFound.code, + expected_body=None) + # test removing a rule in the middle of the list + attrs['firewall_rules'].remove(fw_rule_ids[1]) + self._rule_action('remove', fwp_id, fw_rule_ids[1], + expected_body=attrs) + # test removing a rule at the top of the list + attrs['firewall_rules'].remove(fw_rule_ids[0]) + self._rule_action('remove', fwp_id, fw_rule_ids[0], + expected_body=attrs) + # test removing remaining rule in the list + attrs['firewall_rules'].remove(fw_rule_ids[2]) + self._rule_action('remove', fwp_id, fw_rule_ids[2], + expected_body=attrs) + # test removing rule that is not + #associated with the policy + self._rule_action( + 'remove', fwp_id, fw_rule_ids[2], + expected_code=webob.exc.HTTPBadRequest.code, + expected_body=None)