Create links between FWaaS resources

Part of blueprint fwaas-horizon

This commit creates cross links between FWaaS resources
and resolves FWaaS resource UUID in tables to its name.
This helps users to track FWaaS resources.

This commit also improves an error handling when an error occurs
during retrieving a resource detail via API call.
While previously a detail page without item values is displayed,
now the index page is displayed for such case.

Closes-Bug: #1229433

Change-Id: I4357b5c275ab75163f0afc7158b4c1edb8cecd6c
This commit is contained in:
Akihiro MOTOKI 2013-09-10 10:21:26 +09:00
parent ffdf190825
commit f3f6f05275
10 changed files with 326 additions and 109 deletions

View File

@ -16,6 +16,8 @@
from __future__ import absolute_import
from django.utils.datastructures import SortedDict # noqa
from openstack_dashboard.api import neutron
neutronclient = neutron.neutronclient
@ -71,14 +73,33 @@ def rule_create(request, **kwargs):
def rules_list(request, **kwargs):
return _rules_list(request, expand_policy=True, **kwargs)
def _rules_list(request, expand_policy, **kwargs):
rules = neutronclient(request).list_firewall_rules(
**kwargs).get('firewall_rules')
if expand_policy:
policies = _policies_list(request, expand_rule=False)
policy_dict = SortedDict((p.id, p) for p in policies)
for rule in rules:
rule['policy'] = policy_dict.get(rule['firewall_policy_id'])
return [Rule(r) for r in rules]
def rule_get(request, rule_id):
return _rule_get(request, rule_id, expand_policy=True)
def _rule_get(request, rule_id, expand_policy):
rule = neutronclient(request).show_firewall_rule(
rule_id).get('firewall_rule')
if expand_policy:
if rule['firewall_policy_id']:
rule['policy'] = _policy_get(request, rule['firewall_policy_id'],
expand_rule=False)
else:
rule['policy'] = None
return Rule(rule)
@ -111,14 +132,36 @@ def policy_create(request, **kwargs):
def policies_list(request, **kwargs):
return _policies_list(request, expand_rule=True, **kwargs)
def _policies_list(request, expand_rule, **kwargs):
policies = neutronclient(request).list_firewall_policies(
**kwargs).get('firewall_policies')
if expand_rule:
rules = _rules_list(request, expand_policy=False)
rule_dict = SortedDict((rule.id, rule) for rule in rules)
for p in policies:
p['rules'] = [rule_dict.get(rule) for rule in p['firewall_rules']]
return [Policy(p) for p in policies]
def policy_get(request, policy_id):
return _policy_get(request, policy_id, expand_rule=True)
def _policy_get(request, policy_id, expand_rule):
policy = neutronclient(request).show_firewall_policy(
policy_id).get('firewall_policy')
if expand_rule:
policy_rules = policy['firewall_rules']
if policy_rules:
rules = _rules_list(request, expand_policy=False,
firewall_policy_id=policy_id)
rule_dict = SortedDict((rule.id, rule) for rule in rules)
policy['rules'] = [rule_dict.get(rule) for rule in policy_rules]
else:
policy['rules'] = []
return Policy(policy)
@ -162,14 +205,34 @@ def firewall_create(request, **kwargs):
def firewalls_list(request, **kwargs):
return _firewalls_list(request, expand_policy=True, **kwargs)
def _firewalls_list(request, expand_policy, **kwargs):
firewalls = neutronclient(request).list_firewalls(
**kwargs).get('firewalls')
if expand_policy:
policies = _policies_list(request, expand_rule=False)
policy_dict = SortedDict((p.id, p) for p in policies)
for fw in firewalls:
fw['policy'] = policy_dict.get(fw['firewall_policy_id'])
return [Firewall(f) for f in firewalls]
def firewall_get(request, firewall_id):
return _firewall_get(request, firewall_id, expand_policy=True)
def _firewall_get(request, firewall_id, expand_policy):
firewall = neutronclient(request).show_firewall(
firewall_id).get('firewall')
if expand_policy:
policy_id = firewall['firewall_policy_id']
if policy_id:
firewall['policy'] = _policy_get(request, policy_id,
expand_rule=False)
else:
firewall['policy'] = None
return Firewall(firewall)

View File

@ -234,14 +234,18 @@ class InsertRuleToPolicy(forms.SelfHandlingForm):
def handle(self, request, context):
policy_id = self.initial['policy_id']
policy_name_or_id = self.initial['name'] or policy_id
try:
body = {'firewall_rule_id': context['firewall_rule_id'],
insert_rule_id = context['firewall_rule_id']
insert_rule = api.fwaas.rule_get(request, insert_rule_id)
body = {'firewall_rule_id': insert_rule_id,
'insert_before': context['insert_before'],
'insert_after': context['insert_after']}
policy = api.fwaas.policy_insert_rule(request, policy_id, **body)
msg = _('Rule %(rule)s was successfully inserted to policy '
'%(policy)s.' %
{'rule': context['firewall_rule_id'], 'policy': policy_id})
{'rule': insert_rule.name or insert_rule.id,
'policy': policy_name_or_id})
LOG.debug(msg)
messages.success(request, msg)
return policy
@ -286,13 +290,16 @@ class RemoveRuleFromPolicy(forms.SelfHandlingForm):
def handle(self, request, context):
policy_id = self.initial['policy_id']
policy_name_or_id = self.initial['name'] or policy_id
try:
body = {'firewall_rule_id': context['firewall_rule_id'], }
remove_rule_id = context['firewall_rule_id']
remove_rule = api.fwaas.rule_get(request, remove_rule_id)
body = {'firewall_rule_id': remove_rule_id}
policy = api.fwaas.policy_remove_rule(request, policy_id, **body)
msg = _('Rule %(rule)s was successfully removed from policy '
msg = _('Rule %(rule)s was successfully remove from policy '
'%(policy)s.' %
{'rule': context['firewall_rule_id'],
'policy': self.initial['name']})
{'rule': remove_rule.name or remove_rule.id,
'policy': policy_name_or_id})
LOG.debug(msg)
messages.success(request, msg)
return policy

View File

@ -18,6 +18,7 @@
import logging
from django.core.urlresolvers import reverse # noqa
from django.template import defaultfilters as filters
from django.utils.translation import ugettext_lazy as _ # noqa
from horizon import tables
@ -126,11 +127,28 @@ class RemoveRuleFromPolicyLink(tables.LinkAction):
return base_url
def get_rules_name(datum):
return ', '.join([rule.name or rule.id[:13]
for rule in datum.rules])
def get_policy_name(datum):
if datum.policy:
return datum.policy.name or datum.policy.id
def get_policy_link(datum):
return reverse('horizon:project:firewalls:policydetails',
kwargs={'policy_id': datum.policy.id})
class RulesTable(tables.DataTable):
name = tables.Column("name",
verbose_name=_("Name"),
link="horizon:project:firewalls:ruledetails")
protocol = tables.Column("protocol",
filters=(lambda v: filters.default(v, _("ANY")),
filters.upper,),
verbose_name=_("Protocol"))
source_ip_address = tables.Column("source_ip_address",
verbose_name=_("Source IP"))
@ -141,10 +159,12 @@ class RulesTable(tables.DataTable):
destination_port = tables.Column("destination_port",
verbose_name=_("Destination Port"))
action = tables.Column("action",
filters=(filters.upper,),
verbose_name=_("Action"))
enabled = tables.Column("enabled",
verbose_name=_("Enabled"))
firewall_policy_id = tables.Column("firewall_policy_id",
firewall_policy_id = tables.Column(get_policy_name,
link=get_policy_link,
verbose_name=_("In Policy"))
class Meta:
@ -158,7 +178,7 @@ class PoliciesTable(tables.DataTable):
name = tables.Column("name",
verbose_name=_("Name"),
link="horizon:project:firewalls:policydetails")
firewall_rules = tables.Column("firewall_rules",
firewall_rules = tables.Column(get_rules_name,
verbose_name=_("Rules"))
audited = tables.Column("audited",
verbose_name=_("Audited"))
@ -175,7 +195,8 @@ class FirewallsTable(tables.DataTable):
name = tables.Column("name",
verbose_name=_("Name"),
link="horizon:project:firewalls:firewalldetails")
firewall_policy_id = tables.Column("firewall_policy_id",
firewall_policy_id = tables.Column(get_policy_name,
link=get_policy_link,
verbose_name=_("Policy"))
status = tables.Column("status",
verbose_name=_("Status"))

View File

@ -15,6 +15,7 @@
#
# @author: KC Wang, Big Switch Networks
from django.core.urlresolvers import reverse_lazy # noqa
from django.utils.translation import ugettext_lazy as _ # noqa
from horizon import exceptions
@ -96,15 +97,16 @@ class RuleDetailsTab(tabs.Tab):
name = _("Firewall Rule Details")
slug = "ruledetails"
template_name = "project/firewalls/_rule_details.html"
failure_url = reverse_lazy('horizon:project:firewalls:index')
def get_context_data(self, request):
rid = self.tab_group.kwargs['rule_id']
try:
rule = api.fwaas.rule_get(request, rid)
except Exception:
rule = []
exceptions.handle(request,
_('Unable to retrieve rule details.'))
_('Unable to retrieve rule details.'),
redirect=self.failure_url)
return {'rule': rule}
@ -112,15 +114,16 @@ class PolicyDetailsTab(tabs.Tab):
name = _("Firewall Policy Details")
slug = "policydetails"
template_name = "project/firewalls/_policy_details.html"
failure_url = reverse_lazy('horizon:project:firewalls:index')
def get_context_data(self, request):
pid = self.tab_group.kwargs['policy_id']
try:
policy = api.fwaas.policy_get(request, pid)
except Exception:
policy = []
exceptions.handle(request,
_('Unable to retrieve policy details.'))
_('Unable to retrieve policy details.'),
redirect=self.failure_url)
return {'policy': policy}
@ -128,15 +131,16 @@ class FirewallDetailsTab(tabs.Tab):
name = _("Firewall Details")
slug = "firewalldetails"
template_name = "project/firewalls/_firewall_details.html"
failure_url = reverse_lazy('horizon:project:firewalls:index')
def get_context_data(self, request):
fid = self.tab_group.kwargs['firewall_id']
try:
firewall = api.fwaas.firewall_get(request, fid)
except Exception:
firewall = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve firewall details.'))
exceptions.handle(request,
_('Unable to retrieve firewall details.'),
redirect=self.failure_url)
return {'firewall': firewall}

View File

@ -5,10 +5,10 @@
<hr class="header_rule">
<dl>
<dt>{% trans "Name" %}</dt>
<dd>{{ firewall.name|default:_("None") }}</dd>
<dd>{{ firewall.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ firewall.description|default:_("None") }}</dd>
<dd>{{ firewall.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ firewall.id }} </dd>
@ -19,13 +19,13 @@
<dt>{% trans "Policy ID" %}</dt>
<dd>
{% url 'horizon:project:firewalls:policydetails' firewall.firewall_policy_id as policy_url%}
<a href="{{ policy_url }}">{{ firewall.firewall_policy_id }}</a>
<a href="{{ policy_url }}">{{ firewall.policy.name|default:firewall.policy.id }}</a>
</dd>
<dt>{% trans "Status" %}</dt>
<dd>{{ firewall.status }}</dd>
<dt>{% trans "Admin State Up" %}</dt>
<dd>{{ firewall.admin_state_up }}</dd>
<dd>{{ firewall.admin_state_up|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -1,13 +1,14 @@
{% load i18n sizeformat parse_date %}
{% load url from future %}
<div class="info row-fluid detail">
<hr class="header_rule">
<dl>
<dt>{% trans "Name" %}</dt>
<dd>{{ policy.name|default:_("None") }}</dd>
<dd>{{ policy.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ policy.description|default:_("None") }}</dd>
<dd>{{ policy.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ policy.id }}</dd>
@ -16,12 +17,21 @@
<dd>{{ policy.tenant_id }}</dd>
<dt>{% trans "Rules" %}</dt>
<dd>{{ policy.firewall_rules }}</dd>
<dd>
{% if policy.rules %}
{% for rule in policy.rules %}
{% url 'horizon:project:firewalls:ruledetails' rule.id as rule_url %}
{{ rule.position }} : <a href="{{ rule_url }}">{{ rule.name|default:rule.id }}</a><br>
{% endfor %}
{% else %}
{% trans "-" %}
{% endif %}
</dd>
<dt>{% trans "Shared" %}</dt>
<dd>{{ policy.shared }}</dd>
<dd>{{ policy.shared|yesno|capfirst }}</dd>
<dt>{% trans "Audited" %}</dt>
<dd>{{ policy.audited }}</dd>
<dd>{{ policy.audited|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -5,10 +5,10 @@
<hr class="header_rule">
<dl>
<dt>{% trans "Name" %}</dt>
<dd>{{ rule.name|default:_("None") }}</dd>
<dd>{{ rule.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ rule.description|default:_("None") }}</dd>
<dd>{{ rule.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ rule.id }}</dd>
@ -17,36 +17,40 @@
<dd>{{ rule.tenant_id }}</dd>
<dt>{% trans "Action" %}</dt>
<dd>{{ rule.action }}</dd>
<dd>{{ rule.action|upper }}</dd>
<dt>{% trans "Protocol" %}</dt>
<dd>{{ rule.protocol }}</dd>
<dd>{{ rule.protocol|default:_("ANY")|upper }}</dd>
<dt>{% trans "Source IP Address" %}</dt>
<dd>{{ rule.source_ip_address }}</dd>
<dd>{{ rule.source_ip_address|default:_("ANY") }}</dd>
<dt>{% trans "Source Port" %}</dt>
<dd>{{ rule.source_port }}</dd>
<dd>{{ rule.source_port|default:_("ANY") }}</dd>
<dt>{% trans "Destination IP Address" %}</dt>
<dd>{{ rule.destination_ip_address }}</dd>
<dd>{{ rule.destination_ip_address|default:_("ANY") }}</dd>
<dt>{% trans "Destination Port"%}</dt>
<dd>{{ rule.destination_port }}</dd>
<dd>{{ rule.destination_port|default:_("ANY") }}</dd>
<dt>{% trans "Used in Policy" %}</dt>
<dd>
{% url 'horizon:project:firewalls:policydetails' rule.firewall_policy_id as policy_url%}
<a href="{{ policy_url }}">{{ rule.firewall_policy_id }}</a>
{% if rule.policy %}
{% url 'horizon:project:firewalls:policydetails' rule.policy.id as policy_url %}
<a href="{{ policy_url }}">{{ rule.policy.name|default:rule.policy.id }}</a>
{% else %}
{% trans "-" %}
{% endif %}
</dd>
<dt>{% trans "Position in Policy" %}</dt>
<dd>{{ rule.position }}</dd>
<dd>{{ rule.position|default:_("-") }}</dd>
<dt>{% trans "Shared" %}</dt>
<dd>{{ rule.shared }}</dd>
<dd>{{ rule.shared|yesno|capfirst }}</dd>
<dt>{% trans "Enabled" %}</dt>
<dd>{{ rule.enabled }}</dd>
<dd>{{ rule.enabled|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -449,9 +449,8 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_get',
'policy_insert_rule',
'rules_list')})
@test.create_stubs({api.fwaas: ('policy_get', 'policy_insert_rule',
'rules_list', 'rule_get')})
def test_policy_insert_rule(self):
policy = self.fw_policies.first()
tenant_id = policy.tenant_id
@ -472,6 +471,8 @@ class FirewallTests(test.TestCase):
api.fwaas.rules_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_get(
IsA(http.HttpRequest), new_rule_id).AndReturn(rules[2])
api.fwaas.policy_insert_rule(IsA(http.HttpRequest), policy.id, **data)\
.AndReturn(policy)
@ -484,7 +485,7 @@ class FirewallTests(test.TestCase):
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_get', 'policy_remove_rule',
'rules_list',)})
'rules_list', 'rule_get')})
def test_policy_remove_rule(self):
policy = self.fw_policies.first()
tenant_id = policy.tenant_id
@ -508,6 +509,8 @@ class FirewallTests(test.TestCase):
policy.id).AndReturn(policy)
api.fwaas.rules_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_get(
IsA(http.HttpRequest), remove_rule_id).AndReturn(rules[0])
api.fwaas.policy_remove_rule(IsA(http.HttpRequest), policy.id, **data)\
.AndReturn(after_remove_policy)

View File

@ -49,32 +49,46 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(rule1.name, ret_val.name)
self.assertTrue(ret_val.id)
@test.create_stubs({neutronclient: ('list_firewall_rules',)})
@test.create_stubs({neutronclient: ('list_firewall_rules',
'list_firewall_policies')})
def test_rules_list(self):
rules = {'firewall_rules': self.fw_rules.list()}
rules_dict = {'firewall_rules': self.api_fw_rules.list()}
neutronclient.list_firewall_rules().AndReturn(rules_dict)
exp_rules = self.fw_rules.list()
api_rules = {'firewall_rules': self.api_fw_rules.list()}
api_policies = {'firewall_policies': self.api_fw_policies.list()}
neutronclient.list_firewall_rules().AndReturn(api_rules)
neutronclient.list_firewall_policies().AndReturn(api_policies)
self.mox.ReplayAll()
ret_val = api.fwaas.rules_list(self.request)
for (v, d) in zip(ret_val, rules['firewall_rules']):
for (v, d) in zip(ret_val, exp_rules):
self.assertIsInstance(v, api.fwaas.Rule)
self.assertTrue(v.name, d.name)
self.assertEqual(v.name, d.name)
self.assertTrue(v.id)
if d.policy:
self.assertEqual(v.policy.id, d.firewall_policy_id)
self.assertEqual(v.policy.name, d.policy.name)
else:
self.assertIsNone(v.policy)
@test.create_stubs({neutronclient: ('show_firewall_rule',)})
@test.create_stubs({neutronclient: ('show_firewall_rule',
'show_firewall_policy')})
def test_rule_get(self):
rule = self.fw_rules.first()
rule_dict = self.api_fw_rules.first()
ret_dict = {'firewall_rule': rule_dict}
exp_rule = self.fw_rules.first()
ret_dict = {'firewall_rule': self.api_fw_rules.first()}
policy_dict = {'firewall_policy': self.api_fw_policies.first()}
neutronclient.show_firewall_rule(rule.id).AndReturn(ret_dict)
neutronclient.show_firewall_rule(exp_rule.id).AndReturn(ret_dict)
neutronclient.show_firewall_policy(
exp_rule.firewall_policy_id).AndReturn(policy_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.rule_get(self.request, rule.id)
ret_val = api.fwaas.rule_get(self.request, exp_rule.id)
self.assertIsInstance(ret_val, api.fwaas.Rule)
self.assertEqual(rule.name, ret_val.name)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(ret_val.policy.id, exp_rule.firewall_policy_id)
self.assertEqual(ret_val.policy.name, exp_rule.policy.name)
@test.create_stubs({neutronclient: ('update_firewall_rule',)})
def test_rule_update(self):
@ -137,33 +151,65 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(policy1.name, ret_val.name)
self.assertTrue(ret_val.id)
@test.create_stubs({neutronclient: ('list_firewall_policies',)})
@test.create_stubs({neutronclient: ('list_firewall_policies',
'list_firewall_rules')})
def test_policies_list(self):
policies = {'firewall_policies': self.fw_policies.list()}
exp_policies = self.fw_policies.list()
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
rules_dict = {'firewall_rules': self.api_fw_rules.list()}
neutronclient.list_firewall_policies().AndReturn(policies_dict)
neutronclient.list_firewall_rules().AndReturn(rules_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.policies_list(self.request)
for (v, d) in zip(ret_val, policies['firewall_policies']):
for (v, d) in zip(ret_val, exp_policies):
self.assertIsInstance(v, api.fwaas.Policy)
self.assertTrue(v.name, d.name)
self.assertEqual(v.name, d.name)
self.assertTrue(v.id)
self.assertEqual(len(d.firewall_rules), len(v.rules))
self.assertEqual(len(d.firewall_rules), len(v.firewall_rules))
for (r, exp_r) in zip(v.rules, d.rules):
self.assertEqual(r.id, exp_r.id)
@test.create_stubs({neutronclient: ('show_firewall_policy',)})
@test.create_stubs({neutronclient: ('show_firewall_policy',
'list_firewall_rules')})
def test_policy_get(self):
policy = self.fw_policies.first()
exp_policy = self.fw_policies.first()
policy_dict = self.api_fw_policies.first()
# The first two rules are associated with the first policy.
api_rules = self.api_fw_rules.list()[:2]
ret_dict = {'firewall_policy': policy_dict}
neutronclient.show_firewall_policy(policy.id).AndReturn(ret_dict)
neutronclient.show_firewall_policy(exp_policy.id).AndReturn(ret_dict)
filters = {'firewall_policy_id': exp_policy.id}
ret_dict = {'firewall_rules': api_rules}
neutronclient.list_firewall_rules(**filters).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.policy_get(self.request, policy.id)
ret_val = api.fwaas.policy_get(self.request, exp_policy.id)
self.assertIsInstance(ret_val, api.fwaas.Policy)
self.assertEqual(policy.name, ret_val.name)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(len(exp_policy.rules), len(ret_val.rules))
for (exp, ret) in zip(exp_policy.rules, ret_val.rules):
self.assertEqual(exp.id, ret.id)
@test.create_stubs({neutronclient: ('show_firewall_policy',)})
def test_policy_get_no_rule(self):
# 2nd policy is not associated with any rules.
exp_policy = self.fw_policies.list()[1]
policy_dict = self.api_fw_policies.list()[1]
ret_dict = {'firewall_policy': policy_dict}
neutronclient.show_firewall_policy(exp_policy.id).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.policy_get(self.request, exp_policy.id)
self.assertIsInstance(ret_val, api.fwaas.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertFalse(len(ret_val.rules))
@test.create_stubs({neutronclient: ('update_firewall_policy',)})
def test_policy_update(self):
@ -218,7 +264,7 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.policy_insert_rule(self.request,
policy.id, **body)
self.assertTrue(new_rule_id in ret_val.firewall_rules)
self.assertIn(new_rule_id, ret_val.firewall_rules)
@test.create_stubs({neutronclient: ('firewall_policy_remove_rule',)})
def test_policy_remove_rule(self):
@ -226,7 +272,7 @@ class FwaasApiTests(test.APITestCase):
policy_dict = self.api_fw_policies.first()
remove_rule_id = policy.firewall_rules[0]
policy.firewall_rules.remove(remove_rule_id)
policy_dict['firewall_rules'].remove(remove_rule_id)
body = {'firewall_rule_id': remove_rule_id}
@ -236,7 +282,7 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.policy_remove_rule(self.request,
policy.id, **body)
self.assertFalse(remove_rule_id in ret_val.firewall_rules)
self.assertNotIn(remove_rule_id, ret_val.firewall_rules)
@test.create_stubs({neutronclient: ('create_firewall', )})
def test_firewall_create(self):
@ -260,34 +306,43 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
@test.create_stubs({neutronclient: ('list_firewalls',)})
@test.create_stubs({neutronclient: ('list_firewalls',
'list_firewall_policies')})
def test_firewalls_list(self):
firewalls = {'firewalls': self.firewalls.list()}
exp_firewalls = self.firewalls.list()
firewalls_dict = {'firewalls': self.api_firewalls.list()}
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
neutronclient.list_firewalls().AndReturn(firewalls_dict)
neutronclient.list_firewall_policies().AndReturn(policies_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.firewalls_list(self.request)
for (v, d) in zip(ret_val, firewalls['firewalls']):
for (v, d) in zip(ret_val, exp_firewalls):
self.assertIsInstance(v, api.fwaas.Firewall)
self.assertTrue(v.name, d.name)
self.assertEqual(v.name, d.name)
self.assertTrue(v.id)
self.assertEqual(v.policy.id, d.firewall_policy_id)
self.assertEqual(v.policy.name, d.policy.name)
@test.create_stubs({neutronclient: ('show_firewall',)})
@test.create_stubs({neutronclient: ('show_firewall',
'show_firewall_policy')})
def test_firewall_get(self):
firewall = self.firewalls.first()
firewall_dict = self.api_firewalls.first()
exp_firewall = self.firewalls.first()
ret_dict = {'firewall': self.api_firewalls.first()}
policy_dict = {'firewall_policy': self.api_fw_policies.first()}
ret_dict = {'firewall': firewall_dict}
neutronclient.show_firewall(firewall.id).AndReturn(ret_dict)
neutronclient.show_firewall(exp_firewall.id).AndReturn(ret_dict)
neutronclient.show_firewall_policy(
exp_firewall.firewall_policy_id).AndReturn(policy_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.firewall_get(self.request, firewall.id)
ret_val = api.fwaas.firewall_get(self.request, exp_firewall.id)
self.assertIsInstance(ret_val, api.fwaas.Firewall)
self.assertEqual(firewall.name, ret_val.name)
self.assertEqual(exp_firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(ret_val.policy.id, exp_firewall.firewall_policy_id)
self.assertEqual(ret_val.policy.name, exp_firewall.policy.name)
@test.create_stubs({neutronclient: ('update_firewall',)})
def test_firewall_update(self):

View File

@ -721,7 +721,7 @@ def data(TEST):
# FWaaS
# 1st rule
# 1st rule (used by 1st policy)
rule1_dict = {'id': 'f0881d38-c3eb-4fee-9763-12de3338041d',
'tenant_id': '1',
'name': 'rule1',
@ -737,13 +737,16 @@ def data(TEST):
'shared': True,
'enabled': True}
TEST.api_fw_rules.add(rule1_dict)
TEST.fw_rules.add(fwaas.Rule(rule1_dict))
# 2nd rule
rule2_dict = {'id': 'g0881d38-c3eb-4fee-9763-12de3338041d',
rule1 = fwaas.Rule(copy.deepcopy(rule1_dict))
# NOTE: rule1['policy'] is set below
TEST.fw_rules.add(rule1)
# 2nd rule (used by 2nd policy; no name)
rule2_dict = {'id': 'c6298a93-850f-4f64-b78a-959fd4f1e5df',
'tenant_id': '1',
'name': 'rule2',
'description': 'rule2 description',
'name': '',
'description': '',
'protocol': 'udp',
'action': 'deny',
'source_ip_address': '1.2.3.0/24',
@ -755,9 +758,12 @@ def data(TEST):
'shared': True,
'enabled': True}
TEST.api_fw_rules.add(rule2_dict)
TEST.fw_rules.add(fwaas.Rule(rule2_dict))
# 3rd rule
rule2 = fwaas.Rule(copy.deepcopy(rule2_dict))
# NOTE: rule2['policy'] is set below
TEST.fw_rules.add(rule2)
# 3rd rule (not used by any policy)
rule3_dict = {'id': 'h0881d38-c3eb-4fee-9763-12de3338041d',
'tenant_id': '1',
'name': 'rule3',
@ -773,28 +779,72 @@ def data(TEST):
'shared': True,
'enabled': True}
TEST.api_fw_rules.add(rule3_dict)
TEST.fw_rules.add(fwaas.Rule(rule3_dict))
# 1st policy
policy_dict = {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'tenant_id': '1',
'name': 'policy1',
'description': 'policy description',
'firewall_rules': [rule1_dict['id'], rule2_dict['id']],
'audited': True,
'shared': True}
TEST.api_fw_policies.add(policy_dict)
TEST.fw_policies.add(fwaas.Policy(policy_dict))
rule3 = fwaas.Rule(copy.deepcopy(rule3_dict))
# rule3 is not associated with any rules
rule3._apidict['policy'] = None
TEST.fw_rules.add(rule3)
# 1st policy (associated with 2 rules)
policy1_dict = {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'tenant_id': '1',
'name': 'policy1',
'description': 'policy with two rules',
'firewall_rules': [rule1_dict['id'], rule2_dict['id']],
'audited': True,
'shared': True}
TEST.api_fw_policies.add(policy1_dict)
policy1 = fwaas.Policy(copy.deepcopy(policy1_dict))
policy1._apidict['rules'] = [rule1, rule2]
TEST.fw_policies.add(policy1)
# Reverse relations (rule -> policy)
rule1._apidict['policy'] = policy1
rule2._apidict['policy'] = policy1
# 2nd policy (associated with no rules; no name)
policy2_dict = {'id': 'cf50b331-787a-4623-825e-da794c918d6a',
'tenant_id': '1',
'name': '',
'description': '',
'firewall_rules': [],
'audited': False,
'shared': False}
TEST.api_fw_policies.add(policy2_dict)
policy2 = fwaas.Policy(copy.deepcopy(policy2_dict))
policy2._apidict['rules'] = []
TEST.fw_policies.add(policy2)
# 1st firewall
firewall_dict = {'id': '8913dde8-4915-4b90-8d3e-b95eeedb0d49',
'tenant_id': '1',
'firewall_policy_id':
'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'firewall1',
'description': 'firewall description',
'status': 'PENDING_CREATE',
'shared': True,
'admin_state_up': True}
TEST.api_firewalls.add(firewall_dict)
TEST.firewalls.add(fwaas.Firewall(firewall_dict))
fw1_dict = {'id': '8913dde8-4915-4b90-8d3e-b95eeedb0d49',
'tenant_id': '1',
'firewall_policy_id':
'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'firewall1',
'description': 'firewall description',
'status': 'PENDING_CREATE',
'shared': True,
'admin_state_up': True}
TEST.api_firewalls.add(fw1_dict)
fw1 = fwaas.Firewall(copy.deepcopy(fw1_dict))
fw1._apidict['policy'] = policy1
TEST.firewalls.add(fw1)
# 2nd firewall (no name)
fw2_dict = {'id': '1aa75150-415f-458e-bae5-5a362a4fb1f7',
'tenant_id': '1',
'firewall_policy_id':
'abcdef-c3eb-4fee-9763-12de3338041e',
'name': '',
'description': '',
'status': 'PENDING_CREATE',
'shared': True,
'admin_state_up': True}
TEST.api_firewalls.add(fw1_dict)
fw2 = fwaas.Firewall(copy.deepcopy(fw2_dict))
fw2._apidict['policy'] = policy1
TEST.firewalls.add(fw1)