Merge "shared policy shouldn't have unshared rules"
This commit is contained in:
commit
7155de3940
@ -164,7 +164,8 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
|
||||
'enabled': firewall_rule['enabled']}
|
||||
return self._fields(res, fields)
|
||||
|
||||
def _set_rules_for_policy(self, context, firewall_policy_db, rule_id_list):
|
||||
def _set_rules_for_policy(self, context, firewall_policy_db, fwp):
|
||||
rule_id_list = fwp['firewall_rules']
|
||||
fwp_db = firewall_policy_db
|
||||
with context.session.begin(subtransactions=True):
|
||||
if not rule_id_list:
|
||||
@ -188,6 +189,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
|
||||
fwp_db['id']):
|
||||
raise firewall.FirewallRuleInUse(
|
||||
firewall_rule_id=fwrule_id)
|
||||
if 'shared' in fwp:
|
||||
if fwp['shared'] and not rules_dict[fwrule_id]['shared']:
|
||||
raise firewall.FirewallRuleSharingConflict(
|
||||
firewall_rule_id=fwrule_id,
|
||||
firewall_policy_id=fwp_db['id'])
|
||||
elif fwp_db['shared'] and not rules_dict[fwrule_id]['shared']:
|
||||
raise firewall.FirewallRuleSharingConflict(
|
||||
firewall_rule_id=fwrule_id,
|
||||
firewall_policy_id=fwp_db['id'])
|
||||
# New list of rules is valid so we will first reset the existing
|
||||
# list and then add each rule in order.
|
||||
# Note that the list could be empty in which case we interpret
|
||||
@ -198,6 +208,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
|
||||
fwp_db.firewall_rules.reorder()
|
||||
fwp_db.audited = False
|
||||
|
||||
def _check_unshared_rules_for_policy(self, fwp_db, fwp):
|
||||
if fwp['shared']:
|
||||
rules_in_db = fwp_db['firewall_rules']
|
||||
for fwr_db in rules_in_db:
|
||||
if not fwr_db['shared']:
|
||||
raise firewall.FirewallPolicySharingConflict(
|
||||
firewall_rule_id=fwr_db['id'],
|
||||
firewall_policy_id=fwp_db['id'])
|
||||
|
||||
def _process_rule_for_policy(self, context, firewall_policy_id,
|
||||
firewall_rule_db, position):
|
||||
with context.session.begin(subtransactions=True):
|
||||
@ -303,8 +322,7 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
|
||||
description=fwp['description'],
|
||||
shared=fwp['shared'])
|
||||
context.session.add(fwp_db)
|
||||
self._set_rules_for_policy(context, fwp_db,
|
||||
fwp['firewall_rules'])
|
||||
self._set_rules_for_policy(context, fwp_db, fwp)
|
||||
fwp_db.audited = fwp['audited']
|
||||
return self._make_firewall_policy_dict(fwp_db)
|
||||
|
||||
@ -313,9 +331,11 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
|
||||
fwp = firewall_policy['firewall_policy']
|
||||
with context.session.begin(subtransactions=True):
|
||||
fwp_db = self._get_firewall_policy(context, id)
|
||||
if 'firewall_rules' in fwp:
|
||||
self._set_rules_for_policy(context, fwp_db,
|
||||
fwp['firewall_rules'])
|
||||
# check any existing rules are not shared
|
||||
if 'shared' in fwp and 'firewall_rules' not in fwp:
|
||||
self._check_unshared_rules_for_policy(fwp_db, fwp)
|
||||
elif 'firewall_rules' in fwp:
|
||||
self._set_rules_for_policy(context, fwp_db, fwp)
|
||||
del fwp['firewall_rules']
|
||||
if 'audited' not in fwp or fwp['audited']:
|
||||
fwp['audited'] = False
|
||||
|
@ -54,6 +54,30 @@ class FirewallPolicyInUse(qexception.InUse):
|
||||
message = _("Firewall Policy %(firewall_policy_id)s is being used.")
|
||||
|
||||
|
||||
class FirewallRuleSharingConflict(qexception.Conflict):
|
||||
|
||||
"""FWaaS exception for firewall rules
|
||||
|
||||
When a shared policy is created or updated with unshared rules,
|
||||
this exception will be raised.
|
||||
"""
|
||||
message = _("Operation cannot be performed since Firewall Policy "
|
||||
"%(firewall_policy_id)s is shared but Firewall Rule "
|
||||
"%(firewall_rule_id)s is not shared")
|
||||
|
||||
|
||||
class FirewallPolicySharingConflict(qexception.Conflict):
|
||||
|
||||
"""FWaaS exception for firewall policy
|
||||
|
||||
When a policy is shared without sharing its associated rules,
|
||||
this exception will be raised.
|
||||
"""
|
||||
message = _("Operation cannot be performed. Before sharing Firewall "
|
||||
"Policy %(firewall_policy_id)s, share associated Firewall "
|
||||
"Rule %(firewall_rule_id)s")
|
||||
|
||||
|
||||
class FirewallRuleNotFound(qexception.NotFound):
|
||||
message = _("Firewall Rule %(firewall_rule_id)s could not be found.")
|
||||
|
||||
|
@ -347,6 +347,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
|
||||
audited=AUDITED)
|
||||
self.assertEqual(res.status_int, 409)
|
||||
|
||||
def test_create_shared_firewall_policy_with_unshared_rule(self):
|
||||
with self.firewall_rule(shared=False) as fwr:
|
||||
fw_rule_ids = [fwr['firewall_rule']['id']]
|
||||
res = self._create_firewall_policy(
|
||||
None, 'firewall_policy1', description=DESCRIPTION, shared=True,
|
||||
firewall_rules=fw_rule_ids, audited=AUDITED)
|
||||
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
|
||||
|
||||
def test_show_firewall_policy(self):
|
||||
name = "firewall_policy1"
|
||||
attrs = self._get_test_firewall_policy_attrs(name)
|
||||
@ -520,6 +528,42 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
|
||||
for k, v in attrs.iteritems():
|
||||
self.assertEqual(res['firewall_policy'][k], v)
|
||||
|
||||
def test_update_shared_firewall_policy_with_unshared_rule(self):
|
||||
with self.firewall_rule(name='fwr1', shared=False) as fr:
|
||||
with self.firewall_policy() as fwp:
|
||||
fw_rule_ids = [fr['firewall_rule']['id']]
|
||||
# update shared policy with unshared rule
|
||||
data = {'firewall_policy':
|
||||
{'firewall_rules': fw_rule_ids}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp['firewall_policy']['id'])
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
|
||||
|
||||
def test_update_firewall_policy_with_shared_attr_unshared_rule(self):
|
||||
with self.firewall_rule(name='fwr1', shared=False) as fr:
|
||||
with self.firewall_policy(shared=False) as fwp:
|
||||
fw_rule_ids = [fr['firewall_rule']['id']]
|
||||
# update shared policy with shared attr and unshared rule
|
||||
data = {'firewall_policy': {'shared': True,
|
||||
'firewall_rules': fw_rule_ids}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp['firewall_policy']['id'])
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
|
||||
|
||||
def test_update_firewall_policy_with_shared_attr_exist_unshare_rule(self):
|
||||
with self.firewall_rule(name='fwr1', shared=False) as fr:
|
||||
fw_rule_ids = [fr['firewall_rule']['id']]
|
||||
with self.firewall_policy(shared=False,
|
||||
firewall_rules=fw_rule_ids) as fwp:
|
||||
# update policy with shared attr
|
||||
data = {'firewall_policy': {'shared': True}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp['firewall_policy']['id'])
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
|
||||
|
||||
def test_delete_firewall_policy(self):
|
||||
ctx = context.get_admin_context()
|
||||
with self.firewall_policy(do_delete=False) as fwp:
|
||||
|
Loading…
x
Reference in New Issue
Block a user