Drop mox usage from FWaaS v1 panel tests

Note that test_add_firewall_post_with_router_extension() is also fixed
because we need to post unassociated router IDs to the form of creating
a new firewall but the previous test sends associated router ID.
This fix is required to pass the test after mock is used.
I am not sure why the test succeeded with mox though....

Also cleans up unnecessary mocked methods.

This is part of mox-removal community goal in Rocky.
Partial-Bug: #1753504

Change-Id: I033421aeb00d97c049bec3d04da6989c811d1514
This commit is contained in:
Akihiro Motoki 2018-03-11 09:28:33 +09:00 committed by Yushiro FURUKAWA
parent f85040ff65
commit b17392b5fa

View File

@ -12,24 +12,20 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from mox3.mox import IsA import mock
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django import http
from openstack_dashboard import api from openstack_dashboard import api
from openstack_dashboard.test import helpers
from neutron_fwaas_dashboard.api import fwaas as api_fwaas from neutron_fwaas_dashboard.api import fwaas as api_fwaas
from neutron_fwaas_dashboard.test import helpers as test from neutron_fwaas_dashboard.test import helpers as test
class FirewallTests(test.TestCase): class FirewallTests(test.TestCase):
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value): use_mox = False
self[attr] = value
DASHBOARD = 'project' DASHBOARD = 'project'
INDEX_URL = reverse('horizon:%s:firewalls:index' % DASHBOARD) INDEX_URL = reverse('horizon:%s:firewalls:index' % DASHBOARD)
@ -52,58 +48,59 @@ class FirewallTests(test.TestCase):
ADDROUTER_PATH = 'horizon:%s:firewalls:addrouter' % DASHBOARD ADDROUTER_PATH = 'horizon:%s:firewalls:addrouter' % DASHBOARD
REMOVEROUTER_PATH = 'horizon:%s:firewalls:removerouter' % DASHBOARD REMOVEROUTER_PATH = 'horizon:%s:firewalls:removerouter' % DASHBOARD
def set_up_expect(self, fwaas_router_extension=True): def setup_mocks(self, fwaas_router_extension=True):
# retrieve rules
tenant_id = self.tenant.id
api.neutron.is_extension_supported(
IsA(http.HttpRequest), 'fwaasrouterinsertion'
).MultipleTimes().AndReturn(fwaas_router_extension)
api_fwaas.rule_list_for_tenant(
IsA(http.HttpRequest),
tenant_id).AndReturn(self.fw_rules.list())
# retrieves policies
policies = self.fw_policies.list() policies = self.fw_policies.list()
api_fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
# retrieves firewalls
firewalls = self.firewalls.list() firewalls = self.firewalls.list()
api_fwaas.firewall_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(firewalls)
routers = self.routers.list() routers = self.routers.list()
api_fwaas.firewall_unassociated_routers_list(
IsA(http.HttpRequest), tenant_id).\
MultipleTimes().AndReturn(routers)
def set_up_expect_with_exception(self): self.mock_is_extension_supported.return_value = fwaas_router_extension
self.mock_rule_list_for_tenant.return_value = self.fw_rules.list()
self.mock_policy_list_for_tenant.return_value = policies
self.mock_firewall_list_for_tenant.return_value = firewalls
self.mock_firewall_unassociated_routers_list.return_value = routers
def check_mocks(self, fwaas_router_extension=True):
tenant_id = self.tenant.id tenant_id = self.tenant.id
api.neutron.is_extension_supported( self.assert_mock_multiple_calls_with_same_arguments(
IsA(http.HttpRequest), 'fwaasrouterinsertion').AndReturn(True) self.mock_is_extension_supported, 5,
mock.call(helpers.IsHttpRequest(), 'fwaasrouterinsertion'))
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_firewall_unassociated_routers_list, 2,
mock.call(helpers.IsHttpRequest(), tenant_id))
api_fwaas.rule_list_for_tenant( def setup_mocks_with_exception(self):
IsA(http.HttpRequest), self.mock_is_extension_supported.return_value = True
tenant_id).AndRaise(self.exceptions.neutron) self.mock_rule_list_for_tenant.side_effect = self.exceptions.neutron
api_fwaas.policy_list_for_tenant( self.mock_policy_list_for_tenant.side_effect = self.exceptions.neutron
IsA(http.HttpRequest), self.mock_firewall_list_for_tenant.side_effect = \
tenant_id).AndRaise(self.exceptions.neutron) self.exceptions.neutron
api_fwaas.firewall_list_for_tenant(
IsA(http.HttpRequest),
tenant_id).AndRaise(self.exceptions.neutron)
@test.create_stubs({api_fwaas: ('firewall_list_for_tenant', def check_mocks_with_exception(self):
'policy_list_for_tenant', tenant_id = self.tenant.id
'rule_list_for_tenant',
'firewall_unassociated_routers_list',), self.mock_is_extension_supported.assert_called_once_with(
api.neutron: ('is_extension_supported',), }) helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'firewall_unassociated_routers_list',),
api.neutron: ('is_extension_supported',), })
def test_index_firewalls(self): def test_index_firewalls(self):
self.set_up_expect() self.setup_mocks()
self.mox.ReplayAll()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -114,19 +111,18 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html') self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data), self.assertEqual(len(res.context['table'].data),
len(self.firewalls.list())) len(self.firewalls.list()))
self.check_mocks()
# TODO(absubram): Change test_index_firewalls for with and without # TODO(absubram): Change test_index_firewalls for with and without
# router extensions. # router extensions.
@test.create_stubs({api_fwaas: ('firewall_list_for_tenant', @helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant', 'policy_list_for_tenant',
'rule_list_for_tenant', 'rule_list_for_tenant',
'firewall_unassociated_routers_list',), 'firewall_unassociated_routers_list',),
api.neutron: ('is_extension_supported',), }) api.neutron: ('is_extension_supported',), })
def test_index_policies(self): def test_index_policies(self):
self.set_up_expect() self.setup_mocks()
self.mox.ReplayAll()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -138,16 +134,15 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html') self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data), self.assertEqual(len(res.context['policiestable_table'].data),
len(self.fw_policies.list())) len(self.fw_policies.list()))
self.check_mocks()
@test.create_stubs({api_fwaas: ('firewall_list_for_tenant', @helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant', 'policy_list_for_tenant',
'rule_list_for_tenant', 'rule_list_for_tenant',
'firewall_unassociated_routers_list',), 'firewall_unassociated_routers_list',),
api.neutron: ('is_extension_supported',), }) api.neutron: ('is_extension_supported',), })
def test_index_rules(self): def test_index_rules(self):
self.set_up_expect() self.setup_mocks()
self.mox.ReplayAll()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -159,15 +154,14 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html') self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['rulestable_table'].data), self.assertEqual(len(res.context['rulestable_table'].data),
len(self.fw_rules.list())) len(self.fw_rules.list()))
self.check_mocks()
@test.create_stubs({api_fwaas: ('firewall_list_for_tenant', @helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant', 'policy_list_for_tenant',
'rule_list_for_tenant'), 'rule_list_for_tenant'),
api.neutron: ('is_extension_supported',), }) api.neutron: ('is_extension_supported',), })
def test_index_exception_firewalls(self): def test_index_exception_firewalls(self):
self.set_up_expect_with_exception() self.setup_mocks_with_exception()
self.mox.ReplayAll()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -179,15 +173,14 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, self.assertTemplateUsed(res,
'horizon/common/_detail_table.html') 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data), 0) self.assertEqual(len(res.context['table'].data), 0)
self.check_mocks_with_exception()
@test.create_stubs({api_fwaas: ('firewall_list_for_tenant', @helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant', 'policy_list_for_tenant',
'rule_list_for_tenant'), 'rule_list_for_tenant'),
api.neutron: ('is_extension_supported',), }) api.neutron: ('is_extension_supported',), })
def test_index_exception_policies(self): def test_index_exception_policies(self):
self.set_up_expect_with_exception() self.setup_mocks_with_exception()
self.mox.ReplayAll()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -200,15 +193,14 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, self.assertTemplateUsed(res,
'horizon/common/_detail_table.html') 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data), 0) self.assertEqual(len(res.context['policiestable_table'].data), 0)
self.check_mocks_with_exception()
@test.create_stubs({api_fwaas: ('firewall_list_for_tenant', @helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant', 'policy_list_for_tenant',
'rule_list_for_tenant'), 'rule_list_for_tenant'),
api.neutron: ('is_extension_supported',), }) api.neutron: ('is_extension_supported',), })
def test_index_exception_rules(self): def test_index_exception_rules(self):
self.set_up_expect_with_exception() self.setup_mocks_with_exception()
self.mox.ReplayAll()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -221,8 +213,9 @@ class FirewallTests(test.TestCase):
self.assertTemplateUsed(res, self.assertTemplateUsed(res,
'horizon/common/_detail_table.html') 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['rulestable_table'].data), 0) self.assertEqual(len(res.context['rulestable_table'].data), 0)
self.check_mocks_with_exception()
@test.create_stubs({api_fwaas: ('rule_create',), }) @helpers.create_mocks({api_fwaas: ('rule_create',), })
def test_add_rule_post(self): def test_add_rule_post(self):
rule1 = self.fw_rules.first() rule1 = self.fw_rules.first()
@ -239,17 +232,16 @@ class FirewallTests(test.TestCase):
'ip_version': rule1.ip_version 'ip_version': rule1.ip_version
} }
api_fwaas.rule_create( self.mock_rule_create.return_value = rule1
IsA(http.HttpRequest), **form_data).AndReturn(rule1)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data) res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_rule_create.assert_called_once_with(
helpers.IsHttpRequest(), **form_data)
@test.create_stubs({api_fwaas: ('rule_create',), }) @helpers.create_mocks({api_fwaas: ('rule_create',), })
def test_add_rule_post_src_None(self): def test_add_rule_post_src_None(self):
rule1 = self.fw_rules.first() rule1 = self.fw_rules.first()
form_data = {'name': rule1.name, form_data = {'name': rule1.name,
@ -263,17 +255,19 @@ class FirewallTests(test.TestCase):
'ip_version': rule1.ip_version 'ip_version': rule1.ip_version
} }
api_fwaas.rule_create( self.mock_rule_create.return_value = rule1
IsA(http.HttpRequest), **form_data).AndReturn(rule1)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data) res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
data = form_data.copy()
data['source_ip_address'] = None
data['source_port'] = None
self.mock_rule_create.assert_called_once_with(
helpers.IsHttpRequest(), **data)
@test.create_stubs({api_fwaas: ('rule_create',), }) @helpers.create_mocks({api_fwaas: ('rule_create',), })
def test_add_rule_post_dest_None(self): def test_add_rule_post_dest_None(self):
rule1 = self.fw_rules.first() rule1 = self.fw_rules.first()
form_data = {'name': rule1.name, form_data = {'name': rule1.name,
@ -287,15 +281,17 @@ class FirewallTests(test.TestCase):
'ip_version': rule1.ip_version 'ip_version': rule1.ip_version
} }
api_fwaas.rule_create( self.mock_rule_create.return_value = rule1
IsA(http.HttpRequest), **form_data).AndReturn(rule1)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data) res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
data = form_data.copy()
data['destination_ip_address'] = None
data['destination_port'] = None
self.mock_rule_create.assert_called_once_with(
helpers.IsHttpRequest(), **data)
def test_add_rule_post_with_error(self): def test_add_rule_post_with_error(self):
rule1 = self.fw_rules.first() rule1 = self.fw_rules.first()
@ -313,14 +309,12 @@ class FirewallTests(test.TestCase):
'ip_version': 6 'ip_version': 6
} }
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data) res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertFormErrors(res, 3) self.assertFormErrors(res, 3)
@test.create_stubs({api_fwaas: ('policy_create', @helpers.create_mocks({api_fwaas: ('policy_create',
'rule_list_for_tenant'), }) 'rule_list_for_tenant'), })
def test_add_policy_post(self): def test_add_policy_post(self):
policy = self.fw_policies.first() policy = self.fw_policies.first()
rules = self.fw_rules.list() rules = self.fw_rules.list()
@ -346,20 +340,19 @@ class FirewallTests(test.TestCase):
for rule in rules: for rule in rules:
if rule.id in policy.firewall_rules: if rule.id in policy.firewall_rules:
rule.firewall_policy_id = rule.policy = None rule.firewall_policy_id = rule.policy = None
api_fwaas.rule_list_for_tenant( self.mock_rule_list_for_tenant.return_value = rules
IsA(http.HttpRequest), tenant_id).AndReturn(rules) self.mock_policy_create.return_value = policy
api_fwaas.policy_create(
IsA(http.HttpRequest), **form_data).AndReturn(policy)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDPOLICY_PATH), post_data) res = self.client.post(reverse(self.ADDPOLICY_PATH), post_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_policy_create.assert_called_once_with(
helpers.IsHttpRequest(), **form_data)
@test.create_stubs({api_fwaas: ('policy_create', @helpers.create_mocks({api_fwaas: ('rule_list_for_tenant',), })
'rule_list_for_tenant'), })
def test_add_policy_post_with_error(self): def test_add_policy_post_with_error(self):
policy = self.fw_policies.first() policy = self.fw_policies.first()
rules = self.fw_rules.list() rules = self.fw_rules.list()
@ -369,15 +362,19 @@ class FirewallTests(test.TestCase):
'shared': policy.shared, 'shared': policy.shared,
'audited': policy.audited 'audited': policy.audited
} }
api_fwaas.rule_list_for_tenant( self.mock_rule_list_for_tenant.return_value = rules
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDPOLICY_PATH), form_data) res = self.client.post(reverse(self.ADDPOLICY_PATH), form_data)
self.assertFormErrors(res, 1) self.assertFormErrors(res, 1)
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
@helpers.create_mocks({api_fwaas: ('firewall_create',
'policy_list_for_tenant',
'firewall_list_for_tenant',),
api.neutron: ('is_extension_supported',
'router_list'), })
def _test_add_firewall_post(self, router_extension=False): def _test_add_firewall_post(self, router_extension=False):
firewall = self.firewalls.first() firewall = self.firewalls.first()
policies = self.fw_policies.list() policies = self.fw_policies.list()
@ -391,46 +388,51 @@ class FirewallTests(test.TestCase):
'firewall_policy_id': firewall.firewall_policy_id, 'firewall_policy_id': firewall.firewall_policy_id,
'admin_state_up': firewall.admin_state_up 'admin_state_up': firewall.admin_state_up
} }
data = form_data.copy()
if router_extension: if router_extension:
form_data['router_ids'] = firewall.router_ids # Lookup for unassociated router(s)
api.neutron.router_list( associated = []
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(routers) for fw in firewalls:
api_fwaas.firewall_list_for_tenant( associated += fw.router_ids
IsA(http.HttpRequest), unassociated = [r.id for r in routers if r.id not in associated]
tenant_id=tenant_id).AndReturn(firewalls) form_data['router'] = unassociated
data['router_ids'] = unassociated
self.mock_router_list.return_value = routers
self.mock_firewall_list_for_tenant.return_value = firewalls
api.neutron.is_extension_supported( self.mock_is_extension_supported.return_value = router_extension
IsA(http.HttpRequest), self.mock_policy_list_for_tenant.return_value = policies
'fwaasrouterinsertion').AndReturn(router_extension) self.mock_firewall_create.return_value = firewall
api_fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api_fwaas.firewall_create(
IsA(http.HttpRequest), **form_data).AndReturn(firewall)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDFIREWALL_PATH), form_data) res = self.client.post(reverse(self.ADDFIREWALL_PATH), form_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) # self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
if router_extension:
self.mock_router_list.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id=tenant_id)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id=tenant_id)
else:
self.mock_router_list.assert_not_called()
self.mock_firewall_list_for_tenant.assert_not_called()
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_create.assert_called_once_with(
helpers.IsHttpRequest(), **data)
@test.create_stubs({api_fwaas: ('firewall_create',
'policy_list_for_tenant',),
api.neutron: ('is_extension_supported',), })
def test_add_firewall_post(self): def test_add_firewall_post(self):
self._test_add_firewall_post() self._test_add_firewall_post()
@test.create_stubs({api_fwaas: ('firewall_create',
'policy_list_for_tenant',
'firewall_list_for_tenant',),
api.neutron: ('is_extension_supported',
'router_list'), })
def test_add_firewall_post_with_router_extension(self): def test_add_firewall_post_with_router_extension(self):
self._test_add_firewall_post(router_extension=True) self._test_add_firewall_post(router_extension=True)
@test.create_stubs({api_fwaas: ('firewall_create', @helpers.create_mocks({api_fwaas: ('policy_list_for_tenant',),
'policy_list_for_tenant',), api.neutron: ('is_extension_supported',), })
api.neutron: ('is_extension_supported',), })
def test_add_firewall_post_with_error(self): def test_add_firewall_post_with_error(self):
firewall = self.firewalls.first() firewall = self.firewalls.first()
policies = self.fw_policies.list() policies = self.fw_policies.list()
@ -440,35 +442,34 @@ class FirewallTests(test.TestCase):
'firewall_policy_id': None, 'firewall_policy_id': None,
'admin_state_up': firewall.admin_state_up 'admin_state_up': firewall.admin_state_up
} }
api.neutron.is_extension_supported( self.mock_is_extension_supported.return_value = False
IsA(http.HttpRequest), self.mock_policy_list_for_tenant.return_value = policies
'fwaasrouterinsertion').AndReturn(False)
api_fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDFIREWALL_PATH), form_data) res = self.client.post(reverse(self.ADDFIREWALL_PATH), form_data)
self.assertFormErrors(res, 1) self.assertFormErrors(res, 1)
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
@test.create_stubs({api_fwaas: ('rule_get',)}) @helpers.create_mocks({api_fwaas: ('rule_get',)})
def test_update_rule_get(self): def test_update_rule_get(self):
rule = self.fw_rules.first() rule = self.fw_rules.first()
api_fwaas.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule) self.mock_rule_get.return_value = rule
self.mox.ReplayAll()
res = self.client.get(reverse(self.UPDATERULE_PATH, args=(rule.id,))) res = self.client.get(reverse(self.UPDATERULE_PATH, args=(rule.id,)))
self.assertTemplateUsed(res, 'project/firewalls/updaterule.html') self.assertTemplateUsed(res, 'project/firewalls/updaterule.html')
self.mock_rule_get.assert_called_once_with(helpers.IsHttpRequest(),
rule.id)
@test.create_stubs({api_fwaas: ('rule_get', 'rule_update')}) @helpers.create_mocks({api_fwaas: ('rule_get', 'rule_update')})
def test_update_rule_post(self): def test_update_rule_post(self):
rule = self.fw_rules.first() rule = self.fw_rules.first()
api_fwaas.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule) self.mock_rule_get.return_value = rule
data = {'name': 'new name', data = {'name': 'new name',
'description': 'new desc', 'description': 'new desc',
@ -483,10 +484,7 @@ class FirewallTests(test.TestCase):
'destination_port': rule.destination_port, 'destination_port': rule.destination_port,
} }
api_fwaas.rule_update(IsA(http.HttpRequest), rule.id, **data)\ self.mock_rule_update.return_value = rule
.AndReturn(rule)
self.mox.ReplayAll()
form_data = data.copy() form_data = data.copy()
form_data['destination_ip_address'] = '' form_data['destination_ip_address'] = ''
@ -498,12 +496,18 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('rule_get', 'rule_update')}) self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
self.mock_rule_update.assert_called_once_with(
helpers.IsHttpRequest(), rule.id, **data)
@helpers.create_mocks({api_fwaas: ('rule_get', 'rule_update')})
def test_update_protocol_any_rule_post(self): def test_update_protocol_any_rule_post(self):
# protocol any means protocol == None in neutron context. # protocol any means protocol == None in neutron context.
rule = self.fw_rules.get(protocol=None) rule = self.fw_rules.get(protocol=None)
api_fwaas.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule) self.mock_rule_get.return_value = rule
self.mock_rule_update.return_value = rule
data = {'name': 'new name', data = {'name': 'new name',
'description': 'new desc', 'description': 'new desc',
@ -518,11 +522,6 @@ class FirewallTests(test.TestCase):
'destination_port': rule.destination_port, 'destination_port': rule.destination_port,
} }
api_fwaas.rule_update(IsA(http.HttpRequest), rule.id, **data)\
.AndReturn(rule)
self.mox.ReplayAll()
form_data = data.copy() form_data = data.copy()
form_data['destination_ip_address'] = '' form_data['destination_ip_address'] = ''
form_data['source_port'] = '' form_data['source_port'] = ''
@ -533,11 +532,17 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('rule_get', 'rule_update')}) self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
self.mock_rule_update.assert_called_once_with(
helpers.IsHttpRequest(), rule.id, **data)
@helpers.create_mocks({api_fwaas: ('rule_get', 'rule_update')})
def test_update_rule_protocol_to_any_post(self): def test_update_rule_protocol_to_any_post(self):
rule = self.fw_rules.first() rule = self.fw_rules.first()
api_fwaas.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule) self.mock_rule_get.return_value = rule
self.mock_rule_update.return_value = rule
data = {'name': 'new name', data = {'name': 'new name',
'description': 'new desc', 'description': 'new desc',
@ -551,10 +556,6 @@ class FirewallTests(test.TestCase):
'source_port': None, 'source_port': None,
'destination_port': rule.destination_port, 'destination_port': rule.destination_port,
} }
api_fwaas.rule_update(IsA(http.HttpRequest), rule.id, **data)\
.AndReturn(rule)
self.mox.ReplayAll()
form_data = data.copy() form_data = data.copy()
form_data['destination_ip_address'] = '' form_data['destination_ip_address'] = ''
@ -567,27 +568,31 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('policy_get',)}) self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
self.mock_rule_update.assert_called_once_with(
helpers.IsHttpRequest(), rule.id, **data)
@helpers.create_mocks({api_fwaas: ('policy_get',)})
def test_update_policy_get(self): def test_update_policy_get(self):
policy = self.fw_policies.first() policy = self.fw_policies.first()
api_fwaas.policy_get(IsA(http.HttpRequest), self.mock_policy_get.return_value = policy
policy.id).AndReturn(policy)
self.mox.ReplayAll()
res = self.client.get( res = self.client.get(
reverse(self.UPDATEPOLICY_PATH, args=(policy.id,))) reverse(self.UPDATEPOLICY_PATH, args=(policy.id,)))
self.assertTemplateUsed(res, 'project/firewalls/updatepolicy.html') self.assertTemplateUsed(res, 'project/firewalls/updatepolicy.html')
@test.create_stubs({api_fwaas: ('policy_get', 'policy_update', self.mock_policy_get.assert_called_once_with(
'rule_list_for_tenant')}) helpers.IsHttpRequest(), policy.id)
@helpers.create_mocks({api_fwaas: ('policy_get', 'policy_update')})
def test_update_policy_post(self): def test_update_policy_post(self):
policy = self.fw_policies.first() policy = self.fw_policies.first()
api_fwaas.policy_get(IsA(http.HttpRequest), self.mock_policy_get.return_value = policy
policy.id).AndReturn(policy) self.mock_policy_update.return_value = policy
data = {'name': 'new name', data = {'name': 'new name',
'description': 'new desc', 'description': 'new desc',
@ -595,43 +600,47 @@ class FirewallTests(test.TestCase):
'audited': False 'audited': False
} }
api_fwaas.policy_update(IsA(http.HttpRequest), policy.id, **data)\
.AndReturn(policy)
self.mox.ReplayAll()
res = self.client.post( res = self.client.post(
reverse(self.UPDATEPOLICY_PATH, args=(policy.id,)), data) reverse(self.UPDATEPOLICY_PATH, args=(policy.id,)), data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('firewall_get', 'policy_list_for_tenant')}) self.mock_policy_get.assert_called_once_with(
helpers.IsHttpRequest(), policy.id)
self.mock_policy_update.assert_called_once_with(
helpers.IsHttpRequest(), policy.id, **data)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'policy_list_for_tenant')})
def test_update_firewall_get(self): def test_update_firewall_get(self):
firewall = self.firewalls.first() firewall = self.firewalls.first()
policies = self.fw_policies.list() policies = self.fw_policies.list()
tenant_id = self.tenant.id tenant_id = self.tenant.id
api_fwaas.policy_list_for_tenant( self.mock_policy_list_for_tenant.return_value = policies
IsA(http.HttpRequest), tenant_id).AndReturn(policies) self.mock_firewall_get.return_value = firewall
api_fwaas.firewall_get(IsA(http.HttpRequest),
firewall.id).AndReturn(firewall)
self.mox.ReplayAll()
res = self.client.get( res = self.client.get(
reverse(self.UPDATEFIREWALL_PATH, args=(firewall.id,))) reverse(self.UPDATEFIREWALL_PATH, args=(firewall.id,)))
self.assertTemplateUsed(res, 'project/firewalls/updatefirewall.html') self.assertTemplateUsed(res, 'project/firewalls/updatefirewall.html')
@test.create_stubs({api_fwaas: ('firewall_get', 'policy_list_for_tenant', self.mock_policy_list_for_tenant.assert_called_once_with(
'firewall_update')}) helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_get.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'policy_list_for_tenant',
'firewall_update')})
def test_update_firewall_post(self): def test_update_firewall_post(self):
firewall = self.firewalls.first() firewall = self.firewalls.first()
tenant_id = self.tenant.id tenant_id = self.tenant.id
api_fwaas.firewall_get(IsA(http.HttpRequest), policies = self.fw_policies.list()
firewall.id).AndReturn(firewall) self.mock_firewall_get.return_value = firewall
self.mock_policy_list_for_tenant.return_value = policies
self.mock_firewall_update.return_value = firewall
data = {'name': 'new name', data = {'name': 'new name',
'description': 'new desc', 'description': 'new desc',
@ -639,23 +648,23 @@ class FirewallTests(test.TestCase):
'admin_state_up': False 'admin_state_up': False
} }
policies = self.fw_policies.list()
api_fwaas.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api_fwaas.firewall_update(IsA(http.HttpRequest), firewall.id, **data)\
.AndReturn(firewall)
self.mox.ReplayAll()
res = self.client.post( res = self.client.post(
reverse(self.UPDATEFIREWALL_PATH, args=(firewall.id,)), data) reverse(self.UPDATEFIREWALL_PATH, args=(firewall.id,)), data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('policy_get', 'policy_insert_rule', self.mock_firewall_get.assert_called_once_with(
'rule_list_for_tenant', 'rule_get')}) helpers.IsHttpRequest(), firewall.id)
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_update.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id, **data)
@helpers.create_mocks({api_fwaas: ('policy_get',
'policy_insert_rule',
'rule_list_for_tenant',
'rule_get')})
def test_policy_insert_rule(self): def test_policy_insert_rule(self):
policy = self.fw_policies.first() policy = self.fw_policies.first()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -667,21 +676,14 @@ class FirewallTests(test.TestCase):
'insert_before': rules[1].id, 'insert_before': rules[1].id,
'insert_after': rules[0].id} 'insert_after': rules[0].id}
api_fwaas.policy_get(IsA(http.HttpRequest),
policy.id).AndReturn(policy)
policy.firewall_rules = [rules[0].id, policy.firewall_rules = [rules[0].id,
new_rule_id, new_rule_id,
rules[1].id] rules[1].id]
api_fwaas.rule_list_for_tenant( self.mock_policy_get.return_value = policy
IsA(http.HttpRequest), tenant_id).AndReturn(rules) self.mock_rule_list_for_tenant.return_value = rules
api_fwaas.rule_get( self.mock_rule_get.return_value = rules[2]
IsA(http.HttpRequest), new_rule_id).AndReturn(rules[2]) self.mock_policy_insert_rule.return_value = policy
api_fwaas.policy_insert_rule(IsA(http.HttpRequest), policy.id, **data)\
.AndReturn(policy)
self.mox.ReplayAll()
res = self.client.post( res = self.client.post(
reverse(self.INSERTRULE_PATH, args=(policy.id,)), data) reverse(self.INSERTRULE_PATH, args=(policy.id,)), data)
@ -689,8 +691,19 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('policy_get', 'policy_remove_rule', self.mock_policy_get.assert_called_once_with(
'rule_list_for_tenant', 'rule_get')}) helpers.IsHttpRequest(), policy.id)
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), new_rule_id)
self.mock_policy_insert_rule.assert_called_once_with(
helpers.IsHttpRequest(), policy.id, **data)
@helpers.create_mocks({api_fwaas: ('policy_get',
'policy_remove_rule',
'rule_list_for_tenant',
'rule_get')})
def test_policy_remove_rule(self): def test_policy_remove_rule(self):
policy = self.fw_policies.first() policy = self.fw_policies.first()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -710,16 +723,10 @@ class FirewallTests(test.TestCase):
'shared': True} 'shared': True}
after_remove_policy = api_fwaas.Policy(after_remove_policy_dict) after_remove_policy = api_fwaas.Policy(after_remove_policy_dict)
api_fwaas.policy_get(IsA(http.HttpRequest), self.mock_policy_get.return_value = policy
policy.id).AndReturn(policy) self.mock_rule_list_for_tenant.return_value = rules
api_fwaas.rule_list_for_tenant( self.mock_rule_get.return_value = rules[0]
IsA(http.HttpRequest), tenant_id).AndReturn(rules) self.mock_policy_remove_rule.return_value = after_remove_policy
api_fwaas.rule_get(
IsA(http.HttpRequest), remove_rule_id).AndReturn(rules[0])
api_fwaas.policy_remove_rule(IsA(http.HttpRequest), policy.id, **data)\
.AndReturn(after_remove_policy)
self.mox.ReplayAll()
res = self.client.post( res = self.client.post(
reverse(self.REMOVERULE_PATH, args=(policy.id,)), data) reverse(self.REMOVERULE_PATH, args=(policy.id,)), data)
@ -727,10 +734,19 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('firewall_get', self.mock_policy_get.assert_called_once_with(
'firewall_list_for_tenant', helpers.IsHttpRequest(), policy.id)
'firewall_update', self.mock_rule_list_for_tenant.assert_called_once_with(
'firewall_unassociated_routers_list')}) helpers.IsHttpRequest(), tenant_id)
self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), remove_rule_id)
self.mock_policy_remove_rule.assert_called_once_with(
helpers.IsHttpRequest(), policy.id, **data)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'firewall_list_for_tenant',
'firewall_update',
'firewall_unassociated_routers_list')})
def test_firewall_add_router(self): def test_firewall_add_router(self):
tenant_id = self.tenant.id tenant_id = self.tenant.id
firewall = self.firewalls.first() firewall = self.firewalls.first()
@ -742,18 +758,11 @@ class FirewallTests(test.TestCase):
form_data = {'router_ids': add_router_ids} form_data = {'router_ids': add_router_ids}
post_data = {'router_ids': add_router_ids + existing_router_ids} post_data = {'router_ids': add_router_ids + existing_router_ids}
api_fwaas.firewall_get(
IsA(http.HttpRequest), firewall.id).AndReturn(firewall)
api_fwaas.firewall_unassociated_routers_list(
IsA(http.HttpRequest), tenant_id).AndReturn(routers)
firewall.router_ids = [add_router_ids, existing_router_ids] firewall.router_ids = [add_router_ids, existing_router_ids]
api_fwaas.firewall_update( self.mock_firewall_get.return_value = firewall
IsA(http.HttpRequest), self.mock_firewall_unassociated_routers_list.return_value = routers
firewall.id, **post_data).AndReturn(firewall) self.mock_firewall_update.return_value = firewall
self.mox.ReplayAll()
res = self.client.post( res = self.client.post(
reverse(self.ADDROUTER_PATH, args=(firewall.id,)), form_data) reverse(self.ADDROUTER_PATH, args=(firewall.id,)), form_data)
@ -761,9 +770,16 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('firewall_get', self.mock_firewall_get.assert_called_once_with(
'firewall_update'), helpers.IsHttpRequest(), firewall.id)
api.neutron: ('router_list',), }) self.mock_firewall_unassociated_routers_list.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_update.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id, **post_data)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'firewall_update'),
api.neutron: ('router_list',), })
def test_firewall_remove_router(self): def test_firewall_remove_router(self):
firewall = self.firewalls.first() firewall = self.firewalls.first()
tenant_id = self.tenant.id tenant_id = self.tenant.id
@ -772,16 +788,11 @@ class FirewallTests(test.TestCase):
form_data = {'router_ids': existing_router_ids} form_data = {'router_ids': existing_router_ids}
api_fwaas.firewall_get(
IsA(http.HttpRequest), firewall.id).AndReturn(firewall)
api.neutron.router_list(
IsA(http.HttpRequest), tenant_id=tenant_id).AndReturn(routers)
firewall.router_ids = [] firewall.router_ids = []
api_fwaas.firewall_update(
IsA(http.HttpRequest),
firewall.id, **form_data).AndReturn(firewall)
self.mox.ReplayAll() self.mock_firewall_get.return_value = firewall
self.mock_router_list.return_value = routers
self.mock_firewall_update.return_value = firewall
res = self.client.post( res = self.client.post(
reverse(self.REMOVEROUTER_PATH, args=(firewall.id,)), form_data) reverse(self.REMOVEROUTER_PATH, args=(firewall.id,)), form_data)
@ -789,56 +800,70 @@ class FirewallTests(test.TestCase):
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL)) self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas: ('rule_list_for_tenant', self.mock_firewall_get.assert_called_once_with(
'rule_delete'), helpers.IsHttpRequest(), firewall.id)
api.neutron: ('is_extension_supported',)}) api.neutron.router_list.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id=tenant_id)
self.mock_firewall_update.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id, **form_data)
@helpers.create_mocks({api_fwaas: ('rule_list_for_tenant',
'rule_delete'),
api.neutron: ('is_extension_supported',)})
def test_delete_rule(self): def test_delete_rule(self):
api.neutron.is_extension_supported( self.mock_is_extension_supported.return_value = True
IsA(http.HttpRequest), 'fwaasrouterinsertion').AndReturn(True) self.mock_rule_list_for_tenant.return_value = self.fw_rules.list()
self.mock_rule_delete.return_value = None
rule = self.fw_rules.list()[2] rule = self.fw_rules.list()[2]
api_fwaas.rule_list_for_tenant(
IsA(http.HttpRequest),
self.tenant.id).AndReturn(self.fw_rules.list())
api_fwaas.rule_delete(IsA(http.HttpRequest), rule.id)
self.mox.ReplayAll()
form_data = {"action": "rulestable__deleterule__%s" % rule.id} form_data = {"action": "rulestable__deleterule__%s" % rule.id}
res = self.client.post(self.INDEX_URL, form_data) res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
@test.create_stubs({api_fwaas: ('policy_list_for_tenant', self.mock_is_extension_supported.assert_called_once_with(
'policy_delete'), helpers.IsHttpRequest(), 'fwaasrouterinsertion')
api.neutron: ('is_extension_supported',)}) self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), self.tenant.id)
self.mock_rule_delete.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
@helpers.create_mocks({api_fwaas: ('policy_list_for_tenant',
'policy_delete'),
api.neutron: ('is_extension_supported',)})
def test_delete_policy(self): def test_delete_policy(self):
api.neutron.is_extension_supported( self.mock_is_extension_supported.return_value = True
IsA(http.HttpRequest), 'fwaasrouterinsertion').AndReturn(True) self.mock_policy_list_for_tenant.return_value = self.fw_policies.list()
self.mock_policy_delete.return_value = None
policy = self.fw_policies.first() policy = self.fw_policies.first()
api_fwaas.policy_list_for_tenant(
IsA(http.HttpRequest),
self.tenant.id).AndReturn(self.fw_policies.list())
api_fwaas.policy_delete(IsA(http.HttpRequest), policy.id)
self.mox.ReplayAll()
form_data = {"action": "policiestable__deletepolicy__%s" % policy.id} form_data = {"action": "policiestable__deletepolicy__%s" % policy.id}
res = self.client.post(self.INDEX_URL, form_data) res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
api.neutron.is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), self.tenant.id)
self.mock_policy_delete.assert_called_once_with(
helpers.IsHttpRequest(), policy.id)
@test.create_stubs({api_fwaas: ('firewall_list_for_tenant', @helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'firewall_delete'), 'firewall_delete'),
api.neutron: ('is_extension_supported', api.neutron: ('is_extension_supported',)})
'router_list',)})
def test_delete_firewall(self): def test_delete_firewall(self):
fwl = self.firewalls.first() fwl = self.firewalls.first()
api_fwaas.firewall_list_for_tenant( self.mock_firewall_list_for_tenant.return_value = [fwl]
IsA(http.HttpRequest), self.tenant.id).AndReturn([fwl]) self.mock_firewall_delete.return_value = None
api_fwaas.firewall_delete(IsA(http.HttpRequest), fwl.id) self.mock_is_extension_supported.return_value = False
self.mox.ReplayAll()
form_data = {"action": "firewallstable__deletefirewall__%s" % fwl.id} form_data = {"action": "firewallstable__deletefirewall__%s" % fwl.id}
res = self.client.post(self.INDEX_URL, form_data) res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res) self.assertNoFormErrors(res)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), self.tenant.id)
self.mock_firewall_delete.assert_called_once_with(
helpers.IsHttpRequest(), fwl.id)
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')