FWaaS v2 Database rule insert/remove operations support
This change implements insert and remove operations for firewall rules. Those rules can now be shared among firewall policies. Partial-Implements: blueprint fwaas-api-2.0 Change-Id: I975afb7b3bcfcc3b9da319c1c380870fc43c5eb3
This commit is contained in:
parent
ca7c5c2b72
commit
3788294812
|
@ -85,8 +85,8 @@ class FirewallGroupPortAssociation(model_base.BASEV2):
|
|||
ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
port_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey('ports.id', ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
sa.ForeignKey('ports.id', ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
|
||||
|
||||
class FirewallPolicyRuleAssociation(model_base.BASEV2):
|
||||
|
@ -96,12 +96,15 @@ class FirewallPolicyRuleAssociation(model_base.BASEV2):
|
|||
__tablename__ = 'firewall_policy_rule_associations_v2'
|
||||
|
||||
firewall_policy_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey('firewall_policies_v2.id', ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
sa.ForeignKey('firewall_policies_v2.id',
|
||||
ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
firewall_rule_id = sa.Column(sa.String(36),
|
||||
sa.ForeignKey('firewall_rules_v2.id', ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
sa.ForeignKey('firewall_rules_v2.id',
|
||||
ondelete="CASCADE"),
|
||||
primary_key=True)
|
||||
position = sa.Column(sa.Integer)
|
||||
firewall_rule = orm.relationship('FirewallRuleV2')
|
||||
|
||||
|
||||
class FirewallPolicy(model_base.BASEV2, models_v2.HasId, HasName,
|
||||
|
@ -112,7 +115,7 @@ class FirewallPolicy(model_base.BASEV2, models_v2.HasId, HasName,
|
|||
public = sa.Column(sa.Boolean)
|
||||
rule_count = sa.Column(sa.Integer)
|
||||
audited = sa.Column(sa.Boolean)
|
||||
firewall_rules = orm.relationship(
|
||||
rule_associations = orm.relationship(
|
||||
FirewallPolicyRuleAssociation,
|
||||
backref=orm.backref('firewall_policies_v2', cascade='all, delete'),
|
||||
order_by='FirewallPolicyRuleAssociation.position',
|
||||
|
@ -205,8 +208,9 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
return self._fields(res, fields)
|
||||
|
||||
def _make_firewall_policy_dict(self, firewall_policy, fields=None):
|
||||
fw_rules = [rule.firewall_rule_id
|
||||
for rule in firewall_policy['firewall_rules']]
|
||||
fw_rules = [
|
||||
rule_association.firewall_rule_id
|
||||
for rule_association in firewall_policy['rule_associations']]
|
||||
res = {'id': firewall_policy['id'],
|
||||
'tenant_id': firewall_policy['tenant_id'],
|
||||
'name': firewall_policy['name'],
|
||||
|
@ -255,6 +259,64 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
firewall_group['egress_rule_list'] = []
|
||||
return firewall_group
|
||||
|
||||
def _check_firewall_rule_conflict(self, fwr_db, fwp_db):
|
||||
if not fwr_db['public']:
|
||||
if fwr_db['tenant_id'] != fwp_db['tenant_id']:
|
||||
raise fw_ext.FirewallRuleConflict(
|
||||
firewall_rule_id=fwr_db['id'],
|
||||
tenant_id=fwr_db['tenant_id'])
|
||||
|
||||
def _process_rule_for_policy(self, context, firewall_policy_id,
|
||||
firewall_rule_id, position, association_db):
|
||||
with context.session.begin(subtransactions=True):
|
||||
fwp_query = context.session.query(
|
||||
FirewallPolicy).with_lockmode('update')
|
||||
fwp_db = fwp_query.filter_by(id=firewall_policy_id).one()
|
||||
if position:
|
||||
# Note that although position numbering starts at 1,
|
||||
# internal ordering of the list starts at 0, so we compensate.
|
||||
fwp_db.rule_associations.insert(
|
||||
position - 1,
|
||||
FirewallPolicyRuleAssociation(
|
||||
firewall_rule_id=firewall_rule_id))
|
||||
else:
|
||||
fwp_db.rule_associations.remove(association_db)
|
||||
context.session.delete(association_db)
|
||||
fwp_db.rule_associations.reorder()
|
||||
fwp_db.audited = False
|
||||
return self._make_firewall_policy_dict(fwp_db)
|
||||
|
||||
def _get_policy_rule_association_query(self, context, firewall_policy_id,
|
||||
firewall_rule_id):
|
||||
fwpra_query = context.session.query(FirewallPolicyRuleAssociation)
|
||||
return fwpra_query.filter_by(firewall_policy_id=firewall_policy_id,
|
||||
firewall_rule_id=firewall_rule_id)
|
||||
|
||||
def _ensure_rule_not_already_associated(self, context, firewall_policy_id,
|
||||
firewall_rule_id):
|
||||
"""Checks that a rule is not already associated with a particular
|
||||
policy. If it is the function will throw an exception.
|
||||
"""
|
||||
try:
|
||||
self._get_policy_rule_association_query(
|
||||
context, firewall_policy_id, firewall_rule_id).one()
|
||||
raise fw_ext.FirewallRuleAlreadyAssociated(
|
||||
firewall_rule_id=firewall_rule_id, firewall_policy_id=id)
|
||||
except exc.NoResultFound:
|
||||
return
|
||||
|
||||
def _get_policy_rule_association(self, context, firewall_policy_id,
|
||||
firewall_rule_id):
|
||||
"""Returns the association between a firewall rule and a firewall
|
||||
policy. Throws an exception if the assocaition does not exist.
|
||||
"""
|
||||
try:
|
||||
return self._get_policy_rule_association_query(
|
||||
context, firewall_policy_id, firewall_rule_id).one()
|
||||
except exc.NoResultFound:
|
||||
raise fw_ext.FirewallRuleNotAssociatedWithPolicy(
|
||||
firewall_rule_id=firewall_rule_id, firewall_policy_id=id)
|
||||
|
||||
def create_firewall_rule(self, context, firewall_rule):
|
||||
LOG.debug("create_firewall_rule() called")
|
||||
fwr = firewall_rule['firewall_rule']
|
||||
|
@ -330,12 +392,59 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
context.session.delete(fwr)
|
||||
|
||||
def insert_rule(self, context, id, rule_info):
|
||||
# TODO(sridar)
|
||||
pass
|
||||
LOG.debug("insert_rule() called")
|
||||
self._validate_insert_remove_rule_request(id, rule_info)
|
||||
firewall_rule_id = rule_info['firewall_rule_id']
|
||||
# ensure rule is not already assigned to the policy
|
||||
self._ensure_rule_not_already_associated(context, id, firewall_rule_id)
|
||||
insert_before = True
|
||||
ref_firewall_rule_id = None
|
||||
if not firewall_rule_id:
|
||||
raise fw_ext.FirewallRuleNotFound(firewall_rule_id=None)
|
||||
if 'insert_before' in rule_info:
|
||||
ref_firewall_rule_id = rule_info['insert_before']
|
||||
if not ref_firewall_rule_id and 'insert_after' in rule_info:
|
||||
# If insert_before is set, we will ignore insert_after.
|
||||
ref_firewall_rule_id = rule_info['insert_after']
|
||||
insert_before = False
|
||||
with context.session.begin(subtransactions=True):
|
||||
fwr_db = self._get_firewall_rule(context, firewall_rule_id)
|
||||
fwp_db = self._get_firewall_policy(context, id)
|
||||
self._check_firewall_rule_conflict(fwr_db, fwp_db)
|
||||
if ref_firewall_rule_id:
|
||||
# If reference_firewall_rule_id is set, the new rule
|
||||
# is inserted depending on the value of insert_before.
|
||||
# If insert_before is set, the new rule is inserted before
|
||||
# reference_firewall_rule_id, and if it is not set the new
|
||||
# rule is inserted after reference_firewall_rule_id.
|
||||
fwpra_db = self._get_policy_rule_association(
|
||||
context, id, ref_firewall_rule_id)
|
||||
if insert_before:
|
||||
position = fwpra_db.position
|
||||
else:
|
||||
position = fwpra_db.position + 1
|
||||
else:
|
||||
# If reference_firewall_rule_id is not set, it is assumed
|
||||
# that the new rule needs to be inserted at the top.
|
||||
# insert_before field is ignored.
|
||||
# So default insertion is always at the top.
|
||||
# Also note that position numbering starts at 1.
|
||||
position = 1
|
||||
return self._process_rule_for_policy(context, id, firewall_rule_id,
|
||||
position, None)
|
||||
|
||||
def remove_rule(self, context, id, rule_info):
|
||||
# TODO(sridar)
|
||||
pass
|
||||
LOG.debug("remove_rule() called")
|
||||
self._validate_insert_remove_rule_request(id, rule_info)
|
||||
firewall_rule_id = rule_info['firewall_rule_id']
|
||||
if not firewall_rule_id:
|
||||
raise fw_ext.FirewallRuleNotFound(firewall_rule_id=None)
|
||||
with context.session.begin(subtransactions=True):
|
||||
self._get_firewall_rule(context, firewall_rule_id)
|
||||
fwpra_db = self._get_policy_rule_association(context, id,
|
||||
firewall_rule_id)
|
||||
return self._process_rule_for_policy(context, id, firewall_rule_id,
|
||||
None, fwpra_db)
|
||||
|
||||
def get_firewall_rule(self, context, id, fields=None):
|
||||
LOG.debug("get_firewall_rule() called")
|
||||
|
@ -348,6 +457,10 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
self._make_firewall_rule_dict,
|
||||
filters=filters, fields=fields)
|
||||
|
||||
def _validate_insert_remove_rule_request(self, id, rule_info):
|
||||
if not rule_info or 'firewall_rule_id' not in rule_info:
|
||||
raise fw_ext.FirewallRuleInfoMissing()
|
||||
|
||||
def _delete_rules_in_policy(self, context, firewall_policy_id):
|
||||
"""Delete the rules in the firewall policy."""
|
||||
with context.session.begin(subtransactions=True):
|
||||
|
@ -423,10 +536,10 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
|
||||
def _check_if_rules_public_for_policy_public(self, context, fwp_db, fwp):
|
||||
if fwp['public']:
|
||||
rules_in_db = fwp_db['firewall_rules']
|
||||
rules_in_db = fwp_db.rule_associations
|
||||
for entry in rules_in_db:
|
||||
fwr_db = self._get_firewall_rule(context,
|
||||
entry.firewall_rule_id)
|
||||
entry.firewall_rule_id)
|
||||
if not fwr_db['public']:
|
||||
raise fw_ext.FirewallPolicySharingConflict(
|
||||
firewall_rule_id=fwr_db['id'],
|
||||
|
@ -464,7 +577,7 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
fwp_db = firewall_policy_db
|
||||
with context.session.begin(subtransactions=True):
|
||||
if not rule_id_list:
|
||||
fwp_db.firewall_rules = []
|
||||
fwp_db.rule_associations = []
|
||||
return
|
||||
# We will first check if the new list of rules is valid
|
||||
filters = {'firewall_rule_id': [r_id for r_id in rule_id_list]}
|
||||
|
@ -481,10 +594,10 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
filters=filters)
|
||||
rules_dict = dict((fpol_rul_db['firewall_rule_id'], fpol_rul_db)
|
||||
for fpol_rul_db in rules_in_fpol_rul_db)
|
||||
fwp_db.firewall_rules = []
|
||||
fwp_db.rule_associations = []
|
||||
for fwrule_id in rule_id_list:
|
||||
fwp_db.firewall_rules.append(rules_dict[fwrule_id])
|
||||
fwp_db.firewall_rules.reorder()
|
||||
fwp_db.rule_associations.append(rules_dict[fwrule_id])
|
||||
fwp_db.rule_associations.reorder()
|
||||
|
||||
def create_firewall_policy(self, context, firewall_policy):
|
||||
LOG.debug("create_firewall_policy() called")
|
||||
|
|
|
@ -167,6 +167,18 @@ class FirewallRuleConflict(nexception.Conflict):
|
|||
"another tenant %(tenant_id)s")
|
||||
|
||||
|
||||
class FirewallRuleAlreadyAssociated(nexception.Conflict):
|
||||
"""Firewall rule conflict exception.
|
||||
|
||||
Occurs when there is an attempt to assign a rule to a policy that
|
||||
the rule is already associated with.
|
||||
"""
|
||||
|
||||
message = _("Operation cannot be performed since Firewall Rule "
|
||||
"%(firewall_rule_id)s is already associated with Firewall"
|
||||
"Policy %(firewall_policy_id)s")
|
||||
|
||||
|
||||
RESOURCE_ATTRIBUTE_MAP = {
|
||||
'firewall_rules': {
|
||||
'id': {'allow_post': False, 'allow_put': False,
|
||||
|
|
|
@ -1216,6 +1216,278 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
|
|||
self.plugin.get_firewall_group,
|
||||
ctx, fw_id)
|
||||
|
||||
def test_insert_rule_in_policy_with_prior_rules_added_via_update(self):
|
||||
attrs = self._get_test_firewall_policy_attrs()
|
||||
attrs['audited'] = False
|
||||
with self.firewall_rule(name='fwr1') as fwr1, \
|
||||
self.firewall_rule(name='fwr2') as fwr2, \
|
||||
self.firewall_rule(name='fwr3') as fwr3:
|
||||
frs = [fwr1, fwr2, fwr3]
|
||||
fr1 = frs[0:2]
|
||||
fwr3 = frs[2]
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
attrs['id'] = fwp_id
|
||||
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
|
||||
attrs['firewall_rules'] = fw_rule_ids[:]
|
||||
data = {'firewall_policy': {'firewall_rules': fw_rule_ids}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp_id)
|
||||
req.get_response(self.ext_api)
|
||||
self._rule_action('insert', fwp_id, fw_rule_ids[0],
|
||||
insert_before=fw_rule_ids[0],
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPConflict.code,
|
||||
expected_body=None)
|
||||
fwr3_id = fwr3['firewall_rule']['id']
|
||||
attrs['firewall_rules'].insert(0, fwr3_id)
|
||||
self._rule_action('insert', fwp_id, fwr3_id,
|
||||
insert_before=fw_rule_ids[0],
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs)
|
||||
|
||||
def test_insert_rule_in_policy_failures(self):
|
||||
with self.firewall_rule(name='fwr1') as fr1:
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
fr1_id = fr1['firewall_rule']['id']
|
||||
fw_rule_ids = [fr1_id]
|
||||
data = {'firewall_policy':
|
||||
{'firewall_rules': fw_rule_ids}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp_id)
|
||||
req.get_response(self.ext_api)
|
||||
# test inserting with empty request body
|
||||
self._rule_action('insert', fwp_id, '123',
|
||||
expected_code=webob.exc.HTTPBadRequest.code,
|
||||
expected_body=None, body_data={})
|
||||
# test inserting when firewall_rule_id is missing in
|
||||
# request body
|
||||
insert_data = {'insert_before': '123',
|
||||
'insert_after': '456'}
|
||||
self._rule_action('insert', fwp_id, '123',
|
||||
expected_code=webob.exc.HTTPBadRequest.code,
|
||||
expected_body=None,
|
||||
body_data=insert_data)
|
||||
# test inserting when firewall_rule_id is None
|
||||
insert_data = {'firewall_rule_id': None,
|
||||
'insert_before': '123',
|
||||
'insert_after': '456'}
|
||||
self._rule_action('insert', fwp_id, '123',
|
||||
expected_code=webob.exc.HTTPNotFound.code,
|
||||
expected_body=None,
|
||||
body_data=insert_data)
|
||||
# test inserting when firewall_policy_id is incorrect
|
||||
self._rule_action('insert', '123', fr1_id,
|
||||
expected_code=webob.exc.HTTPNotFound.code,
|
||||
expected_body=None)
|
||||
# test inserting when firewall_policy_id is None
|
||||
self._rule_action('insert', None, fr1_id,
|
||||
expected_code=webob.exc.HTTPBadRequest.code,
|
||||
expected_body=None)
|
||||
|
||||
def test_insert_rule_for_previously_associated_rule(self):
|
||||
with self.firewall_rule() as fwr:
|
||||
fwr_id = fwr['firewall_rule']['id']
|
||||
fw_rule_ids = [fwr_id]
|
||||
with self.firewall_policy(firewall_rules=fw_rule_ids):
|
||||
with self.firewall_policy(name='firewall_policy2') as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
insert_data = {'firewall_rule_id': fwr_id}
|
||||
self._rule_action(
|
||||
'insert', fwp_id, fwr_id, insert_before=None,
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=None, body_data=insert_data)
|
||||
|
||||
def test_insert_rule_for_previously_associated_rule_other_tenant(self):
|
||||
with self.firewall_rule(tenant_id='tenant-2') as fwr:
|
||||
fwr_id = fwr['firewall_rule']['id']
|
||||
fw_rule_ids = [fwr_id]
|
||||
with self.firewall_policy(tenant_id='tenant-2',
|
||||
firewall_rules=fw_rule_ids):
|
||||
with self.firewall_policy(name='firewall_policy2') as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
insert_data = {'firewall_rule_id': fwr_id}
|
||||
self._rule_action(
|
||||
'insert', fwp_id, fwr_id, insert_before=None,
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=None, body_data=insert_data)
|
||||
|
||||
def test_insert_rule_for_prev_associated_ref_rule(self):
|
||||
with self.firewall_rule(name='fwr0') as fwr0, \
|
||||
self.firewall_rule(name='fwr1') as fwr1:
|
||||
fwr = [fwr0, fwr1]
|
||||
fwr0_id = fwr[0]['firewall_rule']['id']
|
||||
fwr1_id = fwr[1]['firewall_rule']['id']
|
||||
with self.firewall_policy(name='fwp0') as fwp0, \
|
||||
self.firewall_policy(name='fwp1',
|
||||
firewall_rules=[fwr1_id]) as fwp1:
|
||||
fwp = [fwp0, fwp1]
|
||||
fwp0_id = fwp[0]['firewall_policy']['id']
|
||||
# test inserting before a rule which
|
||||
# is associated with different policy
|
||||
self._rule_action('insert', fwp0_id, fwr0_id,
|
||||
insert_before=fwr1_id,
|
||||
expected_code=webob.exc.HTTPBadRequest.code,
|
||||
expected_body=None)
|
||||
# test inserting after a rule which
|
||||
# is associated with different policy
|
||||
self._rule_action('insert', fwp0_id, fwr0_id,
|
||||
insert_after=fwr1_id,
|
||||
expected_code=webob.exc.HTTPBadRequest.code,
|
||||
expected_body=None)
|
||||
|
||||
def test_insert_rule_for_policy_of_other_tenant(self):
|
||||
with self.firewall_rule(tenant_id='tenant-2', public=False) as fwr:
|
||||
fwr_id = fwr['firewall_rule']['id']
|
||||
with self.firewall_policy(name='firewall_policy') as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
insert_data = {'firewall_rule_id': fwr_id}
|
||||
self._rule_action(
|
||||
'insert', fwp_id, fwr_id, insert_before=None,
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPConflict.code,
|
||||
expected_body=None, body_data=insert_data)
|
||||
|
||||
def test_insert_rule_in_policy(self):
|
||||
attrs = self._get_test_firewall_policy_attrs()
|
||||
attrs['audited'] = False
|
||||
with self.firewall_rule(name='fwr0') as fwr0, \
|
||||
self.firewall_rule(name='fwr1') as fwr1, \
|
||||
self.firewall_rule(name='fwr2') as fwr2, \
|
||||
self.firewall_rule(name='fwr3') as fwr3, \
|
||||
self.firewall_rule(name='fwr4') as fwr4, \
|
||||
self.firewall_rule(name='fwr5') as fwr5, \
|
||||
self.firewall_rule(name='fwr6') as fwr6:
|
||||
fwr = [fwr0, fwr1, fwr2, fwr3, fwr4, fwr5, fwr6]
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
attrs['id'] = fwp_id
|
||||
# test insert when rule list is empty
|
||||
fwr0_id = fwr[0]['firewall_rule']['id']
|
||||
attrs['firewall_rules'].insert(0, fwr0_id)
|
||||
self._rule_action('insert', fwp_id, fwr0_id,
|
||||
insert_before=None,
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs)
|
||||
# test insert at top of rule list, insert_before and
|
||||
# insert_after not provided
|
||||
fwr1_id = fwr[1]['firewall_rule']['id']
|
||||
attrs['firewall_rules'].insert(0, fwr1_id)
|
||||
insert_data = {'firewall_rule_id': fwr1_id}
|
||||
self._rule_action('insert', fwp_id, fwr0_id,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs, body_data=insert_data)
|
||||
# test insert at top of list above existing rule
|
||||
fwr2_id = fwr[2]['firewall_rule']['id']
|
||||
attrs['firewall_rules'].insert(0, fwr2_id)
|
||||
self._rule_action('insert', fwp_id, fwr2_id,
|
||||
insert_before=fwr1_id,
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs)
|
||||
# test insert at bottom of list
|
||||
fwr3_id = fwr[3]['firewall_rule']['id']
|
||||
attrs['firewall_rules'].append(fwr3_id)
|
||||
self._rule_action('insert', fwp_id, fwr3_id,
|
||||
insert_before=None,
|
||||
insert_after=fwr0_id,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs)
|
||||
# test insert in the middle of the list using
|
||||
# insert_before
|
||||
fwr4_id = fwr[4]['firewall_rule']['id']
|
||||
attrs['firewall_rules'].insert(1, fwr4_id)
|
||||
self._rule_action('insert', fwp_id, fwr4_id,
|
||||
insert_before=fwr1_id,
|
||||
insert_after=None,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs)
|
||||
# test insert in the middle of the list using
|
||||
# insert_after
|
||||
fwr5_id = fwr[5]['firewall_rule']['id']
|
||||
attrs['firewall_rules'].insert(1, fwr5_id)
|
||||
self._rule_action('insert', fwp_id, fwr5_id,
|
||||
insert_before=None,
|
||||
insert_after=fwr2_id,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs)
|
||||
# test insert when both insert_before and
|
||||
# insert_after are set
|
||||
fwr6_id = fwr[6]['firewall_rule']['id']
|
||||
attrs['firewall_rules'].insert(1, fwr6_id)
|
||||
self._rule_action('insert', fwp_id, fwr6_id,
|
||||
insert_before=fwr5_id,
|
||||
insert_after=fwr5_id,
|
||||
expected_code=webob.exc.HTTPOk.code,
|
||||
expected_body=attrs)
|
||||
|
||||
def test_remove_rule_from_policy(self):
|
||||
attrs = self._get_test_firewall_policy_attrs()
|
||||
attrs['audited'] = False
|
||||
with self.firewall_rule(name='fwr1') as fwr1, \
|
||||
self.firewall_rule(name='fwr2') as fwr2, \
|
||||
self.firewall_rule(name='fwr3') as fwr3:
|
||||
fr1 = [fwr1, fwr2, fwr3]
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
attrs['id'] = fwp_id
|
||||
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
|
||||
attrs['firewall_rules'] = fw_rule_ids[:]
|
||||
data = {'firewall_policy':
|
||||
{'firewall_rules': fw_rule_ids}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp_id)
|
||||
req.get_response(self.ext_api)
|
||||
# test removing a rule from a policy that does not exist
|
||||
self._rule_action('remove', '123', fw_rule_ids[1],
|
||||
expected_code=webob.exc.HTTPNotFound.code,
|
||||
expected_body=None)
|
||||
# test removing a rule in the middle of the list
|
||||
attrs['firewall_rules'].remove(fw_rule_ids[1])
|
||||
self._rule_action('remove', fwp_id, fw_rule_ids[1],
|
||||
expected_body=attrs)
|
||||
# test removing a rule at the top of the list
|
||||
attrs['firewall_rules'].remove(fw_rule_ids[0])
|
||||
self._rule_action('remove', fwp_id, fw_rule_ids[0],
|
||||
expected_body=attrs)
|
||||
# test removing remaining rule in the list
|
||||
attrs['firewall_rules'].remove(fw_rule_ids[2])
|
||||
self._rule_action('remove', fwp_id, fw_rule_ids[2],
|
||||
expected_body=attrs)
|
||||
# test removing rule that is not associated with the policy
|
||||
self._rule_action('remove', fwp_id, fw_rule_ids[2],
|
||||
expected_code=webob.exc.HTTPBadRequest.code,
|
||||
expected_body=None)
|
||||
|
||||
def test_remove_rule_from_policy_failures(self):
|
||||
with self.firewall_rule(name='fwr1') as fr1:
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
fw_rule_ids = [fr1['firewall_rule']['id']]
|
||||
data = {'firewall_policy':
|
||||
{'firewall_rules': fw_rule_ids}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp_id)
|
||||
req.get_response(self.ext_api)
|
||||
# test removing rule that does not exist
|
||||
self._rule_action('remove', fwp_id, '123',
|
||||
expected_code=webob.exc.HTTPNotFound.code,
|
||||
expected_body=None)
|
||||
# test removing rule with bad request
|
||||
self._rule_action('remove', fwp_id, '123',
|
||||
expected_code=webob.exc.HTTPBadRequest.code,
|
||||
expected_body=None, body_data={})
|
||||
# test removing rule with firewall_rule_id set to None
|
||||
self._rule_action('remove', fwp_id, '123',
|
||||
expected_code=webob.exc.HTTPNotFound.code,
|
||||
expected_body=None,
|
||||
body_data={'firewall_rule_id': None})
|
||||
|
||||
def test_show_firewall_rule_by_name(self):
|
||||
with self.firewall_rule(name='firewall_Rule1') as fw_rule:
|
||||
res = self._show('firewall_rules',
|
||||
|
|
Loading…
Reference in New Issue