List shared firewall policies/rules in Project panel

When firewall policy/rule is set to shared, it should be seen
by any users.

This commit reimplements the support of shared firewall policies
and rules with honoring Neutron listing behavior for admin role.

Closes-bug: #1294541
Change-Id: Ie7142b10234e720b65f6540f08c7a092939e3ea8
This commit is contained in:
Akihiro Motoki 2015-02-15 21:48:30 +09:00
parent df03be6aa3
commit 9736609e53
6 changed files with 214 additions and 116 deletions

View File

@ -74,10 +74,22 @@ def rule_list(request, **kwargs):
return _rule_list(request, expand_policy=True, **kwargs)
def rule_list_for_tenant(request, tenant_id, **kwargs):
"""Return a rule list available for the tenant.
The list contains rules owned by the tenant and shared rules.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
rules = rule_list(request, tenant_id=tenant_id, shared=False, **kwargs)
shared_rules = rule_list(request, shared=True, **kwargs)
return rules + shared_rules
def _rule_list(request, expand_policy, **kwargs):
rules = neutronclient(request).list_firewall_rules(
**kwargs).get('firewall_rules')
if expand_policy:
if expand_policy and rules:
policies = _policy_list(request, expand_rule=False)
policy_dict = SortedDict((p.id, p) for p in policies)
for rule in rules:
@ -133,10 +145,23 @@ def policy_list(request, **kwargs):
return _policy_list(request, expand_rule=True, **kwargs)
def policy_list_for_tenant(request, tenant_id, **kwargs):
"""Return a policy list available for the tenant.
The list contains policies owned by the tenant and shared policies.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
policies = policy_list(request, tenant_id=tenant_id,
shared=False, **kwargs)
shared_policies = policy_list(request, shared=True, **kwargs)
return policies + shared_policies
def _policy_list(request, expand_rule, **kwargs):
policies = neutronclient(request).list_firewall_policies(
**kwargs).get('firewall_policies')
if expand_rule:
if expand_rule and policies:
rules = _rule_list(request, expand_policy=False)
rule_dict = SortedDict((rule.id, rule) for rule in rules)
for p in policies:
@ -206,10 +231,25 @@ def firewall_list(request, **kwargs):
return _firewall_list(request, expand_policy=True, **kwargs)
def firewall_list_for_tenant(request, tenant_id, **kwargs):
"""Return a firewall list available for the tenant.
The list contains firewalls owned by the tenant and shared firewalls.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
# NOTE(amotoki): At now 'shared' attribute is not visible in Neutron
# and there is no way to query shared firewalls explicitly.
# Thus this method returns the same as when tenant_id is specified,
# but I would like to have this method for symmetry to firewall
# rules and policies to avoid unnecessary confusion.
return firewall_list(request, tenant_id=tenant_id, **kwargs)
def _firewall_list(request, expand_policy, **kwargs):
firewalls = neutronclient(request).list_firewalls(
**kwargs).get('firewalls')
if expand_policy:
if expand_policy and firewalls:
policies = _policy_list(request, expand_rule=False)
policy_dict = SortedDict((p.id, p) for p in policies)
for fw in firewalls:

View File

@ -139,7 +139,7 @@ class UpdateFirewall(forms.SelfHandlingForm):
try:
tenant_id = self.request.user.tenant_id
policies = api.fwaas.policy_list(request, tenant_id=tenant_id)
policies = api.fwaas.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=lambda policy: policy.name)
except Exception:
exceptions.handle(request,
@ -187,9 +187,9 @@ class InsertRuleToPolicy(forms.SelfHandlingForm):
def __init__(self, request, *args, **kwargs):
super(InsertRuleToPolicy, self).__init__(request, *args, **kwargs)
tenant_id = self.request.user.tenant_id
try:
all_rules = api.fwaas.rule_list(request, tenant_id=tenant_id)
tenant_id = self.request.user.tenant_id
all_rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
all_rules = sorted(all_rules, key=lambda rule: rule.name_or_id)
available_rules = [r for r in all_rules
@ -246,9 +246,9 @@ class RemoveRuleFromPolicy(forms.SelfHandlingForm):
def __init__(self, request, *args, **kwargs):
super(RemoveRuleFromPolicy, self).__init__(request, *args, **kwargs)
tenant_id = request.user.tenant_id
try:
all_rules = api.fwaas.rule_list(request, tenant_id=tenant_id)
tenant_id = request.user.tenant_id
all_rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
current_rules = []
for r in kwargs['initial']['firewall_rules']:

View File

@ -35,8 +35,8 @@ class RulesTab(tabs.TableTab):
def get_rulestable_data(self):
try:
tenant_id = self.request.user.tenant_id
rules = api.fwaas.rule_list(self.tab_group.request,
tenant_id=tenant_id)
request = self.tab_group.request
rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
except Exception:
rules = []
exceptions.handle(self.tab_group.request,
@ -54,8 +54,8 @@ class PoliciesTab(tabs.TableTab):
def get_policiestable_data(self):
try:
tenant_id = self.request.user.tenant_id
policies = api.fwaas.policy_list(self.tab_group.request,
tenant_id=tenant_id)
request = self.tab_group.request
policies = api.fwaas.policy_list_for_tenant(request, tenant_id)
except Exception:
policies = []
exceptions.handle(self.tab_group.request,
@ -73,8 +73,8 @@ class FirewallsTab(tabs.TableTab):
def get_firewallstable_data(self):
try:
tenant_id = self.request.user.tenant_id
firewalls = api.fwaas.firewall_list(self.tab_group.request,
tenant_id=tenant_id)
request = self.tab_group.request
firewalls = api.fwaas.firewall_list_for_tenant(request, tenant_id)
except Exception:
firewalls = []
exceptions.handle(self.tab_group.request,

View File

@ -53,36 +53,36 @@ class FirewallTests(test.TestCase):
# retrieve rules
tenant_id = self.tenant.id
api.fwaas.rule_list(
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndReturn(self.fw_rules.list())
tenant_id).AndReturn(self.fw_rules.list())
# retrieves policies
policies = self.fw_policies.list()
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
# retrieves firewalls
firewalls = self.firewalls.list()
api.fwaas.firewall_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(firewalls)
api.fwaas.firewall_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(firewalls)
def set_up_expect_with_exception(self):
tenant_id = self.tenant.id
api.fwaas.rule_list(
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.policy_list(
tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.firewall_list(
tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.firewall_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndRaise(self.exceptions.neutron)
tenant_id).AndRaise(self.exceptions.neutron)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_firewalls(self):
self.set_up_expect()
@ -98,9 +98,9 @@ class FirewallTests(test.TestCase):
self.assertEqual(len(res.context['table'].data),
len(self.firewalls.list()))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_policies(self):
self.set_up_expect()
@ -117,9 +117,9 @@ class FirewallTests(test.TestCase):
self.assertEqual(len(res.context['policiestable_table'].data),
len(self.fw_policies.list()))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_rules(self):
self.set_up_expect()
@ -136,9 +136,9 @@ class FirewallTests(test.TestCase):
self.assertEqual(len(res.context['rulestable_table'].data),
len(self.fw_rules.list()))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_exception_firewalls(self):
self.set_up_expect_with_exception()
@ -155,9 +155,9 @@ class FirewallTests(test.TestCase):
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data), 0)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_exception_policies(self):
self.set_up_expect_with_exception()
@ -175,9 +175,9 @@ class FirewallTests(test.TestCase):
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data), 0)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_exception_rules(self):
self.set_up_expect_with_exception()
@ -242,7 +242,8 @@ class FirewallTests(test.TestCase):
self.assertFormErrors(res, 2)
@test.create_stubs({api.fwaas: ('policy_create', 'rule_list'), })
@test.create_stubs({api.fwaas: ('policy_create',
'rule_list_for_tenant'), })
def test_add_policy_post(self):
policy = self.fw_policies.first()
rules = self.fw_rules.list()
@ -268,8 +269,8 @@ class FirewallTests(test.TestCase):
for rule in rules:
if rule.id in policy.firewall_rules:
rule.firewall_policy_id = rule.policy = None
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api.fwaas.policy_create(
IsA(http.HttpRequest), **form_data).AndReturn(policy)
@ -280,7 +281,8 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_create', 'rule_list'), })
@test.create_stubs({api.fwaas: ('policy_create',
'rule_list_for_tenant'), })
def test_add_policy_post_with_error(self):
policy = self.fw_policies.first()
rules = self.fw_rules.list()
@ -290,8 +292,8 @@ class FirewallTests(test.TestCase):
'shared': policy.shared,
'audited': policy.audited
}
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
self.mox.ReplayAll()
@ -299,7 +301,8 @@ class FirewallTests(test.TestCase):
self.assertFormErrors(res, 1)
@test.create_stubs({api.fwaas: ('firewall_create', 'policy_list'), })
@test.create_stubs({api.fwaas: ('firewall_create',
'policy_list_for_tenant'), })
def test_add_firewall_post(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
@ -310,8 +313,8 @@ class FirewallTests(test.TestCase):
'shared': firewall.shared,
'admin_state_up': firewall.admin_state_up
}
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api.fwaas.firewall_create(
IsA(http.HttpRequest), **form_data).AndReturn(firewall)
@ -322,7 +325,8 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('firewall_create', 'policy_list'), })
@test.create_stubs({api.fwaas: ('firewall_create',
'policy_list_for_tenant'), })
def test_add_firewall_post_with_error(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
@ -333,8 +337,8 @@ class FirewallTests(test.TestCase):
'shared': firewall.shared,
'admin_state_up': firewall.admin_state_up
}
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
self.mox.ReplayAll()
@ -469,7 +473,7 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, 'project/firewalls/updatepolicy.html')
@test.create_stubs({api.fwaas: ('policy_get', 'policy_update',
'rule_list')})
'rule_list_for_tenant')})
def test_update_policy_post(self):
policy = self.fw_policies.first()
@ -493,14 +497,14 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list')})
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list_for_tenant')})
def test_update_firewall_get(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
tenant_id = self.tenant.id
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api.fwaas.firewall_get(IsA(http.HttpRequest),
firewall.id).AndReturn(firewall)
@ -512,7 +516,7 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, 'project/firewalls/updatefirewall.html')
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list',
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list_for_tenant',
'firewall_update')})
def test_update_firewall_post(self):
firewall = self.firewalls.first()
@ -527,8 +531,8 @@ class FirewallTests(test.TestCase):
}
policies = self.fw_policies.list()
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api.fwaas.firewall_update(IsA(http.HttpRequest), firewall.id, **data)\
.AndReturn(firewall)
@ -542,7 +546,7 @@ class FirewallTests(test.TestCase):
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_get', 'policy_insert_rule',
'rule_list', 'rule_get')})
'rule_list_for_tenant', 'rule_get')})
def test_policy_insert_rule(self):
policy = self.fw_policies.first()
tenant_id = self.tenant.id
@ -561,8 +565,8 @@ class FirewallTests(test.TestCase):
new_rule_id,
rules[1].id]
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api.fwaas.rule_get(
IsA(http.HttpRequest), new_rule_id).AndReturn(rules[2])
api.fwaas.policy_insert_rule(IsA(http.HttpRequest), policy.id, **data)\
@ -577,7 +581,7 @@ class FirewallTests(test.TestCase):
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_get', 'policy_remove_rule',
'rule_list', 'rule_get')})
'rule_list_for_tenant', 'rule_get')})
def test_policy_remove_rule(self):
policy = self.fw_policies.first()
tenant_id = self.tenant.id
@ -599,8 +603,8 @@ class FirewallTests(test.TestCase):
api.fwaas.policy_get(IsA(http.HttpRequest),
policy.id).AndReturn(policy)
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api.fwaas.rule_get(
IsA(http.HttpRequest), remove_rule_id).AndReturn(rules[0])
api.fwaas.policy_remove_rule(IsA(http.HttpRequest), policy.id, **data)\
@ -614,9 +618,9 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list',
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'rule_delete')})
def test_delete_rule(self):
self.set_up_expect()
@ -629,9 +633,9 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list',
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'policy_delete')})
def test_delete_policy(self):
self.set_up_expect()
@ -644,9 +648,9 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list',
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'firewall_delete')})
def test_delete_firewall(self):
self.set_up_expect()

View File

@ -141,7 +141,7 @@ class SelectRulesAction(workflows.Action):
def populate_rule_choices(self, request, context):
try:
tenant_id = self.request.user.tenant_id
rules = api.fwaas.rule_list(request, tenant_id=tenant_id)
rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
rules = sorted(rules,
key=lambda rule: rule.name_or_id)
rule_list = [(rule.id, rule.name_or_id) for rule in rules
@ -246,7 +246,7 @@ class AddFirewallAction(workflows.Action):
firewall_policy_id_choices = [('', _("Select a Policy"))]
try:
tenant_id = self.request.user.tenant_id
policies = api.fwaas.policy_list(request, tenant_id=tenant_id)
policies = api.fwaas.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=lambda policy: policy.name)
except Exception as e:
exceptions.handle(

View File

@ -44,6 +44,16 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(rule1.name, ret_val.name)
self.assertTrue(ret_val.id)
def _assert_rule_return_value(self, ret_val, exp_rule):
self.assertIsInstance(ret_val, api.fwaas.Rule)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
if exp_rule.policy:
self.assertEqual(exp_rule.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_rule.policy.name, ret_val.policy.name)
else:
self.assertIsNone(ret_val.policy)
@test.create_stubs({neutronclient: ('list_firewall_rules',
'list_firewall_policies')})
def test_rule_list(self):
@ -57,14 +67,27 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.rule_list(self.request)
for (v, d) in zip(ret_val, exp_rules):
self.assertIsInstance(v, api.fwaas.Rule)
self.assertEqual(d.name, v.name)
self.assertTrue(v.id)
if d.policy:
self.assertEqual(d.firewall_policy_id, v.policy.id, )
self.assertEqual(d.policy.name, v.policy.name)
else:
self.assertIsNone(v.policy)
self._assert_rule_return_value(v, d)
@test.create_stubs({neutronclient: ('list_firewall_rules',
'list_firewall_policies')})
def test_rule_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_rules = self.fw_rules.list()
api_rules = {'firewall_rules': self.api_fw_rules.list()}
api_policies = {'firewall_policies': self.api_fw_policies.list()}
neutronclient.list_firewall_rules(
tenant_id=tenant_id,
shared=False).AndReturn({'firewall_rules': []})
neutronclient.list_firewall_rules(shared=True) \
.AndReturn(api_rules)
neutronclient.list_firewall_policies().AndReturn(api_policies)
self.mox.ReplayAll()
ret_val = api.fwaas.rule_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
@test.create_stubs({neutronclient: ('show_firewall_rule',
'show_firewall_policy')})
@ -79,11 +102,7 @@ class FwaasApiTests(test.APITestCase):
self.mox.ReplayAll()
ret_val = api.fwaas.rule_get(self.request, exp_rule.id)
self.assertIsInstance(ret_val, api.fwaas.Rule)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_rule.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_rule.policy.name, ret_val.policy.name)
self._assert_rule_return_value(ret_val, exp_rule)
@test.create_stubs({neutronclient: ('update_firewall_rule',)})
def test_rule_update(self):
@ -146,6 +165,16 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(policy1.name, ret_val.name)
self.assertTrue(ret_val.id)
def _assert_policy_return_value(self, ret_val, exp_policy):
self.assertIsInstance(ret_val, api.fwaas.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(len(exp_policy.firewall_rules), len(ret_val.rules))
self.assertEqual(len(exp_policy.firewall_rules),
len(ret_val.firewall_rules))
for (r, exp_r) in zip(ret_val.rules, exp_policy.rules):
self.assertEqual(exp_r.id, r.id)
@test.create_stubs({neutronclient: ('list_firewall_policies',
'list_firewall_rules')})
def test_policy_list(self):
@ -159,13 +188,27 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.policy_list(self.request)
for (v, d) in zip(ret_val, exp_policies):
self.assertIsInstance(v, api.fwaas.Policy)
self.assertEqual(d.name, v.name)
self.assertTrue(v.id)
self.assertEqual(len(d.firewall_rules), len(v.rules))
self.assertEqual(len(d.firewall_rules), len(v.firewall_rules))
for (r, exp_r) in zip(v.rules, d.rules):
self.assertEqual(exp_r.id, r.id)
self._assert_policy_return_value(v, d)
@test.create_stubs({neutronclient: ('list_firewall_policies',
'list_firewall_rules')})
def test_policy_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_policies = self.fw_policies.list()
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
rules_dict = {'firewall_rules': self.api_fw_rules.list()}
neutronclient.list_firewall_policies(
tenant_id=tenant_id,
shared=False).AndReturn({'firewall_policies': []})
neutronclient.list_firewall_policies(
shared=True).AndReturn(policies_dict)
neutronclient.list_firewall_rules().AndReturn(rules_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.policy_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
@test.create_stubs({neutronclient: ('show_firewall_policy',
'list_firewall_rules')})
@ -183,12 +226,7 @@ class FwaasApiTests(test.APITestCase):
self.mox.ReplayAll()
ret_val = api.fwaas.policy_get(self.request, exp_policy.id)
self.assertIsInstance(ret_val, api.fwaas.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(len(exp_policy.rules), len(ret_val.rules))
for (exp, ret) in zip(exp_policy.rules, ret_val.rules):
self.assertEqual(exp.id, ret.id)
self._assert_policy_return_value(ret_val, exp_policy)
@test.create_stubs({neutronclient: ('show_firewall_policy',)})
def test_policy_get_no_rule(self):
@ -301,6 +339,13 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
def _assert_firewall_return_value(self, ret_val, exp_firewall):
self.assertIsInstance(ret_val, api.fwaas.Firewall)
self.assertEqual(exp_firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_firewall.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_firewall.policy.name, ret_val.policy.name)
@test.create_stubs({neutronclient: ('list_firewalls',
'list_firewall_policies')})
def test_firewall_list(self):
@ -314,11 +359,24 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.firewall_list(self.request)
for (v, d) in zip(ret_val, exp_firewalls):
self.assertIsInstance(v, api.fwaas.Firewall)
self.assertEqual(d.name, v.name)
self.assertTrue(v.id)
self.assertEqual(d.firewall_policy_id, v.policy.id)
self.assertEqual(d.policy.name, v.policy.name)
self._assert_firewall_return_value(v, d)
@test.create_stubs({neutronclient: ('list_firewalls',
'list_firewall_policies')})
def test_firewall_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_firewalls = self.firewalls.list()
firewalls_dict = {'firewalls': self.api_firewalls.list()}
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
neutronclient.list_firewalls(tenant_id=tenant_id) \
.AndReturn(firewalls_dict)
neutronclient.list_firewall_policies().AndReturn(policies_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.firewall_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d)
@test.create_stubs({neutronclient: ('show_firewall',
'show_firewall_policy')})
@ -333,11 +391,7 @@ class FwaasApiTests(test.APITestCase):
self.mox.ReplayAll()
ret_val = api.fwaas.firewall_get(self.request, exp_firewall.id)
self.assertIsInstance(ret_val, api.fwaas.Firewall)
self.assertEqual(exp_firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_firewall.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_firewall.policy.name, ret_val.policy.name)
self._assert_firewall_return_value(ret_val, exp_firewall)
@test.create_stubs({neutronclient: ('update_firewall',)})
def test_firewall_update(self):