Merge "List shared firewall policies/rules in Project panel"

This commit is contained in:
Jenkins 2015-02-18 08:45:31 +00:00 committed by Gerrit Code Review
commit 673bfed6c8
6 changed files with 214 additions and 116 deletions

View File

@ -74,10 +74,22 @@ def rule_list(request, **kwargs):
return _rule_list(request, expand_policy=True, **kwargs)
def rule_list_for_tenant(request, tenant_id, **kwargs):
"""Return a rule list available for the tenant.
The list contains rules owned by the tenant and shared rules.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
rules = rule_list(request, tenant_id=tenant_id, shared=False, **kwargs)
shared_rules = rule_list(request, shared=True, **kwargs)
return rules + shared_rules
def _rule_list(request, expand_policy, **kwargs):
rules = neutronclient(request).list_firewall_rules(
**kwargs).get('firewall_rules')
if expand_policy:
if expand_policy and rules:
policies = _policy_list(request, expand_rule=False)
policy_dict = SortedDict((p.id, p) for p in policies)
for rule in rules:
@ -133,10 +145,23 @@ def policy_list(request, **kwargs):
return _policy_list(request, expand_rule=True, **kwargs)
def policy_list_for_tenant(request, tenant_id, **kwargs):
"""Return a policy list available for the tenant.
The list contains policies owned by the tenant and shared policies.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
policies = policy_list(request, tenant_id=tenant_id,
shared=False, **kwargs)
shared_policies = policy_list(request, shared=True, **kwargs)
return policies + shared_policies
def _policy_list(request, expand_rule, **kwargs):
policies = neutronclient(request).list_firewall_policies(
**kwargs).get('firewall_policies')
if expand_rule:
if expand_rule and policies:
rules = _rule_list(request, expand_policy=False)
rule_dict = SortedDict((rule.id, rule) for rule in rules)
for p in policies:
@ -206,10 +231,25 @@ def firewall_list(request, **kwargs):
return _firewall_list(request, expand_policy=True, **kwargs)
def firewall_list_for_tenant(request, tenant_id, **kwargs):
"""Return a firewall list available for the tenant.
The list contains firewalls owned by the tenant and shared firewalls.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
# NOTE(amotoki): At now 'shared' attribute is not visible in Neutron
# and there is no way to query shared firewalls explicitly.
# Thus this method returns the same as when tenant_id is specified,
# but I would like to have this method for symmetry to firewall
# rules and policies to avoid unnecessary confusion.
return firewall_list(request, tenant_id=tenant_id, **kwargs)
def _firewall_list(request, expand_policy, **kwargs):
firewalls = neutronclient(request).list_firewalls(
**kwargs).get('firewalls')
if expand_policy:
if expand_policy and firewalls:
policies = _policy_list(request, expand_rule=False)
policy_dict = SortedDict((p.id, p) for p in policies)
for fw in firewalls:

View File

@ -139,7 +139,7 @@ class UpdateFirewall(forms.SelfHandlingForm):
try:
tenant_id = self.request.user.tenant_id
policies = api.fwaas.policy_list(request, tenant_id=tenant_id)
policies = api.fwaas.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=lambda policy: policy.name)
except Exception:
exceptions.handle(request,
@ -187,9 +187,9 @@ class InsertRuleToPolicy(forms.SelfHandlingForm):
def __init__(self, request, *args, **kwargs):
super(InsertRuleToPolicy, self).__init__(request, *args, **kwargs)
tenant_id = self.request.user.tenant_id
try:
all_rules = api.fwaas.rule_list(request, tenant_id=tenant_id)
tenant_id = self.request.user.tenant_id
all_rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
all_rules = sorted(all_rules, key=lambda rule: rule.name_or_id)
available_rules = [r for r in all_rules
@ -246,9 +246,9 @@ class RemoveRuleFromPolicy(forms.SelfHandlingForm):
def __init__(self, request, *args, **kwargs):
super(RemoveRuleFromPolicy, self).__init__(request, *args, **kwargs)
tenant_id = request.user.tenant_id
try:
all_rules = api.fwaas.rule_list(request, tenant_id=tenant_id)
tenant_id = request.user.tenant_id
all_rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
current_rules = []
for r in kwargs['initial']['firewall_rules']:

View File

@ -35,8 +35,8 @@ class RulesTab(tabs.TableTab):
def get_rulestable_data(self):
try:
tenant_id = self.request.user.tenant_id
rules = api.fwaas.rule_list(self.tab_group.request,
tenant_id=tenant_id)
request = self.tab_group.request
rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
except Exception:
rules = []
exceptions.handle(self.tab_group.request,
@ -54,8 +54,8 @@ class PoliciesTab(tabs.TableTab):
def get_policiestable_data(self):
try:
tenant_id = self.request.user.tenant_id
policies = api.fwaas.policy_list(self.tab_group.request,
tenant_id=tenant_id)
request = self.tab_group.request
policies = api.fwaas.policy_list_for_tenant(request, tenant_id)
except Exception:
policies = []
exceptions.handle(self.tab_group.request,
@ -73,8 +73,8 @@ class FirewallsTab(tabs.TableTab):
def get_firewallstable_data(self):
try:
tenant_id = self.request.user.tenant_id
firewalls = api.fwaas.firewall_list(self.tab_group.request,
tenant_id=tenant_id)
request = self.tab_group.request
firewalls = api.fwaas.firewall_list_for_tenant(request, tenant_id)
except Exception:
firewalls = []
exceptions.handle(self.tab_group.request,

View File

@ -53,36 +53,36 @@ class FirewallTests(test.TestCase):
# retrieve rules
tenant_id = self.tenant.id
api.fwaas.rule_list(
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndReturn(self.fw_rules.list())
tenant_id).AndReturn(self.fw_rules.list())
# retrieves policies
policies = self.fw_policies.list()
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
# retrieves firewalls
firewalls = self.firewalls.list()
api.fwaas.firewall_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(firewalls)
api.fwaas.firewall_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(firewalls)
def set_up_expect_with_exception(self):
tenant_id = self.tenant.id
api.fwaas.rule_list(
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.policy_list(
tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.firewall_list(
tenant_id).AndRaise(self.exceptions.neutron)
api.fwaas.firewall_list_for_tenant(
IsA(http.HttpRequest),
tenant_id=tenant_id).AndRaise(self.exceptions.neutron)
tenant_id).AndRaise(self.exceptions.neutron)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_firewalls(self):
self.set_up_expect()
@ -98,9 +98,9 @@ class FirewallTests(test.TestCase):
self.assertEqual(len(res.context['table'].data),
len(self.firewalls.list()))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_policies(self):
self.set_up_expect()
@ -117,9 +117,9 @@ class FirewallTests(test.TestCase):
self.assertEqual(len(res.context['policiestable_table'].data),
len(self.fw_policies.list()))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_rules(self):
self.set_up_expect()
@ -136,9 +136,9 @@ class FirewallTests(test.TestCase):
self.assertEqual(len(res.context['rulestable_table'].data),
len(self.fw_rules.list()))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_exception_firewalls(self):
self.set_up_expect_with_exception()
@ -155,9 +155,9 @@ class FirewallTests(test.TestCase):
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data), 0)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_exception_policies(self):
self.set_up_expect_with_exception()
@ -175,9 +175,9 @@ class FirewallTests(test.TestCase):
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data), 0)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list')}, )
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant')}, )
def test_index_exception_rules(self):
self.set_up_expect_with_exception()
@ -242,7 +242,8 @@ class FirewallTests(test.TestCase):
self.assertFormErrors(res, 2)
@test.create_stubs({api.fwaas: ('policy_create', 'rule_list'), })
@test.create_stubs({api.fwaas: ('policy_create',
'rule_list_for_tenant'), })
def test_add_policy_post(self):
policy = self.fw_policies.first()
rules = self.fw_rules.list()
@ -268,8 +269,8 @@ class FirewallTests(test.TestCase):
for rule in rules:
if rule.id in policy.firewall_rules:
rule.firewall_policy_id = rule.policy = None
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api.fwaas.policy_create(
IsA(http.HttpRequest), **form_data).AndReturn(policy)
@ -280,7 +281,8 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_create', 'rule_list'), })
@test.create_stubs({api.fwaas: ('policy_create',
'rule_list_for_tenant'), })
def test_add_policy_post_with_error(self):
policy = self.fw_policies.first()
rules = self.fw_rules.list()
@ -290,8 +292,8 @@ class FirewallTests(test.TestCase):
'shared': policy.shared,
'audited': policy.audited
}
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
self.mox.ReplayAll()
@ -299,7 +301,8 @@ class FirewallTests(test.TestCase):
self.assertFormErrors(res, 1)
@test.create_stubs({api.fwaas: ('firewall_create', 'policy_list'), })
@test.create_stubs({api.fwaas: ('firewall_create',
'policy_list_for_tenant'), })
def test_add_firewall_post(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
@ -310,8 +313,8 @@ class FirewallTests(test.TestCase):
'shared': firewall.shared,
'admin_state_up': firewall.admin_state_up
}
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api.fwaas.firewall_create(
IsA(http.HttpRequest), **form_data).AndReturn(firewall)
@ -322,7 +325,8 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('firewall_create', 'policy_list'), })
@test.create_stubs({api.fwaas: ('firewall_create',
'policy_list_for_tenant'), })
def test_add_firewall_post_with_error(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
@ -333,8 +337,8 @@ class FirewallTests(test.TestCase):
'shared': firewall.shared,
'admin_state_up': firewall.admin_state_up
}
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
self.mox.ReplayAll()
@ -469,7 +473,7 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, 'project/firewalls/updatepolicy.html')
@test.create_stubs({api.fwaas: ('policy_get', 'policy_update',
'rule_list')})
'rule_list_for_tenant')})
def test_update_policy_post(self):
policy = self.fw_policies.first()
@ -493,14 +497,14 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list')})
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list_for_tenant')})
def test_update_firewall_get(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
tenant_id = self.tenant.id
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api.fwaas.firewall_get(IsA(http.HttpRequest),
firewall.id).AndReturn(firewall)
@ -512,7 +516,7 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, 'project/firewalls/updatefirewall.html')
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list',
@test.create_stubs({api.fwaas: ('firewall_get', 'policy_list_for_tenant',
'firewall_update')})
def test_update_firewall_post(self):
firewall = self.firewalls.first()
@ -527,8 +531,8 @@ class FirewallTests(test.TestCase):
}
policies = self.fw_policies.list()
api.fwaas.policy_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(policies)
api.fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api.fwaas.firewall_update(IsA(http.HttpRequest), firewall.id, **data)\
.AndReturn(firewall)
@ -542,7 +546,7 @@ class FirewallTests(test.TestCase):
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_get', 'policy_insert_rule',
'rule_list', 'rule_get')})
'rule_list_for_tenant', 'rule_get')})
def test_policy_insert_rule(self):
policy = self.fw_policies.first()
tenant_id = self.tenant.id
@ -561,8 +565,8 @@ class FirewallTests(test.TestCase):
new_rule_id,
rules[1].id]
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api.fwaas.rule_get(
IsA(http.HttpRequest), new_rule_id).AndReturn(rules[2])
api.fwaas.policy_insert_rule(IsA(http.HttpRequest), policy.id, **data)\
@ -577,7 +581,7 @@ class FirewallTests(test.TestCase):
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('policy_get', 'policy_remove_rule',
'rule_list', 'rule_get')})
'rule_list_for_tenant', 'rule_get')})
def test_policy_remove_rule(self):
policy = self.fw_policies.first()
tenant_id = self.tenant.id
@ -599,8 +603,8 @@ class FirewallTests(test.TestCase):
api.fwaas.policy_get(IsA(http.HttpRequest),
policy.id).AndReturn(policy)
api.fwaas.rule_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(rules)
api.fwaas.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api.fwaas.rule_get(
IsA(http.HttpRequest), remove_rule_id).AndReturn(rules[0])
api.fwaas.policy_remove_rule(IsA(http.HttpRequest), policy.id, **data)\
@ -614,9 +618,9 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list',
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'rule_delete')})
def test_delete_rule(self):
self.set_up_expect()
@ -629,9 +633,9 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list',
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'policy_delete')})
def test_delete_policy(self):
self.set_up_expect()
@ -644,9 +648,9 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res)
@test.create_stubs({api.fwaas: ('firewall_list',
'policy_list',
'rule_list',
@test.create_stubs({api.fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'firewall_delete')})
def test_delete_firewall(self):
self.set_up_expect()

View File

@ -141,7 +141,7 @@ class SelectRulesAction(workflows.Action):
def populate_rule_choices(self, request, context):
try:
tenant_id = self.request.user.tenant_id
rules = api.fwaas.rule_list(request, tenant_id=tenant_id)
rules = api.fwaas.rule_list_for_tenant(request, tenant_id)
rules = sorted(rules,
key=lambda rule: rule.name_or_id)
rule_list = [(rule.id, rule.name_or_id) for rule in rules
@ -246,7 +246,7 @@ class AddFirewallAction(workflows.Action):
firewall_policy_id_choices = [('', _("Select a Policy"))]
try:
tenant_id = self.request.user.tenant_id
policies = api.fwaas.policy_list(request, tenant_id=tenant_id)
policies = api.fwaas.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=lambda policy: policy.name)
except Exception as e:
exceptions.handle(

View File

@ -44,6 +44,16 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(rule1.name, ret_val.name)
self.assertTrue(ret_val.id)
def _assert_rule_return_value(self, ret_val, exp_rule):
self.assertIsInstance(ret_val, api.fwaas.Rule)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
if exp_rule.policy:
self.assertEqual(exp_rule.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_rule.policy.name, ret_val.policy.name)
else:
self.assertIsNone(ret_val.policy)
@test.create_stubs({neutronclient: ('list_firewall_rules',
'list_firewall_policies')})
def test_rule_list(self):
@ -57,14 +67,27 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.rule_list(self.request)
for (v, d) in zip(ret_val, exp_rules):
self.assertIsInstance(v, api.fwaas.Rule)
self.assertEqual(d.name, v.name)
self.assertTrue(v.id)
if d.policy:
self.assertEqual(d.firewall_policy_id, v.policy.id, )
self.assertEqual(d.policy.name, v.policy.name)
else:
self.assertIsNone(v.policy)
self._assert_rule_return_value(v, d)
@test.create_stubs({neutronclient: ('list_firewall_rules',
'list_firewall_policies')})
def test_rule_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_rules = self.fw_rules.list()
api_rules = {'firewall_rules': self.api_fw_rules.list()}
api_policies = {'firewall_policies': self.api_fw_policies.list()}
neutronclient.list_firewall_rules(
tenant_id=tenant_id,
shared=False).AndReturn({'firewall_rules': []})
neutronclient.list_firewall_rules(shared=True) \
.AndReturn(api_rules)
neutronclient.list_firewall_policies().AndReturn(api_policies)
self.mox.ReplayAll()
ret_val = api.fwaas.rule_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
@test.create_stubs({neutronclient: ('show_firewall_rule',
'show_firewall_policy')})
@ -79,11 +102,7 @@ class FwaasApiTests(test.APITestCase):
self.mox.ReplayAll()
ret_val = api.fwaas.rule_get(self.request, exp_rule.id)
self.assertIsInstance(ret_val, api.fwaas.Rule)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_rule.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_rule.policy.name, ret_val.policy.name)
self._assert_rule_return_value(ret_val, exp_rule)
@test.create_stubs({neutronclient: ('update_firewall_rule',)})
def test_rule_update(self):
@ -146,6 +165,16 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(policy1.name, ret_val.name)
self.assertTrue(ret_val.id)
def _assert_policy_return_value(self, ret_val, exp_policy):
self.assertIsInstance(ret_val, api.fwaas.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(len(exp_policy.firewall_rules), len(ret_val.rules))
self.assertEqual(len(exp_policy.firewall_rules),
len(ret_val.firewall_rules))
for (r, exp_r) in zip(ret_val.rules, exp_policy.rules):
self.assertEqual(exp_r.id, r.id)
@test.create_stubs({neutronclient: ('list_firewall_policies',
'list_firewall_rules')})
def test_policy_list(self):
@ -159,13 +188,27 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.policy_list(self.request)
for (v, d) in zip(ret_val, exp_policies):
self.assertIsInstance(v, api.fwaas.Policy)
self.assertEqual(d.name, v.name)
self.assertTrue(v.id)
self.assertEqual(len(d.firewall_rules), len(v.rules))
self.assertEqual(len(d.firewall_rules), len(v.firewall_rules))
for (r, exp_r) in zip(v.rules, d.rules):
self.assertEqual(exp_r.id, r.id)
self._assert_policy_return_value(v, d)
@test.create_stubs({neutronclient: ('list_firewall_policies',
'list_firewall_rules')})
def test_policy_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_policies = self.fw_policies.list()
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
rules_dict = {'firewall_rules': self.api_fw_rules.list()}
neutronclient.list_firewall_policies(
tenant_id=tenant_id,
shared=False).AndReturn({'firewall_policies': []})
neutronclient.list_firewall_policies(
shared=True).AndReturn(policies_dict)
neutronclient.list_firewall_rules().AndReturn(rules_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.policy_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
@test.create_stubs({neutronclient: ('show_firewall_policy',
'list_firewall_rules')})
@ -183,12 +226,7 @@ class FwaasApiTests(test.APITestCase):
self.mox.ReplayAll()
ret_val = api.fwaas.policy_get(self.request, exp_policy.id)
self.assertIsInstance(ret_val, api.fwaas.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(len(exp_policy.rules), len(ret_val.rules))
for (exp, ret) in zip(exp_policy.rules, ret_val.rules):
self.assertEqual(exp.id, ret.id)
self._assert_policy_return_value(ret_val, exp_policy)
@test.create_stubs({neutronclient: ('show_firewall_policy',)})
def test_policy_get_no_rule(self):
@ -301,6 +339,13 @@ class FwaasApiTests(test.APITestCase):
self.assertEqual(firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
def _assert_firewall_return_value(self, ret_val, exp_firewall):
self.assertIsInstance(ret_val, api.fwaas.Firewall)
self.assertEqual(exp_firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_firewall.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_firewall.policy.name, ret_val.policy.name)
@test.create_stubs({neutronclient: ('list_firewalls',
'list_firewall_policies')})
def test_firewall_list(self):
@ -314,11 +359,24 @@ class FwaasApiTests(test.APITestCase):
ret_val = api.fwaas.firewall_list(self.request)
for (v, d) in zip(ret_val, exp_firewalls):
self.assertIsInstance(v, api.fwaas.Firewall)
self.assertEqual(d.name, v.name)
self.assertTrue(v.id)
self.assertEqual(d.firewall_policy_id, v.policy.id)
self.assertEqual(d.policy.name, v.policy.name)
self._assert_firewall_return_value(v, d)
@test.create_stubs({neutronclient: ('list_firewalls',
'list_firewall_policies')})
def test_firewall_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_firewalls = self.firewalls.list()
firewalls_dict = {'firewalls': self.api_firewalls.list()}
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
neutronclient.list_firewalls(tenant_id=tenant_id) \
.AndReturn(firewalls_dict)
neutronclient.list_firewall_policies().AndReturn(policies_dict)
self.mox.ReplayAll()
ret_val = api.fwaas.firewall_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d)
@test.create_stubs({neutronclient: ('show_firewall',
'show_firewall_policy')})
@ -333,11 +391,7 @@ class FwaasApiTests(test.APITestCase):
self.mox.ReplayAll()
ret_val = api.fwaas.firewall_get(self.request, exp_firewall.id)
self.assertIsInstance(ret_val, api.fwaas.Firewall)
self.assertEqual(exp_firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_firewall.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_firewall.policy.name, ret_val.policy.name)
self._assert_firewall_return_value(ret_val, exp_firewall)
@test.create_stubs({neutronclient: ('update_firewall',)})
def test_firewall_update(self):